Type something to search...
构建高效并行工作流:使用langgraph实现llm应用的5个关键步骤

构建高效并行工作流:使用langgraph实现llm应用的5个关键步骤

大型语言模型与 LangGraph

大型语言模型 (LLMs) 已成为自然语言处理的强大工具,但使用它们编排复杂的工作流可能会很具挑战性。这就是 LangGraph 进入视野的地方——一个专门设计的框架,旨在为 LLM 应用带来结构和效率。

LangGraph 通过将您的 LLM 应用视为有向图而脱颖而出,其中每个节点代表一个特定操作,边定义数据流。它的强大之处在于能够处理并行处理,允许多个操作在不依赖于彼此输出的情况下同时执行。

在本指南中,我们将探讨一个实际示例:构建一个基于主题的摘要生成器。想象一下,您希望生成关于某个主题的全面摘要,但您想同时从多个角度进行探索,而不是线性的方法。我们的应用将接受一个主题(例如“鸟类”),并并行生成几个子主题,为每个子主题创建摘要,然后选择最佳摘要——同时保持清晰的状态管理和类型安全。

这个现实世界的用例展示了 LangGraph 的并行处理能力如何显著提高效率,并创建更复杂的 LLM 应用。让我们一步一步深入了解如何构建这个系统。

理解构建模块

LangGraph 的强大之处在于其核心组件,它们协同工作以创建结构化的 LLM 应用程序。其核心是 StateGraph,它作为您应用程序逻辑的主要容器。您工作流中的每个操作由一个 Node 表示,当可能时,它可以独立并行处理数据。

节点之间的数据流通过基于 TypedDict 的状态进行管理,确保类型安全和清晰的数据契约。这些状态可以通过操作符如 operator.add 进行注释,以处理并行处理结果。边连接这些节点,定义了您应用程序中的顺序路径和条件路径。

这种结构化的方法使我们能够构建复杂的并行工作流,同时在我们的 LLM 应用程序中保持清晰和可靠性。

状态管理

LangGraph 使用 TypedDict 定义结构化状态,确保清晰性和类型安全。Annotations,特别是 Annotated[list, operator.add],在将多个输出合并为单个列表方面发挥了关键作用,从而实现高效的并行处理。状态设计的最佳实践包括:

  • 保持状态定义简洁而富有表现力
  • 使用清晰的类型注释以便于维护
  • 在可行的情况下确保不可变性
  • 逻辑地构建状态以支持并行性和条件流

核心组件

节点和边的设计

在 LangGraph 中,节点是遵循单一责任原则的基本处理单元。每个节点都有一个明确的特定任务——无论是生成主题、创建摘要,还是选择最佳内容。

状态管理

状态转换通过定义数据流清晰契约的 TypedDict 类来处理。OverallState 维护全局上下文,而 SummaryState 管理单个并行操作的数据。

并行流配置

边定义了数据在节点之间的流动方式。使用 add_conditional_edges,我们可以创建并行处理路径。比如:

graph.add_conditional_edges(
    "generate_topics", 
    continue_to_summaries, 
    ["generate_summary"]
)

这个设置允许多个摘要生成同时运行,从而提高效率。

示例实现:LangGraph 并行工作流处理

我们的实现通过构建一个主题摘要系统来演示并行处理,该系统:

  1. 以“鸟类”为主题
  2. 生成多个子主题(例如,“迁徙模式”、“筑巢习性”、“觅食行为”)
  3. 同时为每个子主题创建摘要
  4. 精炼并选择最佳摘要

这种并行方法使我们能够同时处理多个摘要,与顺序处理相比,显著减少了整体执行时间。

1. 设置环境

要构建我们的并行处理应用程序,我们需要使用关键依赖项设置环境:

pip install langgraph langchain-deepseek pydantic typing-extensions

我们将 DeepSeek 配置为我们的 LLM,并设置特定参数:

model = ChatDeepSeek(
    model="deepseek-chat",
    temperature=0,
    max_tokens=1000,
    timeout=None,
    max_retries=2
)

此设置为我们的并行处理工作流提供了基础,确保从我们的 LLM 中获得可靠和一致的响应。

2. 定义数据模型

LangGraph 应用程序需要良好结构的数据模型,以确保可靠的处理和类型安全。我们的实现使用了三个关键组件:

2.1 用于结构化输出的 Pydantic 模型

class Subjects(BaseModel):
    subjects: list[str]

class Summary(BaseModel):
    summary: str

class BestSummary(BaseModel):
    id: int = Field(description="最佳摘要的索引,从 0 开始", ge=0)

这些 Pydantic 模型确保 LLM 输出正确结构化并经过验证。它们充当 LLM 与我们的应用程序之间的合同,防止意外的数据格式。

2.2 状态定义

class OverallState(TypedDict):
    topic: str
    subjects: list
    summaries: Annotated[list, operator.add]
    best_selected_summary: str

class SummaryState(TypedDict):
    subject: str
    summary: Annotated[list, operator.add]

状态定义有两个关键目的:

  • 管理整体工作流状态 (OverallState)
  • 处理单个并行处理状态 (TopicState)

2.3 类型安全考虑

  • 使用 TypedDict 确保编译时类型检查
  • Annotatedoperator.add 使并行输出的正确合并成为可能
  • 字段描述和验证器(如 ge=0)防止无效数据
  • 全球状态与节点特定状态之间的清晰分离防止状态污染

这个强大的类型系统有助于及早捕捉错误,并使代码库更易于维护和自我文档化。

本节解释了您的 LangGraph 应用程序的核心数据建模方面,强调通过 Pydantic 和 TypedDict 类实现的类型安全和状态管理。它展示了适当的数据建模如何支持并行处理,同时保持代码的可靠性。

3. 构建处理节点

在我们的 LangGraph 应用中,节点是基本的处理单元。每个节点都有特定的责任,并且可以独立执行或在可能的情况下并行执行。

3.1 主题生成节点

该节点接受初始主题和多个相关主题进行探索。

def generate_topics(state: OverallState) -> OverallState:
    prompt = subjects_prompt.format(topic=state["topic"])
    response = model.with_structured_output(Subjects).invoke(prompt)
    return {"subjects": response.subjects}

关键特性:

  • 使用格式化提示生成相关主题
  • 返回主题以进行并行处理
  • 结构化输出确保数据格式一致

3.2 并行摘要生成节点

该节点并行处理单个主题以生成详细摘要。

def generate_summary(state: SummaryState) -> SummaryState:
    prompt = summary_prompt.format(topic=state["subject"])
    response = model.with_structured_output(Summary).invoke(prompt)
    return {"summary": [response.summary]}

关键特性:

  • 独立处理单个主题
  • 可对多个主题并行执行
  • 返回结构化输出以确保一致性

3.3 摘要精炼节点

该节点接受生成的摘要并将其精炼得更加有趣和详细。

def refine_summary(state: SummaryState) -> OverallState:
    prompt = refine_summary_prompt.format(summary=state["summary"])
    response = model.with_structured_output(Summary).invoke(prompt)
    return {"summaries": [response.summary]}

关键特性:

  • 提高生成摘要的质量
  • 保持一致的状态结构
  • 独立处理精炼内容
  • 将 SummaryState 转换为 OverallState 进行最终处理

3.4 最佳摘要选择节点

该节点评估所有精炼的摘要并选择最合适的一个。

def best_summary(state: OverallState) -> OverallState:
    summaries = "\n\n".join(state["summaries"])
    prompt = best_summary_prompt.format(topic=state["topic"], summaries=summaries)
    response = model.with_structured_output(BestSummary).invoke(prompt)
    return {"best_selected_summary": state["summaries"][response.id]}

关键特性:

  • 将所有摘要组合以进行比较
  • 使用结构化输出进行一致选择
  • 返回最佳摘要的索引
  • 用选定的摘要更新最终状态

3.5 节点连接逻辑

节点通过条件边连接,以实现并行处理:

def continue_to_summaries(state: OverallState):
    return [Send("generate_summary", {"subject": s}) for s in state["subjects"]]

def continue_to_refine(state: SummaryState):
    return Send("refine_summary", state)

关键特性:

  • continue_to_summaries: 将主题映射到并行摘要生成任务
  • continue_to_refine: 将精炼的摘要引导到最终选择
  • 使用 Send 对象管理状态转换
  • 通过适当的状态映射实现并行处理

