LangChain으로 LLM RAG 챗봇 구축 VII

2024. 4. 23. 19:34python/intermediate

5단계: LangChain 에이전트 배포

 

마침내 병원 시스템 챗봇 역할을 하는 LangChain 에이전트가 작동하게 되었습니다. 마지막으로 해야 할 일은 이해관계자들에게 챗봇을 제공하는 것입니다. 이를 위해 챗봇을 FastAPI 엔드포인트로 배포하고 Streamlit UI를 생성하여 엔드포인트와 상호 작용합니다.

시작하기 전에 프로젝트의 루트 디렉터리에 chatbot_frontend/ 및 tests/ 라는 두 개의 새 폴더를 만듭니다. 또한 다음 위치에 추가 파일과 chatbot_api/에 폴더를 추가해야 합니다.

 

 

 

FastAPI 앱을 빌드하려면 chatbot_api에 새 파일이 필요하며 , tests/에는 에이전트에 비동기 요청을 보내는 기능을 보여주는 두 개의 스크립트가 있습니다. 마지막으로 chatbot_frontend/는 챗봇과 인터페이스할 Streamlit UI용 코드가 있습니다. 에이전트를 제공할 FastAPI 애플리케이션을 만드는 것부터 시작하겠습니다.

 

FastAPI를 사용하여 에이전트 제공

 

FastAPI는 표준 유형 힌트를 기반으로 Python으로 API를 구축하기 위한 현대적인 고성능 웹 프레임워크입니다. 개발 속도, 런타임 속도, 훌륭한 커뮤니티 지원을 포함한 많은 훌륭한 기능이 함께 제공되므로 챗봇 에이전트 서비스를 위한 탁월한 선택입니다.

POST 요청을 통해 에이전트를 제공하므로 첫 번째 단계는 요청 본문에 가져올 것으로 예상되는 데이터와 요청이 반환하는 데이터를 정의하는 것입니다. FastAPI는 Pydantic을 사용하여 이 작업을 수행합니다 .

 
[ ]:
 
 
# chatbot_api/src/models/hospital_rag_query.py
from pydantic import BaseModel
class HospitalQueryInput(BaseModel):
    text: str
class HospitalQueryOutput(BaseModel):
    input: str
    output: str
    intermediate_steps: list[str]
 
 

이 스크립트에서는 POST 요청 본문 text에 챗봇이 응답하는 쿼리를 나타내는 필드가 포함되어 있는지 확인하는 데 사용되는 Pydantic 모델 HospitalQueryInput과 HospitalQueryOutput. HospitalQueryInput을 정의합니다. HospitalQueryOutput는 사용자에게 다시 전송된 응답 본문에 input, output 및 intermediate_step 필드가 포함되어 있는지 확인합니다.

FastAPI의 가장 큰 특징 중 하나는 비동기식 제공 기능입니다. 에이전트가 외부 서버에서 호스팅되는 OpenAI 모델을 호출하기 때문에 에이전트가 응답을 기다리는 동안 항상 지연 시간이 발생합니다. 이는 비동기 프로그래밍을 사용할 수 있는 완벽한 기회입니다.

OpenAI가 각 에이전트의 요청에 응답할 때까지 기다리는 대신 에이전트가 여러 요청을 연속으로 수행하고 수신된 응답을 저장하도록 할 수 있습니다. 이렇게 하면 에이전트가 응답해야 하는 쿼리가 여러 개 있는 경우 많은 시간을 절약할 수 있습니다.

이전에 논의한 것처럼 Neo4j에 간헐적으로 연결 문제가 있을 수 있으며 일반적으로 새 연결을 설정하면 해결됩니다. 이 때문에 비동기 함수에 작동하는 재시도 논리를 구현해야 합니다.

 
[ ]:
 
 
 
# chatbot_api/src/utils/async_utils.py
import asyncio
def async_retry(max_retries: int=3, delay: int=1):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            for attempt in range(1, max_retries + 1):
                try:
                    result = await func(*args, **kwargs)
                    return result
                except Exception as e:
                    print(f"Attempt {attempt} failed: {str(e)}")
                    await asyncio.sleep(delay)
            raise ValueError(f"Failed after {max_retries} attempts")
        return wrapper
    return decorator
 
 

