RAG系统概述

RAG,即Retrieval-Augmented Generation,是一种结合了检索(Retrieval)和生成(Generation)的模型架构,用于处理复杂的自然语言处理任务。RAG系统通过检索相关信息并结合生成模型来提供准确的回答。

如何搭建RAG应用

搭建一个RAG应用涉及以下步骤:

  1. 选择模型:确定适合的预训练模型作为基础。

  2. 数据准备:收集和预处理数据,以适配模型训练。

  3. 微调模型:在特定任务上微调模型,以提高其性能。

  4. 部署应用:将训练好的模型部署到实际应用中。

开源RAG应用

目前,开源社区提供了一些RAG应用的实现,可以通过以下资源进行学习和使用:

  • Huggingface:提供了多种预训练模型和工具,便于快速搭建RAG系统。

  • GitHub:许多开发者在GitHub上分享了他们的RAG项目和代码,可以作为学习和参考的资源。

学习资源

  • 大模型学习资料整理:这篇文章详细介绍了如何从零开始学习大模型,并搭建个人或企业的RAG系统。

  • streamlit教程:介绍了如何使用streamlit快速构建交互式应用,对于搭建RAG系统的前端界面非常有帮助。

个人博客推荐

博主在个人博客上持续更新有关大模型学习、RAG系统搭建和评估优化的相关内容,值得关注。
在这里插入图片描述

检索增强生成(Retrieval Augmented Generation, RAG)

简介检索增强生成(RAG)是一种先进的技术,它通过结合大规模知识库的检索能力与大型语言模型(LLM)的生成能力,显著提升了AI在特定任务中的表现。RAG由Meta AI的研究人员于2020年提出,旨在解决LLM在信息滞后、模型幻觉、私有数据匮乏、内容不可追溯和长文本处理能力弱等问题。

RAG的优势

  1. 信息更新:RAG能够动态引入最新数据,提供时效性更强的回答。
  2. 事实准确性:通过检索到的原始资料,增强了生成内容的可信度。
  3. 私有数据整合:可以整合专属知识库,提供定制化服务。
  4. 内容可追溯:生成内容与原始资料的链接,提升了内容的可追溯性。
  5. 长文本处理:通过检索整合长文本信息,突破了输入长度的限制。

RAG的工作流程RAG系统的工作流程包括四个主要阶段:数据处理、检索、增强和生成。

数据处理阶段

  • 数据清洗:对原始数据进行清洗,去除多余换行和特殊符号。
  • 数据转换:将处理后的数据转换为检索模型可用的格式。
  • 数据存储:将数据存储在相应的数据库中。

检索阶段

  • 问题输入:将用户问题输入检索系统。
  • 信息检索:从数据库中检索出相关信息。

增强阶段

  • 信息处理:对检索到的信息进行进一步处理和增强。
  • 信息优化:优化信息以便于生成模型更好地理解和使用。

生成阶段

  • 信息输入:将增强后的信息输入到生成模型。
  • 内容生成:生成模型根据输入信息生成答案。

搭建RAG应用搭建RAG应用需要经过以下步骤:

数据处理

  • 加载文件:使用langchain下的document_loaders加载不同格式的文件。
  • 文本分块:选择合适的分块方法和大小,对文本进行分块处理。

总结RAG技术通过整合检索和生成,为LLM提供了一个强大的工具,使其能够访问专属知识库和最新数据,从而在生成响应时提供更准确、更新的信息。

1
import os from langchain_community.document_loaders import PyPDFLoader, Docx2txtLoader, UnstructuredFileLoader from langchain_text_splitters import RecursiveCharacterTextSplitter def load_document(file): """ 加载PDF、DOC、TXT文档 :param file: :return: """ name, extension = os.path.splitext(file) if extension == '.pdf': print(f'Loading {file}') loader = PyPDFLoader(file) elif extension == '.docx': print(f'Loading {file}') loader = Docx2txtLoader(file) elif extension == '.txt': loader = UnstructuredFileLoader(file) else: print('Document format is not supported!') return None data = loader.load() return data def chunk_data(data, chunk_size=256, chunk_overlap=150): """ 将数据分割成块 :param data: :param chunk_size: chunk块大小 :param chunk_overlap: 重叠部分大小 :return: """ text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, chunk_overlap=chunk_overlap) chunks = text_splitter.split_documents(data) return chunks

embedding模型存储