3.6 使用的提示

节点使用精心设计的提示来引导大型语言模型:

subjects_prompt = """Generate a comma separated list of between 2 and 5 examples related to: {topic}."""

summary_prompt = """Generate a summary of the following topic: {topic}"""

best_summary_prompt = """Below are a bunch of summaries about {topic}. Select the best one! Return the ID of the best one.
{summaries}"""

refine_summary_prompt = """Refine the following summary to make it more concise and informative: {summary}."""

该节点架构实现了:

  1. 多个主题的并行处理
  2. 明确的关注点分离
  3. 类型安全的状态转换
  4. 高效的工作流管理

每个节点都被设计为独立和无状态,使系统更易于维护和调试。与顺序方法相比,并行处理能力显著减少了整体执行时间。

4. 工作流编排

LangGraph 中的工作流编排涉及构建一个有向图,为并行处理配置边,并设置执行流程。让我们逐一解析每个组件:

4.1 图构建

图是使用 StateGraph 类构建的,并编译成可执行应用程序:

def compile_graph():
  # Initialize the graph with our OverallState type
  graph = StateGraph(OverallState)
  # Add processing nodes
  graph.add_node("generate_topics", generate_topics)
  graph.add_node("generate_summary", generate_summary)
  graph.add_node("refine_summary", refine_summary)
  graph.add_node("best_summary", best_summary)
  # Define the sequential flow
  graph.add_edge(START, "generate_topics")
  graph.add_edge("best_summary", END)
  # Add conditional edges for parallel processing
  graph.add_conditional_edges(
    "generate_topics",
    continue_to_summaries,
    ["generate_summary"]
  )
  graph.add_conditional_edges(
    "generate_summary",
    continue_to_refine,
    ["refine_summary"]
  )
  graph.add_edge("refine_summary", "best_summary")
  # Compile the graph into an executable
  return graph.compile()

4.2 条件边配置

条件边通过根据状态动态创建路径来实现并行处理。

4.3 图的执行流程

工作流遵循以下顺序:

  1. 开始 → generate_topics
  2. generate_topics → 多个 generate_summary 实例(并行)
  3. generate_summary → refine_summary
  4. refine_summary → best_summary
  5. best_summary → 结束

5. 可视化和调试

LangGraph 提供强大的工具来可视化工作流、检查状态和调试并行处理。让我们详细探讨这些功能。

基本图形可视化 LangGraph 可以生成 mermaid 图表来可视化您的工作流:

app = compile_graph()
png_image = app.get_graph().draw_mermaid_png()

with open("langgraph_diagram.png", "wb") as f:
    f.write(png_image)

Image 2

6. 执行和运行时行为

6.1 基本执行

执行 LangGraph 工作流的最简单方法是通过 _invoke_ 方法:

app = compile_graph()

state = app.invoke({"topic": "birds"})
print(state)

预期输出结构:

{
  'topic': 'birds',
  'subjects': ['sparrow', 'eagle', 'penguin', 'parrot'],
  'summaries': [
    'Sparrows, small passerine birds of the family Passeridae, are adaptable and social, thriving in urban and rural areas worldwide. They primarily feed on seeds and insects, contributing to ecosystems by controlling pests and dispersing seeds. Known for their distinctive chirping, sparrows often form flocks. However, habitat loss and pollution threaten some species, causing population declines in certain regions.',
    'Eagles are large birds of prey in the Accipitridae family, known for their powerful build, keen eyesight, and role as apex predators. Found on every continent except Antarctica, they inhabit diverse environments like forests, mountains, and plains. With over 60 species, including the bald and golden eagles, they are cultural symbols of power and freedom, featured in mythology, folklore, and national emblems.',
    'Penguins are aquatic, flightless birds primarily found in the Southern Hemisphere, especially Antarctica. Adapted for swimming, they have countershaded plumage and flippers. Penguins feed on krill, fish, and squid, spending half their lives on land and half in water. They live in large colonies and exhibit unique behaviors like waddling and tobogganing. Species range from the small blue penguin to the large emperor penguin. Threats include climate change, overfishing, and pollution.',
    'Parrots are colorful, intelligent birds in the order Psittaciformes, found in tropical and subtropical regions. Known for mimicking sounds, including human speech, they have strong, curved beaks and zygodactyl feet, making them excellent climbers. Social and often seen in flocks, parrots have a varied diet of seeds, fruits, and nuts. Popular as pets for their vibrant colors and interactive nature, they require significant care. Conservation efforts are crucial due to threats from habitat destruction and the pet trade.'
  ],
  'best_selected_summary': 'Sparrows, small passerine birds of the family Passeridae, are adaptable and social, thriving in urban and rural areas worldwide. They primarily feed on seeds and insects, contributing to ecosystems by controlling pests and dispersing seeds. Known for their distinctive chirping, sparrows often form flocks. However, habitat loss and pollution threaten some species, causing population declines in certain regions.'
}

