
LangGraph + DeepSeek-R1 + 函数调用 + Agentic RAG(惊人的结果)
- Rifx.Online
- Programming/Scripting , Chatbots , Generative AI
- 28 Feb, 2025
在这个视频中,我将快速演示如何使用 LangGraph、Deepseek-R1、函数调用和 Agentic RAG 创建一个多智能体聊天机器人,以便为您的业务或个人使用构建一个强大的代理聊天机器人。
在我上一个视频中,我谈到了 LangChain 和 Deepseek-R1。一位观众问我是否可以使用 LangGraph 实现。
我满足了这个请求——不仅如此,我还通过函数调用和 Agentic RAG 增强了聊天机器人。
“但是高,Deepseek-R1 不支持函数调用!”
是的,你说得对——但让我告诉你,我想出了一个聪明的主意。如果你坚持看到视频的最后,我会告诉你如何做到这一点!
我在之前的文章中多次提到过函数调用,我们已经知道,函数调用是一种技术,允许 LLM 根据对话内容自主选择和调用预定义的函数。这些函数可用于执行各种任务。
Agentic RAG 是一种通过使用代理来改善一般 RAG 问题的 RAG。传统 RAG 的问题在于 AI 处理所有内容都是“按顺序”进行的,因此如果发生错误,例如无法检索数据,所有后续处理将变得毫无意义。
Agentic RAG 通过使用“代理”来解决这些问题。代理不仅选择要使用的工具,还允许进行思维循环,因此它可以循环多个过程,直到获得所需的信息并满足用户的期望。
所以,让我给你演示一个实时聊天机器人,向你展示我的意思。
我们有两个不同的数据库,研究和开发,可以从中获取答案。我创建了一些示例问题供你测试。让我输入问题:‘项目 A 的状态是什么?’ 如果你看看聊天机器人是如何生成输出的,
它使用 RecursiveCharacterTextSplitter 将文本拆分成更小的块,并使用 splitter.create_documents()
将其转换为文档。这些文档作为向量嵌入存储在 ChromaDB 中,以便进行高效的相似性搜索。
使用 LangChain 中的 create_retriever_tool
创建了一个检索器,并开发了一个 AI 代理,将用户查询分类为研究或开发,连接到 DeepSeek-R1(温度 = 0.7)以获取响应,并检索相关文档。
我在开发聊天机器人时面临的一个大问题是 DeepSeek-R1 不支持像 OpenAI 那样的函数调用,因此我创建了一个基于文本的命令系统,而不是使用函数调用。与其强迫 DeepSeek-R1 直接使用函数,我设计它输出特定的文本格式。
然后,我构建了一个包装器,将这些文本命令转换为工具操作。这样,我们就得到了与函数调用相同的功能,但以 DeepSeek-R1 可以处理的方式。
一个评分函数检查检索到的文档是否存在——如果找到,处理继续;如果没有,查询会被重写以提高清晰度。一个生成函数使用 DeepSeek-R1 总结检索到的文档并格式化响应。
一个决策函数根据消息内容确定是否需要工具,并创建了一个 LangGraph 工作流来管理该过程,从代理开始,如果需要,将查询路由到检索,当找到文档时生成答案,或者在必要时重写问题并重试。最后,它在一个绿色框中生成最终答案。
到视频结束时,您将了解什么是 Agentic RAG,为什么我们需要 Agentic RAG,以及如何将函数调用与 DeepSeek-R1 结合使用来创建超级 AI 代理。
什么是 Agentic RAG?
Agentic RAG 是一种整合了 Agent 能力的 RAG,其核心能力是自主推理和行动。
因此,Agentic RAG 将 AI 代理的自主规划能力(如路由、行动步骤、反思等)引入传统 RAG,以适应更复杂的 RAG 查询任务。
RAG = LLM + 知识库 + 检索器
为什么我们需要 Agentic RAG❓
Agentic RAG 的 “agent” 功能主要体现在检索阶段。与传统 RAG 的检索过程相比,Agentic RAG 更具能力:
- 决定是否进行搜索(自主决策)
- 选择使用哪个搜索引擎(自主规划)
- 评估检索到的上下文并决定是否重新搜索(自我规划)
- 确定是否使用外部工具
Agentic RAG 与传统 RAG
开始编码
现在,让我们继续关于如何使用 LangGraph + DeepSeek-R1 + Function Call + Agentic RAG 构建 AI 聊天机器人的指南。在创建这个聊天机器人时,我们将为代码的运行创建一个理想的环境。我们需要安装必要的 Python 库。为此,我们将对以下库进行 pip 安装。
pip install -r requirements.txt
安装完成后,我们导入重要的依赖项,如 langchain、Langgraph 和 Streamlit。
from langchain_openai import OpenAIEmbeddings from langchain_community.vectorstores import Chroma from langchain_core.messages import HumanMessage, AIMessage, ToolMessage from langchain.text_splitter import RecursiveCharacterTextSplitter from langgraph.graph import END, StateGraph, START from langgraph.prebuilt import ToolNode from langgraph.graph.message import add_messages from typing_extensions import TypedDict, Annotated from typing import Sequence from langchain_openai import OpenAIEmbeddings import re import os import streamlit as st import requests from langchain.tools.retriever import create_retriever_tool
我创建了两个数据库。第一组称为 research_texts
,包括一些与研究相关的陈述,如关于 AI 和机器学习的报告和论文。
第二组 development_texts
包括有关正在进行的项目的陈述,例如 UI 设计、功能测试和产品发布前的优化。
但请根据您的用例自由定制数据。
research_texts = [ “研究报告:一种新 AI 模型提高图像识别准确率至 98% 的结果”, “学术论文摘要:为什么变压器成为自然语言处理的主流架构”, “使用量子计算的机器学习方法最新趋势” ]
development_texts = [ “项目 A:UI 设计完成,API 集成进行中”, “项目 B:测试新功能 X,需要修复漏洞”, “产品 Y:在发布前的性能优化阶段” ]
让我们通过将数据拆分为更小的部分,将其转换为文档对象,然后创建向量嵌入来处理数据。
我使用 RecursiveCharacterTextSplitter
将长文本拆分为更小的块,确保每个块的最大字符数为 100。
然后,我使用 splitter.create_documents()
将拆分的文本转换为文档对象。这些文档对象在与 Deepseek_R1 进行搜索时将非常有用。
splitter = RecursiveCharacterTextSplitter(chunk_size=100, chunk_overlap=10)
research_docs = splitter.create_documents(research_texts) development_docs = splitter.create_documents(development_texts)
我们将研究和开发文档存储为向量嵌入,并使用检索器使其可搜索。
首先,我使用 Chroma.from_documents()
为 research_docs
创建一个向量存储,其中 embedding=embeddings
将文本转换为数值向量,collection_name="research_collection"
确保文档存储在专用集合中以便高效检索。
一旦研究和开发文本都存储为向量,我们就使用检索来查找和提取向量存储中最相关的文档。
research_vectorstore = Chroma.from_documents( documents=research_docs, embedding=embeddings, collection_name=“research_collection” )
development_vectorstore = Chroma.from_documents( documents=development_docs, embedding=embeddings, collection_name=“development_collection” )
research_retriever = research_vectorstore.as_retriever() development_retriever = development_vectorstore.as_retriever()
然后,我们为 AI 代理提供一个工具,您首先需要创建一个检索器,然后导入该工具。
我使用 create_retriever_tool
函数在 LangChain 中创建一个用于检索文档的工具。通过该工具检索到的文档可以从该工具包装的函数的返回值中提取。
research_tool = create_retriever_tool( research_retriever, “research_db_tool”, “从研究数据库中搜索信息。” )
development_tool = create_retriever_tool( development_retriever, “development_db_tool”, “从开发数据库中搜索信息。” )
tools = [research_tool, development_tool]
我开发了一个代理函数,作为用户问题的智能路由器。我提取用户的消息,并使用提示将其分类为研究或开发相关。
我连接到 DeepSeek AI,温度设置为 0.7,以获得平衡的响应。当 API 响应时,我检查它是研究还是开发查询,然后使用适当的检索器查找相关文档。我将所有内容格式化为标准消息格式,包括查询和结果。
如果它不适合任何类别,我将返回直接答案。可以把它想象成一个交通控制员——将问题引导到正确的数据库,以获得最佳答案。
class AgentState(TypedDict): messages: Annotated[Sequence[AIMessage|HumanMessage|ToolMessage], add_messages]
def agent(state: AgentState): print(“---CALL AGENT---”) messages = state[“messages”]
if isinstance(messages[0], tuple): user_message = messages[0][1] else: user_message = messages[0].content
prompt = f"""给定这个用户问题:“{user_message}” 如果它是关于研究或学术主题,请按照此格式准确回应: SEARCH_RESEARCH: <搜索词>
如果它是关于开发状态,请按照此格式准确回应: SEARCH_DEV: <搜索词>
否则,直接回答。 """
headers = { “Accept”: “application/json”, “Authorization”: f”Bearer sk-1cddf19f9dc4466fa3ecea6fe10abec0”, “Content-Type”: “application/json” }
data = { “model”: “deepseek-chat”, “messages”: [{“role”: “user”, “content”: prompt}], “temperature”: 0.7, “max_tokens”: 1024 }
response = requests.post( “https://api.deepseek.com/v1/chat/completions”, headers=headers, json=data, verify=False )
if response.status_code == 200: response_text = response.json()[‘choices’][0][‘message’][‘content’] print(“原始响应:”, response_text)
if "SEARCH\_RESEARCH:" in response\_text:
query = response\_text.split("SEARCH\_RESEARCH:")\[1\].strip()
results = research\_retriever.invoke(query)
return {"messages": \[AIMessage(content=f'Action: research\_db\_tool\\n{{"query": "{query}"}}\\n\\nResults: {str(results)}')\]}
elif "SEARCH\_DEV:" in response\_text:
query = response\_text.split("SEARCH\_DEV:")\[1\].strip()
results = development\_retriever.invoke(query)
return {"messages": \[AIMessage(content=f'Action: development\_db\_tool\\n{{"query": "{query}"}}\\n\\nResults: {str(results)}')\]}
else:
return {"messages": \[AIMessage(content=response\_text)\]}
else: raise Exception(f”API 调用失败: {response.text}”) 然后我们设计了一个评分函数,用于检查我们的搜索是否成功。我查看最后收到的消息,并检查它是否包含任何检索到的文档。
如果我找到文档(以“Results: [Document”标记),我会发出信号,继续生成答案。
如果我没有找到任何文档,我会建议重写问题以获得更好的结果。这就像一个质量检查——确保我们在继续之前拥有正确的材料。
def simple_grade_documents(state: AgentState): messages = state[“messages”] last_message = messages[-1] print(“评估消息:”, last_message.content)
if “Results: [Document” in last_message.content: print(“---找到文档,继续生成---”) return “generate” else: print(“---未找到文档,尝试重写---”) return “rewrite” 我开发了一个生成函数,从我们找到的文档中创建最终答案。我首先获取原始问题和我们在搜索中找到的任何文档。
我提取文档内容,并创建一个提示,要求 DeepSeek-R1 总结发现,重点关注关键的研究进展。我设置 API 调用,指定头部和温度为 0.7,以实现平衡的创造力。
一旦我得到响应,我将其格式化为用户友好的消息。如果 API 调用出现问题,我会通知您错误。
def generate(state: AgentState): print(“---生成最终答案---”) messages = state[“messages”] question = messages[0].content if isinstance(messages[0], tuple) else messages[0].content last_message = messages[-1]
docs = "" if “Results: [” in last_message.content: results_start = last_message.content.find(“Results: [”) docs = last_message.content[results_start:] print(“找到的文档:”, docs)
headers = { “Accept”: “application/json”, “Authorization”: f”Bearer sk-1cddf19f9dc4466fa3ecea6fe10abec0”, “Content-Type”: “application/json” }
prompt = f"""根据这些研究文档,总结 AI 的最新进展: 问题:{question} 文档:{docs} 重点提取和综合研究论文中的关键发现。 """
data = { “model”: “deepseek-chat”, “messages”: [{“role”: “user”, “content”: prompt}], “temperature”: 0.7, “max_tokens”: 1024 }
print(“正在向 API 发送生成请求…”) response = requests.post( “https://api.deepseek.com/v1/chat/completions”, headers=headers, json=data, verify=False
)
if response.status_code == 200:
response_text = response.json()['choices'][0]['message']['content']
print("最终答案:", response_text)
return {"messages": [AIMessage(content=response_text)]}
else:
raise Exception(f"API 调用失败: {response.text}")
我设计了一个重写函数,帮助改善模糊的问题。我将原始问题提交给 DeepSeek,使其更具体、更清晰。我设置了带有自定义头部的 API 调用,并将温度设定为 0.7,然后请求模型重写问题。当我收到响应时,我检查它是否正常工作——如果正常,我返回改进的问题;如果不正常,将打印错误。
def rewrite(state: AgentState):
print("---重写问题---")
messages = state["messages"]
original_question = messages[0].content if len(messages) > 0 else "N/A"
headers = {
"Accept": "application/json",
"Authorization": f"Bearer sk-1cddf19f9dc4466fa3ecea6fe10abec0",
"Content-Type": "application/json"
}
data = {
"model": "deepseek-chat",
"messages": [{
"role": "user",
"content": f"将这个问题重写得更具体、更清晰: {original_question}"
}],
"temperature": 0.7,
"max_tokens": 1024
}
print("发送重写请求...")
response = requests.post(
"https://api.deepseek.com/v1/chat/completions",
headers=headers,
json=data,
verify=False
)
print("状态码:", response.status_code)
print("响应:", response.text)
if response.status_code == 200:
response_text = response.json()['choices'][0]['message']['content']
print("重写的问题:", response_text)
return {"messages": [AIMessage(content=response_text)]}
else:
raise Exception(f"API 调用失败: {response.text}")
我创建了一个决策函数,检查是否需要根据消息内容使用任何工具。我查看最后一条消息,检查它是否匹配我们的工具模式(即以“Action:”开头)。
如果匹配,我会发出信号,表示我们应该使用工具检索信息。如果不匹配,我会发出信号,表示我们应该结束过程。
tools_pattern = re.compile(r"Action: .*")
def custom_tools_condition(state: AgentState):
messages = state["messages"]
last_message = messages[-1]
content = last_message.content
print("检查工具条件:", content)
if tools_pattern.match(content):
print("移动到检索...")
return "tools"
print("移动到结束...")
return END
让我们创建一个工作流系统,将所有函数按逻辑顺序连接起来。我使用了 LangGraph,其中每个函数都是一个节点,从第一个接收问题的代理开始。我使用边设置了特殊路径——当代理需要工具时,它会去检索信息;如果不需要,则结束。在检索后,我检查文档:好的结果进入生成答案,而差的结果则返回重写问题。
workflow = StateGraph(AgentState)
workflow.add_node("agent", agent)
retrieve_node = ToolNode(tools)
workflow.add_node("retrieve", retrieve_node)
workflow.add_node("rewrite", rewrite)
workflow.add_node("generate", generate)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
"agent",
custom_tools_condition,
{
"tools": "retrieve",
END: END
}
)
workflow.add_conditional_edges("retrieve", simple_grade_documents)
workflow.add_edge("generate", END)
workflow.add_edge("rewrite", "agent")
app = workflow.compile()
让我们创建一个函数,管理问题在我们系统中的流动。我获取用户的问题,并通过工作流应用处理它,将每个事件(如搜索、找到文档或生成答案)收集到一个列表中。
def process_question(user_question, app, config):
"""通过工作流处理用户问题"""
events = []
for event in app.stream({"messages": [("user", user_question)]}, config):
events.append(event)
return events
然后我们使用 Streamlit 创建一个清晰的布局,并将数据库内容放在侧边栏中以便于参考。在主区域,我创建了一个文本输入框来输入问题,并将屏幕分成两列。在第一列中,我处理问题的处理:当您点击“获取答案”按钮时,我显示一个加载旋转器,并通过工作流处理您的问题。
我在可展开的部分中显示任何找到的文档,并用绿色成功框显示最终答案。在第二列中,我添加了有用的说明和示例问题。
def main():
st.set_page_config(
page_title="AI 研究与开发助手",
layout="wide",
initial_sidebar_state="expanded"
)
st.markdown("""
<style>
.stApp {
background-color: #f8f9fa;
}
.stButton > button {
width: 100%;
margin-top: 20px;
}
.data-box {
padding: 20px;
border-radius: 10px;
margin: 10px 0;
}
.research-box {
background-color: #e3f2fd;
border-left: 5px solid #1976d2;
}
.dev-box {
background-color: #e8f5e9;
border-left: 5px solid #43a047;
}
</style>
""", unsafe_allow_html=True)
with st.sidebar:
st.header("📚 可用数据")
st.subheader("研究数据库")
for text in research_texts:
st.markdown(f'<div class="data-box research-box">{text}</div>', unsafe_allow_html=True)
st.subheader("开发数据库")
for text in development_texts:
st.markdown(f'<div class="data-box dev-box">{text}</div>', unsafe_allow_html=True)
st.title("🤖 AI 研究与开发助手")
st.markdown("---")
query = st.text_area("输入您的问题:", height=100, placeholder="例如,人工智能研究的最新进展是什么?")
col1, col2 = st.columns([1, 2])
with col1:
if st.button("🔍 获取答案", use_container_width=True):
if query:
with st.spinner('正在处理您的问题...'):
events = process_question(query, app, {"configurable": {"thread_id": "1"}})
for event in events:
if 'agent' in event:
with st.expander("🔄 处理步骤", expanded=True):
content = event['agent']['messages'][0].content
if "结果:" in content:
st.markdown("### 📑 检索到的文档:")
docs_start = content.find("结果:")
docs = content[docs_start:]
st.info(docs)
elif 'generate' in event:
st.markdown("### ✨ 最终答案:")
st.success(event['generate']['messages'][0].content)
else:
st.warning("⚠️ 请先输入一个问题!")
with col2:
st.markdown("""
### 🎯 如何使用
1. 在文本框中输入您的问题
2. 点击“获取答案”进行处理
3. 查看检索到的文档和最终答案
### 💡 示例问题
- 人工智能研究的最新进展是什么?
- A 项目的状态如何?
- 当前机器学习的趋势是什么?
""")
if __name__ == "__main__":
main()
结论:
Agentic RAG 和 DeepSeek-R1 功能调用 是创新的信息检索和生成技术,能够自主采取行动。它们甚至可以通过将复杂任务拆分为更小的步骤并在需要时调用适当的工具来完成复杂任务,推动人工智能在实时决策和动态内容生成方面的边界。随着这些技术的发展,它们的应用将变得更加多样化,推动各个行业的创新。
如果这篇文章对您的朋友可能有帮助,请 转发给他们。
参考文献:
📚 欢迎查看我的其他文章: