Type something to search...
构建动态订单管理系统:使用langgraph的5个步骤实现高效工作流

构建动态订单管理系统:使用langgraph的5个步骤实现高效工作流

AI 生成的 LangGraph 插图

在这个极其详细的教程中,我们将探索 LangGraph — 一个强大的库,用于协调复杂的多步骤工作流,结合大型语言模型(LLMs) — 并将其应用于一个常见的电子商务问题:根据用户的查询决定是下单还是取消订单。在博客结束时,您将了解如何:

  1. 在 Python 环境中设置 LangGraph
  2. 加载和管理数据(例如,库存和客户)。
  3. 定义节点(工作流中的单个任务)。
  4. 构建包含条件分支的节点和边的图。
  5. 可视化和测试工作流。

我们将逐步进行,详细解释每个概念 — 非常适合初学者以及希望使用大型语言模型(LLMs)构建动态或循环工作流的人。

目录

什么是 LangGraph?

问题陈述:订单管理

导入说明

数据加载和状态定义

创建工具和大型语言模型集成

定义工作流节点

构建工作流图

可视化和测试工作流

结论

什么是 LangGraph?

LangGraph 是一个将图形化方法引入 LangChain 工作流的库。传统的管道通常线性地从一个步骤移动到另一个步骤,但现实世界的任务往往需要分支、条件逻辑,甚至循环(重试失败的步骤、澄清用户输入等)。

LangGraph 的主要特性:

  • 节点:单个任务或功能(例如,检查库存、计算运费)。
  • :定义节点之间数据和控制的流动。可以是条件的。
  • 共享状态:每个节点可以返回更新全局状态对象的数据,避免手动传递数据。
  • 工具集成:轻松结合外部工具或函数,供大型语言模型调用。
  • 人机协作(可选):插入需要人工审核的节点。

问题陈述:订单管理

在这种情况下,用户的查询可以关于下新订单取消现有订单:

  • PlaceOrder: 检查商品可用性,计算运费,并模拟支付。
  • CancelOrder: 提取order_id并将订单标记为已取消。

因为我们必须分支(决定“PlaceOrder”与“CancelOrder”),我们将使用LangGraph创建一个条件流程:

  1. 分类查询。
  2. 如果PlaceOrder,则转到检查库存、运送和支付。
  3. 如果CancelOrder,则解析出order_id并调用取消工具。

导入说明

下面是您提供的代码的第一部分,展示了导入和环境设置。我们在代码后添加了注释,以解释每个部分。

import os
import pandas as pd
import random
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, MessagesState, START, END
from langchain_core.runnables.graph import MermaidDrawMethod
from IPython.display import display, Image
from typing import Literal
from langchain_core.prompts import ChatPromptTemplate
from typing import Dict, TypedDict

os.environ["OPENAI_API_KEY"] = ""

langchain_core.tools, langchain_openai, ToolNode 等

  • tool(一个装饰器)将 Python 函数转换为 LLM 可以调用的“工具”。
  • ChatOpenAI 是我们与 GPT 模型对话的 LLM 客户端。
  • ToolNode 是来自 langgraph.prebuilt 的预构建节点,负责工具执行。
  • StateGraphMessagesStateSTARTEND 来自 langgraph.graph——它们对于定义我们的工作流至关重要。
  • MermaidDrawMethod 有助于将工作流可视化为 Mermaid.js 图表。

数据加载和状态定义

数据链接:Data

在下一个代码片段中,我们加载 CSV 文件(用于库存和客户)并 转换 它们为字典。我们还定义了我们的状态类型字典。

inventory_df = pd.read_csv("inventory.csv")
customers_df = pd.read_csv("customers.csv")

inventory = inventory_df.set_index("item_id").T.to_dict()
customers = customers_df.set_index("customer_id").T.to_dict()

class State(TypedDict): query: str category: str next_node: str item_id: str order_status: str cost: str payment_status: str location: str quantity: int

CSV 到字典

  • inventorycustomers 是以 item_idcustomer_id 为键的字典。这使得像 inventory[item_51] 这样的查找变得简单。

状态

  • 一个类型字典,以便我们知道期望哪些字段。例如,querycategoryitem_id 等。
  • category 通常是“PlaceOrder”或“CancelOrder”。
  • next_node 可以存储下一个节点名称,尽管我们依赖图的边进行转换。
  • 这有助于在一个对象中跟踪所有内容——库存检查、支付状态 等。

