새소식

Project

LoGO 해외로고 프로젝트 - RAG 3.

  • -

LoGO 해외로고, 해외진출을 희망하는 대한민국 기업을 위한 정보 검색 서비스에서 RAG 부분

세번째 게시물이다.

 

https://github.com/khw11044/KT_BIGPRO_RAG

 

GitHub - khw11044/KT_BIGPRO_RAG

Contribute to khw11044/KT_BIGPRO_RAG development by creating an account on GitHub.

github.com

 

위 깃헙링크에서 코드를 따라하면 되겠다.

해당 게시물에서는 빈 프로젝트 폴더에서 시작해서 하나하나 코딩을 해본다.

세번째 게시물은 RAG pipeline에서 Chat 생성기를 통해 본격적으로 생성물을 만들어 낸다.

1. API Key를 로드한다.

# API KEY를 환경변수로 관리하기 위한 설정 파일
from dotenv import load_dotenv
# API KEY 정보로드
load_dotenv()

2. 필요 라이브러리를 임포트한다.

# langchain_openai에서 ChatOpenAI(LLM)과 OpenAIEmbeddings(임베딩모델: text를 vector화하는 모델)을 load
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# PDF파일등 데이터를 Chroma형식의 vectorDB에 저장하고 리트리버가 수집한 데이터에 접근하기 위해 Chroma를 load
from langchain_community.vectorstores import Chroma

# 우리가 만든 config.py에서 모델등 옵션들을 수정
from utils.config import config, metadata_field_info

# 추가 데이터를 업로드하기 위해 file을 다큐먼트로 만들고 (convert_file_to_documents) 다큐먼트를 자르는 (split_document) 함수를 load합니다.
from utils.update import convert_file_to_documents

from langchain.retrievers.multi_query import MultiQueryRetriever
from langchain.retrievers.self_query.base import SelfQueryRetriever
# Ensemble retriever
from langchain.retrievers import EnsembleRetriever
from langchain_community.retrievers import BM25Retriever
import pickle

3. 리트리버까지만 있는 RAG pipeline

아래는 chat 부분없이 2번째 게시물까지 만들었던 코드이다.

class Ragpipeline:
    def __init__(self):
        # chatGPT API를 통해 llm 모델 로드
        self.llm = ChatOpenAI(
            model=config["llm_predictor"]["model_name"],  # chatgpt 모델 이름
            temperature=config["llm_predictor"]["temperature"],  # 창의성 0~1
        )

        # 초기화 리스트들 
        self.vector_store   = self.init_vectorDB()                  # 1. RAG가 접근할 vectorDB를 초기화합니다.
        self.retriever      = self.init_retriever()                 # 2. LLM이 질문에 대한 답변을 생성하기 전 질문에 관련된 컨텐츠 기반 답변을 생성하기 위해, 컨텐츠를 검색해 찾는 리트리버를 초기화 합니다. 
        self.bm25_retriever = self.init_bm25_retriever()
        self.ensemble_retriever = self.init_ensemble_retriever()
        self.mq_ensemble_retriever = self.init_mq_ensemble_retriever()

    def init_vectorDB(self, persist_directory=config["chroma"]["persist_dir"]):
        """vectorDB 설정"""
        embeddings = OpenAIEmbeddings(model=config["embed_model"]["model_name"])  # VectorDB에 저장될 데이터를 임베딩할 모델을 선언합니다.
        vector_store = Chroma(
            persist_directory=persist_directory,  # 기존에 vectordb가 있으면 해당 위치의 vectordb를 load하고 없으면 새로 생성합니다.
            embedding_function=embeddings,                      # 새롭게 데이터가 vectordb에 넣어질때 사용할 임베딩 방식을 정합니다, 저희는 위에서 선언한 embeddings를 사용합니다.
            collection_name = 'india',                          # india라는 이름을 정해줌으로써 나중에 vector store 관리 가능 
            collection_metadata = {'hnsw:space': 'cosine'},     # cosine 말고 l2 가 default / collection_metadata를 통해 유사도 검색에 사용될 공간('hnsw:space')을 'cosine'으로 지정하여, 코사인 유사도를 사용
        )
        return vector_store

    def init_retriever(self):            
        # base retriever 3 
        retriever = self.vector_store.as_retriever(
            search_type="mmr",                                              # mmr 검색 방법으로 
            search_kwargs={'fetch_k': 10, "k": 5, 'lambda_mult': 0.4},      # 상위 10개의 관련 context에서 최종 5개를 추리고 'lambda_mult'는 관련성과 다양성 사이의 균형을 조정하는 파라메타 default 값이 0.5
        )
        return retriever

    def init_bm25_retriever(self):
        all_docs = pickle.load(open(config["pkl_path"], 'rb'))
        bm25_retriever = BM25Retriever.from_documents(all_docs)
        bm25_retriever.k = 1                                            # BM25Retriever의 검색 결과 개수를 1로 설정합니다.
        return bm25_retriever

    def init_ensemble_retriever(self):
        ensemble_retriever = EnsembleRetriever(
            retrievers=[self.bm25_retriever, self.retriever],
            weights=[0.4, 0.6],
            search_type=config["ensemble_search_type"],  # mmr
        )
        return ensemble_retriever

    # 멀티쿼리 - 앙상블
    def init_mq_ensemble_retriever(self):
        mq_ensemble_retriever = MultiQueryRetriever.from_llm(
            llm=self.llm,
            retriever=self.ensemble_retriever
        )
        return mq_ensemble_retriever

 

