
构建动态订单管理系统:使用langgraph的5个步骤实现高效工作流
AI 生成的 LangGraph 插图
在这个极其详细的教程中,我们将探索 LangGraph — 一个强大的库,用于协调复杂的多步骤工作流,结合大型语言模型(LLMs) — 并将其应用于一个常见的电子商务问题:根据用户的查询决定是下单还是取消订单。在博客结束时,您将了解如何:
- 在 Python 环境中设置 LangGraph。
- 加载和管理数据(例如,库存和客户)。
- 定义节点(工作流中的单个任务)。
- 构建包含条件分支的节点和边的图。
- 可视化和测试工作流。
我们将逐步进行,详细解释每个概念 — 非常适合初学者以及希望使用大型语言模型(LLMs)构建动态或循环工作流的人。
目录
什么是 LangGraph?
LangGraph 是一个将图形化方法引入 LangChain 工作流的库。传统的管道通常线性地从一个步骤移动到另一个步骤,但现实世界的任务往往需要分支、条件逻辑,甚至循环(重试失败的步骤、澄清用户输入等)。
LangGraph 的主要特性:
- 节点:单个任务或功能(例如,检查库存、计算运费)。
- 边:定义节点之间数据和控制的流动。可以是条件的。
- 共享状态:每个节点可以返回更新全局状态对象的数据,避免手动传递数据。
- 工具集成:轻松结合外部工具或函数,供大型语言模型调用。
- 人机协作(可选):插入需要人工审核的节点。
问题陈述:订单管理
在这种情况下,用户的查询可以是关于下新订单或取消现有订单:
- PlaceOrder: 检查商品可用性,计算运费,并模拟支付。
- CancelOrder: 提取
order_id
并将订单标记为已取消。
因为我们必须分支(决定“PlaceOrder”与“CancelOrder”),我们将使用LangGraph创建一个条件流程:
- 分类查询。
- 如果PlaceOrder,则转到检查库存、运送和支付。
- 如果CancelOrder,则解析出
order_id
并调用取消工具。
导入说明
下面是您提供的代码的第一部分,展示了导入和环境设置。我们在代码后添加了注释,以解释每个部分。
import os
import pandas as pd
import random
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.runnables.graph import MermaidDrawMethod
from IPython.display import display, Image
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from typing import Dict, TypedDict
os.environ["OPENAI_API_KEY"] = ""
langchain_core.tools, langchain_openai, ToolNode 等:
tool
(一个装饰器)将 Python 函数转换为 LLM 可以调用的“工具”。ChatOpenAI
是我们与 GPT 模型对话的 LLM 客户端。ToolNode
是来自langgraph.prebuilt
的预构建节点,负责工具执行。StateGraph
、MessagesState
、START
、END
来自langgraph.graph
——它们对于定义我们的工作流至关重要。MermaidDrawMethod
有助于将工作流可视化为 Mermaid.js 图表。
数据加载和状态定义
数据链接:Data
在下一个代码片段中,我们加载 CSV 文件(用于库存和客户)并 转换 它们为字典。我们还定义了我们的状态类型字典。
inventory_df = pd.read_csv("inventory.csv")
customers_df = pd.read_csv("customers.csv")
inventory = inventory_df.set_index("item_id").T.to_dict()
customers = customers_df.set_index("customer_id").T.to_dict()
class State(TypedDict): query: str category: str next_node: str item_id: str order_status: str cost: str payment_status: str location: str quantity: int
CSV 到字典:
inventory
和customers
是以item_id
或customer_id
为键的字典。这使得像inventory[item_51]
这样的查找变得简单。
状态:
- 一个类型字典,以便我们知道期望哪些字段。例如,
query
、category
、item_id
等。 category
通常是“PlaceOrder”或“CancelOrder”。next_node
可以存储下一个节点名称,尽管我们依赖图的边进行转换。- 这有助于在一个对象中跟踪所有内容——库存检查、支付状态 等。
创建工具和大型语言模型集成
现在我们定义我们的大型语言模型和工具。这里的主要工具是 cancel_order
,它使用大型语言模型从查询中提取 order_id
。
@tool
def cancel_order(query: str) -> dict:
"""模拟订单取消"""
order_id = llm.with_structured_output(method='json_mode').invoke(f'从以下文本中以json格式提取order_id: {query}')['order_id']
if not order_id:
return {"error": "缺少 'order_id'."}
return {"order_status": "订单已取消"}
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
tools_2 = [cancel_order]
llm_with_tools_2 = llm.bind_tools(tools_2)
tool_node_2 = ToolNode(tools_2)
@tool:
cancel_order
函数现在是大型语言模型可以调用的工具,如果它决定需要取消订单。
提取 order_id
:
- 我们调用
llm.with_structured_output(method='json_mode')
来指示大型语言模型返回JSON。然后我们解析出'order_id'
。
大型语言模型初始化:
model="gpt-4o-mini"
是选择的模型,temperature=0
用于确定性响应。
绑定和 ToolNode
:
llm.bind_tools(tools_2)
将我们的大型语言模型与cancel_order
工具连接起来。ToolNode
是一个专用节点,可以自动处理这些绑定的工具。
定义工作流节点
我们现在将开始逐个定义节点。
模型调用节点
这些节点可以调用模型
def call_model_2(state: MessagesState):
"""使用大型语言模型决定下一步。"""
messages = state["messages"]
response = llm_with_tools_2.invoke(str(messages))
return {"messages": [response]}
def call_tools_2(state: MessagesState) -> Literal["tools_2", END]:
"""根据工具调用路由工作流。"""
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "tools_2"
return END
**call_model_2**
:获取对话(messages
)并将其传递给绑定工具的大型语言模型。如果大型语言模型触发工具调用,我们将在call_tools_2
中检测到。**call_tools_2**
:检查大型语言模型是否请求了工具调用(tool_calls
)。如果是,我们路由到"tools_2"
,即ToolNode
;否则,我们结束工作流。
分类查询
在这里,我们定义一个节点用于分类查询:
def categorize_query(state: MessagesState) -> MessagesState:
"""将用户查询分类为下订单或取消订单"""
prompt = ChatPromptTemplate.from_template(
"将用户查询分类为下订单或取消订单"
"回应为 'PlaceOrder' 或 'CancelOrder' 查询: {state}"
)
chain = prompt | ChatOpenAI(temperature=0)
category = chain.invoke({"state": state}).content
return {"query": state, "category": category}
- 该节点使用大型语言模型来分类用户的查询。返回值在状态中设置
"category"
。
检查库存
def check_inventory(state: MessagesState) -> MessagesState:
"""Check if the requested item is in stock."""
item_id = llm.with_structured_output(method='json_mode').invoke(f'Extract item_id from the following text in json format: {state}')['item_id']
quantity = llm.with_structured_output(method='json_mode').invoke(f'Extract quantity from the following text in json format: {state}')['quantity']
if not item_id or not quantity:
return {"error": "Missing 'item_id' or 'quantity'."}
if inventory.get(item_id, {}).get("stock", 0) >= quantity:
print("IN STOCK")
return {"status": "In Stock"}
return {"query": state, "order_status": "Out of Stock"}
- 尝试从对话中解析
item_id
和quantity
。 - 检查
inventory[item_id]["stock"]
以确认可用性。
计算运费
我们为特定客户定义了一个节点来计算运费
def compute_shipping(state: MessagesState) -> MessagesState:
"""Calculate shipping costs."""
item_id = llm.with_structured_output(method='json_mode').invoke(f'Extract item_id from the following text in json format: {state}')['item_id']
quantity = llm.with_structured_output(method='json_mode').invoke(f'Extract quantity from the following text in json format: {state}')['quantity']
customer_id = llm.with_structured_output(method='json_mode').invoke(f'Extract customer_id from the following text in json format: {state}')['customer_id']
location = customers[customer_id]['location']
if not item_id or not quantity or not location:
return {"error": "Missing 'item_id', 'quantity', or 'location'."}
weight_per_item = inventory[item_id]["weight"]
total_weight = weight_per_item * quantity
rates = {"local": 5, "domestic": 10, "international": 20}
cost = total_weight * rates.get(location, 10)
print(cost, location)
return {"query": state, "cost": f"${cost:.2f}"}
- 从用户的查询中检索 customer_id,然后在
customers
字典中查找他们的 location。 - 根据物品的重量、数量和用户的位置计算运费。
处理支付
我们将定义一个用于处理支付的节点:
def process_payment(state: State) -> State:
"""Simulate payment processing."""
cost = llm.with_structured_output(method='json_mode').invoke(f'Extract cost from the following text in json format: {state}')
if not cost:
return {"error": "Missing 'amount'."}
print(f"PAYMENT PROCESSED: {cost} and order successfully placed!")
payment_outcome = random.choice(["Success", "Failed"])
return {"payment_status": payment_outcome}
- 使用
random.choice
来模拟成功或失败。 - 在生产系统中,您将与真实的支付网关集成。
路由功能
我们现在定义一个用于路由查询的节点:
def route_query_1(state: State) -> str:
"""Route the query based on its category."""
print(state)
if state["category"] == "PlaceOrder":
return "PlaceOrder"
elif state["category"] == "CancelOrder":
return "CancelOrder"
- 决定下一个要遵循的路径:“PlaceOrder”或“CancelOrder”。在 LangGraph 中,我们将“PlaceOrder”映射到 CheckInventory 节点,将“CancelOrder”映射到 CancelOrder 节点。
构建工作流图
下面,我们创建一个 状态图
,添加节点,并定义边和条件边。
workflow = StateGraph(MessagesState)
workflow.add_node("RouteQuery", categorize_query)
workflow.add_node("CheckInventory", check_inventory)
workflow.add_node("ComputeShipping", compute_shipping)
workflow.add_node("ProcessPayment", process_payment)
workflow.add_conditional_edges(
"RouteQuery",
route_query_1,
{
"PlaceOrder": "CheckInventory",
"CancelOrder": "CancelOrder"
}
)
workflow.add_node("CancelOrder", call_model_2)
workflow.add_node("tools_2", tool_node_2)
workflow.add_edge(START, "RouteQuery")
workflow.add_edge("CheckInventory", "ComputeShipping")
workflow.add_edge("ComputeShipping", "ProcessPayment")
workflow.add_conditional_edges("CancelOrder", call_tools_2)
workflow.add_edge("tools_2", "CancelOrder")
workflow.add_edge("ProcessPayment", END)
**状态图(MessagesState)**
:
- 我们指定
MessagesState
来保存对话数据。
节点:
RouteQuery
是分类用户意图的入口节点。CheckInventory
、ComputeShipping
和ProcessPayment
处理下单流程。CancelOrder
和tools_2
处理取消订单流程。
条件边:
- 调用
workflow.add_conditional_edges("RouteQuery", route_query_1, ...)
确保如果是“下单”,我们转到CheckInventory
;如果是“取消订单”,则转到CancelOrder
。
循环:
- 当用户点击
CancelOrder
时,我们检查 LLM 是否触发了工具调用 (call_tools_2
)。如果是,则转到tools_2
(工具节点
);在工具被调用后,返回到CancelOrder
,给 LLM 机会产生进一步的动作或结束。
结束:
ProcessPayment
导致结束
,结束“下单”路径。
可视化和测试工作流
下一个代码片段将工作流编译成一个代理,将其呈现为Mermaid图,并使用示例查询进行测试。
agent = workflow.compile()
mermaid_graph = agent.get_graph()
mermaid_png = mermaid_graph.draw_mermaid_png(draw_method=MermaidDrawMethod.API)
display(Image(mermaid_png))
user_query = "我想取消订单ID 223"
for chunk in agent.stream(
{"messages": [("user", user_query)]},
stream_mode="values",
):
chunk["messages"][-1].pretty_print()
auser_query = "customer_id: customer_14 : 我想为item_51下订单,订单数量为4,且为国内"
for chunk in agent.stream(
{"messages": [("user", auser_query)]},
stream_mode="values",
):
chunk["messages"][-1].pretty_print()
编译:
agent = workflow.compile()
将我们的节点/边定义转换为可执行的代理。
可视化:
- 我们获得了一个Mermaid图(
mermaid_png
),可以在Jupyter笔记本中显示以进行调试或演示。
测试查询:
- 第一次测试:“我想取消订单ID 223” 应该路由到
CancelOrder
。
第一次测试的输出
- 第二次测试:“customer_id: customer_14 : 我想为item_51下订单……” 应该路由到下单工作流。
第二次测试的输出
结论:
通过利用 LangGraph,我们构建了一个动态的、分支的工作流,根据用户意图进行下单或取消订单。我们演示了:
- 如何使用 LLM 节点 (
categorize_query
) 对查询进行分类。 - 如何绑定工具 (
cancel_order
) 并将其集成到工作流中。 - 如何通过独立的节点检查库存、计算运费和处理付款。
- 如何使用 Mermaid.js 可视化整个工作流。
这种方法具有可扩展性:您可以添加更多步骤(例如,地址验证、促销代码)或额外的分支(例如,更新现有订单),而无需重写一个单一的脚本。如果您需要 循环 来重试失败的付款或验证用户确认,LangGraph 也可以处理这些。