본문 바로가기
Study (Data Science)/NLP

PDF file - RAG

by 콜라찡 2024. 5. 21.

0. 라이브러리 및 모듈 임포트

!pip install docx==0.2.4
!pip install langchain==0.1.16
!pip install langchain_community==0.0.32
!pip install langchain_core==0.1.42
!pip install langchain_openai==0.1.3
!pip install numpy==1.23.5
!pip install pandas==2.2.2
!pip install faiss-cpu doc2txt pypdf langchain_pinecone

from langchain_community.document_loaders import Docx2txtLoader, PyPDFLoader
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain.chat_models import ChatOpenAI
from pinecone import Pinecone, ServerlessSpec
import os
import re
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma, FAISS
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.prompts import ChatPromptTemplate

1. 데이터 로드

from langchain.document_loaders import DirectoryLoader 를 사용해 여러가지 형식의 다큐먼트들을 불러와도 되고, 로컬의 파일을 바로 불러도 된다. 다만 주의할 점은 내용을 담은 형식이 list인지 dictionary 인지 꼭 타입 확인하기. 이 뒤의 대부분의 랭체인 코드는 dictionary의 docs 형식으로 보통 이어진다. 필요에 따라 데이터 클리닝 한다.

OPENAI_API_KEY= "" # 직접입력보단 load_env로 처리하기
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
# data loading
def doc_loader(file_path):
  _, file_ext = os.path.splitext(file_path)

  if file_ext.lower() == 'docx':
    txt_loader = DirectoryLoader(os.path.dirname(file_path), glob=os.path.basename(file_path), loader_cls=Docx2txtLoader)
    documents = txt_loader.load()
    print("Successfully extracted DOCX file")
    return documents
  
  elif file_ext.lower() == '.pdf':
    pdf_loader = DirectoryLoader(os.path.dirname(file_path), glob=os.path.basename(file_path), loader_cls=PyPDFLoader)
    documents = pdf_loader.load()
    print("Successfully extracted PDF file")
    return documents
  
  else:
    documents = []
    print("Unsupported file format")

documents = doc_loader("---파일경로---")

# cleaning
def extract_and_combine_text(documents):
  combined_text = ' '.join(doc.page_content for doc in documents)
  return combined_text

def clean_text(combined_text):
  return re.sub(r'\s+', ' ', combined_text)

combined_text = extract_and_combine_text(documents)
cleaned_text = clean_text(combined_text)
type(cleaned_text)

 

2. 벡터 DB 별 split & embedding & db upsert

먼저 initial로 db를 셋팅한다. api_key를 연결하는 작업이다. 

경험상 FAISS 는 무료이고 제일 간단하다. 한 줄 코드에 embedding과 db upsert가 동시에 끝난다.

 

3. retriever / prompt / rag chain / response

답을 찾아올 리트리버를 선언하고, 프롬프트 주고, 체인으로 연결해서, 답 받아오면 끝.

# Used FAISS
chunk_size = 1000
chunk_overlap = 200
k = 3

# Initial
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
    os.environ["OPENAI_API_KEY"] = ""
    raise EnvironmentError("OpenAI API key not found in environment variables.")

# Split
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
split = text_splitter.split_documents(documents)

# Embedding, FAISS
vectorstore = FAISS.from_documents(documents=split, embedding=OpenAIEmbeddings(api_key=api_key))

# Retriever
retriever = vectorstore.as_retriever(search_kwargs={'k': k})

# prompt
template = """
You are a well-informed assistant, tasked to answer the question below using the provided context. Include sources only if they are specific and verifiable within the provided context. If unsure about any part of the question, state explicitly that you don't know.
---------
Context:
{context}
---------
Question: {question}
---------
Guidelines:
- Answer format: 
  1. Answer in English
  2. Sources:
- If there's no resources, respond with "I don't know."
- This is important! Be concise and directly address the question.

Your have to inform response with sources:
"""
prompt = ChatPromptTemplate.from_template(template=template)

# llm chain, response
def get_response(question):
    llm = ChatOpenAI(model_name = "gpt-3.5-turbo", temperature=0)
    # print('llm completed')

    def format_docs(docs):
        # combined found docs
        return "\n\n".join(doc.page_content for doc in docs)

    rag_chain = (
        {"context": retriever | format_docs, "question": RunnablePassthrough()}
        | prompt
        | llm
        | StrOutputParser()
    )

    response = rag_chain.invoke(question)
    # print('chain execute')

    print(f"[HUMAN]\n{question}\n")
    print(f"[AI]\n{response}\n")
    print("===" * 20)

 

# FYI, pinecone으로 단순 similarity로 검색해오기

Pinecone은 유료인데 딕셔너리 형식으로 id와 벡터값이 pair로 있어야지만 upsert가 가능하다. 그래서 split 한 것 만큼 0부터 숫자를 먹여 id값을 주어 튜플형태로 데이터를 정리했다. 파인콘은 index를 생성하는 작업을 해야하고, 생성된 빈 index에 벡터값을 upsert 하는 형태가 된다. 

# used pinecone

# initial
def create_pinecone_index(index_name):
  if index_name not in pc.list_indexes().names():
    pc.create_index(
      name=index_name,
      dimension=128,
      metric="cosine",
      spec=ServerlessSpec(
        cloud="aws",
        region="us-east-1"
      )
    )

index_name = 'pcbella'
pc = Pinecone(api_key=PINECONE_API_KEY)
create_pinecone_index(index_name)

index = pc.Index(index_name)
index.describe_index_stats() # 들어가기 전 확인


# split
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
split = text_splitter.split_documents(documents)

# embedding
embeddings = OpenAIEmbeddings(model="text-embedding-3-large")
emb_data = embeddings.embed_query(cleaned_text)

# create dic to upsert pinecone index
def text_to_tuple(text, batch_size):
    tuples_list = []
    for i in range(0,len(text),batch_size):
        batch = text[i:i+batch_size]
        tuples_list.append((str(i), batch))
    return tuples_list
batch_size = 128
indexed_data = text_to_tuple(emb_data, batch_size)

# upsert
index.upsert(vectors=indexed_data)
index.describe_index_stats() # 들어갔나 확인

# Search
def search_similarity(query,k):
  emb_query = embeddings.embed_query(query)
  response = index.query(queries=[emb_query], top_k=k)
  return response

# Response
query = "What is CIF?"
k = 5
search_similarity(query,k)

 

 

 

728x90

댓글