이제 Chat부분을 추가해보자.

4. Chat

우리는 사용자와 챗봇이 대화하며 사용자가 질문하는 것을 정확하고 신속하게 답변해주길 원한다.

다음은 구축한 데이터셋 기반 대답을 만들어 내기 위해 Langchain을 사용하기 위한 라이브러리들이다.

from langchain.chains import create_history_aware_retriever, create_retrieval_chain
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain.schema import HumanMessage

from utils.redis_utils import save_message_to_redis, get_messages_from_redis
from utils.prompt import *

 

다음 코드는 LLM과 리트리버, 그리고 프롬프트가 연결된 채팅 생성기이다.

 

# 한 이유: 현재 대화가 이어지도록 하면서 검색기가 가져온 정보들을 바탕으로 대화 할 수 있게 하기 위해서
def init_chat_chain(self):
    # 참고 링크 : https://python.langchain.com/v0.1/docs/modules/chains/

    # 1. 이어지는 대화가 되도록 대화기록과 체인
    history_aware_retriever = create_history_aware_retriever(           # llm과 retriever, 그리고 prompt를 연결한다. 이 체인은 대화 기록을 가져온 다음 이를 사용하여 기본 검색기에 전달되는 검색어를 생성합니다.
        self.llm, self.retriever, contextualize_q_prompt                # contextualize_q_prompt의 주요 목표는 사용자의 질문을 이해하기 쉽게 다시 작성하는 것입니다. 전반적으로 대화 맥락을 이해합니다.
    )

    # 2. 문서들의 내용을 답변할 수 있도록 리트리버와 체인
    question_answer_chain = create_stuff_documents_chain(               # prompt와 llm을 연결하여 나중에 리트리버가 제공하는 문서의 내용들과 연결될 준비를 시켜놓습니다. 
        self.llm, qa_prompt)

    #. 1번과 2번을 서로 체인
    rag_chat_chain = create_retrieval_chain(                            # 이 체인은 사용자 쿼리를 받아 retriever에게 전달되어 관련 문서를 가져옵니다. 그런 다음 해당 문서(및 원래 입력)가 LLM으로 전달되어 응답을 생성합니다.
        history_aware_retriever, question_answer_chain)

    print("[초기화] RAG chain 초기화 완료")
    return rag_chat_chain

 