@async_retry의 세부 사항에 대해 걱정하지 마십시오. 당신이 알아야 할 것은 실패할 경우 비동기 함수를 다시 시도한다는 것입니다. 다음에는 이것이 어디에 사용되는지 살펴보겠습니다.

챗봇 API의 구동 논리는 chatbot_api/src/main.py과 같습니다.

 
[ ]:
 
 
 
# chatbot_api/src/main.py
from fastapi import FastAPI
from agents.hospital_rag_agent import hospital_rag_agent_executor
from models.hospital_rag_query import HospitalQueryInput, HospitalQueryOutput
from utils.async_utils import async_retry
app = FastAPI(
    title="Hospital Chatbot",
    description="Endpoints for a hospital system graph RAG chatbot",
)
@async_retry(max_retries=10, delay=1)
async def invoke_agent_with_retry(query: str):
    """Retry the agent if a tool fails to run.
    This can help when there are intermittent connection issues
    to external APIs.
    """
    return await hospital_rag_agent_executor.ainvoke({"input": query})
@app.get("/")
async def get_status():
    return {"status": "running"}
@app.post("/hospital-rag-agent")
async def query_hospital_agent(query: HospitalQueryInput) -> HospitalQueryOutput:
    query_response = await invoke_agent_with_retry(query.text)
    query_response["intermediate_steps"] = [
        str(s) for s in query_response["intermediate_steps"]
    ]
    return query_response
 
 

FastAPI, 에이전트 실행기, POST 요청을 위해 생성한 Pydantic 모델 및 @async_retry을 가져옵니다. 그런 다음 FastAPI 객체를 인스턴스화하고 에이전트를 비동기식으로 실행하는 함수인 invoke_agent_with_retry()를 정의합니다. invoke_agent_with_retry() 위의 @async_retry 데코레이터는 실패하기 전에 1초의 지연을 두고 함수가 10번 재시도되도록 보장합니다.

마지막으로 /hospital-rag-agent에서 에이전트에 POST 요청을 제공하는 query_hospital_agent()을 정의합니다. 이 함수는 요청 본문에서 text 필드를 추출하여 에이전트에 전달하고 에이전트의 응답을 사용자에게 반환합니다.

Docker를 사용하여 이 API를 제공하고 컨테이너 내부에서 실행할 다음 진입점 파일을 정의하려고 합니다.

 

# chatbot_api/src/entrypoint.sh

#!/bin/bash

# Run any setup steps or pre-processing tasks here
echo "Starting hospital RAG FastAPI service..."

# Start the main application
uvicorn main:app --host 0.0.0.0 --port 8000

 

uvicorn main:app --host 0.0.0.0 --port 8000 명령은 컴퓨터의 포트 8000에서 FastAPI 애플리케이션을 실행합니다. FastAPI 앱을 위한 Dockerfile의 구동은 다음과 같습니다.

 

# chatbot_api/Dockerfile

FROM python:3.11-slim

WORKDIR /app
COPY ./src/ /app

COPY ./pyproject.toml /code/pyproject.toml
RUN pip install /code/.

EXPOSE 8000
CMD ["sh", "entrypoint.sh"]

 

Dockerfile는 컨테이너가 python:3.11-slim 배포판을 사용하고, 컨테이너 내의 /app 디렉터리에 chatbot_api/src/에서 콘텐츠를 복사하고, pyproject.toml에서 종속성을 설치하고, entrypoint.sh를 실행하도록 지시합니다

마지막으로 해야 할 일은 FastAPI 컨테이너를 포함하는 docker-compose.yml 파일을 업데이트하는 것입니다.

 

  # YAML

version: '3'

services:
     hospital_neo4j_etl:
         build:
             context: ./hospital_neo4j_etl
         env_file:
             - .env

     chatbot_api:
         build:
             context: ./chatbot_api
         env_file:
             - .env
         depends_on:
             - hospital_neo4j_etl
         ports:
             - "8000:8000"

 

여기에 ./chatbot_api의 Dockerfile에서 파생된 chatbot_api 서비스를 추가합니다 . 이는 hospital_neo4j_etl에 의존하며 포트 8000에에서 실행됩니다.

API를 실행하려면이전에 빌드한 ETL과 함께 터미널을 열고 다음을 실행하세요.

 

# Shell

$ docker-compose up --build

 