7. 完整代码:

import operator
from typing import Annotated, TypedDict

from langchain_deepseek import ChatDeepSeek

from langgraph.types import Send
from langgraph.graph import END, StateGraph, START

from pydantic import BaseModel, Field

subjects_prompt = """生成一个包含 2 到 5 个与主题相关的示例的逗号分隔列表: {topic}。"""
summary_prompt = """生成以下主题的摘要: {topic}"""
best_summary_prompt = """以下是关于 {topic} 的一系列摘要。选择最佳的一个!返回最佳摘要的 ID。

{summaries}"""

refine_summary_prompt = """精炼以下摘要,使其更简洁和信息丰富: {summary}。"""

class Subjects(BaseModel):
    subjects: list[str]

class Summary(BaseModel):
    summary: str

class BestSummary(BaseModel):
    id: int = Field(description="最佳摘要的索引,从 0 开始", ge=0)

model = ChatDeepSeek(
    model="deepseek-chat",
    temperature=0,
    max_tokens=1000,
    timeout=None,
    max_retries=2,
)

class OverallState(TypedDict):
    topic: str
    subjects: list

summaries: Annotated[list, operator.add]
best_selected_summary: str

class SummaryState(TypedDict):
    subject: str
    summary: Annotated[list, operator.add]

def generate_topics(state: OverallState) -> OverallState:
    prompt = subjects_prompt.format(topic=state["topic"])
    response = model.with_structured_output(Subjects).invoke(prompt)
    return {"subjects": response.subjects}

def generate_summary(state: SummaryState) -> SummaryState:
    prompt = summary_prompt.format(topic=state["subject"])
    response = model.with_structured_output(Summary).invoke(prompt)
    return {"summary": [response.summary]}

def refine_summary(state: SummaryState) -> OverallState:
    prompt = refine_summary_prompt.format(summary=state["summary"])
    response = model.with_structured_output(Summary).invoke(prompt)
    return {"summaries": [response.summary]}

def continue_to_summaries(state: OverallState):
    return [Send("generate_summary", {"subject": s}) for s in state["subjects"]]

def best_summary(state: OverallState) -> OverallState:
    summaries = "\n\n".join(state["summaries"])
    prompt = best_summary_prompt.format(topic=state["topic"], summaries=summaries)
    response = model.with_structured_output(BestSummary).invoke(prompt)
    return {"best_selected_summary": state["summaries"][response.id]}

def continue_to_refine(state: SummaryState):
    return Send("refine_summary", state)

def compile_graph():
    graph = StateGraph(OverallState)
    graph.add_node("generate_topics", generate_topics)
    graph.add_node("generate_summary", generate_summary)
    graph.add_node("refine_summary", refine_summary)
    graph.add_node("best_summary", best_summary)
    graph.add_edge(START, "generate_topics")
    graph.add_conditional_edges("generate_topics", continue_to_summaries, ["generate_summary"])
    graph.add_conditional_edges("generate_summary", continue_to_refine, ["refine_summary"])
    graph.add_edge("refine_summary", "best_summary")
    graph.add_edge("best_summary", END)
    app = graph.compile()
    return app

app = compile_graph()

png_image = app.get_graph().draw_mermaid_png()

with open("langgraph_diagram_n.png", "wb") as f:
    f.write(png_image)

state = app.invoke({"topic": "birds"})

print(state)

结论