다음 코드는 사용자별 대화 세션을 관리해주고 이어서 대화가 될 수 있게 해주기 위해 Redis를 적용한 부분이다. 최종적으로 이어지는 대화에 수집한 데이터 기반으로 답변을 만들어지며, 이 대화들은 저장하고 불러올 수 있다.

 


# 한 이유: 사용자별 대화 세션을 관리해주고 이어서 대화가 될 수 있게 해주기 위해 
def chat_generation(self, question: str) -> dict:
    def get_session_history(session_id=None, user_email=None):
        session_id = session_id if session_id else self.current_session_id
        user_email = user_email if user_email else self.current_user_email

        if session_id not in self.session_histories:
            self.session_histories[session_id] = ChatMessageHistory()
            # Redis에서 세션 히스토리 불러오기
            history_messages = get_messages_from_redis(user_email, session_id)
            for message in history_messages:
                self.session_histories[session_id].add_message(HumanMessage(content=message))

        return self.session_histories[session_id]

    final_chain = self.chain

    # 특정 유형의 작업(체인)에 메시지 기록을 추가, 대화형 애플리케이션 또는 복잡한 데이터 처리 작업을 구현할 때 이전 메시지의 맥락을 유지해야 할 필요가 있을 때 유용
    conversational_rag_chain = RunnableWithMessageHistory(      
        final_chain,                                # 실행할 Runnable 객체
        get_session_history,                        # 세션 기록을 가져오는 함수
        input_messages_key="input",                 # 입력 메시지의 키
        history_messages_key="chat_history",        # 기록 메시지의 키
        output_messages_key="answer"                # 출력 메시지의 키 
    )
    response = conversational_rag_chain.invoke(
        {"input": question},
        config={"configurable": {"session_id": self.current_session_id}}            # 같은 session_id 를 입력하면 이전 대화 스레드의 내용을 가져오기 때문에 이어서 대화가 가능!
    )

    # Redis에 세션 히스토리 저장
    save_message_to_redis(self.current_user_email, self.current_session_id, question)
    save_message_to_redis(self.current_user_email, self.current_session_id, response["answer"])

    return response

 

이제 위 내용을 클래스 안에 넣자