모든 것이 성공적으로 실행되면 http://localhost:8000/docs#/.과 유사한 화면이 표시됩니다.

 

문서 페이지를 사용하여 hospital-rag-agent 엔드포인트를 테스트할 수 있지만 여기에서 비동기 요청을 할 수는 없습니다. 엔드포인트가 비동기 요청을 처리하는 방법을 보려면 httpx와 같은 라이브러리를 사용하여 테스트할 수 있습니다 .

참고: 아래 테스트를 실행하기 전에 가상 환경에 httpx를 설치 해야 합니다.

비동기식 요청으로 얼마나 많은 시간이 절약되는지 확인하려면 먼저 동기식 요청을 사용하여 벤치마크를 설정해 보세요. 다음 스크립트를 만듭니다.

 
[ ]:
 
 
 
 
# tests/sync_agent_requests.py
import time
import requests
CHATBOT_URL = "http://localhost:8000/hospital-rag-agent"
questions = [
   "What is the current wait time at Wallace-Hamilton hospital?",
   "Which hospital has the shortest wait time?",
   "At which hospitals are patients complaining about billing and insurance issues?",
   "What is the average duration in days for emergency visits?",
   "What are patients saying about the nursing staff at Castaneda-Hardy?",
   "What was the total billing amount charged to each payer for 2023?",
   "What is the average billing amount for Medicaid visits?",
   "How many patients has Dr. Ryan Brown treated?",
   "Which physician has the lowest average visit duration in days?",
   "How many visits are open and what is their average duration in days?",
   "Have any patients complained about noise?",
   "How much was billed for patient 789's stay?",
   "Which physician has billed the most to cigna?",
   "Which state had the largest percent increase in Medicaid visits from 2022 to 2023?",
]
request_bodies = [{"text": q} for q in questions]
start_time = time.perf_counter()
outputs = [requests.post(CHATBOT_URL, json=data) for data in request_bodies]
end_time = time.perf_counter()
print(f"Run time: {end_time - start_time} seconds")
 
 

이 스크립트에서는 requests 및 time를 가져오고, 챗봇에 대한 URL을 정의하고, 질문 목록을 만들고, 목록의 모든 질문에 대한 응답을 받는 데 걸리는 시간을 기록합니다. 터미널을 열고 sync_agent_requests.py를 실행하면 14가지 질문에 모두 답하는 데 시간이 얼마나 걸리는지 확인할 수 있습니다.

 

# Shell

(venv) $ python tests/sync_agent_requests.py

 

인터넷 속도와 채팅 모델의 가용성에 따라 결과가 약간 다를 수 있지만 이 스크립트를 실행하는 데 약 68초가 소요되는 것을 볼 수 있습니다. 다음으로 동일한 질문에 대한 답변을 비동기적으로 얻을 수 있습니다.

 
[ ]:
 
 
 
 
# tests/async_agent_requests.py
import asyncio
import time
import httpx
CHATBOT_URL = "http://localhost:8000/hospital-rag-agent"
async def make_async_post(url, data):
    timeout = httpx.Timeout(timeout=120)
    async with httpx.AsyncClient() as client:
        response = await client.post(url, json=data, timeout=timeout)
        return response
async def make_bulk_requests(url, data):
    tasks = [make_async_post(url, payload) for payload in data]
    responses = await asyncio.gather(*tasks)
    outputs = [r.json()["output"] for r in responses]
    return outputs
questions = [
   "What is the current wait time at Wallace-Hamilton hospital?",
   "Which hospital has the shortest wait time?",
   "At which hospitals are patients complaining about billing and insurance issues?",
   "What is the average duration in days for emergency visits?",
   "What are patients saying about the nursing staff at Castaneda-Hardy?",
   "What was the total billing amount charged to each payer for 2023?",
   "What is the average billing amount for Medicaid visits?",
   "How many patients has Dr. Ryan Brown treated?",
   "Which physician has the lowest average visit duration in days?",
   "How many visits are open and what is their average duration in days?",
   "Have any patients complained about noise?",
   "How much was billed for patient 789's stay?",
   "Which physician has billed the most to cigna?",
   "Which state had the largest percent increase in Medicaid visits from 2022 to 2023?",
]
request_bodies = [{"text": q} for q in questions]
start_time = time.perf_counter()
outputs = asyncio.run(make_bulk_requests(CHATBOT_URL, request_bodies))
end_time = time.perf_counter()
print(f"Run time: {end_time - start_time} seconds")
 
 