将分块后的文本,使用embedding模型持久化存储,目前常用的中文模型bge-large-zh-v1.5。持久化存储后,避免每次都去embedding一次,消耗很长的时间。下次使用时,直接加载模型就可以了。

1
import os from langchain_community.embeddings import HuggingFaceBgeEmbeddings, OpenAIEmbeddings from langchain_community.vectorstores import Chroma, FAISS def get_embedding(embedding_name): """ 根据embedding名称去加载embedding模型 :param embedding_name: 路径或者名称 :return: """ if embedding_name == "bge": embedding_path = os.environ[embedding_name] model_kwargs = {'device': 'cpu'} return HuggingFaceBgeEmbeddings(model_name=embedding_path, model_kwargs=model_kwargs) if embedding_name == "bce": return None # create embeddings using OpenAIEmbeddings() and save them in a Chroma vector store def create_embeddings_chroma(chunks): embeddings = OpenAIEmbeddings() vector_store = Chroma.from_documents(chunks, embeddings) # if you want to use a specific directory for chromadb # vector_store = Chroma.from_documents(chunks, embeddings, persist_directory='./mychroma_db') return vector_store def create_embeddings_faiss(vector_db_path, embedding_name, chunks): """ 使用FAISS向量数据库,并保存 :param vector_db_path: 向量 :param embedding_name: :param chunks: :return: """ embeddings = get_embedding(embedding_name) db = FAISS.from_documents(chunks, embeddings) if not os.path.isdir(vector_db_path): os.mkdir(vector_db_path) db.save_local(folder_path=vector_db_path) return db def load_embeddings_faiss(vector_db_path, embedding_name): """ 加载向量库 :param vector_db_path: :param embedding_name: :return: """ embeddings = get_embedding(embedding_name) db = FAISS.load_local(vector_db_path, embeddings, allow_dangerous_deserialization=True) return db

构建模型

采用了函数和类两种方式定义模型:

  • 函数:get_llm_model定义了基本的参数,model、prompt、temperature、max_tokens、n_ctx
  • 自定义类:
1
import os import sys import time from abc import ABC from langchain_core.callbacks import CallbackManagerForLLMRun from llama_cpp import Llama from langchain.llms.base import LLM from pydantic import Field from typing import Dict, Any, Mapping, Optional, List BASE_DIR = os.path.dirname(__file__) # PRJ_DIR上层目录 # PRJ_DIR = os.path.abspath(os.path.join(BASE_DIR, "..")) sys.path.append(BASE_DIR) def get_llm_model( prompt: str = None, model: str = None, temperature: float = 0.0, max_token: int = 2048, n_ctx: int = 512): """ 根据模型名称去加载模型,返回response数据 :param prompt: :param model: :param temperature: :param max_token: :param n_ctx: :return: """ if model in ['Qwen_q2']: model_path = os.environ[model] llm = Llama(model_path=model_path, n_ctx=n_ctx) start = time.time() response = llm.create_chat_completion( messages=[ { "role": "system", "content": "你是一个智能超级助手,请用专业的词语回答问题,整体上下文带有逻辑性,如果不知道,请不要乱说", }, { "role": "user", "content": "{}".format(prompt) }, ], temperature=temperature, max_tokens=max_token, stream=False ) cost = time.time() - start print(f"模型生成时间:{cost}") print(f"大模型回复:\n{response}") return response['choices'][0]['message']['content'] class QwenLLM(LLM): """ 自定义QwenLLM """ model_name: str = "Qwen_q2" # 访问时延上限 request_timeout: float = None # 温度系数 temperature: float = 0.1 # 窗口大小 n_ctx = 2048 # token大小 max_tokens = 1024 # 必备的可选参数 model_kwargs: Dict[str, Any] = Field(default_factory=dict) def _call(self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any): qwen_path = os.environ[self.model_name] print("qwen_path:", qwen_path) llm = Llama(model_path=qwen_path, n_ctx=self.n_ctx) response = llm.create_chat_completion( messages=[ { "role": "system", "content": "你是一个智能超级助手,请用[中文]专业的词语回答问题,整体上下文带有逻辑性,并以markdown格式输出", }, { "role": "user", "content": "{}".format(prompt) }, ], temperature=self.temperature, max_tokens=self.max_tokens, stream=False ) # prompt工程提示 # print(f"Qwen prompt: \n{prompt}") # response = lla( # prompt=prompt, # temperature=self.temperature, # max_tokens=self.max_tokens # ) print(f"Qwen response: \n{response}") # return response['choices'][0]['text'] return response['choices'][0]['message']['content'] @property def _llm_type(self) -> str: return "Llama3" # 定义一个返回默认参数的方法 @property def _default_params(self) -> Dict[str, Any]: """获取调用默认参数。""" normal_params = { "temperature": self.temperature, "request_timeout": self.request_timeout, "n_ctx": self.n_ctx, "max_tokens": self.max_tokens } # print(type(self.model_kwargs)) return {**normal_params} @property def _identifying_params(self) -> Mapping[str, Any]: """Get the identifying parameters.""" return {**{"model_name": self.model_name}, **self._default_params}