class Ragpipeline:
    def __init__(self):
        # chatGPT API를 통해 llm 모델 로드
        self.llm = ChatOpenAI(
            model=config["llm_predictor"]["model_name"],  # chatgpt 모델 이름
            temperature=config["llm_predictor"]["temperature"],  # 창의성 0~1
        )

        # 초기화 리스트들 
        self.vector_store   = self.init_vectorDB()                  
        self.retriever      = self.init_retriever()                
        self.bm25_retriever = self.init_bm25_retriever()
        self.ensemble_retriever = self.init_ensemble_retriever()
        self.mq_ensemble_retriever = self.init_mq_ensemble_retriever()
        self.chain          = self.init_chat_chain()

        self.session_histories = {}
        self.current_user_email = None
        self.current_session_id = None

    def init_vectorDB(self, persist_directory=config["chroma"]["persist_dir"]):
        """vectorDB 설정"""
        embeddings = OpenAIEmbeddings(model=config["embed_model"]["model_name"])  # VectorDB에 저장될 데이터를 임베딩할 모델을 선언합니다.
        vector_store = Chroma(
            persist_directory=persist_directory,  # 기존에 vectordb가 있으면 해당 위치의 vectordb를 load하고 없으면 새로 생성합니다.
            embedding_function=embeddings,                      # 새롭게 데이터가 vectordb에 넣어질때 사용할 임베딩 방식을 정합니다, 저희는 위에서 선언한 embeddings를 사용합니다.
            collection_name = 'india',                          # india라는 이름을 정해줌으로써 나중에 vector store 관리 가능 
            collection_metadata = {'hnsw:space': 'cosine'},     # cosine 말고 l2 가 default / collection_metadata를 통해 유사도 검색에 사용될 공간('hnsw:space')을 'cosine'으로 지정하여, 코사인 유사도를 사용
        )
        return vector_store

    def init_retriever(self):            
        # base retriever 3 
        retriever = self.vector_store.as_retriever(
            search_type="mmr",                                              # mmr 검색 방법으로 
            search_kwargs={'fetch_k': 5, "k": 2, 'lambda_mult': 0.4},      # 상위 10개의 관련 context에서 최종 5개를 추리고 'lambda_mult'는 관련성과 다양성 사이의 균형을 조정하는 파라메타 default 값이 0.5
        )
        return retriever

    def init_bm25_retriever(self):
        all_docs = pickle.load(open(config["pkl_path"], 'rb'))
        bm25_retriever = BM25Retriever.from_documents(all_docs)
        bm25_retriever.k = 1                                            # BM25Retriever의 검색 결과 개수를 1로 설정합니다.
        return bm25_retriever

    def init_ensemble_retriever(self):
        ensemble_retriever = EnsembleRetriever(
            retrievers=[self.bm25_retriever, self.retriever],
            weights=[0.4, 0.6],
            search_type=config["ensemble_search_type"],  # mmr
        )
        return ensemble_retriever

    # 멀티쿼리 - 앙상블
    def init_mq_ensemble_retriever(self):
        mq_ensemble_retriever = MultiQueryRetriever.from_llm(
            llm=self.llm,
            retriever=self.ensemble_retriever
        )
        return mq_ensemble_retriever

    def init_chat_chain(self):
        # 1. 이어지는 대화가 되도록 대화기록과 체인
        history_aware_retriever = create_history_aware_retriever(self.llm, self.mq_ensemble_retriever, contextualize_q_prompt)      # self.mq_ensemble_retriever
        # 2. 문서들의 내용을 답변할 수 있도록 리트리버와 체인
        question_answer_chain = create_stuff_documents_chain(self.llm, qa_prompt)
        # 3. 1과 2를 합침
        rag_chat_chain = create_retrieval_chain(history_aware_retriever, question_answer_chain)

        return rag_chat_chain

    def chat_generation(self, question: str) -> dict:
        def get_session_history(session_id=None, user_email=None):
            session_id = session_id if session_id else self.current_session_id
            user_email = user_email if user_email else self.current_user_email

            if session_id not in self.session_histories:
                self.session_histories[session_id] = ChatMessageHistory()
                # Redis에서 세션 히스토리 불러오기
                history_messages = get_messages_from_redis(user_email, session_id)
                for message in history_messages:
                    self.session_histories[session_id].add_message(HumanMessage(content=message))

            return self.session_histories[session_id]

        # 특정 유형의 작업(체인)에 메시지 기록을 추가, 대화형 애플리케이션 또는 복잡한 데이터 처리 작업을 구현할 때 이전 메시지의 맥락을 유지해야 할 필요가 있을 때 유용
        conversational_rag_chain = RunnableWithMessageHistory(      
            self.chain,                                 # 실행할 Runnable 객체
            get_session_history,                        # 세션 기록을 가져오는 함수
            input_messages_key="input",                 # 입력 메시지의 키
            history_messages_key="chat_history",        # 기록 메시지의 키
            output_messages_key="answer"                # 출력 메시지의 키 
        )
        response = conversational_rag_chain.invoke(
            {"input": question},
            config={"configurable": {"session_id": self.current_session_id}}            # 같은 session_id 를 입력하면 이전 대화 스레드의 내용을 가져오기 때문에 이어서 대화가 가능!
        )

        # Redis에 세션 히스토리 저장
        save_message_to_redis(self.current_user_email, self.current_session_id, question)
        save_message_to_redis(self.current_user_email, self.current_session_id, response["answer"])

        return response

 

5. Chat 결과 확인

pipeline = Ragpipeline()

question = '인도 통관 및 운송에 대해서 알려줘.'
answer = pipeline.chat_generation(question)

print(answer)

print(answer.keys())
print(answer['chat_history'])

print(answer['answer'])

print(answer['context'])
Contents

포스팅 주소를 복사했습니다

이 글이 도움이 되었다면 공감 부탁드립니다.