LangGraph 提供了一个强大的框架,用于构建与大型语言模型(LLMs)一起的并行处理应用程序,如我们在主题摘要示例中所展示的。关键概念包括通过类型字典进行状态管理、使用条件边进行并行处理以及健壮的错误处理。我们所涵盖的最佳实践包括保持清晰的节点职责、实施适当的状态验证以及利用可视化工具进行调试。该框架在保持类型安全和状态一致性的同时处理并行操作,使其成为复杂 LLM 应用程序的优秀选择。下一步,开发人员应该探索高级功能,如自定义节点实现、与不同 LLM 提供者的集成以及实施更复杂的并行处理模式。宝贵的资源包括官方 LangGraph 文档、LangChain 的集成指南以及不断壮大的 LangGraph 开发者社区,分享实现和最佳实践。随着 LLM 应用程序的不断发展,LangGraph 在工作流管理方面的结构化方法将变得越来越有价值,以构建可扩展和可维护的应用程序。

References

Related Posts

结合chatgpt-o3-mini与perplexity Deep Research的3步提示:提升论文写作质量的终极指南

结合chatgpt-o3-mini与perplexity Deep Research的3步提示:提升论文写作质量的终极指南

AI 研究报告和论文写作 合并两个系统指令以获得两个模型的最佳效果 Perplexity AI 的 Deep Research 工具提供专家级的研究报告,而 OpenAI 的 ChatGPT-o3-mini-high 擅长推理。我发现你可以将它们结合起来生成令人难以置信的论文,这些论文比任何一个模型单独撰写的都要好。你只需要将这个一次性提示复制到 **

阅读更多
让 Excel 过时的 10 种 Ai 工具:实现数据分析自动化,节省手工作业时间

让 Excel 过时的 10 种 Ai 工具:实现数据分析自动化,节省手工作业时间

Non members click here作为一名软件开发人员,多年来的一个发现总是让我感到惊讶,那就是人们还在 Excel

阅读更多
使用 ChatGPT 搜索网络功能的 10 种创意方法

使用 ChatGPT 搜索网络功能的 10 种创意方法

例如,提示和输出 你知道可以使用 ChatGPT 的“搜索网络”功能来完成许多任务,而不仅仅是基本的网络搜索吗? 对于那些不知道的人,ChatGPT 新的“搜索网络”功能提供实时信息。 截至撰写此帖时,该功能仅对使用 ChatGPT 4o 和 4o-mini 的付费会员开放。 ![](https://images.weserv.nl/?url=https://cdn-im

阅读更多
掌握Ai代理:解密Google革命性白皮书的10个关键问题解答

掌握Ai代理:解密Google革命性白皮书的10个关键问题解答

10 个常见问题解答 本文是我推出的一个名为“10 个常见问题解答”的新系列的一部分。在本系列中,我旨在通过回答关于该主题的十个最常见问题来分解复杂的概念。我的目标是使用简单的语言和相关的类比,使这些想法易于理解。 图片来自 [Solen Feyissa](https://unsplash.com/@solenfeyissa?utm_source=medium&utm_medi

阅读更多
在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和科技这样一个动态的行业中,保持领先意味着不断提升你的技能。无论你是希望深入了解人工智能模型性能、掌握数据分析,还是希望通过人工智能转变传统领域如法律,这些课程都是你成功的捷径。以下是一个精心策划的高价值课程列表,可以助力你的职业发展,并让你始终处于创新的前沿。 1. 生成性人工智能简介课程: [生成性人工智能简介](https://genai.works

阅读更多
揭开真相!深度探悉DeepSeek AI的十大误区,您被误导了吗?

揭开真相!深度探悉DeepSeek AI的十大误区,您被误导了吗?

在AI军备竞赛中分辨事实与虚构 DeepSeek AI真的是它所宣传的游戏规则改变者,还是仅仅聪明的营销和战略炒作?👀 虽然一些人将其视为AI效率的革命性飞跃,但另一些人则认为它的成功建立在借用(甚至窃取的)创新和可疑的做法之上。传言称,DeepSeek的首席执行官在疫情期间像囤积卫生纸一样囤积Nvidia芯片——这只是冰山一角。 从其声称的550万美元培训预算到使用Open

阅读更多
Type something to search...