构建应用

1
import sys import streamlit as st import os from langchain.chains import RetrievalQA from langchain.chat_models import ChatOpenAI import tiktoken from dotenv import load_dotenv, find_dotenv from langchain_core.prompts import PromptTemplate BASE_DIR = os.path.dirname(__file__) PRJ_DIR = os.path.abspath(os.path.join(BASE_DIR, "..")) sys.path.append(PRJ_DIR) from streamlit_demo.custom_llm import QwenLLM from streamlit_demo.embedding_oper import create_embeddings_faiss, create_embeddings_chroma, load_embeddings_faiss from streamlit_demo.prepare_data import load_document, chunk_data _ = load_dotenv(find_dotenv(), override=True) vector_db_path = os.path.join(BASE_DIR, "vector_db") print(f"vector_db_path: {vector_db_path}") DEFAULT_TEMPLATE = """ 你是一个聪明的超级智能助手,请用专业且富有逻辑顺序的句子回复,并以中文形式且markdown形式输出。 检索到的信息: {context} 问题: {question} """ def ask_and_get_answer_from_local(model_name, vector_db, prompt, top_k=5): """ 从本地加载大模型 :param model_name: 模型名称 :param vector_db: :param prompt: :param top_k: :return: """ docs_and_scores = vector_db.similarity_search_with_score(prompt, k=top_k) print("docs_and_scores: ", docs_and_scores) # knowledge = [doc.page_content for doc in docs_and_scores] # print("检索到的知识:", knowledge) if model_name == "Qwen_q2": llm = QwenLLM(model_name=model_name, temperature=0.4) prompt_template = PromptTemplate(input_variables=["context", "question"], template=DEFAULT_TEMPLATE) retriever = vector_db.as_retriever(search_type='similarity', search_kwargs={'k': top_k}) chain = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever, chain_type_kwargs={"prompt": prompt_template}, return_source_documents=True) answer = chain({"query": prompt, "top_k": top_k}) print(f"answers: {answer}") # answer = chain.run(prompt) # answer = answer['choices'][0]['message']['content'] answer = answer['result'] return answer def ask_and_get_answer(vector_store, q, k=3): llm = ChatOpenAI(model='gpt-3.5-turbo', temperature=1) retriever = vector_store.as_retriever(search_type='similarity', search_kwargs={'k': k}) chain = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever) answer = chain.run(q) return answer # calculate embedding cost using tiktoken def calculate_embedding_cost(texts): enc = tiktoken.encoding_for_model('text-embedding-ada-002') total_tokens = sum([len(enc.encode(page.page_content)) for page in texts]) # print(f'Total Tokens: {total_tokens}') # print(f'Embedding Cost in USD: {total_tokens / 1000 * 0.0004:.6f}') return total_tokens, total_tokens / 1000 * 0.0004 # clear the chat history from streamlit session state def clear_history(): if 'history' in st.session_state: del st.session_state['history'] if __name__ == "__main__": # st.image('img.png') st.subheader('LLM Question-Answering Application 🤖') with st.sidebar: # text_input for the OpenAI API key (alternative to python-dotenv and .env) api_key = st.text_input('OpenAI API Key:', type='password') if api_key: os.environ['OPENAI_API_KEY'] = api_key llm = st.selectbox( label="请选择本地大模型", options=('Qwen_q2', 'Qwen_q6') ) # 向量数据库 embedding = st.selectbox( "请选择向量数据库", ('FAISS', 'Chroma') ) # file uploader widget uploaded_file = st.file_uploader('上传文件:', type=['pdf', 'docx', 'txt']) # chunk size number widget chunk_size = st.number_input('chunk_size:', min_value=100, max_value=2048, value=512, on_change=clear_history) # chunk overlap chunk_overlap = st.number_input(label="chunk_overlap", min_value=0, max_value=1024, value=150, on_change=clear_history) # k number input widget k = st.number_input('top_k', min_value=1, max_value=20, value=3, on_change=clear_history) # add data button widget add_data = st.button('添加数据', on_click=clear_history) # 输出方式 output_type = st.selectbox("选择输出方式", ('普通输出', '流式输出')) if uploaded_file and add_data: # if the user browsed a file with st.spinner('Reading, chunking and embedding file ...'): # writing the file from RAM to the current directory on disk bytes_data = uploaded_file.read() file_name = os.path.join('./', uploaded_file.name) with open(file_name, 'wb') as f: f.write(bytes_data) data = load_document(file_name) chunks = chunk_data(data, chunk_size=chunk_size, chunk_overlap=chunk_overlap) st.write(f'Chunk size: {chunk_size}, chunk_overlap: {len(chunks)} Chunks: {len(chunks)}') tokens, embedding_cost = calculate_embedding_cost(chunks) st.write(f'Embedding cost: ${embedding_cost:.4f}') # creating the embeddings and returning the Chroma vector store # 指定选择向量库和embedding类型,还可改进 if embedding == "FAISS": vector_store = create_embeddings_faiss(vector_db_path=vector_db_path, embedding_name="bge", chunks=chunks) elif embedding == "Chroma": vector_store = create_embeddings_chroma(chunks) # saving the vector store in the streamlit session state (to be persistent between reruns) st.session_state.vs = vector_store st.success('File uploaded, chunked and embedded successfully.') # 初始化history if "messages" not in st.session_state: st.session_state.messages = [] # 展示对话 for msg in st.session_state.messages: with st.chat_message(msg['role']): st.markdown(msg["content"]) # React to user input if prompt := st.chat_input("Say something"): # Display user message in chat message container with st.chat_message("user"): st.markdown(prompt) # Add user message to chat history st.session_state.messages.append({"role": "user", "content": prompt}) # load local vector db if 'vs' not in st.session_state: # st.warning(body='正在努力加载模型中...', icon="⚠️") vector_store = load_embeddings_faiss(vector_db_path, "bge") st.session_state.vs = vector_store st.toast('Load vector store db success!', icon='😍') # 普通方式输出 if prompt is not None: vector_store = st.session_state.vs # if vector_store is None: # st.warning(body='正在努力加载模型中,稍后再试', icon="⚠️") if output_type == "普通输出" and vector_store is not None: response = "" if llm == "GPT": response = ask_and_get_answer(vector_store, prompt, k) elif llm == "Qwen_q2": response = ask_and_get_answer_from_local(model_name="Qwen_q2", vector_db=vector_store, prompt=prompt, top_k=k) # Display assistant response in chat message container with st.chat_message("assistant"): st.markdown(response) # Add assistant response to chat history st.session_state.messages.append({"role": "assistant", "content": response}) else: # 流式输出 # stream_res = get_llm_model_with_stream(prompt=prompt, model="Qwen_q2") # with st.chat_message("assistant"): # content = st.write_stream(stream_res) # print("流式输出:", content) # st.session_state.messages.append({"role": "assistant", "content": content}) print("流式输出") # run the app: streamlit run ./chat_doc.py

结果展示

使用步骤:

  1. 选择参数,然后上传本地的文件
  2. 开始添加数据,用于数据处理和embedding持久化存储
    在这里插入图片描述

在这里插入图片描述

开源的RAG应用

以下是一些开源的RAG(Retrieval-Augmented Generation)应用,它们利用了检索增强生成模型来提高应用的智能性和效率。

QAnything

  • GitHub 链接: QAnything

  • 简介: QAnything 是一个开源项目,旨在通过RAG技术提升问答系统的准确性和效率。

AnythingLLM

  • GitHub 链接: AnythingLLM

  • 简介: AnythingLLM 是一个全功能的桌面和Docker AI应用程序,具备完整的RAG和AI代理功能。

ragflow

  • GitHub 链接: ragflow

  • 简介: ragflow 是一个开源项目,提供了自动化工作流、代码空间、GitHub Copilot等丰富的功能。 这些项目都为开发者提供了强大的工具和平台,以利用RAG技术构建更智能的应用程序。