创建工具和大型语言模型集成

现在我们定义我们的大型语言模型和工具。这里的主要工具是 cancel_order,它使用大型语言模型从查询中提取 order_id

@tool
def cancel_order(query: str) -> dict:
    """模拟订单取消"""
    order_id = llm.with_structured_output(method='json_mode').invoke(f'从以下文本中以json格式提取order_id: {query}')['order_id']

    if not order_id:
        return {"error": "缺少 'order_id'."}

    return {"order_status": "订单已取消"}
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

tools_2 = [cancel_order]
llm_with_tools_2 = llm.bind_tools(tools_2)
tool_node_2 = ToolNode(tools_2)

@tool:

  • cancel_order 函数现在是大型语言模型可以调用的工具,如果它决定需要取消订单。

提取 order_id:

  • 我们调用 llm.with_structured_output(method='json_mode') 来指示大型语言模型返回JSON。然后我们解析出 'order_id'

大型语言模型初始化:

  • model="gpt-4o-mini" 是选择的模型,temperature=0 用于确定性响应。

绑定和 ToolNode:

  • llm.bind_tools(tools_2) 将我们的大型语言模型与 cancel_order 工具连接起来。
  • ToolNode 是一个专用节点,可以自动处理这些绑定的工具。

定义工作流节点

我们现在将开始逐个定义节点。

模型调用节点

这些节点可以调用模型

def call_model_2(state: MessagesState):
    """使用大型语言模型决定下一步。"""
    messages = state["messages"]
    response = llm_with_tools_2.invoke(str(messages))
    return {"messages": [response]}
def call_tools_2(state: MessagesState) -> Literal["tools_2", END]:
    """根据工具调用路由工作流。"""
    messages = state["messages"]
    last_message = messages[-1]

    if last_message.tool_calls:
        return "tools_2"
    return END
  • **call_model_2**:获取对话(messages)并将其传递给绑定工具的大型语言模型。如果大型语言模型触发工具调用,我们将在call_tools_2中检测到。
  • **call_tools_2**:检查大型语言模型是否请求了工具调用(tool_calls)。如果是,我们路由到"tools_2",即ToolNode;否则,我们结束工作流。

分类查询

在这里,我们定义一个节点用于分类查询:

def categorize_query(state: MessagesState) -> MessagesState:
    """将用户查询分类为下订单或取消订单"""
    prompt = ChatPromptTemplate.from_template(
        "将用户查询分类为下订单或取消订单"
        "回应为 'PlaceOrder' 或 'CancelOrder' 查询: {state}"
    )

    chain = prompt | ChatOpenAI(temperature=0)
    category = chain.invoke({"state": state}).content

    return {"query": state, "category": category}
  • 该节点使用大型语言模型来分类用户的查询。返回值在状态中设置 "category"

检查库存

def check_inventory(state: MessagesState) -> MessagesState:
    """Check if the requested item is in stock."""

    item_id = llm.with_structured_output(method='json_mode').invoke(f'Extract item_id from the following text in json format: {state}')['item_id']
    quantity = llm.with_structured_output(method='json_mode').invoke(f'Extract quantity from the following text in json format: {state}')['quantity']

    if not item_id or not quantity:
        return {"error": "Missing 'item_id' or 'quantity'."}

    if inventory.get(item_id, {}).get("stock", 0) >= quantity:
        print("IN STOCK")
        return {"status": "In Stock"}
    
    return {"query": state, "order_status": "Out of Stock"}
  • 尝试从对话中解析 item_idquantity
  • 检查 inventory[item_id]["stock"] 以确认可用性。

计算运费

我们为特定客户定义了一个节点来计算运费

def compute_shipping(state: MessagesState) -> MessagesState:
    """Calculate shipping costs."""
    item_id = llm.with_structured_output(method='json_mode').invoke(f'Extract item_id from the following text in json format: {state}')['item_id']
    quantity = llm.with_structured_output(method='json_mode').invoke(f'Extract quantity from the following text in json format: {state}')['quantity']
    customer_id = llm.with_structured_output(method='json_mode').invoke(f'Extract customer_id from the following text in json format: {state}')['customer_id']
    location = customers[customer_id]['location']

    if not item_id or not quantity or not location:
        return {"error": "Missing 'item_id', 'quantity', or 'location'."}

    weight_per_item = inventory[item_id]["weight"]
    total_weight = weight_per_item * quantity
    rates = {"local": 5, "domestic": 10, "international": 20}
    cost = total_weight * rates.get(location, 10)
    print(cost, location)

    return {"query": state, "cost": f"${cost:.2f}"}
  • 从用户的查询中检索 customer_id,然后在 customers 字典中查找他们的 location
  • 根据物品的重量、数量和用户的位置计算运费。

