使用自定义工具构建和服务 RAG 代理:完整指南
- Rifx.Online
- Programming , Generative AI , Data Science
- 14 Jan, 2025
目标
本文的目标是演示如何使用 LangGraph 和 LangChain 创建一个大型语言模型(LLM)代理,该代理将在一组文档上执行检索增强生成(RAG)。此外,我们将探讨如何构建一个工具,以便进行 API 调用,从而使 LLM 能够从外部来源获取实时知识。最后,我们将使用 Flask API(FastAPI)在本地提供此代理,并使用本地 PostgreSQL 数据库服务器存储与 LLM 的聊天记录。
注意:LLM 可能已经在维基百科文章和其他来源上进行了训练,因此具备一些关于金融的内在知识。然而,我们在此上下文中使用它们是为了有效地说明如何创建 RAG 工具。
什么是工具?
大型语言模型(LLMs)虽然强大,但受限于其训练时所依赖的知识。为了让LLM能够响应:
- LLM未见过的事件(例如,发生在其训练期之后的事件),或
- LLM未公开或之前未见过的文档数据,
我们需要为其提供工具。
工具是我们提供给LLM的功能。这些工具可以从简单的实用程序(如进行数学计算)到复杂的操作(如调用API和使用外部资源生成响应)不等。对工具使用的适当文档能够为LLM提供关于何时以及为何使用该工具的上下文,以及它所需的参数。
在本教程中,我们将创建以下两个工具并将其绑定到OpenAI的gpt-4o-mini
LLM:
- RAG工具:该工具将使用关于金融的维基百科文章进行检索增强生成。
- 股市趋势工具:该工具将使用Finnhub API在指定日期范围内检索给定股票代码的新闻文章。它将帮助回答查询并解释股市趋势。
注意:要使用OpenAI模型,您需要一个API密钥,可以通过在OpenAI平台上注册并请求一个来获得。一些OpenAI模型可能需要付费。或者,您可以探索其他选项,如Mistral、Llama和Cohere,这些选项在使用您的数据训练其模型的情况下提供对某些LLM的免费访问。
RAG工具
细分
RAG,或称为检索增强生成,是一种人工智能框架或架构,专注于为大型语言模型(LLM)提供外部知识来源。当我们需要向LLM提供未经过训练的数据时,这种技术特别有用。
让我们讨论一下这个工具的每个组件:
- 知识库(维基百科文章)
知识库由用于为LLM提供额外知识的文档组成。在实际应用中,这可能包括年度报告、公司相关文档或与任务相关的任何领域特定数据。 - 文档加载器
文档加载器负责从知识库加载数据。该组件提供标准接口以加载任何类型的数据,并以标准化的方式格式化,以便LangChain进行处理。 - 分割器(分块)
加载的文档通常由大文本和较小部分组成,只有某些部分与查询相关。为了提高准确性并确保LLM仅处理相关部分,我们将文档分割(分块)为更小的部分。分块有几种方法可供选择:
a. 基于字符的文本分割:根据您定义的分隔符分割文档。
b. 递归字符分割:通过保持段落完整(如果在分块大小之内)并确保同一分块内的句子不溢出到下一个分块来尝试保持结构。
c. 基于文档结构的分割:对于逻辑结构清晰的文档(如HTML或XML),标签定义了分割。
d. 基于语义意义的分割:(我们将使用的方法)根据文本的语义意义进行分割。这种方法确保分块根据内容相关性而非字符或逻辑结构进行分离,因此在文本包含主题变化时非常理想。 - 嵌入层
一旦文档被分块,文本将被转换为数值表示,称为嵌入,LLM可以理解。嵌入层将把分块映射或*“嵌入”*为数值。我们将使用Hugging Face Transformers库来生成嵌入,但会依赖于LangChain核心库中更方便的HuggingFaceEmbeddings
类。 - 向量存储
在为文档块创建嵌入后,我们将它们存储在向量存储中。类似于关系数据库存储和索引表格数据,向量存储经过优化以将嵌入存储为向量并执行语义搜索。语义搜索通过将查询嵌入为向量并计算其与向量存储中存储的向量的相似性来工作。
向量存储可以从简单的使用计算机内存的向量存储到云上的分布式向量存储(如Pinecone)不等。
在本教程中,我们将使用FAISS。FAISS在实时检索向量方面表现相当不错。FAISS可以通过压缩大型向量来优化内存存储。您可以在这里了解更多关于低内存使用的信息。 - 存储在磁盘上的索引
计算嵌入是时间密集型的,并且由于向量存储驻留在内存中,因此在意外关闭期间存在数据丢失的风险。为了确保嵌入在系统会话之间持久化,我们将本地将向量存储索引保存到磁盘,以便在重启后可以重新使用。
Finnhub API 工具
Finnhub.io 提供了一种股票 API,具有从查询股票价格到股票市场新闻文章的一系列端点。我们将使用他们的 公司新闻 端点,传递股票代码和一个日期范围,以获取与该时间段相关的文章。我们将使用 Finnhub 提供的 API 密钥向其 API 发送请求。
什么是代理?
代理是旨在承担高级任务并利用LLM作为推理引擎的系统。它们依赖于LLM根据特定任务决定下一步行动。LangChain建议使用LangGraph来构建代理,因为它允许创建类似图形的结构来定义代理内部的控制流。
LangGraph 中图代理的核心概念:
- 节点节点表示代理图中的顶点。在 LangGraph 中,节点本质上是包含代理逻辑的 Python 函数。
- 状态状态指的是应用程序的当前快照或图执行过程中的检查点。
- 边边定义了代理图中的逻辑流。不同的边类型适用于各种用例: 普通边:表示节点之间的固定或无条件过渡。 条件边:根据函数的输出表示向不同节点的过渡。
教程中的代理工作流程
在本教程中,我们将定义一个代理,该代理执行以下步骤:
- 接收用户的查询。
- 确定是否需要工具,并决定使用哪个工具。
- 使用所选工具的响应来调用之前定义的LLM,或者如果未选择工具,则使用LLM的响应。
- 将响应返回给用户。
- 通过将消息存储在Postgres数据库中来持久化对话(在“服务代理”部分中解释)。
开发代理
库
- langchain我们将使用核心的 LangChain 库来将组件链在一起并构建 RAG 架构。它还将用于创建 Finnhub API 工具。
a. langchain 库使用一个称为 Runnable 的基本单元。
b. Runnable 抽象并封装了访问 LLM 应用架构基本组件(例如,LLMs、向量存储等)所需的实际代码。
c. 每个 Runnable 单元具有一致的接口,使其能够以相同的功能被调用、批处理、流式传输等。 - langchain-openai 和 langchain-community这些库帮助将第三方 API 作为 Runnables 集成到 LangChain 中,确保所有组件无缝协作。可以将 LangChain 视为中介,简化对 API 的访问,而无需您处理不同 API 方法的复杂性。
- langgraphLangGraph 让您为定义代理中的控制流创建图形结构。
- unstructured在加载文档时使用,unstructured 是在 langchain 中使用文档加载器的库要求。
- faiss-gpu我们将使用 FAISS(Facebook AI 相似性搜索)作为我们的向量存储。如果 GPU 内存不可用,您可以改用 faiss-cpu 以利用 CPU 内存。
开发环境
我们将在 Python 笔记本中开发代理,以实现互动和迭代的开发过程。
步骤 0. 安装依赖
我们将安装开发阶段所需的所有 Python 依赖项。
%pip install --quiet --upgrade langchain langchain-openai langchain-community unstructured faiss-gpu python-dotenv
设置环境变量(仅用于开发目的):
import getpass
import os
os.environ["OPENAI_API_KEY"] = getpass.getpass()
os.environ["FINNHUB_API_KEY"] = getpass.getpass()
使用 getpass 可以让您输入 API 密钥的输入字段。
RAG 工具开发
第一步. 文档加载器
在本教程中,我们将使用存储在 links.txt
文件中的预定义 URL 列表加载维基百科文章。注意:如果您想从本地目录加载文档而不是 URL,可以使用 DirectoryLoader
with open('links.txt', 'r') as f:
links = f.readlines()
## Remove duplicates and newline characters from each link
links = list(set([link.strip() for link in links]))
我们将使用 langchain_community
库中的 WebBaseLoader 从提供的链接加载文档。
from langchain_community.document_loaders import WebBaseLoader
## Initialize the WebBaseLoader with the list of links
wikipedia_loader = WebBaseLoader(links)
## Load the documents
wikipedia_docs = wikipedia_loader.load()
第2步. 分块
在本教程中,我们将使用Sentence Transformers库将文本拆分成块。SentenceTransformersTokenTextSplitter
根据文档的语义意义进行拆分,确保生成的块具有意义并且在上下文中相关。
工作原理
- 分割器使用
sentence-transformers/all-mpnet-base-v2
模型(或您选择的其他模型)以句子级别理解文档内容。 - 文档通过模型处理,生成表示文本含义的 tokens。
- 基于句子级别的相似性,tokens 要么被分组在一起,要么被分离成不同的块。
- 最后,tokens 被解码回原始文本,保持其上下文的完整性。
from langchain_text_splitters import SentenceTransformersTokenTextSplitter
text_splitter = SentenceTransformersTokenTextSplitter(
chunk_size=1000,
chunk_overlap=200,
length_function=len,
)
splits = text_splitter.split_documents(wikipedia_docs)
第3步. 嵌入层
我们将使用 all-MiniLM-L6-v2
模型为文档块生成嵌入。这个过程类似于我们之前讨论的句子变换器,其中文本被标记化。这些标记表示文档的含义,并帮助 LLM 高效地检索相关块。
关键细节
- 填充标记: 在嵌入文本时,我们添加填充标记以确保所有输入具有一致的大小,即使某些文本块比其他文本块短。
all-MiniLM-L6-v2
模型的输入大小限制为 256 tokens。- 填充确保一致性,并避免因输入长度不一而导致的错误。
from langchain.embeddings import HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
embeddings.client.tokenizer.add_special_tokens({'pad_token': '[PAD]'})
第 4 步. 向量存储
我们将使用 langchain_community.vectorstores.FAISS 库来使用 FAISS 作为我们的向量存储。
from langchain_community.vectorstores import FAISS
db = FAISS.from_documents(splits, embedding=embeddings)
第 5 步. 本地保存索引
现在我们已经创建了基于分割/块的向量存储,并使用了之前定义的嵌入,我们将本地保存索引。
db.save_local("faiss_index")
第6步. 从磁盘加载索引
当我们需要在应用程序中使用本地保存的索引时(当我们想要提供应用程序时),我们需要再次定义我们使用的嵌入。我们可以如下加载它:
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
embeddings.client.tokenizer.add_special_tokens({'pad_token': '[PAD]'})
db = FAISS.load_local(
"faiss_index", embeddings, allow_dangerous_deserialization=True
)
第7步. 创建检索工具
要创建RAG工具,第一步是定义一个检索对象。该对象将决定检索相关片段的策略。例如,我们可以选择根据相似性检索前K个结果,或者设置一个相似性阈值,仅返回超过该阈值的片段。在本教程中,我们将保持简单,使用基于相似性的前片段匹配。
retriever = db.as_retriever(
search_type="similarity", search_kwargs={"k": 1})
我们使用检索对象创建一个检索工具,该工具将在我们的代理中使用。创建该工具的参数如下:
- 检索对象(第一个参数)。
- 工具名称(第二个参数)。
- 工具描述(第三个参数)。
retriever_tool = create_retriever_tool(
retriever,
"explain_financial_terms",
"Explain financial terms in the query",)
这就结束了RAG工具的开发部分。
Finnhub API 工具开发
第8步. Finnhub API工具
首先,让我们创建一个函数,向Finnhub API的公司新闻端点发送GET请求。
def news_helper(symbol: str, start_date: str, last_date: str):
API_KEY = os.environ["FINNHUB_API_KEY"]
API_ENDPOINT = "https://finnhub.io/api/v1/company-news"
queryString = f"{API_ENDPOINT}?symbol={symbol}&from={start_date}&to={last_date}&token={API_KEY}"
# Send the search query to the Search API
response = requests.get(queryString)
# Read the response
articles = response.json()[-5:]
summaries = [article["summary"] for article in articles]
return ",".join(summaries)
LangChain提供了一种简单的方法来使用@tool
装饰器实现工具。通过添加这个装饰器,函数变成了一个StructuredTool
对象。下面,我们定义了search_news_for_symbol
工具,其中包含有关何时使用该工具以及预期输入和输出的文档。news_helper
函数在search_news_for_symbol
中被调用,以向Finnhub API端点发送API请求。
from langchain_core.tools import tool
@tool
def search_news_for_symbol(symbol: str, start_date: str, last_date: str) -> str:
"""在给定股票代码的时间段内搜索新闻文章。例如:NVDA、MSFT、TSLA等。
Args:
symbol: 要搜索的股票代码。
start_date: 搜索的开始日期。
last_date: 搜索的结束日期。
Returns:
包含新闻文章的字符串。
"""
company_news = news_helper(symbol=symbol, start_date=start_date, last_date=last_date)
return company_news
当你打印search_news_for_symbol
工具时,你会看到它被定义为一个StructuredTool
,并具有一个func
属性,该属性引用search_news_for_symbol
函数(包含代码的Python函数)。这完成了Finnhub API工具的创建。
第9步. 将工具绑定到LLM
LLM 和工具
最后,我们来到了文章的 LLM 部分。在这一步中,我们将定义 LangChain OpenAI 聊天类,并将工具绑定到 LLM。通过绑定工具,我们允许 LLM 在需要时访问和利用它们。
我们创建一个包含工具名称的列表——具体来说是 Finnhub API 工具 和 RAG 工具。这样,LLM 就知道何时使用这些工具来完成特定任务。
from langchain_openai import ChatOpenAI
tools = [retriever_tool, search_news_for_symbol]
llm = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)
代理开发
使用之前定义的工具,我们现在将创建一个代理。如前所述,我们将定义一个包含状态、节点和边的图。
StateGraph
类是 LangGraph 中用于为代理创建图的主要类。它使用 State 作为变量来跟踪当前代理环境。例如,它可以使用 MessagesState 类,该类有助于跟踪代理中的所有消息。
from langgraph.graph import StateGraph, MessagesState
workflow = StateGraph(MessagesState)
我们将定义的第一个节点将处理发送用户询问的查询以及之前的消息。它还将包含在处理查询时来自工具的消息。
def llm_node(state: MessagesState):
messages = state['messages']
response = llm.invoke(messages)
# We return a list, because this will get added to the existing list
return {"messages": [response]}
接下来,我们将定义一个 ToolNode,它本质上是一个 Runnable(来自 LangChain)。它接收消息作为输入,并返回来自工具的消息。
from langgraph.prebuilt import ToolNode
tool_node = ToolNode(tools) # tools: list of tools we defined earlier
现在,我们将把 llm_node 和 tool_node 添加到图中。
workflow.add_node("llm_node", llm_node) # agent
workflow.add_node("tools", tool_node)
我们将定义一个 should_continue
函数,该函数将负责在图中进行路由。如果状态中的最后一条消息表明 LLM 应该调用一个工具,我们将把图路由到 tools 节点。如果最后一条消息没有调用任何工具,我们将路由到 END 节点以停止图并返回响应。
from langgraph.graph import END
def should_continue(state: MessagesState) -> Literal["tools", END]:
messages = state['messages']
last_message = messages[-1]
# If the LLM makes a tool call, then we route to the "tools" node
if last_message.tool_calls:
return "tools"
# Otherwise, we stop (reply to the user)
return END
我们现在已经添加了节点,但图中仍然缺少节点之间的边。我们将逐步添加边,从图的起始状态 (START) 开始,该状态将接收输入。添加边时,第一个参数是 FROM 节点,第二个参数是 TO 节点。
from langgraph.graph import START
workflow.add_edge(START, "llm_node")
接下来,我们将添加一个 条件边,其中 FROM 节点是固定的,但 TO 节点取决于 should_continue
函数的输出。
workflow.add_conditional_edges(
"llm_node",
should_continue,
)
现在,我们从 llm_node 到 tools 节点有了一条条件边,具体取决于 should_continue
函数的输出。我们还需要添加一条从 tools 节点返回到 llm_node 的边。
workflow.add_edge("tools", 'llm_node')
我们已经定义了节点和边,并且我们几乎准备好编译图了。在最后一步之前,让我们定义一个地方来存储图的状态,或“检查点”它。这有助于我们跟踪代理的对话和消息历史。根据所使用的存储类型,消息的持续时间可以从短暂到永久存储。
现在,我们可以使用 MemorySaver 作为一种快速简便的方法,在我们开发代理时将消息存储在内存中。然而,为了确保在清除内存时不会丢失消息,我们将在提供应用程序时将其存储在像 Postgres 这样的数据库中。
## In Memory store
checkpointer = MemorySaver()
现在我们已经定义了图和检查点,让我们编译它。在编译图时传递 checkpointer 对象。运行图上的 compile 函数后,您将能够可视化图。就是这样!代理图已准备好调用。
graph = workflow.compile(checkpointer=checkpointer)
graph
要调用图,我们可以使用用户提示。我们将提示作为 HumanMessage 对象传递,这有助于 LLM 跟踪对话并理解消息的来源。
from langchain_core.messages import HumanMessage
prompt = "解释期权交易"
final_state = graph.invoke(
{"messages": [HumanMessage(content=prompt)]},
config={"configurable": {"thread_id": 42}}
)
要更详细地了解代理中发生的转换,您可以打印 final_state 中的每条消息。
for message in final_state["messages"]:
print(message)
Output:
content='解释期权交易' additional_kwargs={} response_metadata={} id='b3ca2cc4-9604-4139-8b6f-e4105329d65a'
content='' additional_kwargs={'tool_calls': [{'id': 'call_tAJd7qTJxgeHNqaeiz5KKAyg', 'function': {'arguments': '{"query":"期权交易"}', 'name': 'explain_financial_terms'}, 'type': 'function'}], 'refusal': None} response_metadata={'token_usage': {'completion_tokens': 19, 'prompt_tokens': 149, 'total_tokens': 168, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_0aa8d3e20b', 'finish_reason': 'tool_calls', 'logprobs': None} id='run-f4080ffb-4449-4837-921e-2c5a3604f16c-0' tool_calls=[{'name': 'explain_financial_terms', 'args': {'query': '期权交易'}, 'id': 'call_tAJd7qTJxgeHNqaeiz5KKAyg', 'type': 'tool_call'}] usage_metadata={'input_tokens': 149, 'output_tokens': 19, 'total_tokens': 168, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}}
content='8 ] 对期权定价理论作出了重要改进。费舍尔·布莱克和迈伦·斯科尔斯在1968年证明,动态调整投资组合消除了证券的预期收益,从而发明了风险中性论点。[9][10]他们的思考基于市场研究人员和实践者之前的工作,包括上述提到的工作,以及谢恩·卡索夫和爱德华·O·索普的工作。布莱克和斯科尔斯随后试图将公式应用于市场,但因缺乏风险管理而遭受财务损失。1970年,他们决定重返学术界。[11]经过三年的努力,这一公式——因其公开而以他们的名字命名——于1973年在《政治经济学杂志》上发表,标题为“期权和公司负债的定价”。[12][13][14]罗伯特·C·默顿是第一个发表论文扩展期权定价模型数学理解的人,并创造了“布莱克-斯科尔斯期权定价模型”这一术语。该公式引发了期权交易的繁荣,并为芝加哥期权交易所及世界各地其他期权市场的活动提供了数学合法性。[15]默顿和斯科尔斯因其工作获得了1997年诺贝尔经济学奖,委员会指出他们发现的风险中性动态修订是将期权与基础证券风险分开的突破。[16]尽管因1995年去世而不符合获奖资格,但布莱克被瑞典学院提及为贡献者。[17]布莱克-斯科尔斯模型假设市场由至少一个风险资产(通常称为股票)和一个无风险资产(通常称为货币市场、现金或债券)组成。关于资产的以下假设(与资产名称相关)' name='explain_financial_terms' id='bcf9ffc0-5a04-41e7-b95f-2acb1006eba8' tool_call_id='call_tAJd7qTJxgeHNqaeiz5KKAyg'
content='期权交易涉及期权合约的买卖,期权合约是金融衍生品,赋予买方在特定到期日之前或在特定到期日以预定价格(执行价格)买入或卖出基础资产的权利,但没有义务。\n\n### 期权交易的关键概念:\n\n1. **期权合约**:主要有两种类型的期权合约:\n - **看涨期权**:赋予持有者以执行价格购买基础资产的权利。\n - **看跌期权**:赋予持有者以执行价格出售基础资产的权利。\n\n2. **执行价格**:可以以该价格买入或卖出基础资产的预定价格。\n\n3. **到期日**:如果未行使,期权合约在此日期将失效。\n\n4. **期权费**:购买期权所支付的价格,对买方而言是成本,对卖方(写手)而言是收入。\n\n5. **基础资产**:期权合约所基于的金融工具(例如,股票、商品、指数)。\n\n### 定价模型:\n布莱克-斯科尔斯模型是定价期权的最著名方法之一。由费舍尔·布莱克、迈伦·斯科尔斯和罗伯特·默顿开发,提供了一种数学公式,根据基础资产价格、执行价格、到期时间和波动性等各种因素来确定期权的公允价格。\n\n### 期权的用途:\n期权交易可用于多种目的,包括:\n- **对冲**:保护投资免受潜在损失。\n- **投机**:押注资产未来价格的变动以获取利润。\n- **收入生成**:写期权以收取期权费。\n\n### 风险:\n期权交易可能存在风险,尤其对经验不足的交易者而言。潜在损失可能相当可观,特别是在交易策略涉及杠杆或复杂头寸时。\n\n总体而言,期权交易是一种复杂的金融实践,需要对市场动态、定价模型和风险管理策略有良好的理解。' additional_kwargs={'refusal': None} response_metadata={'token_usage': {'completion_tokens': 415, 'prompt_tokens': 585, 'total_tokens': 1000, 'completion_tokens_details': {'accepted_prediction_tokens': 0, 'audio_tokens': 0, 'reasoning_tokens': 0, 'rejected_prediction_tokens': 0}, 'prompt_tokens_details': {'audio_tokens': 0, 'cached_tokens': 0}}, 'model_name': 'gpt-4o-mini-2024-07-18', 'system_fingerprint': 'fp_0aa8d3e20b', 'finish_reason': 'stop', 'logprobs': None} id='run-cef2ef0b-d6b6-4c4d-882f-a15537f9ce99-0' usage_metadata={'input_tokens': 585, 'output_tokens': 415, 'total_tokens': 1000, 'input_token_details': {'audio': 0, 'cache_read': 0}, 'output_token_details': {'audio': 0, 'reasoning': 0}}
服务代理
在服务代理时,我们将超越 Python 笔记本,专注于如何使用 REST API 服务代理。我们将通过将构建和编译代理的步骤与工具本身分开来构建项目结构。我们将使用 PostgresDB 来确保聊天记录的持久性,而不是使用内存检查点。以下是我们将使用的目录结构:
.
├── agent.py
├── faiss_index
│ ├── index.faiss
│ └── index.pkl
├── main.py
├── requirements.txt
└── utils
├── __init__.py
├── nodes.py
├── state.py
└── tools.py
3 directories, 9 files
目录结构说明:
- agent.py:包含向图中添加节点、边以及检查点的步骤。
- faiss_index:包含 FAISS 索引文件(
index.faiss
和index.pkl
)。 - utils:包含定义节点、状态(如适用)和工具的文件。
- main.py:定义 API 路由并处理 Postgres 数据库连接池,同时调用代理。
- requirements.txt:列出用于服务代理的依赖项。
步骤 0. 设置 PostgresDB
您可以参考任何关于如何在 PostgresDB 中设置数据库的教程。我们只需要数据库的连接字符串。在我的情况下,连接字符串如下:
"postgresql://username:password@localhost:5433/DatabaseName?sslmode=disable"
第一步. 安装依赖项并设置环境变量
我们将为在服务时使用的包创建一个 requirements.txt 文件。除了之前的包外,我们还有一些新的包将会使用:
- fastapi[standard]: 我们将使用 fastapi,这是创建 Flask API 的快速方法,并且足够适合我们当前的用例。
- psycopg: Python 的 PostgreSQL 适配器。将帮助我们以编程方式连接到 PostgresDB。
- psycopg-pool: 此包将创建一个与 PostgresDB 的连接池。连接池有助于维护连接并重用它,而不是打开和删除连接。
- langgraph-checkpoint-postgres: Langgraph 对 PostgresDB 的 Checkpointer 类的实现。
您的 requirements.txt 将如下所示:
langchain
langchain-openai
langchain-community
unstructured
langgraph
faiss-cpu
sentence-transformers
fastapi[standard]
psycopg
psycopg-pool
langgraph-checkpoint-postgres
要在本地加载环境变量,我们可以使用 load_dotenv
Python 库。这更适合本地开发,但对于像 Heroku 这样的托管服务,请按照服务的说明设置环境变量。
在根目录中创建一个 .env
文件,内容如下:
FINNHUB_API_KEY=c3****
OPENAI_API_KEY=sk****
DB_URI="postgresql://username:password@localhost:5433/DatabaseName?sslmode=disable"
在 main.py 文件中,要从 .env 文件加载变量,请使用以下代码:
from dotenv import load_dotenv
load_dotenv()
第2步. 将FAISS索引复制到根目录
将之前保存的包含FAISS索引的文件夹复制到根目录。该文件夹应包括index.faiss
和index.pkl
文件。
第 3 步. 复制 utils 子目录中的节点和工具
在 utils/tools.py
中,定义工具如下:
from langchain_core.tools import tool
import requests
import os
from langchain.tools.retriever import create_retriever_tool
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
def get_retriever_tool():
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
embeddings.client.tokenizer.add_special_tokens({'pad_token': '[PAD]'})
db = FAISS.load_local(
"faiss_index", embeddings, allow_dangerous_deserialization=True
)
retriever = db.as_retriever(
search_type="similarity", search_kwargs={"k": 1})
retriever_tool = create_retriever_tool(
retriever,
"explain_financial_terms",
"Explain financial terms in the query",)
return retriever_tool
def news_helper(symbol: str, start_date: str, last_date: str):
# "c3smgt2ad3ide69e4jtg"
FINNHUB_API_KEY = os.environ["FINNHUB_API_KEY"]
API_ENDPOINT = "https://finnhub.io/api/v1/company-news"
queryString = f"{API_ENDPOINT}?symbol={symbol}&from={start_date}&to={last_date}&token={FINNHUB_API_KEY}"
# Send the search query to the Search API
response = requests.get(queryString)
# Read the response
articles = response.json()[-5:]
summaries = [article["summary"] for article in articles]
return ",".join(summaries)
@tool
def search_news_for_symbol(symbol: str, start_date: str, last_date: str) -> str:
"""在给定的股票代码的时间段内搜索新闻文章。例如:NVDA、MSFT、TSLA 等。
参数:
symbol: 要搜索的股票代码。
start_date: 搜索的开始日期。
last_date: 搜索的结束日期。
返回:
包含新闻文章的字符串。
"""
company_news = news_helper(
symbol=symbol, start_date=start_date, last_date=last_date)
return company_news
def get_tools():
return [get_retriever_tool(), search_news_for_symbol]
如您所见,大部分代码直接来自开发阶段。我们在 get_retriever_tool() 中添加了创建检索工具的过程,该过程只调用一次。我们有 get_tools 函数从 tools.py 文件中返回工具列表。
在 utils/nodes.py
中,定义节点:
from .tools import get_tools
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langgraph.graph import END, MessagesState
from typing import Literal
tools = get_tools()
llm = ChatOpenAI(model="gpt-4o-mini").bind_tools(tools)
def agent(state: MessagesState):
messages = state['messages']
response = llm.invoke(messages)
return {"messages": [response]}
tool_node = ToolNode(tools)
def should_continue(state: MessagesState) -> Literal["tools", END]:
messages = state['messages']
last_message = messages[-1]
# 如果 LLM 发起工具调用,则我们路由到 "tools" 节点
if last_message.tool_calls:
return "tools"
# 否则,我们停止(回复用户)
return END
我们在这个文件中将工具绑定到 LLM。我们像在开发阶段那样定义节点。
第4步. 添加图形创建和编译步骤
在 agent.py
中,定义图形创建和编译步骤:
from utils.nodes import agent, tool_node, should_continue
from langgraph.graph import START, StateGraph, MessagesState
def get_graph(checkpointer):
workflow = StateGraph(MessagesState)
workflow.add_node("agent", agent) # agent
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
"agent",
should_continue,
)
workflow.add_edge("tools", 'agent')
graph = workflow.compile(checkpointer=checkpointer)
return graph
注意:我们从 main.py 将一个 checkpointer 传递到 agent.py
第5步:设置API
main.py:
from dotenv import load_dotenv
load_dotenv()
from fastapi import FastAPI
from agent import get_graph
from langchain_core.messages import HumanMessage
from psycopg_pool import ConnectionPool
import os
from langgraph.checkpoint.postgres import PostgresSaver
app = FastAPI()
connection_kwargs = {
"autocommit": True,
"prepare_threshold": 0,
}
pool = ConnectionPool(
# 示例配置
conninfo=os.environ['DB_URI'],
max_size=20,
kwargs=connection_kwargs,
)
@app.get("/")
def query_llm(query: str) -> str:
checkpointer = PostgresSaver(pool)
# checkpointer = MemorySaver()
checkpointer.setup()
graph = get_graph(checkpointer)
final_state = graph.invoke(
{"messages": [HumanMessage(content=query)]},
config={"configurable": {"thread_id": "1"}}
)
last_message = final_state['messages'][-1].content
return last_message
@app.on_event("shutdown")
async def shutdown_event():
pool.close()
- 首先,我们使用python-dotenv库加载环境变量。我们还导入了服务API所需的所有依赖项。
- 我们调用FastAPI类并将其存储在app变量中。
- 然后,我们使用Postgres数据库创建一个ConnectionPool。我们使用存储在“DB_URI”环境变量中的连接字符串。
- 我们在“/”路由上定义GET方法并添加一个函数。
- 在query_llm()中,我们将查询作为请求中的参数。我们定义checkpointer并进行设置。我们从agent.py中的get_graph()获取图形。
- 我们使用查询调用图形并返回最终状态的最后一条消息,即响应。
- 在文件末尾,我们添加了一个应用程序关闭事件。此函数在API关闭时运行。在这里,我们将关闭与Postgres数据库的连接池。
您可以使用以下命令运行API:
fastapi dev main.py
API默认在localhost:8000上提供服务,您可以使用Postman等工具向API发送请求,或者在localhost:8000/docs上使用FastAPI文档进行测试。以下是一个查询示例:
"Explain NVDA trend from 01/01/2024 to 01/01/2025"
您还可以使用curl如下进行上述查询:
curl -X 'GET' \
'http://127.0.0.1:8000/?query=Explain%20NVDA%20trend%20from%2001%2F01%2F2024%20to%2001%2F01%2F2025' \
-H 'accept: application/json'
这是localhost:8000/docs页面上的响应:
当您使用API查询代理时,您会看到在您指定的Postgres数据库中创建了一些表,类似于这些:
这是发送上述请求后检查点表的内容:
结论
此过程涵盖了使用 FastAPI 和 PostgreSQL 构建和提供 RAG agent 的完整生命周期:
- 开发:定义工具、节点和图以管理状态。
- 提供:使用 FastAPI 提供代理并处理请求。
- 持久性:将对话历史存储在 PostgreSQL 中以进行长期存储。
通过遵循这些步骤,您将拥有一个完全功能的 REST API,可以与 RAG agent 交互,同时在多个交互中保持状态。您可以根据需要测试、监控和扩展服务。这总结了从开发到提供的整个 RAG agent 生命周期!
几个额外的要点:
- 对于托管 Flask API,render.com 是一个不错的平台,提供慷慨的免费套餐。
- 确保为 API 添加身份验证或速率限制,以确保如果您托管 API,不会产生过高的费用。
- RAG 工具中使用的库和组件可以根据您的偏好或需求轻松替换为其他替代品。