async_agent_requests.py에서는 pyhttpx를 비동기식으로 요청을 수행하는 데 사용한다는 점을 제외하면 sync_agent_requests에서와 동일한 요청을 수행합니다. 결과는 다음과 같습니다.

 

# Shell

(venv) $ python tests/async_agent_requests.py

 

다시 한 번 말씀드리지만, 이 작업을 실행하는 데 걸리는 정확한 시간은 개인마다 다를 수 있지만 14개 요청을 비동기식으로 수행하는 것이 대략 4배 더 빠르다는 것을 알 수 있습니다. 에이전트를 비동기식으로 배포하면 인프라 수요를 늘리지 않고도 요청이 많은 볼륨으로 확장할 수 있습니다. 항상 예외가 있기는 하지만, 코드가 네트워크 바인딩 요청을 할 때 REST 엔드포인트를 비동기식으로 제공하는 것은 일반적으로 좋은 생각입니다.

FastAPI 엔드포인트가 작동하면 엔드포인트에 액세스할 수 있는 모든 사람이 에이전트에 액세스할 수 있게 됩니다. 이는 Streamlit을 사용하여 다음에 수행할 작업인 챗봇 UI에 에이전트를 통합하는 데 유용합니다.

 

Streamlit을 사용하여 채팅 UI 만들기

 

이해관계자는 수동으로 API를 요청하지 않고도 에이전트와 상호 작용할 수 있는 방법이 필요합니다. 이를 수용하기 위해 이해관계자와 API 간의 인터페이스 역할을 하는 Streamlit 앱을 구축합니다. Streamlit UI에 대한 종속성은 다음과 같습니다.

 

# chatbot_frontend/pyproject.toml

[project]
name = "chatbot_frontend"
version = "0.1"
dependencies = [
     "requests==2.31.0",
     "streamlit==1.29.0"
]

[project.optional-dependencies]
dev = ["black", "flake8"]

 

Streamlit 앱의 운전 코드는 chatbot_frontend/src/main.py 위치에 있습니다.

 
[ ]:
 
 
 
 
# chatbot_frontend/src/main.py
import os
import requests
import streamlit as st
CHATBOT_URL = os.getenv("CHATBOT_URL", "http://localhost:8000/hospital-rag-agent")
with st.sidebar:
    st.header("About")
    st.markdown(
        """
        This chatbot interfaces with a
        [LangChain](https://python.langchain.com/docs/get_started/introduction)
        agent designed to answer questions about the hospitals, patients,
        visits, physicians, and insurance payers in  a fake hospital system.
        The agent uses  retrieval-augment generation (RAG) over both
        structured and unstructured data that has been synthetically generated.
        """
    )
    st.header("Example Questions")
    st.markdown("- Which hospitals are in the hospital system?")
    st.markdown("- What is the current wait time at wallace-hamilton hospital?")
    st.markdown(
        "- At which hospitals are patients complaining about billing and "
        "insurance issues?"
    )
    st.markdown("- What is the average duration in days for closed emergency visits?")
    st.markdown(
        "- What are patients saying about the nursing staff at "
        "Castaneda-Hardy?"
    )
    st.markdown("- What was the total billing amount charged to each payer for 2023?")
    st.markdown("- What is the average billing amount for medicaid visits?")
    st.markdown("- Which physician has the lowest average visit duration in days?")
    st.markdown("- How much was billed for patient 789's stay?")
    st.markdown(
        "- Which state had the largest percent increase in medicaid visits "
        "from 2022 to 2023?"
    )
    st.markdown("- What is the average billing amount per day for Aetna patients?")
    st.markdown("- How many reviews have been written from patients in Florida?")
    st.markdown(
        "- For visits that are not missing chief complaints, "
        "what percentage have reviews?"
    )
    st.markdown(
        "- What is the percentage of visits that have reviews for each hospital?"
    )
    st.markdown(
        "- Which physician has received the most reviews for this visits "
        "they've attended?"
    )
    st.markdown("- What is the ID for physician James Cooper?")
    st.markdown(
        "- List every review for visits treated by physician 270. Don't leave any out."
    )