处理支付

我们将定义一个用于处理支付的节点:

def process_payment(state: State) -> State:
    """Simulate payment processing."""
    cost = llm.with_structured_output(method='json_mode').invoke(f'Extract cost from the following text in json format: {state}')

    if not cost:
        return {"error": "Missing 'amount'."}
    print(f"PAYMENT PROCESSED: {cost} and order successfully placed!")
    payment_outcome = random.choice(["Success", "Failed"])
    return {"payment_status": payment_outcome}
  • 使用 random.choice 来模拟成功或失败。
  • 在生产系统中,您将与真实的支付网关集成。

路由功能

我们现在定义一个用于路由查询的节点:

def route_query_1(state: State) -> str:
    """Route the query based on its category."""
    print(state)
    if state["category"] == "PlaceOrder":
        return "PlaceOrder"
    elif state["category"] == "CancelOrder":
        return "CancelOrder"
  • 决定下一个要遵循的路径:“PlaceOrder”或“CancelOrder”。在 LangGraph 中,我们将“PlaceOrder”映射到 CheckInventory 节点,将“CancelOrder”映射到 CancelOrder 节点。

构建工作流图

下面,我们创建一个 状态图,添加节点,并定义边和条件边。

workflow = StateGraph(MessagesState)

workflow.add_node("RouteQuery", categorize_query)
workflow.add_node("CheckInventory", check_inventory)
workflow.add_node("ComputeShipping", compute_shipping)
workflow.add_node("ProcessPayment", process_payment)

workflow.add_conditional_edges(
    "RouteQuery",
    route_query_1,
    {
        "PlaceOrder": "CheckInventory",
        "CancelOrder": "CancelOrder"
    }
)
workflow.add_node("CancelOrder", call_model_2)
workflow.add_node("tools_2", tool_node_2)

workflow.add_edge(START, "RouteQuery")
workflow.add_edge("CheckInventory", "ComputeShipping")
workflow.add_edge("ComputeShipping", "ProcessPayment")
workflow.add_conditional_edges("CancelOrder", call_tools_2)
workflow.add_edge("tools_2", "CancelOrder")
workflow.add_edge("ProcessPayment", END)

**状态图(MessagesState)**:

  • 我们指定 MessagesState 来保存对话数据。

节点:

  • RouteQuery 是分类用户意图的入口节点。
  • CheckInventoryComputeShippingProcessPayment 处理下单流程。
  • CancelOrdertools_2 处理取消订单流程。

条件边:

  • 调用 workflow.add_conditional_edges("RouteQuery", route_query_1, ...) 确保如果是“下单”,我们转到 CheckInventory;如果是“取消订单”,则转到 CancelOrder

循环:

  • 当用户点击 CancelOrder 时,我们检查 LLM 是否触发了工具调用 (call_tools_2)。如果是,则转到 tools_2工具节点);在工具被调用后,返回到 CancelOrder,给 LLM 机会产生进一步的动作或结束。

结束:

  • ProcessPayment 导致 结束,结束“下单”路径。

可视化和测试工作流

下一个代码片段将工作流编译成一个代理,将其呈现为Mermaid图,并使用示例查询进行测试。

agent = workflow.compile()

mermaid_graph = agent.get_graph()
mermaid_png = mermaid_graph.draw_mermaid_png(draw_method=MermaidDrawMethod.API)
display(Image(mermaid_png))

user_query = "我想取消订单ID 223"
for chunk in agent.stream(
    {"messages": [("user", user_query)]},
    stream_mode="values",
):
    chunk["messages"][-1].pretty_print()

auser_query = "customer_id: customer_14 : 我想为item_51下订单,订单数量为4,且为国内"
for chunk in agent.stream(
    {"messages": [("user", auser_query)]},
    stream_mode="values",
):
    chunk["messages"][-1].pretty_print()

编译

  • agent = workflow.compile() 将我们的节点/边定义转换为可执行的代理。

