使用 LangGraph 构建 RAG 研究多重代理
- ❓引言 — 天真的 RAG 与 代理 RAG
- 🧠 项目概述
- 📊 结果
- 🔚 结论
在本文中,我们介绍了一个实用项目,使用 LangGraph 开发了一个 RAG 研究多代理 工具。该工具旨在解决需要 多个来源 和 迭代步骤 才能得出最终答案的 复杂问题。它采用 混合搜索 和 Cohere 重新排序步骤 来检索文档,并且还包含一个 自我纠正机制,包括 幻觉检查 过程,以提高响应的可靠性,使其非常适合企业应用。
Github Repo 在这里.
1. 引言 — 天真的与代理的 RAG
出于项目的目的,天真的 RAG 方法不足以满足以下原因:
- 无法理解复杂查询:无法将复杂查询分解为多个可管理的子步骤,仅在单一层面处理查询,而不是分析每个步骤并得出统一的结论。
- 缺乏幻觉或错误处理:天真的 RAG 管道缺乏响应验证步骤和处理幻觉的机制,无法通过生成新响应来纠正错误。
- 缺乏动态工具使用:天真的 RAG 系统不允许根据工作流条件使用工具、调用外部 API 或与数据库交互。
因此,实施了 多代理 RAG 研究系统 来解决所有这些问题。基于代理的框架实际上允许:
- 路由和使用工具:路由代理 可以对用户的查询进行分类,并将流程引导到适当的节点或工具。这使得基于上下文的决策成为可能,例如确定文档是否需要完整摘要、是否需要更详细的信息,或问题是否超出范围。
- 规划子步骤:复杂查询通常需要分解为更小的可管理步骤。从查询开始,可以生成一系列步骤,以便在探索查询的不同方面时达到结论。例如,如果查询需要比较文档的两个不同部分,基于代理的方法将允许识别这种比较需求,分别检索两个来源,并在最终响应中将它们合并为比较分析。
- 反思和错误纠正:除了简单的响应生成,基于代理的方法还可以添加验证步骤,以解决潜在的幻觉、错误或未能准确回答用户查询的响应。这也使得集成 自我纠正 机制与 人机协作 成为可能,将人类输入纳入自动化过程。这样的功能使得基于代理的 RAG 系统成为企业应用中更强大和可靠的解决方案,其中可靠性是首要任务。
- 共享全局状态:代理工作流共享全局状态,简化了多个步骤之间状态的管理。这种共享状态对于在多代理过程的不同阶段保持一致性至关重要。
2. 项目概述
图形步骤:
- 分析和路由查询(自适应 RAG): 用户的查询被分类并路由到适当的节点。从那里,系统可以选择继续下一步(“研究计划生成”)、请求更多信息或如果查询超出范围则立即响应。
- 研究计划生成: 系统生成一个逐步的研究计划,根据请求的复杂性生成一个或多个步骤。然后返回一个特定步骤的列表,以解决用户的问题。
- 研究子图: 对于研究计划生成中定义的每个步骤,都会调用一个子图。具体而言,子图开始通过 LLM 生成两个查询。接下来,系统使用集成检索器(使用相似性搜索、BM25和MMR)检索与这些生成的查询相关的文档。然后,重排序步骤应用基于 Cohere 的上下文压缩,最终为所有步骤生成* k *个相关文档及其相关分数。
- 生成步骤: 基于相关文档,工具通过 LLM 生成答案。
- 幻觉检查(自我纠正 RAG 与人类在环): 有一个反思步骤,系统分析生成的答案,以确定其是否得到提供的上下文支持并解决所有方面。如果检查失败,图形工作流程将被中断,用户将被提示生成修订答案或结束该过程。
为了创建向量存储,实施了基于段落的切分方法,使用Docling和LangChain,向量数据库使用ChromaDB构建。
构建向量数据库
文档解析
对于具有复杂结构的PDF,包括复杂布局的表格,仔细选择用于解析的工具至关重要。许多库在处理具有复杂页面布局或表格结构的PDF时缺乏精确性。
为了解决这个问题,使用了Docling,一个开源库。它使文档解析变得简单高效,并允许导出到所需格式。它可以从多种常用文档格式(包括PDF、DOCX、PPTX、XLSX、图像、HTML、AsciiDoc和Markdown)读取和导出Markdown和JSON。Docling对PDF文档提供了全面的理解,包括表格结构、阅读顺序和页面布局。此外,它还支持扫描PDF的OCR。
PDF中包含的文本随后被转换为Markdown格式,这对于遵循基于段落的结构的分块是必要的。
from docling.document_converter import DocumentConverter
logger.info("Starting document processing.")
converter = DocumentConverter()
markdown_document = converter.convert(source).document.export_to_markdown()
提取的文本将具有类似于以下图像的结构。如图所示,PDF和表格解析提取的文本保留了原始格式。
根据标题并使用MarkdownHeaderTextSplitter
,输出文本随后被拆分为块, resulting in a list of 332 Document
objects (LangChain Document)。
from langchain_text_splitters import MarkdownHeaderTextSplitter
headers_to_split_on = [
("#", "Header 1"),
("##", "Header 2")
]
markdown_splitter = MarkdownHeaderTextSplitter(headers_to_split_on)
docs_list = markdown_splitter.split_text(markdown_document)
docs_list
## 输出示例
[Document(metadata={'Header 2': 'A letter from our Chief Sustainability Officer and our Senior Vice President of Learning and Sustainability'}, page_content="...."),
...]
## len(docs_list):
332
向量存储构建
我们构建一个向量数据库,以将句子存储为向量嵌入,并在该数据库中进行搜索。在这种情况下,我们使用 Chroma 并在本地目录 ‘db_vector
’ 中存储一个持久化数据库。
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
embd = OpenAIEmbeddings()
vectorstore_from_documents = Chroma.from_documents(
documents=docs_list,
collection_name="rag-chroma-google-v1",
embedding=embd,
persist_directory='db_vector'
)
主图构建
实现的系统包括两个图:
- 研究者图作为子图,负责生成将用于从向量数据库中检索和重新排序前k个文档的不同查询。
- 主图,包含主要工作流程,例如分析用户的查询、生成完成任务所需的步骤、生成响应以及通过人机协作机制检查幻觉。
主图结构
LangGraph 的一个核心概念是 state。每次图执行都会创建一个状态,该状态在执行的节点之间传递,每个节点在执行后用其返回值更新此内部状态。
让我们开始构建图状态的项目。为此,我们定义了两个类:
- Router: 包含将用户查询分类为“more-info”、“environmental”或“general”之一的结果。
- GradeHallucination: 包含指示响应中存在幻觉的二进制分数。
from pydantic import BaseModel, Field
class Router(TypedDict):
"""Classify user query."""
logic: str
type: Literal["more-info", "environmental", "general"]
from pydantic import BaseModel, Field
class GradeHallucinations(BaseModel):
"""Binary score for hallucination present in generation answer."""
binary_score: str = Field(
description="Answer is grounded in the facts, '1' or '0'"
)
定义的图状态为:
- InputState: 包含用户与代理之间交换的消息列表。
- AgentState: 包含
Router
对用户查询的分类、研究计划中要执行的步骤列表、代理可以参考的检索文档列表,以及二进制分数Gradehallucination
。
from dataclasses import dataclass, field
from typing import Annotated, Literal, TypedDict
from langchain_core.documents import Document
from langchain_core.messages import AnyMessage
from langgraph.graph import add_messages
from utils.utils import reduce_docs
@dataclass(kw_only=True)
class InputState:
"""Represents the input state for the agent.
This class defines the structure of the input state, which includes
the messages exchanged between the user and the agent. It serves as
a restricted version of the full State, providing a narrower interface
to the outside world compared to what is maintained iternally.
"""
messages: Annotated[list[AnyMessage], add_messages]
"""Messages track the primary execution state of the agent.
Typically accumulates a pattern of Human/AI/Human/AI messages.
Returns:
A new list of messages with the messages from `right` merged into `left`.
If a message in `right` has the same ID as a message in `left`, the
message from `right` will replace the message from `left`."""
## Primary agent state
@dataclass(kw_only=True)
class AgentState(InputState):
"""State of the retrieval graph / agent."""
router: Router = field(default_factory=lambda: Router(type="general", logic=""))
"""The router's classification of the user's query."""
steps: list[str] = field(default_factory=list)
"""A list of steps in the research plan."""
documents: Annotated[list[Document], reduce_docs] = field(default_factory=list)
"""Populated by the retriever. This is a list of documents that the agent can reference."""
hallucination: GradeHallucinations = field(default_factory=lambda: GradeHallucinations(binary_score="0"))
步骤 1:分析并路由查询
函数 analyze_and_route_query
返回并更新状态 AgentState
的 router
变量。函数 route_query
根据之前的查询分类确定下一步。
具体来说,这一步通过 Router
对象更新状态,该对象的 type
变量包含以下值之一:"more-info"
、"environmental"
或 "general"
。根据这些信息,工作流将被路由到适当的节点("create_research_plan"
、"ask_for_more_info"
或 "respond_to_general_query"
)。
async def analyze_and_route_query(
state: AgentState, *, config: RunnableConfig
) -> dict[str, Router]:
"""分析用户的查询并确定适当的路由。
此函数使用语言模型对用户的查询进行分类,并决定如何在对话流程中路由。
参数:
state (AgentState): 代理的当前状态,包括对话历史。
config (RunnableConfig): 用于查询分析的模型配置。
返回:
dict[str, Router]: 包含 'router' 键及分类结果(分类类型和逻辑)的字典。
"""
model = ChatOpenAI(model=GPT_4o, temperature=TEMPERATURE, streaming=True)
messages = [
{"role": "system", "content": ROUTER_SYSTEM_PROMPT}
] + state.messages
logging.info("---分析并路由查询---")
response = cast(
Router, await model.with_structured_output(Router).ainvoke(messages)
)
return {"router": response}
def route_query(
state: AgentState,
) -> Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]:
"""根据查询分类确定下一步。
参数:
state (AgentState): 代理的当前状态,包括路由的分类。
返回:
Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]: 要采取的下一步。
引发:
ValueError: 如果遇到未知的路由类型。
"""
_type = state.router["type"]
if _type == "environmental":
return "create_research_plan"
elif _type == "more-info":
return "ask_for_more_info"
elif _type == "general":
return "respond_to_general_query"
else:
raise ValueError(f"未知的路由类型 {_type}")
输出示例对于问题 “检索2019年都柏林数据中心的PUE效率值”:
{
"logic":"这是一个关于2019年都柏林数据中心环境效率的具体问题,与环境报告相关。",
"type":"environmental"
}
第一步 1.1 超出范围 / 需要更多信息
我们定义了函数 ask_for_more_info
和 respond_to_general_query
,通过调用 LLM 直接为用户生成响应:如果路由器判断用户需要更多信息,则执行第一个函数,而第二个函数则生成与我们主题无关的一般查询响应。在这种情况下,有必要将生成的响应连接到消息列表中,更新状态中的 messages
变量。
async def ask_for_more_info(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
"""生成一个请求用户提供更多信息的响应。
当路由器判断用户需要更多信息时调用此节点。
参数:
state (AgentState): 代理的当前状态,包括对话历史和路由逻辑。
config (RunnableConfig): 用于响应的模型配置。
返回:
dict[str, list[str]]: 一个字典,包含一个 'messages' 键,包含生成的响应。
"""
model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
system_prompt = MORE_INFO_SYSTEM_PROMPT.format(
logic=state.router["logic"]
)
messages = [{"role": "system", "content": system_prompt}] + state.messages
response = await model.ainvoke(messages)
return {"messages": [response]}
async def respond_to_general_query(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
"""生成对与环境无关的一般查询的响应。
当路由器将查询分类为一般问题时调用此节点。
参数:
state (AgentState): 代理的当前状态,包括对话历史和路由逻辑。
config (RunnableConfig): 用于响应的模型配置。
返回:
dict[str, list[str]]: 一个字典,包含一个 'messages' 键,包含生成的响应。
"""
model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
system_prompt = GENERAL_SYSTEM_PROMPT.format(
logic=state.router["logic"]
)
logging.info("---响应生成---")
messages = [{"role": "system", "content": system_prompt}] + state.messages
response = await model.ainvoke(messages)
return {"messages": [response]}
对问题 “阿尔塔穆拉的天气怎么样?” 的输出示例:
{
"logic":"What's the weather like in Altamura?",
"type":"general"
}
## ---响应生成---
"I appreciate your question, but I'm unable to provide information about the weather. My focus is on Environmental Reports. If you have any questions related to that topic, please let me know, and I'll be happy to help!"
第2步:创建研究计划
如果查询分类返回值为 "environmental"
,则用户的请求与文档相关,工作流将到达 create_research_plan
节点,其功能是为回答与环境相关的查询创建逐步的研究计划。
async def create_research_plan(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[str] | str]:
"""Create a step-by-step research plan for answering a environmental-related query.
Args:
state (AgentState): The current state of the agent, including conversation history.
config (RunnableConfig): Configuration with the model used to generate the plan.
Returns:
dict[str, list[str]]: A dictionary with a 'steps' key containing the list of research steps.
"""
class Plan(TypedDict):
"""Generate research plan."""
steps: list[str]
model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
messages = [
{"role": "system", "content": RESEARCH_PLAN_SYSTEM_PROMPT}
] + state.messages
logging.info("---PLAN GENERATION---")
response = cast(Plan, await model.with_structured_output(Plan).ainvoke(messages))
return {"steps": response["steps"], "documents": "delete"}
输出示例对于问题 “检索2019年都柏林的数据中心PUE效率值”:
{
"steps":
["Look up the PUE (Power Usage Effectiveness) efficiency value for data centers specifically in Dublin for the year 2019 using statistical data sources."
]
}
在这种情况下,用户的请求仅需要一步来检索信息。
第3步:进行研究
此功能从研究计划中获取第一步并使用它进行研究。对于研究,该功能调用subgraph researcher_graph
,返回我们将在下一部分中探索的块列表。最后,我们通过移除刚刚执行的步骤来更新状态中的steps
变量。
async def conduct_research(state: AgentState) -> dict[str, Any]:
"""Execute the first step of the research plan.
This function takes the first step from the research plan and uses it to conduct research.
Args:
state (AgentState): The current state of the agent, including the research plan steps.
Returns:
dict[str, list[str]]: A dictionary with 'documents' containing the research results and
'steps' containing the remaining research steps.
Behavior:
- Invokes the researcher_graph with the first step of the research plan.
- Updates the state with the retrieved documents and removes the completed step.
"""
result = await researcher_graph.ainvoke({"question": state.steps[0]}) #graph call directly
docs = result["documents"]
step = state.steps[0]
logging.info(f"\n{len(docs)} documents retrieved in total for the step: {step}.")
return {"documents": result["documents"], "steps": state.steps[1:]}
第4步:研究者子图构建
如上图所示,该图由一个查询生成步骤组成,从主图传递的步骤开始,以及一个检索相关块的步骤。与主图相同,我们继续定义状态 QueryState
(研究者图中 retrieve_documents
节点的私有状态)和 ResearcherState
(研究者图的状态)。
"""States for the researcher subgraph.
This module defines the state structures used in the researcher subgraph.
"""
from dataclasses import dataclass, field
from typing import Annotated
from langchain_core.documents import Document
from utils.utils import reduce_docs
@dataclass(kw_only=True)
class QueryState:
"""Private state for the retrieve_documents node in the researcher graph."""
query: str
@dataclass(kw_only=True)
class ResearcherState:
"""State of the researcher graph / agent."""
question: str
"""A step in the research plan generated by the retriever agent."""
queries: list[str] = field(default_factory=list)
"""A list of search queries based on the question that the researcher generates."""
documents: Annotated[list[Document], reduce_docs] = field(default_factory=list)
"""Populated by the retriever. This is a list of documents that the agent can reference."""
第 4.1 步:生成查询
此步骤根据问题(研究计划中的一步)生成搜索查询。此功能使用 LLM 生成多样的搜索查询,以帮助回答问题。
async def generate_queries(
state: ResearcherState, *, config: RunnableConfig
) -> dict[str, list[str]]:
"""根据问题(研究计划中的一步)生成搜索查询。
此功能使用语言模型生成多样的搜索查询,以帮助回答问题。
参数:
state (ResearcherState): 研究者的当前状态,包括用户的问题。
config (RunnableConfig): 用于生成查询的模型配置。
返回:
dict[str, list[str]]: 一个字典,包含一个 'queries' 键,里面是生成的搜索查询列表。
"""
class Response(TypedDict):
queries: list[str]
logger.info("---生成查询---")
model = ChatOpenAI(model="gpt-4o-mini-2024-07-18", temperature=0)
messages = [
{"role": "system", "content": GENERATE_QUERIES_SYSTEM_PROMPT},
{"role": "human", "content": state.question},
]
response = cast(Response, await model.with_structured_output(Response).ainvoke(messages))
queries = response["queries"]
queries.append(state.question)
logger.info(f"查询: {queries}")
return {"queries": response["queries"]}
对问题 “检索2019年都柏林的数据中心PUE效率值”: 的输出示例:
{
"queries":[
"查找2019年都柏林数据中心的PUE(电力使用效率)效率值,使用统计数据源。"
"PUE效率值 数据中心 都柏林 2019",
"电力使用效率 统计 数据中心 都柏林 2019"
]
}
一旦生成查询,我们可以使用之前定义的持久数据库来定义向量存储。
def _setup_vectorstore() -> Chroma:
"""
设置并返回 Chroma 向量存储实例。
"""
embeddings = OpenAIEmbeddings()
return Chroma(
collection_name=VECTORSTORE_COLLECTION,
embedding_function=embeddings,
persist_directory=VECTORSTORE_DIRECTORY
)
在 RAG 系统中,最关键的部分是文档检索过程。 因此,特别关注所使用的技术:具体来说,选择了 混合检索器 作为 Hybrid Search 和 Cohere 进行重排序。
混合检索 是“关键字风格”搜索和“向量风格”搜索的结合。它既具有进行关键字搜索的优势,也具有利用嵌入和向量搜索进行语义搜索的优势。集成检索器是一种检索算法,旨在通过结合多个单独检索器的优势来增强信息检索的性能。这种方法被称为“集成检索”,使用一种称为互惠排名融合的方法对来自不同检索器的结果进行重排序和合并,从而提供比任何单个检索器更准确和相关的结果。
## 创建基础检索器
retriever_bm25 = BM25Retriever.from_documents(documents, search_kwargs={"k": TOP_K})
retriever_vanilla = vectorstore.as_retriever(search_type="similarity", search_kwargs={"k": TOP_K})
retriever_mmr = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": TOP_K})
ensemble_retriever = EnsembleRetriever(
retrievers=[retriever_vanilla, retriever_mmr, retriever_bm25],
weights=ENSEMBLE_WEIGHTS,
)
重排序 是一种可以用来提高 RAG 管道性能的技术。这是一种非常强大的方法,可以显著提升搜索系统的效果。简而言之,重排序接受一个查询和一个响应,并输出一个 相关性得分。通过这种方式,可以使用任何搜索系统来显示可能包含查询答案的多个文档,然后使用重排序端点对它们进行排序。
但是:我们为什么需要重排序步骤?
为了解决准确性的问题,采用了两阶段检索作为提高搜索质量的一种手段。在这些两阶段系统中,第一阶段模型(集成检索器)从更大的数据集中检索一组候选文档。然后,使用第二阶段模型(重排序器)对第一阶段模型检索到的文档进行重排序。此外,重排序模型,如 Cohere Rerank,是一种模型,当给定查询和文档对时,会输出相似度得分。该得分可用于重新排序与搜索查询最相关的文档。在重排序方法中,Cohere Rerank 模型因其显著提高搜索准确性的能力而脱颖而出。该模型通过使用深度学习直接评估每个文档与查询之间的对齐,偏离了传统的嵌入模型。Cohere Rerank 通过同时处理查询和文档输出相关性得分,从而实现更细致的文档选择过程。 (完整参考请见)
在这种情况下,检索到的文档被重排序,返回前两个最相关的文档。
from langchain.retrievers.contextual_compression import ContextualCompressionRetriever
from langchain_cohere import CohereRerank
from langchain_community.llms import Cohere
## 设置 Cohere 重排序
compressor = CohereRerank(top_n=2, model="rerank-english-v3.0")
## 构建压缩检索器
compression_retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=ensemble_retriever,
)
compression_retriever.invoke(
"检索2019年都柏林的数据中心PUE效率"
)
对问题 “检索2019年都柏林的数据中心PUE效率值”: 的输出示例:
[Document(metadata={'Header 2': 'Endnotes', 'relevance_score': 0.27009502}, page_content="- 1 此计算基于..."),
Document(metadata={'Header 2': '数据中心电网区域CFE', 'relevance_score': 0.20593424}, page_content="2023 \n| 国家..." )]
第 4.2 步:检索和重新排序文档函数
async def retrieve_and_rerank_documents(
state: QueryState, *, config: RunnableConfig
) -> dict[str, list[Document]]:
"""Retrieve documents based on a given query.
This function uses a retriever to fetch relevant documents for a given query.
Args:
state (QueryState): The current state containing the query string.
config (RunnableConfig): Configuration with the retriever used to fetch documents.
Returns:
dict[str, list[Document]]: A dictionary with a 'documents' key containing the list of retrieved documents.
"""
logger.info("---RETRIEVING DOCUMENTS---")
logger.info(f"Query for the retrieval process: {state.query}")
response = compression_retriever.invoke(state.query)
return {"documents": response}
第4.3步 构建子图
builder = StateGraph(ResearcherState)
builder.add_node(generate_queries)
builder.add_node(retrieve_and_rerank_documents)
builder.add_edge(START, "generate_queries")
builder.add_conditional_edges(
"generate_queries",
retrieve_in_parallel, # type: ignore
path_map=["retrieve_and_rerank_documents"],
)
builder.add_edge("retrieve_and_rerank_documents", END)
researcher_graph = builder.compile()
第5步:检查完成
使用 conditional_edge
,我们构建一个循环,其结束条件由 check_finished
返回的值决定。该函数检查由 create_research_plan
节点创建的步骤列表中是否还有更多步骤需要处理。一旦所有步骤完成,流程将继续到 respond
节点。
def check_finished(state: AgentState) -> Literal["respond", "conduct_research"]:
"""Determine if the research process is complete or if more research is needed.
This function checks if there are any remaining steps in the research plan:
- If there are, route back to the `conduct_research` node
- Otherwise, route to the `respond` node
Args:
state (AgentState): The current state of the agent, including the remaining research steps.
Returns:
Literal["respond", "conduct_research"]: The next step to take based on whether research is complete.
"""
if len(state.steps or []) > 0:
return "conduct_research"
else:
return "respond"
第6步:响应
根据进行的研究生成对用户查询的最终响应。此功能利用对话历史和研究者代理检索到的文档来制定全面的答案。
async def respond(
state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
"""Generate a final response to the user's query based on the conducted research.
This function formulates a comprehensive answer using the conversation history and the documents retrieved by the researcher.
Args:
state (AgentState): The current state of the agent, including retrieved documents and conversation history.
config (RunnableConfig): Configuration with the model used to respond.
Returns:
dict[str, list[str]]: A dictionary with a 'messages' key containing the generated response.
"""
print("--- RESPONSE GENERATION STEP ---")
model = ChatOpenAI(model="gpt-4o-2024-08-06", temperature=0)
context = format_docs(state.documents)
prompt = RESPONSE_SYSTEM_PROMPT.format(context=context)
messages = [{"role": "system", "content": prompt}] + state.messages
response = await model.ainvoke(messages)
return {"messages": [response]}
第7步:检查幻觉
此步骤检查在前一步中生成的LLM响应是否得到所检索文档中事实集合的支持,并给出二元评分。
async def check_hallucinations(
state: AgentState, *, config: RunnableConfig
) -> dict[str, Any]:
"""分析用户的查询,并检查响应是否得到所检索文档中事实集合的支持,
提供二元评分结果。
此函数使用语言模型分析用户的查询,并给出二元评分结果。
参数:
state (AgentState): 代理的当前状态,包括对话历史。
config (RunnableConfig): 用于查询分析的模型配置。
返回:
dict[str, Router]: 一个字典,包含 'router' 键及分类结果(分类类型和逻辑)。
"""
model = ChatOpenAI(model=GPT_4o_MINI, temperature=TEMPERATURE, streaming=True)
system_prompt = CHECK_HALLUCINATIONS.format(
documents=state.documents,
generation=state.messages[-1]
)
messages = [
{"role": "system", "content": system_prompt}
] + state.messages
logging.info("---检查幻觉---")
response = cast(GradeHallucinations, await model.with_structured_output(GradeHallucinations).ainvoke(messages))
return {"hallucination": response}
第8步:人工审批(人机协作)
如果LLM的响应不符合事实集,则可能包含幻觉。在这种情况下,图形会中断,用户可以控制下一步:仅重试最后一生成步骤,而无需重新启动整个工作流程或结束该过程。此人机协作步骤确保用户控制,同时避免意外循环或不希望的操作。
LangGraph中的interr
upt function通过在特定节点暂停图形,向人类呈现信息,并根据他们的输入恢复图形,从而实现人机协作工作流程。此功能适用于审批、编辑或收集额外输入等任务。interr
upt function与Comm
and对象结合使用,以根据人类提供的值恢复图形。
def human_approval(
state: AgentState,
):
_binary_score = state.hallucination.binary_score
if _binary_score == "1":
return "END"
else:
retry_generation = interrupt(
{
"question": "Is this correct?",
"llm_output": state.messages[-1]
})
if retry_generation == "y":
print("voglio continuare")
return "respond"
else:
return "END"
4.3 构建主图形
from langgraph.graph import END, START, StateGraph
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
builder = StateGraph(AgentState, input=InputState)
builder.add_node(analyze_and_route_query)
builder.add_edge(START, "analyze_and_route_query")
builder.add_conditional_edges("analyze_and_route_query", route_query)
builder.add_node(create_research_plan)
builder.add_node(ask_for_more_info)
builder.add_node(respond_to_general_query)
builder.add_node(conduct_research)
builder.add_node("respond", respond)
builder.add_node(check_hallucinations)
builder.add_conditional_edges("check_hallucinations", human_approval, {"END": END, "respond": "respond"})
builder.add_edge("create_research_plan", "conduct_research")
builder.add_conditional_edges("conduct_research", check_finished)
builder.add_edge("respond", "check_hallucinations")
graph = builder.compile(checkpointer=checkpointer)
构建主函数 (app.py)
“每个函数都被定义为 async
以便在生成步骤中启用流式行为。
from subgraph.graph_states import ResearcherState
from main_graph.graph_states import AgentState
from utils.utils import config, new_uuid
from subgraph.graph_builder import researcher_graph
from main_graph.graph_builder import InputState, graph
from langgraph.types import Command
import asyncio
import uuid
import asyncio
import time
import builtins
thread = {"configurable": {"thread_id": new_uuid()}}
async def process_query(query):
inputState = InputState(messages=query)
async for c, metadata in graph.astream(input=inputState, stream_mode="messages", config=thread):
if c.additional_kwargs.get("tool_calls"):
print(c.additional_kwargs.get("tool_calls")[0]["function"].get("arguments"), end="", flush=True)
if c.content:
time.sleep(0.05)
print(c.content, end="", flush=True)
if len(graph.get_state(thread)[-1]) > 0:
if len(graph.get_state(thread)[-1][0].interrupts) > 0:
response = input("\n响应可能包含不确定的信息。是否重试生成?如果是,请按 'y': ")
if response.lower() == 'y':
async for c, metadata in graph.astream(Command(resume=response), stream_mode="messages", config=thread):
if c.additional_kwargs.get("tool_calls"):
print(c.additional_kwargs.get("tool_calls")[0]["function"].get("arguments"), end="")
if c.content:
time.sleep(0.05)
print(c.content, end="", flush=True)
async def main():
input = builtins.input
print("请输入您的查询(输入 '-q' 退出):")
while True:
query = input("> ")
if query.strip().lower() == "-q":
print("正在退出...")
break
await process_query(query)
if __name__ == "__main__":
asyncio.run(main())
在第一次调用后,图形状态会检查中断。如果发现任何中断,可以使用以下命令再次调用图形:
graph.astream(Command(resume=response), stream_mode="messages", config=thread)
通过这种方式,工作流程将从中断的步骤恢复,而无需重新执行之前的步骤,使用相同的 thread_id
。
3. 结果
对于以下测试,使用了关于谷歌环境可持续性战略的年度报告,该报告可在此处免费获取。
实时测试
作为第一次测试,执行了以下查询以从不同表中提取不同值,结合了多步骤方法的能力,并利用了Docling库的解析功能。
复杂问题:“检索新加坡第二个设施在2019年和2022年的数据中心PUE效率值。同时检索2023年亚太地区的区域平均CFE。”
完整结果是正确的,并且幻觉检查已成功通过。
聊天机器人生成的步骤:
- “查找新加坡第二个设施在2019年和2022年的PUE效率值。”,
- “查找2023年亚太地区的区域平均CFE。”
生成的文本:*“- 新加坡第二个设施在2019年的电源使用效率(PUE)不可用,因为该年度的数据未提供。然而,2022年的PUE为1.21\。”
亚太地区2023年的区域平均无碳能源(CFE)为12%。”
完整输出:
请输入您的查询(输入 '-q' 以退出):
> 检索新加坡第二个设施在2019年和2022年的数据中心PUE效率值。同时检索2023年亚太地区的平均CFE
2025-01-10 20:39:53,381 - 信息 - ---分析并路由查询---
2025-01-10 20:39:53,381 - 信息 - 消息: [HumanMessage(content='Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023 ', additional_kwargs={}, response_metadata={}, id='351a00e9-ecda-49e2-b069-19196348a82a')]
{"logic":"Retrieve the data center PUE efficiency values in Singapore 2nd facility in 2019 and 2022. Also retrieve regional average CFE in Asia pacific in 2023","type":"environmental"}2025-01-10 20:39:55,586 - 信息 - ---计划生成---
{"steps":["查找新加坡第二个设施在2019年和2022年的PUE效率值。","查找2023年亚太地区的平均CFE。"]}2025-01-10 20:39:57,323 - 信息 - ---生成查询---
{"queries":["PUE efficiency values Singapore 2nd facility 2019","PUE efficiency values Singapore 2nd facility 2022"]}2025-01-10 20:39:58,285 - 信息 - 查询: ['PUE efficiency values Singapore 2nd facility 2019', 'PUE efficiency values Singapore 2nd facility 2022', '查找新加坡第二个设施在2019年和2022年的PUE效率值。']
2025-01-10 20:39:58,288 - 信息 - ---检索文档---
2025-01-10 20:39:58,288 - 信息 - 检索过程查询: PUE efficiency values Singapore 2nd facility 2019
2025-01-10 20:39:59,568 - 信息 - ---检索文档---
2025-01-10 20:39:59,568 - 信息 - 检索过程查询: PUE efficiency values Singapore 2nd facility 2022
2025-01-10 20:40:00,891 - 信息 - ---检索文档---
2025-01-10 20:40:00,891 - 信息 - 检索过程查询: 查找新加坡第二个设施在2019年和2022年的PUE效率值。
2025-01-10 20:40:01,820 - 信息 -
总共检索到4份文档,步骤:查找新加坡第二个设施在2019年和2022年的PUE效率值。。
2025-01-10 20:40:01,825 - 信息 - ---生成查询---
{"queries":["亚太地区2023年平均CFE","亚太地区2023年CFE统计"]}2025-01-10 20:40:02,778 - 信息 - 查询: ['亚太地区2023年平均CFE', '亚太地区2023年CFE统计', '查找2023年亚太地区的平均CFE。']
2025-01-10 20:40:02,780 - 信息 - ---检索文档---
2025-01-10 20:40:02,780 - 信息 - 检索过程查询: 亚太地区2023年平均CFE
2025-01-10 20:40:03,757 - 信息 - ---检索文档---
2025-01-10 20:40:03,757 - 信息 - 检索过程查询: 亚太地区2023年CFE统计
2025-01-10 20:40:04,885 - 信息 - ---检索文档---
2025-01-10 20:40:04,885 - 信息 - 检索过程查询: 查找2023年亚太地区的平均CFE。
2025-01-10 20:40:06,526 - 信息 -
总共检索到4份文档,步骤:查找2023年亚太地区的平均CFE。。
2025-01-10 20:40:06,530 - 信息 - ---响应生成步骤---
- 新加坡第二个设施在2019年的PUE(电力使用效率)不可用,因为该年的数据未提供。然而,2022年的PUE为1.21 [e048d08a-4ef6-77b5-20d3-352dcec590b7]。
- 2023年亚太地区的平均无碳能源(CFE)为12% [9c489d2f-f16f-572b-abed-ee1d5d0ed379]。2025-01-10 20:40:14,918 - 信息 - ---检查幻觉---
{"binary_score":"1"}>
4. 结论
Agentic RAG:技术挑战与考虑
尽管性能有所提升,但实施Agentic RAG并非没有挑战:
- 延迟:代理交互的复杂性增加往往导致响应时间变长。在速度与准确性之间取得平衡是一项关键挑战。
- 评估与可观察性:随着Agentic RAG系统变得更加复杂,持续的评估与可观察性变得必要。
总之,Agentic RAG在人工智能领域标志着重大的突破。通过将大型语言模型的能力与自主推理和信息检索相结合,Agentic RAG引入了一种新的智能和灵活性标准。随着人工智能的不断发展,Agentic RAG将在各个行业中发挥基础性作用,改变我们使用技术的方式。
Github Repo 这里!
References:
https://langchainai.github.io/langgraph/concepts/human_in_the_loop/#approve-or-reject
https://www.kaggle.com/code/marcinrutecki/rag-ensemble-retriever-in-langchain
https://sustainability.google/reports/google-2024-environmental-report/
https://python.langchain.com/docs/integrations/retrievers/cohere-reranker/
https://infohub.delltechnologies.com/it-it/p/the-rise-of-agentic-rag-systems/
https://aws.amazon.com/it/blogs/machine-learning/improve-rag-performance-using-cohere-rerank/