st.title("Hospital System Chatbot")
st.info(
    "Ask me questions about patients, visits, insurance payers, hospitals, "
    "physicians, reviews, and wait times!"
)
if "messages" not in st.session_state:
    st.session_state.messages = []
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        if "output" in message.keys():
            st.markdown(message["output"])
        if "explanation" in message.keys():
            with st.status("How was this generated", state="complete"):
                st.info(message["explanation"])
if prompt := st.chat_input("What do you want to know?"):
    st.chat_message("user").markdown(prompt)
    st.session_state.messages.append({"role": "user", "output": prompt})
    data = {"text": prompt}
    with st.spinner("Searching for an answer..."):
        response = requests.post(CHATBOT_URL, json=data)
        if response.status_code == 200:
            output_text = response.json()["output"]
            explanation = response.json()["intermediate_steps"]
        else:
            output_text = """An error occurred while processing your message.
            Please try again or rephrase your message."""
            explanation = output_text
    st.chat_message("assistant").markdown(output_text)
    st.status("How was this generated", state="complete").info(explanation)
    st.session_state.messages.append(
        {
            "role": "assistant",
            "output": output_text,
            "explanation": explanation,
        }
    )
 
 

Streamlit 학습은 이 튜토리얼의 초점이 아니므로 이 코드에 대한 자세한 설명은 얻지 못합니다. 그러나 이 UI의 기능에 대한 높은 수준의 개요는 다음과 같습니다.

  • 사용자가 새로운 쿼리를 할 때마다 전체 채팅 기록이 저장되고 표시됩니다.
  • UI는 사용자의 입력을 받아 에이전트 엔드포인트에 동기식 POST 요청을 보냅니다.
  • 가장 최근의 상담원 응답이 채팅 하단에 표시되고 채팅 기록에 추가됩니다.
  • 에이전트가 사용자에게 제공한 응답을 생성한 방법에 대한 설명입니다. 이는 에이전트가 올바른 도구를 호출했는지 확인할 수 있고 도구가 올바르게 작동했는지 확인할 수 있으므로 감사 목적으로 유용합니다.

완료한 대로 UI를 실행하기 위한 진입점 파일을 만듭니다.

 

# chatbot_frontend/src/entrypoint.sh

#!/bin/bash

# Run any setup steps or pre-processing tasks here
echo "Starting hospital chatbot frontend..."

# Run the ETL script
streamlit run main.py

 

마지막으로 UI용 이미지를 생성하는 Docker 파일은 다음과 같습니다.

 

# chatbot_frontend/Dockerfile

FROM python:3.11-slim

WORKDIR /app

COPY ./src/ /app

COPY ./pyproject.toml /code/pyproject.toml
RUN pip install /code/.

CMD ["sh", "entrypoint.sh"]

 

이는 Dockerfile 이전에 생성한 것과 동일합니다. 이로써 전체 챗봇 애플리케이션을 엔드투엔드(end-to-end)로 실행할 준비가 되었습니다.

 

Docker Compose를 사용하여 프로젝트 조정

 

이제 챗봇을 실행하는 데 필요한 모든 코드를 작성했습니다. 이 마지막 단계는 docker-compose로 프로젝트를 빌드하고 실행합니다. 그렇게 하기 전에 프로젝트 디렉터리에 다음 파일과 폴더가 모두 있는지 확인하세요.

 

 

.env 파일에는 다음 환경 변수가 있어야 합니다. 대부분은 이 튜토리얼의 앞부분에서 생성했지만 CHATBOT_URLStreamlit 앱이 API를 찾을 수 있도록 하나의 새 항목도 추가해야 합니다.

 

# .env

OPENAI_API_KEY=<YOUR_OPENAI_API_KEY>

NEO4J_URI=<YOUR_NEO4J_URI>
NEO4J_USERNAME=<YOUR_NEO4J_USERNAME>
NEO4J_PASSWORD=<YOUR_NEO4J_PASSWORD>

HOSPITALS_CSV_PATH=https://raw.githubusercontent.com/hfhoffman1144/langchain_neo4j_rag_app/main/data/hospitals.csv
PAYERS_CSV_PATH=https://raw.githubusercontent.com/hfhoffman1144/langchain_neo4j_rag_app/main/data/payers.csv
PHYSICIANS_CSV_PATH=https://raw.githubusercontent.com/hfhoffman1144/langchain_neo4j_rag_app/main/data/physicians.csv
PATIENTS_CSV_PATH=https://raw.githubusercontent.com/hfhoffman1144/langchain_neo4j_rag_app/main/data/patients.csv
VISITS_CSV_PATH=https://raw.githubusercontent.com/hfhoffman1144/langchain_neo4j_rag_app/main/data/visits.csv
REVIEWS_CSV_PATH=https://raw.githubusercontent.com/hfhoffman1144/langchain_neo4j_rag_app/main/data/reviews.csv

HOSPITAL_AGENT_MODEL=gpt-3.5-turbo-1106
HOSPITAL_CYPHER_MODEL=gpt-3.5-turbo-1106
HOSPITAL_QA_MODEL=gpt-3.5-turbo-0125

CHATBOT_URL=http://host.docker.internal:8000/hospital-rag-agent

 

docker-compose.yml 파일을 완성하려면 chatbot_frontend 서비스를 추가해야 합니다. 최종 docker-compose.yml 파일은 다음과 같아야 합니다.

 

# docker-compose.yml

version: '3'

services:
     hospital_neo4j_etl:
         build:
             context: ./hospital_neo4j_etl
         env_file:
             - .env

     chatbot_api:
         build:
             context: ./chatbot_api
         env_file:
             - .env
         depends_on:
             - hospital_neo4j_etl
         ports:
             - "8000:8000"

     chatbot_frontend:
         build:
             context: ./chatbot_frontend
         env_file:
             - .env
         depends_on:
             - chatbot_api
         ports:
            m - "8501:8501"

 

마지막으로 터미널을 열고 다음을 실행합니다.

 

# bash

docker-compose up --build

 

모든 것이 빌드되고 실행되면 다음 UI)(http://localhost:8501/)에 액세스하여 챗봇과 채팅을 시작할 수 있습니다.

 

 

완벽하게 작동하는 병원 시스템 챗봇을 처음부터 끝까지 구축하셨습니다. 시간을 내어 질문하고, 답변하기 좋은 질문의 종류를 확인하고, 실패한 부분을 찾아내고, 더 나은 프롬프트나 데이터를 통해 어떻게 개선할 수 있는지 생각해 보세요. 사이드바에 있는 예시 질문에 대한 답변이 성공적으로 이루어졌는지 확인하는 것부터 시작해 보세요.

 

결론

 

가상 병원 시스템에 대한 질문에 답변하는 RAG LangChain 챗봇을 성공적으로 설계, 구축 및 제공했습니다. 이 튜토리얼에서 구축한 챗봇을 개선할 수 있는 방법은 확실히 많습니다. 하지만 이제 LangChain을 자신의 데이터와 통합하는 방법을 제대로 이해하게 되므로 모든 종류의 맞춤형 챗봇을 구축할 수 있는 창의적인 자유를 얻을 수 있습니다.

이 튜토리얼에서는 다음 방법을 배웠습니다.

  • LangChain을 사용하여 개인화된 챗봇을 구축하세요.
  • 비즈니스 요구 사항 에 맞춰 조정 하고 사용 가능한 데이터를 활용하여 가상 병원 시스템을 위한 챗봇을 만듭니다.
  • 챗봇 디자인에서 그래프 데이터베이스 구현을 고려해보세요.
  • 프로젝트에 Neo4j AuraDB 인스턴스를 설정하세요.
  • Neo4j에서 구조화된 데이터 와 구조화되지 않은 데이터를 모두 가져올 수 있는 RAG 챗봇을 개발하세요.
  • FastAPI 및 Streamlit을 사용하여 챗봇을 배포하세요.

아래 링크를 사용하여 다운로드할 수 있는 지원 자료에서 이 프로젝트의 전체 소스 코드와 데이터를 찾을 수 있습니다.

코드 받기: LangChain 챗봇의 무료 소스 코드를 다운로드하려면 https://realpython.com/bonus/build-llm-rag-chatbot-with-langchain-code/ 를 클릭하세요 .

  •  
    LangChain으로 LLM RAG 챗봇 구축.ipynb
     
  •  
    Terminal 1
     
 
  •  
  •  
  •