可视化

  • 我们获得了一个Mermaid图(mermaid_png),可以在Jupyter笔记本中显示以进行调试或演示。

测试查询

  • 第一次测试:“我想取消订单ID 223” 应该路由到 CancelOrder

第一次测试的输出

  • 第二次测试:“customer_id: customer_14 : 我想为item_51下订单……” 应该路由到下单工作流。

第二次测试的输出

结论:

通过利用 LangGraph,我们构建了一个动态的、分支的工作流,根据用户意图进行下单或取消订单。我们演示了:

  • 如何使用 LLM 节点 (categorize_query) 对查询进行分类。
  • 如何绑定工具 (cancel_order) 并将其集成到工作流中。
  • 如何通过独立的节点检查库存、计算运费和处理付款。
  • 如何使用 Mermaid.js 可视化整个工作流。

这种方法具有可扩展性:您可以添加更多步骤(例如,地址验证、促销代码)或额外的分支(例如,更新现有订单),而无需重写一个单一的脚本。如果您需要 循环 来重试失败的付款或验证用户确认,LangGraph 也可以处理这些。

Related Posts

结合chatgpt-o3-mini与perplexity Deep Research的3步提示:提升论文写作质量的终极指南

结合chatgpt-o3-mini与perplexity Deep Research的3步提示:提升论文写作质量的终极指南

AI 研究报告和论文写作 合并两个系统指令以获得两个模型的最佳效果 Perplexity AI 的 Deep Research 工具提供专家级的研究报告,而 OpenAI 的 ChatGPT-o3-mini-high 擅长推理。我发现你可以将它们结合起来生成令人难以置信的论文,这些论文比任何一个模型单独撰写的都要好。你只需要将这个一次性提示复制到 **

阅读更多
让 Excel 过时的 10 种 Ai 工具:实现数据分析自动化,节省手工作业时间

让 Excel 过时的 10 种 Ai 工具:实现数据分析自动化,节省手工作业时间

Non members click here作为一名软件开发人员,多年来的一个发现总是让我感到惊讶,那就是人们还在 Excel

阅读更多
使用 ChatGPT 搜索网络功能的 10 种创意方法

使用 ChatGPT 搜索网络功能的 10 种创意方法

例如,提示和输出 你知道可以使用 ChatGPT 的“搜索网络”功能来完成许多任务,而不仅仅是基本的网络搜索吗? 对于那些不知道的人,ChatGPT 新的“搜索网络”功能提供实时信息。 截至撰写此帖时,该功能仅对使用 ChatGPT 4o 和 4o-mini 的付费会员开放。 ![](https://images.weserv.nl/?url=https://cdn-im

阅读更多
掌握Ai代理:解密Google革命性白皮书的10个关键问题解答

掌握Ai代理:解密Google革命性白皮书的10个关键问题解答

10 个常见问题解答 本文是我推出的一个名为“10 个常见问题解答”的新系列的一部分。在本系列中,我旨在通过回答关于该主题的十个最常见问题来分解复杂的概念。我的目标是使用简单的语言和相关的类比,使这些想法易于理解。 图片来自 [Solen Feyissa](https://unsplash.com/@solenfeyissa?utm_source=medium&utm_medi

阅读更多
在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和科技这样一个动态的行业中,保持领先意味着不断提升你的技能。无论你是希望深入了解人工智能模型性能、掌握数据分析,还是希望通过人工智能转变传统领域如法律,这些课程都是你成功的捷径。以下是一个精心策划的高价值课程列表,可以助力你的职业发展,并让你始终处于创新的前沿。 1. 生成性人工智能简介课程: [生成性人工智能简介](https://genai.works

阅读更多
揭开真相!深度探悉DeepSeek AI的十大误区,您被误导了吗?

揭开真相!深度探悉DeepSeek AI的十大误区,您被误导了吗?

在AI军备竞赛中分辨事实与虚构 DeepSeek AI真的是它所宣传的游戏规则改变者,还是仅仅聪明的营销和战略炒作?👀 虽然一些人将其视为AI效率的革命性飞跃,但另一些人则认为它的成功建立在借用(甚至窃取的)创新和可疑的做法之上。传言称,DeepSeek的首席执行官在疫情期间像囤积卫生纸一样囤积Nvidia芯片——这只是冰山一角。 从其声称的550万美元培训预算到使用Open

阅读更多
Type something to search...