用于医疗保健的 LangGraph:综合技术指南
- Rifx.Online
- Programming , Machine Learning , Health
- 25 Nov, 2024
构建生产就绪的医疗运营代理
LangGraph简介
什么是 LangGraph?
LangGraph 是一个 Python 库,旨在构建具有状态的多参与者应用程序,使用大型语言模型(LLMs)。可以将其视为一个工具集,帮助您创建能够:
- 在对话中记住上下文
- 通过多个步骤做出决策
- 在不同的 AI “参与者”或组件之间协调
- 在整个过程中维护和更新状态
简单来说,LangGraph 帮助您构建能够思考、行动和记忆的 AI 应用程序——就像人类在处理复杂任务时,通过多个步骤保持对已完成工作的跟踪一样。
关键优势和特点
1. 循环处理
与单向流动的传统管道不同,LangGraph 允许循环和回路:
## Example of a basic cycle in LangGraph
from langgraph.graph import StateGraph, END
## Define a simple state structure
class AgentState(TypedDict):
messages: list
counter: int
## Create a graph
graph = StateGraph(AgentState)
## Add nodes
graph.add_node("think", think_function)
graph.add_node("act", act_function)
## Create a cycle
graph.add_edge("think", "act")
graph.add_conditional_edges(
"act",
should_continue, # Function that decides whether to continue
{True: "think", False: END}
)
2. 内置状态管理
LangGraph 自动处理状态,使维护上下文变得简单:
## Example of state management
class ConversationState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add] # Automatically combines messages
context: dict
attempts: int
## State is automatically passed between nodes
def process_message(state: ConversationState):
current_messages = state['messages']
state['attempts'] += 1
return {"messages": [new_message]}
3. 人工介入能力
您可以暂停执行以获取人工输入或批准:
## Example of human intervention
graph = StateGraph(AgentState)
graph.add_node("agent", agent_function)
graph.add_node("tool", tool_function)
## Compile with interrupts
compiled = graph.compile(
checkpointer=memory,
interrupt_before=["tool"] # Pause before tool execution
)
何时使用 LangGraph
LangGraph 特别适用于以下情况:
- 构建复杂工作流程:当您的 AI 需要遵循多步骤流程并进行决策时。
## Example of a multi-step workflow graph.
add_node("analyze", analyze_input)
graph.add_node("research", research_topic)
graph.add_node("synthesize", create_response)
2. 维护上下文:当您的应用程序需要记住并使用先前的交互时。
3. 协调多个组件:当您需要系统的不同部分协同工作时。
## Multiple specialized nodes working together
graph.add_node("researcher", research_agent)
graph.add_node("writer", writing_agent)
graph.add_node("editor", editing_agent)
4. 启用人工监督:当您需要在过程中进行人工验证或输入时。
LangGraph 在以下应用中表现出色:
- 复杂的客户服务系统
- 研究和分析工具
- 教育应用程序
- 内容生成管道
- 决策支持系统
开始使用
要开始使用 LangGraph,首先使用 pip 安装它:
pip install langgraph
基本项目结构:
my_langgraph_project/
├── agent.py # 主代理逻辑
├── tools/ # 自定义工具
├── nodes/ # 图节点
└── state.py # 状态定义
理解 LangGraph 中的状态管理
LangGraph中的状态是什么?
LangGraph中的状态就像一个容器,保存着您的AI应用程序需要记住和处理的所有重要信息。可以将其视为AI的“记忆”和“工作空间”的结合体。
基本状态示例
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage
class BasicState(TypedDict):
# 对话中的消息列表
messages: Annotated[list[AnyMessage], operator.add]
# 当前正在处理的任务
current_task: str
# 已采取的步骤数
steps: Annotated[int, operator.add]
状态模式和 TypedDict
状态模式定义了您状态的结构,使用 Python 的 TypedDict。这确保您的状态是良好组织和类型安全的。
复杂状态示例
class HospitalState(TypedDict):
# 基本信息
messages: Annotated[list[AnyMessage], operator.add]
department: str
priority_level: int
# 指标跟踪
metrics: Dict[str, Any]
# 操作跟踪
task_history: List[str]
timestamp: datetime
# 上下文信息
context: Dict[str, Any]
注释和归约器
注释告诉 LangGraph 如何组合或更新状态值。归约器是定义值如何合并的函数。
常见注解
## Simple addition for numbers
counter: Annotated[int, operator.add]
## List concatenation
messages: Annotated[list, operator.add]
## Custom reducer for complex merging
def merge_metrics(old: dict, new: dict) -> dict:
"""Custom reducer to merge metrics dictionaries"""
result = old.copy()
for key, value in new.items():
if key in result:
result[key] = (result[key] + value) / 2 # Average values
else:
result[key] = value
return result
metrics: Annotated[Dict[str, float], merge_metrics]
状态持久性
LangGraph 提供多种方式来持久化状态,使您的应用能够在会话间保持上下文或从中断中恢复。
基于内存的持久化
from langgraph.checkpoint.memory import MemorySaver
## 创建内存存储
memory = MemorySaver()
## 在图中使用内存
graph = StateGraph(AgentState)
compiled_graph = graph.compile(checkpointer=memory)
SQLite 持久化
from langgraph.checkpoint.sqlite import SqliteSaver
## 创建 SQLite 存储
sqlite_saver = SqliteSaver.from_conn_string("sqlite:///state.db")
## 在图中使用 SQLite
graph = StateGraph(AgentState)
compiled_graph = graph.compile(checkpointer=sqlite_saver)
与状态一起工作
访问状态
def process_node(state: AgentState) -> dict:
# Read from state
current_messages = state["messages"]
current_task = state["current_task"]
# Process information...
# Return updates to state
return {
"messages": [new_message],
"steps": 1 # Will be added to current steps
}
状态更新与历史
## Get current state
current_state = graph.get_state(thread_id)
## Get state history
state_history = graph.get_state_history(thread_id)
## Update state
new_state = {"messages": [new_message], "steps": 1}
graph.update_state(thread_id, new_state)
最佳实践
- 定义清晰的状态模式:始终定义清晰的状态模式,以表示您需要跟踪的所有数据。
class TaskState(TypedDict):
task_id: str
status: str
progress: int
timestamps: Dict[str, datetime]
2. 使用适当的 Reducers:选择或创建适当的 reducers,以正确处理您的数据合并需求。
3. 原子性地处理状态更新:以完整的原子操作更新状态,以保持一致性。
4. 实施错误恢复:使用状态持久性来实施错误恢复机制。
try:
result = process_task(state)
except Exception:
# 恢复之前的状态
previous_state = graph.get_state_history(thread_id)[-2]
graph.update_state(thread_id, previous_state.values)
真实世界示例:聊天系统状态
这是聊天系统中状态管理的一个实际示例:
class ChatState(TypedDict):
# 自动连接的消息
messages: Annotated[list[AnyMessage], operator.add]
# 更新的用户偏好
user_preferences: Dict[str, Any]
# 会话指标
metrics: Annotated[Dict[str, float], merge_metrics]
# 会话信息
session_id: str
last_activity: datetime
def chat_node(state: ChatState) -> Dict:
"""处理聊天消息并更新状态"""
# 访问当前状态
current_messages = state["messages"]
user_prefs = state["user_preferences"]
# 处理消息...
# 返回状态更新
return {
"messages": [new_message],
"metrics": {"response_time": 0.5},
"last_activity": datetime.now()
}
这个结构使聊天系统能够:
- 维护对话历史
- 跟踪用户偏好
- 监控性能指标
- 处理会话管理
LangGraph 结构与基本组件
2.2 图结构
节点及其角色
节点是 LangGraph 应用程序的构建块。每个节点代表一个特定的功能或操作。
from langgraph.graph import StateGraph
from typing import TypedDict
## Define state structure
class AgentState(TypedDict):
messages: list
context: dict
## Create graph
graph = StateGraph(AgentState)
## Define node functions
def analyze_input(state: AgentState) -> dict:
"""Analyzes user input and updates state"""
return {"context": {"analysis_complete": True}}
def process_data(state: AgentState) -> dict:
"""Processes data based on analysis"""
return {"messages": ["Processing complete"]}
## Add nodes to graph
graph.add_node("analyzer", analyze_input)
graph.add_node("processor", process_data)
边类型
- 普通边:节点之间的简单连接
## Add regular edge
graph.add_edge("analyzer", "processor")
2. 条件边:具有路由逻辑的边
def should_use_tool(state: AgentState) -> bool:
"""Determines if tool usage is needed"""
return state.get("requires_tool", False)
## Add conditional edges
graph.add_conditional_edges(
"processor",
should_use_tool,
{
True: "tool_node",
False: "output_node"
}
)
入口点和出口点
## Set entry point
graph.set_entry_point("analyzer")
## Define endpoint condition
def is_complete(state: AgentState) -> bool:
"""Checks if processing is complete"""
return state.get("complete", False)
## Add endpoint
graph.add_conditional_edges(
"output_node",
is_complete,
{
True: END,
False: "processor"
}
)
图形编译
## Compile graph with optional checkpointing
compiled_graph = graph.compile(
checkpointer=memory_saver, # Optional state persistence
)
2.3 基本组件
消息处理
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
class ChatState(TypedDict):
messages: list[Any]
def handle_messages(state: ChatState) -> dict:
"""处理状态中的消息"""
messages = state["messages"]
latest_message = messages[-1]
# 根据消息类型处理
if isinstance(latest_message, HumanMessage):
# 处理用户输入
response = process_user_input(latest_message.content)
elif isinstance(latest_message, AIMessage):
# 处理AI响应
response = process_ai_response(latest_message.content)
return {"messages": [AIMessage(content=response)]}
工具集成
from langchain_core.tools import tool
from langchain_community.tools.tavily_search import TavilySearchResults
## Define custom tool
@tool
def calculate_metrics(data: str) -> dict:
"""Calculate important metrics from data"""
return {"result": process_data(data)}
## Create tool node
def tool_node(state: AgentState) -> dict:
"""Execute tools based on state"""
tool_name = state["required_tool"]
tool_input = state["tool_input"]
if tool_name == "search":
tool = TavilySearchResults()
elif tool_name == "calculate":
tool = calculate_metrics
result = tool.invoke(tool_input)
return {"messages": [f"Tool result: {result}"]}
代理配置
from langchain_openai import ChatOpenAI
class Agent:
def __init__(
self,
model: str = "gpt-3.5-turbo",
temperature: float = 0,
tools: List[Tool] = None
):
# 初始化 LLM
self.llm = ChatOpenAI(
model=model,
temperature=temperature
).bind_tools(tools)
# 构建图
self.graph = self._build_graph()
def _build_graph(self) -> StateGraph:
"""构建代理的处理图"""
graph = StateGraph(AgentState)
# 添加节点
graph.add_node("input", self.process_input)
graph.add_node("analyze", self.analyze)
graph.add_node("tools", self.execute_tools)
graph.add_node("output", self.synthesize_output)
# 添加边
graph.add_edge("input", "analyze")
graph.add_conditional_edges(
"analyze",
self.needs_tools,
{True: "tools", False: "output"}
)
graph.add_edge("tools", "output")
return graph.compile()
基本工作流程
## Example: Question-answering workflow
def qa_workflow():
# Create graph
graph = StateGraph(AgentState)
# Add processing nodes
graph.add_node("understand_question", process_question)
graph.add_node("search", search_for_answer)
graph.add_node("synthesize", create_answer)
# Create workflow
graph.add_edge("understand_question", "search")
graph.add_edge("search", "synthesize")
# Compile
workflow = graph.compile()
# Execute
result = workflow.invoke({
"messages": ["What is the capital of France?"],
"context": {}
})
return result
最佳实践
- 节点设计
- 保持节点专注于单一职责
- 确保清晰的输入/输出契约
- 优雅地处理错误
2. 边缘管理
- 对于复杂逻辑使用条件边缘
- 保持路由条件简单且可测试
- 清晰地记录边缘条件
3. 状态管理
- 定义清晰的状态模式
- 以不可变的方式更新状态
- 使用适当的 reducers
4. 工具集成
- 用适当的错误处理包装工具
- 记录工具的能力和限制
- 独立测试工具
5. 工作流组织
- 将复杂工作流拆分为子图
- 使用有意义的节点名称
- 监控状态转换
构建您的第一个 LangGraph 代理
3.1 简单 ReAct 代理
设置环境
首先,让我们设置我们的开发环境:
## Create virtual environment
python -m venv langraph-env
source langraph-env/bin/activate # On Windows: .\langraph-env\Scripts\activate
## Install required packages
pip install langgraph langchain-openai python-dotenv
配置设置:
import os
from dotenv import load_dotenv
from langgraph.graph import StateGraph, END
## Load environment variables
load_dotenv()
基础代理结构
让我们构建一个可以处理待办事项列表的任务管理代理:
from typing import TypedDict, Annotated
from datetime import datetime
class TaskState(TypedDict):
"""任务跟踪的状态管理"""
tasks: Annotated[list[dict], operator.add] # 任务列表
current_action: str # 当前正在执行的操作
completed_tasks: Annotated[list[dict], operator.add] # 已完成的任务
last_update: datetime # 最后状态更新
class TaskAgent:
def __init__(self, model, tools):
self.model = model
self.tools = tools
self.graph = self._build_graph()
def _build_graph(self):
graph = StateGraph(TaskState)
# 添加核心节点
graph.add_node("process", self.process_input)
graph.add_node("execute", self.execute_action)
graph.add_node("update", self.update_tasks)
# 添加边
graph.add_edge("process", "execute")
graph.add_conditional_edges(
"execute",
self.should_continue,
{True: "update", False: END}
)
graph.add_edge("update", "process")
return graph.compile()
添加工具和操作
from langchain_core.tools import tool
class TaskTools:
@tool
def add_task(self, title: str, due_date: str, priority: str) -> dict:
"""Add a new task to the list"""
return {
"id": str(uuid.uuid4()),
"title": title,
"due_date": due_date,
"priority": priority,
"status": "pending"
}
@tool
def complete_task(self, task_id: str) -> dict:
"""Mark a task as complete"""
return {
"task_id": task_id,
"status": "completed",
"completion_date": datetime.now().isoformat()
}
@tool
def list_tasks(self, filter_by: str = "all") -> list:
"""List all tasks with optional filtering"""
return [task for task in self.tasks if
filter_by == "all" or task["status"] == filter_by]
运行代理循环
def run_task_agent():
# Initialize agent
model = ChatOpenAI(model="gpt-4")
tools = TaskTools()
agent = TaskAgent(model, [tools])
# Initial state
initial_state = {
"tasks": [],
"current_action": None,
"completed_tasks": [],
"last_update": datetime.now()
}
# Run agent
response = agent.graph.invoke(initial_state)
return response
3.2 增强代理功能
状态管理实现
让我们通过更好的状态管理来增强我们的任务代理:
from dataclasses import dataclass
from typing import Optional, List
@dataclass
class Task:
id: str
title: str
due_date: datetime
priority: str
status: str
assigned_to: Optional[str] = None
tags: List[str] = field(default_factory=list)
class EnhancedTaskState(TypedDict):
tasks: Dict[str, Task]
workflow_status: str
last_action: Optional[str]
error_count: int
metrics: Dict[str, Any]
class StateManager:
def __init__(self):
self.checkpointer = SqliteSaver("tasks.db")
def save_checkpoint(self, state: EnhancedTaskState):
"""将当前状态保存到持久存储"""
return self.checkpointer.save(state)
def restore_checkpoint(self, checkpoint_id: str):
"""从检查点恢复状态"""
return self.checkpointer.load(checkpoint_id)
工具绑定与执行
from langchain.agents import AgentExecutor
from langchain.tools import BaseTool
class EnhancedTaskTools:
def __init__(self):
self.tools = self._initialize_tools()
def _initialize_tools(self) -> List[BaseTool]:
return [
self.add_task,
self.update_task,
self.delete_task,
self.assign_task,
self.generate_report
]
def bind_to_agent(self, agent):
"""将工具绑定到代理,带有适当的错误处理"""
try:
bound_agent = agent.bind_tools(self.tools)
return bound_agent
except Exception as e:
logger.error(f"工具绑定失败: {str(e)}")
raise
消息流控制
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
class MessageController:
def __init__(self):
self.message_queue = []
self.error_handlers = {}
def process_message_flow(self, state: EnhancedTaskState, message: dict):
"""控制消息流,带有错误处理和重试逻辑"""
try:
# 将消息添加到状态
self.message_queue.append(message)
# 根据消息类型进行处理
if isinstance(message, HumanMessage):
return self._handle_human_message(state, message)
elif isinstance(message, AIMessage):
return self._handle_ai_message(state, message)
except Exception as e:
return self._handle_error(state, e)
def _handle_human_message(self, state: EnhancedTaskState, message: HumanMessage):
"""处理人类消息,具有上下文意识"""
context = self._build_context(state)
return {
"type": "human_input",
"content": message.content,
"context": context
}
def _build_context(self, state: EnhancedTaskState):
"""为消息处理构建上下文"""
return {
"active_tasks": len(state["tasks"]),
"workflow_status": state["workflow_status"],
"last_action": state["last_action"]
}
示例用法
## Initialize enhanced task agent
state_manager = StateManager()
message_controller = MessageController()
tools = EnhancedTaskTools()
## Create initial state
initial_state = {
"tasks": {},
"workflow_status": "initialized",
"last_action": None,
"error_count": 0,
"metrics": {
"tasks_completed": 0,
"average_completion_time": 0
}
}
## Run agent with enhanced features
def run_enhanced_agent(user_input: str):
# Create message
message = HumanMessage(content=user_input)
# Process through message controller
processed_message = message_controller.process_message_flow(
initial_state,
message
)
# Execute agent
response = agent.invoke(processed_message)
# Save checkpoint
state_manager.save_checkpoint(response["state"])
return response
该实现展示了:
- 结构化的状态管理与类型安全
- 增强的工具集成与错误处理
- 精细化的消息流控制
- 持久化的状态存储
- 上下文感知的处理
代理现在可以:
- 管理复杂的任务工作流
- 优雅地处理错误
- 维护持久状态
- 为决策提供丰富的上下文
- 跟踪指标和性能
LangGraph 使用手册
核心概念
状态管理
from typing import TypedDict, Annotated
import operator
## Define state schema
class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add] # Messages with add reducer
counter: Annotated[int, operator.add] # Numbers with add reducer
data: dict # Regular dictionary without reducer
图形组件
1. 节点
- 处理状态并返回更新的基本功能
def node_function(state: AgentState) -> dict:
# Process state
return {"key": "updated_value"}
2. 边
- 连接节点以定义流动
## Add basic edge
builder.add_edge("node1", "node2")
## Add conditional edge
def route_next(state):
return "node2" if condition else "node3"
builder.add_conditional_edges(
"node1",
route_next,
{
"condition1": "node2",
"condition2": "node3"
}
)
基本图形设置
from langgraph.graph import StateGraph, END
## Initialize graph
builder = StateGraph(AgentState)
## Add nodes
builder.add_node("node1", node1_function)
builder.add_node("node2", node2_function)
## Set entry point
builder.set_entry_point("node1")
## Add edges
builder.add_edge("node1", "node2")
builder.add_edge("node2", END)
## Compile graph
graph = builder.compile()
常见模式
1. LLM 代理模式
from langchain_core.messages import SystemMessage, HumanMessage
class Agent:
def __init__(self, llm, tools):
self.llm = llm.bind_tools(tools)
self.tools = {t.name: t for t in tools}
# 构建图
builder = StateGraph(AgentState)
builder.add_node("agent", self.agent_node)
builder.add_node("tools", self.tools_node)
builder.set_entry_point("agent")
# 添加边
builder.add_conditional_edges(
"agent",
self.should_continue,
{True: "tools", False: END}
)
builder.add_edge("tools", "agent")
self.graph = builder.compile()
def agent_node(self, state):
# 使用 LLM 处理
return {"messages": [response]}
def tools_node(self, state):
# 执行工具
return {"messages": [result]}
2. 状态持久化
from langgraph.checkpoint.sqlite import SqliteSaver
## 内存存储
memory = SqliteSaver.from_conn_string(":memory:")
## 文件存储
memory = SqliteSaver.from_conn_string("sqlite:///path/to/db.sqlite")
## 添加到图
graph = builder.compile(checkpointer=memory)
3. 流式支持
## Regular streaming
for event in graph.stream(initial_state, thread_config):
print(event)
## Async streaming with token streaming
async for event in graph.astream_events(
initial_state,
thread_config,
version="v1"
):
if event["event"] == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(content, end="")
4. 人工参与循环
## Add interrupt points
graph = builder.compile(
checkpointer=memory,
interrupt_before=["critical_node"]
)
## Get current state
state = graph.get_state(thread_config)
## Update state
graph.update_state(thread_config, new_state)
## Continue execution
graph.continue_from_interrupt(thread_config)
最佳实践
- 状态管理
- 使用 TypedDict 进行结构化状态
- 为可合并字段添加 reducers (operator.add)
- 保持状态简洁且专注
2. 图形设计
- 将复杂流程拆分为离散节点
- 使用条件边进行动态路由
- 添加错误处理节点以增强健壮性
3. 工具集成
- 使用 bind_tools() 将工具绑定到 LLM
- 使用 ToolNode 进行一致的工具执行
- 优雅地处理工具错误
4. 内存与持久性
- 根据需求使用适当的检查点
- 为长时间运行的图形实现状态清理
- 考虑状态大小和存储要求
5. 错误处理
- 为错误情况添加后备节点
- 为瞬态故障实现重试逻辑
- 记录状态转换以便调试
常见导入
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.prebuilt import ToolNode
from langchain_core.messages import SystemMessage, HumanMessage, AIMessage
from typing import TypedDict, Annotated
import operator
LangGraph 术语和概念 — 简明指南
核心概念
1. 状态管理
它是什么: 将状态视为您代理的“记忆”。它是您存储需要在代理的不同部分之间传递的所有重要信息的地方。
简单示例: 就像一个记事本,您在上面写下:
- 对话历史
- 您所做的任何计算
- 您需要记住的任何重要信息
在代码术语中: 通常使用 TypedDict 定义以指定要存储的信息:
class AgentState(TypedDict):
messages: list # Store conversation
counter: int # Keep track of numbers
memory: dict # Store other information
2. 节点
它是什么: 您的代理中的单个“站点”,执行特定任务。每个节点就像一个有特定工作的工人。
简单示例: 就像有不同的专家:
- 一个人与客户沟通(LLM 节点)
- 另一个进行计算(工具节点)
- 另一个做出决策(路由节点)
3. 边
它是什么: 节点之间的连接,显示信息如何从一个节点流向另一个节点。
简单示例: 就像一张地图显示:
- 下一步去哪里
- 有哪些可用路径
- 何时停止
4. 图表
它是什么: 您的代理工作完整蓝图,显示所有节点及其连接方式。
简单示例: 像一个流程图,显示:
- 从哪里开始
- 采取哪些步骤
- 何时结束
5. 工具
它是什么: 代理可以使用的特殊能力或功能,以执行特定任务。
简单示例: 例如,给你的代理访问权限:
- 计算器
- 搜索引擎
- 天气服务
6. 检查点/内存
它是什么: 一个保存代理状态的系统,以便它可以在运行之间记住事情或在出现问题时恢复。
简单示例: 就像有一本日记,你可以:
- 记录重要信息
- 回顾发生过的事情
- 从你停下的地方继续
7. StreamEvents
它是什么: 一种获取有关您的代理正在做什么的实时更新的方法。
简单示例: 就像观察某人:
- 一字一句地输入消息
- 逐步解决问题
- 实时做出决策
高级概念
8. Reducers
它是什么: 用于组合或更新状态信息的规则。
简单示例: 就像拥有以下规则:
- 向对话中添加新消息
- 更新当前总计
- 将新信息与旧信息合并
9. 条件边
它是什么: 根据特定条件选择不同路径的特殊连接。
简单示例: 像决策树:
- 如果 A,去步骤 1
- 如果 B,去步骤 2
- 如果 C,停止
10. 中断
它是什么: 可以暂停代理以检查或修改其正在执行的操作的点。
简单示例: 就像有检查点,你可以:
- 审查正在发生的事情
- 如有需要进行更改
- 决定是否继续
11. 线程配置
它是什么: 与代理进行一次运行或对话特定的设置和信息。
简单示例: 就像为以下内容创建一个单独的文件夹:
- 每个对话
- 每个用户
- 每个任务
重要的状态类型
12. MessagesState
它是什么: 一种专门用于处理对话的状态类型。
简单示例: 像一个聊天记录,跟踪:
- 谁说了什么
- 以什么顺序
- 采取了什么行动
13. AgentState
它是什么: 您为特定代理定义的自定义状态类型。
简单示例: 就像创建一个自定义表单,包含:
- 您需要的字段
- 要存储的数据类型
- 如何更新数据
执行概念
14. 编译
它是什么: 将您的图形设计转换为可以实际运行的东西的过程。
简单示例: 就像将:
- 一份蓝图变成一座建筑
- 一份食谱变成一顿饭
- 一项计划付诸行动
15. 入口点
它是什么: 图的起始点——执行开始的地方。
简单示例: 像:
- 食谱中的第一步
- 一段对话的开始
- 游戏的开始
16. 结束状态
它是什么: 代理应该停止的条件或点。
简单示例: 就像知道什么时候停止,因为:
- 任务已完成
- 找到了答案
- 达到了限制
最佳实践术语
17. 图形验证
它是什么: 在运行图形之前检查图形是否正确设置。
简单示例: 就像检查:
- 所有连接是否合理
- 所需信息是否存在
- 是否没有死胡同
18. 错误处理
它是什么: 处理出错情况的系统。
简单示例: 就像为以下情况制定计划:
- 当工具故障时
- 当信息缺失时
- 当响应意外时
19. 状态验证
它是什么: 确保您状态中的信息是正确和完整的。
简单示例: 就像检查:
- 必填字段已填写
- 数据格式正确
- 值是合理的
20. 节点类型
它是什么: 具有特定目的的不同类别的节点。
简单示例:
- 工具节点:用于使用工具
- LLM节点:用于AI响应
- 路由节点:用于做出决策
- 动作节点:用于执行任务
医疗运营人工智能助手概述
医疗运营人工智能助手是一个智能系统,旨在通过自动化管理和实时决策支持来简化和优化医院运营。该复杂的人工智能代理将医疗运营的多个关键方面整合成一个协调的、响应迅速的系统。
UI 组件(左侧边栏)
- 设置面板
- 选择部门:下拉菜单,选项包括:
- 所有部门(当前选择)
- 单独部门,如急诊科,重症监护室等。
- 允许针对特定部门筛选数据
- 优先级:滑块控制
- 范围:低到危急
- 当前设置为“中等”
- 帮助优先处理任务和警报
- 时间范围(小时):滑块控制
- 范围:1–24小时
- 当前设置为8小时
- 控制数据分析的时间框架
2. 快速操作
- 报告按钮:生成综合报告
- 刷新按钮:更新所有指标和数据
3. 紧急模式
- 切换开关“激活紧急协议”
- 激活时,会触发紧急情况的特殊协议
- 当前未激活
4. 帮助部分
- 使用指南下拉菜单
- 提供文档和帮助
主仪表板(右侧面板)
- 标题
- 标题:“医疗运营助手”
- 状态指示器:🟢 在线
- 副标题:“您的人工智能驱动的医疗运营管理解决方案。”
2. 关键指标仪表板
- 床位占用率:75.0%(正常 🟢)
- 患者满意度:8.5/10(↗ +0.5\)
- 可用工作人员:70(低 🔴)
- 资源利用率:75.0%(↘ -2%)
3. 聊天界面
- 查询输入框
- 当前查询:“当前急诊室等待时间是多少?”
- 时间戳:19:51
4. 响应组件
a) 关键洞察
- 当前科室等待时间:
- 急诊室:45分钟(⚠️ 超出目标)
- ICU:5分钟(✓ 在目标内)
- 普通病房:25分钟(✓ 在目标内)
- 手术室:30分钟(⚡ 接近目标)
- 儿科:20分钟(✓ 在目标内)
b) 可操作建议
- 👥 向急诊室部署额外的分诊护士
- 🔄 优化患者交接程序
- 📱 实施实时等待时间更新
- 🏥 在需要时激活溢出协议
c) 优先行动 需要立即采取的行动:
- 🚨 将非紧急病例从急诊室转移
- 👨⚕️ 在接下来的2小时内增加急诊室工作人员
- 📢 每15分钟更新等待患者情况
d) 实施时间表
- 🕐 0–1小时:人员重新分配
- 🕒 1–2小时:流程优化
- 🕓 2–4小时:情况重新评估
- 🕔 4+小时:长期监测
动态更新
仪表板显示实时变化:
- 初始指标:
- 床位占用率:75.0%
- 患者满意度:8.5/10
- 可用员工:70
- 资源利用率:75.0%
- 更新指标(经过建议后):
- 床位占用率:85.0%(高 🟡)
- 患者满意度:7.8/10(↘ -0.3)
- 可用员工:77(低 🔴)
- 资源利用率:82.0%(↘ -2%)
此界面提供:
- 实时监控
- 互动控制
- 全面的数据可视化
- 明确的行动项和建议
- 应急响应能力
- 历史数据追踪
- 部门特定的洞察
还有更多问题:
Show me bed occupancy across all departments?
问题:
Can you analyze bed utilization in the Emergency Department?
问题:
Which department has the longest wait times right now?
核心能力
1. 病人流动管理
- 监控和优化病人等待时间
- 分析床位容量和利用率
- 预测出院时间
- 管理入院优先级
- 优化部门间病人转移
2. 资源分配
- 跟踪医疗用品和设备
- 监控资源利用情况
- 生成补货建议
- 优化各部门资源分配
- 防止关键短缺
3. 质量监测
- 分析患者满意度指标
- 监测临床结果
- 跟踪医疗标准的遵循情况
- 生成质量改进建议
- 确定需要关注的领域
4. 员工排班
- 优化员工排班
- 分析劳动力指标
- 根据患者负荷计算人员需求
- 管理班次覆盖
- 确保员工分配的最佳化
关键特性
- 实时分析:持续监测和分析医院运营
- 预测能力:在问题发生之前预见潜在问题
- 自动决策支持:提供基于数据的建议
- 互动界面:用户友好的操作控制仪表板
- 可定制工作流程:可根据特定医院需求进行调整
优势
- 运营效率
- 减少等待时间
- 优化资源利用
- 提高员工生产力
- 增强患者流动性
2. 质量改善
- 提高患者满意度
- 改善临床结果
- 增强合规性
- 一致的护理服务
3. 成本优化
- 高效的资源分配
- 减少运营浪费
- 优化人员配置
- 更好的库存管理
4. 员工满意度
- 平衡的工作负载分配
- 优化的排班
- 减少行政负担
- 更好的资源可用性
The system is built using LangGraph, leveraging its powerful state management and workflow capabilities to create a robust, scalable solution for modern healthcare operations. It combines advanced AI capabilities with practical operational needs to deliver a comprehensive hospital management solution.
目标用户
- 医院管理员
- 部门经理
- 临床工作人员
- 运营经理
- 质量控制团队
- 资源经理
技术基础
建立在现代技术栈之上:
- LangGraph 用于工作流管理
- 高级 AI 模型用于决策
- 实时数据处理
- 安全且合规的架构
- 可扩展和可维护的设计
这个 AI 助手代表了医疗运营管理的重大进展,为日常医院运营带来了自动化、智能和效率,同时保持对优质护理服务和患者满意度的关注。
我的文件夹结构:
核心组件
1. 主要应用文件
agent.py
: 系统的核心 - 包含主 HealthcareAgent 类,负责协调所有操作streamlit_app.py
: Web 界面的入口setup.py
: 项目配置和部署设置
2. 源代码 (src/)
配置 (config/)
settings.py
: 系统范围的设置和配置prompts.py
: 不同操作的AI提示和模板
模型 (models/)
state.py
: 定义医院运营的状态管理结构- 处理指标、部门信息和运营状态
处理节点 (nodes/)
input_analyzer.py
: 分析传入请求task_router.py
: 将任务路由到适当的处理程序patient_flow.py
: 管理与患者相关的操作resource_manager.py
: 处理资源分配quality_monitor.py
: 监测医疗质量指标staff_scheduler.py
: 管理员工排班output_synthesizer.py
: 生成最终响应
操作工具 (tools/)
patient_tools.py
: 患者管理工具resource_tools.py
: 资源管理工具quality_tools.py
: 质量监控工具scheduling_tools.py
: 员工排班工具
用户界面 (ui/)
app.py
: 主 UI 应用程序components/
: UI 组件 (聊天, 头部, 指标, 侧边栏)styles/
: UI 样式和主题assets/
: 图标和图片
工具 (utils/)
error_handlers.py
: 错误管理logger.py
: 日志系统validators.py
: 数据验证
3. 测试与质量保证
tests/
: 单元测试和集成测试test_healthcare_agent_basic.py
: 基本功能测试test_healthcare_scenarios.py
: 复杂场景测试
4. 支持文件
requirements.txt
: Python 包依赖environment.yml
: Conda 环境配置logs/
: 系统日志和操作记录examples/
: 使用示例和演示
该结构遵循模块化、可维护的设计,其中:
- 每个组件都有特定的责任
- 模块之间松耦合但高度内聚
- 测试在多个层面集成
- 配置与实现分离
- UI 组件是模块化和可重用的
- 工具提供系统中通用的功能
├── README.md
├── environment.yml
├── examples
│ └── usage_examples.py
├── logs
│ ├── daily_2024-11-07.log
│ └── healthcare_ops_agent.log
├── pytest.ini
├── requirements.txt
├── setup.py
├── src
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-311.pyc
│ │ └── agent.cpython-311.pyc
│ ├── agent.py
│ ├── config
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-311.pyc
│ │ │ ├── prompts.cpython-311.pyc
│ │ │ └── settings.cpython-311.pyc
│ │ ├── prompts.py
│ │ └── settings.py
│ ├── models
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-311.pyc
│ │ │ └── state.cpython-311.pyc
│ │ └── state.py
│ ├── nodes
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-311.pyc
│ │ │ ├── input_analyzer.cpython-311.pyc
│ │ │ ├── output_synthesizer.cpython-311.pyc
│ │ │ ├── patient_flow.cpython-311.pyc
│ │ │ ├── quality_monitor.cpython-311.pyc
│ │ │ ├── resource_manager.cpython-311.pyc
│ │ │ ├── staff_scheduler.cpython-311.pyc
│ │ │ └── task_router.cpython-311.pyc
│ │ ├── input_analyzer.py
│ │ ├── output_synthesizer.py
│ │ ├── patient_flow.py
│ │ ├── quality_monitor.py
│ │ ├── resource_manager.py
│ │ ├── staff_scheduler.py
│ │ └── task_router.py
│ ├── tools
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-311.pyc
│ │ │ ├── patient_tools.cpython-311.pyc
│ │ │ ├── quality_tools.cpython-311.pyc
│ │ │ ├── resource_tools.cpython-311.pyc
│ │ │ └── scheduling_tools.cpython-311.pyc
│ │ ├── patient_tools.py
│ │ ├── quality_tools.py
│ │ ├── resource_tools.py
│ │ └── scheduling_tools.py
│ ├── ui
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-311.pyc
│ │ │ └── app.cpython-311.pyc
│ │ ├── app.py
│ │ ├── assets
│ │ │ ├── icons
│ │ │ └── images
│ │ ├── components
│ │ │ ├── __init__.py
│ │ │ ├── chat.py
│ │ │ ├── header.py
│ │ │ ├── metrics.py
│ │ │ └── sidebar.py
│ │ └── styles
│ │ ├── __init__.py
│ │ ├── __pycache__
│ │ │ ├── __init__.cpython-311.pyc
│ │ │ └── theme.cpython-311.pyc
│ │ ├── custom.css
│ │ └── theme.py
│ └── utils
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-311.pyc
│ │ ├── error_handlers.cpython-311.pyc
│ │ ├── logger.cpython-311.pyc
│ │ └── validators.cpython-311.pyc
│ ├── error_handlers.py
│ ├── logger.py
│ └── validators.py
├── streamlit_app.py
├── test_healthcare_agent_basic.py
├── test_healthcare_scenarios.py
└── tests
├── __init__.py
├── conftest.py
├── test_agent.py
├── test_nodes
│ ├── test_input_analyzer.py
│ └── test_patient_flow.py
└── test_tools
└── test_patient_tools.py
组件细分
来自 src/agent.py
,这是我们的主要代理初始化:
class HealthcareAgent:
def __init__(self, api_key: Optional[str] = None):
try:
# 初始化设置并验证
self.settings = Settings()
if api_key:
self.settings.OPENAI_API_KEY = api_key
self.settings.validate_settings()
# 初始化 LLM
self.llm = ChatOpenAI(
model=self.settings.MODEL_NAME,
temperature=self.settings.MODEL_TEMPERATURE,
api_key=self.settings.OPENAI_API_KEY
)
# 初始化工具
self.tools = self._initialize_tools()
# 初始化节点
self.nodes = self._initialize_nodes()
# 构建图
self.graph = self._build_graph()
logger.info("Healthcare Agent initialized successfully")
except Exception as e:
logger.error(f"Error initializing Healthcare Agent: {str(e)}")
raise HealthcareError(
message="Failed to initialize Healthcare Agent",
error_code="INIT_ERROR",
details={"error": str(e)}
)
这个初始化展示了所有组件是如何结合在一起的:
- 设置配置
- LLM 设置
- 工具初始化
- 节点设置
- 图构建
状态设计
来自 src/models/state.py
,这是我们的状态结构:
class HospitalState(TypedDict):
"""Main state management for the agent"""
messages: Annotated[List[AnyMessage], operator.add]
current_task: TaskType
priority_level: PriorityLevel
department: Optional[str]
metrics: HospitalMetrics
analysis: Optional[AnalysisResult]
context: Dict[str, any] # Will include routing information
timestamp: datetime
thread_id: str
此状态设计跟踪:
- 对话消息
- 当前正在处理的任务
- 任务优先级
- 部门信息
- 医院指标
- 分析结果
- 上下文信息
工具组织
从 src/tools/__init__.py
,我们将工具组织成功能类别:
__all__ = [
'PatientTools',
'ResourceTools',
'QualityTools',
'SchedulingTools'
]
每个工具集都有特定的职责:
- PatientTools: 管理患者流动和床位分配
- ResourceTools: 处理设备和物资
- QualityTools: 监控医疗质量指标
- SchedulingTools: 管理员工和资源调度
图结构
来自 src/agent.py
,我们构建处理图的方式如下:
def _build_graph(self):
"""Build the workflow graph with all nodes and edges"""
try:
# Initialize graph
builder = StateGraph(HospitalState)
# Add all nodes
for name, node in self.nodes.items():
builder.add_node(name, node)
# Set entry point
builder.set_entry_point("input_analyzer")
# Add edge from input analyzer to task router
builder.add_edge("input_analyzer", "task_router")
# Define conditional routing based on task router output
def route_next(state: Dict):
return state["context"]["next_node"]
# Add conditional edges from task router
builder.add_conditional_edges(
"task_router",
route_next,
{
"patient_flow": "patient_flow",
"resource_management": "resource_manager",
"quality_monitoring": "quality_monitor",
"staff_scheduling": "staff_scheduler",
"output_synthesis": "output_synthesizer"
}
)
图结构:
- 通过输入分析器接收输入
- 根据任务类型进行路由
- 通过专业节点处理任务
- 综合输出以进行响应
该架构使得:
- 模块化功能
- 清晰的关注点分离
- 便于维护和更新
- 强大的错误处理
- 可扩展的操作
核心组件深入分析:
1. HospitalState 实现
来自 src/models/state.py
:
class HospitalState(TypedDict):
"""代理的主要状态管理"""
messages: Annotated[List[AnyMessage], operator.add] # 会话历史
current_task: TaskType # 当前任务类型
priority_level: PriorityLevel # 任务优先级
department: Optional[str] # 当前部门
metrics: HospitalMetrics # 医院指标
analysis: Optional[AnalysisResult] # 分析结果
context: Dict[str, any] # 上下文/路由信息
timestamp: datetime
thread_id: str
def create_initial_state(thread_id: str) -> HospitalState:
"""使用默认值创建初始状态"""
return {
"messages": [],
"current_task": TaskType.GENERAL,
"priority_level": PriorityLevel.MEDIUM,
"department": None,
"metrics": {
"patient_flow": {
"total_beds": 300,
"occupied_beds": 240,
"waiting_patients": 15,
"average_wait_time": 35.0,
"admission_rate": 4.2,
"discharge_rate": 3.8,
"department_metrics": {}
},
# ... 其他指标初始化
},
"analysis": None,
"context": {},
"timestamp": datetime.now(),
"thread_id": thread_id
}
简单说明:
- HospitalState 就像是整个医院系统的快照
- 它跟踪会话、当前任务、优先级和指标
- 状态使用默认值创建,并在操作发生时更新
- 每个组件(指标、上下文等)都有其特定的目的
2. 指标跟踪
来自 src/models/state.py
:
class HospitalMetrics(TypedDict):
"""Combined hospital metrics"""
patient_flow: PatientFlowMetrics
resources: ResourceMetrics
quality: QualityMetrics
staffing: StaffingMetrics
last_updated: datetime
class PatientFlowMetrics(TypedDict):
"""Metrics related to patient flow"""
total_beds: int
occupied_beds: int
waiting_patients: int
average_wait_time: float
admission_rate: float
discharge_rate: float
department_metrics: Dict[str, 'Department']
简单解释:
- 跟踪医院内不同类型的指标
- 监控病人流动、资源使用、质量和人力配置
- 每种指标类型都有其特定的测量标准
- 随着操作的进行自动更新
3. 部门管理
来自 src/models/state.py
的部门状态:
class Department(TypedDict):
"""Department information"""
id: str
name: str
capacity: int
current_occupancy: int
staff_count: Dict[str, int]
wait_time: int
def validate_department_data(department_data: Dict[str, Any]) -> bool:
"""Validate department-specific data"""
required_fields = [
"capacity",
"current_occupancy",
"staff_count"
]
try:
# Check required fields
for field in required_fields:
if field not in department_data:
raise ValidationError(
message=f"Missing required field: {field}",
error_code="INVALID_DEPARTMENT_DATA"
)
# Validate capacity constraints
if department_data["current_occupancy"] > department_data["capacity"]:
raise ValidationError(
message="Current occupancy exceeds capacity",
error_code="INVALID_OCCUPANCY"
)
return True
except Exception as e:
logger.error(f"Department data validation failed: {str(e)}")
raise
简单说明:
- 每个部门维护其自身的状态信息
- 跟踪容量、占用、员工和等待时间
- 包含验证以确保数据准确性
- 有助于管理部门特定的操作
4. 任务路由
来自 src/nodes/task_router.py
:
class TaskRouterNode:
def __call__(self, state: HospitalState) -> Dict:
"""Route to appropriate node based on task type"""
try:
task_type = state["current_task"]
# Create base state update
state_update = {
"messages": state.get("messages", []),
"current_task": task_type,
"priority_level": state.get("priority_level"),
"context": state.get("context", {})
}
# Add routing information to context
if task_type == TaskType.PATIENT_FLOW:
state_update["context"]["next_node"] = "patient_flow"
elif task_type == TaskType.RESOURCE_MANAGEMENT:
state_update["context"]["next_node"] = "resource_management"
elif task_type == TaskType.QUALITY_MONITORING:
state_update["context"]["next_node"] = "quality_monitoring"
elif task_type == TaskType.STAFF_SCHEDULING:
state_update["context"]["next_node"] = "staff_scheduling"
else:
state_update["context"]["next_node"] = "output_synthesis"
return state_update
except Exception as e:
logger.error(f"Error in task routing: {str(e)}")
return {
"messages": state.get("messages", []),
"context": {"next_node": "output_synthesis"},
"current_task": state.get("current_task")
}
简单说明:
- 将任务路由到适当的处理节点
- 根据任务类型确定任务的去向
- 优雅地处理路由错误
- 在路由过程中保持任务上下文
工具与能力部分:
src/tools
目录:
1. 患者流动管理
From src/tools/patient_tools.py
:
class PatientTools:
@tool
def calculate_wait_time(
self,
department: str,
current_queue: int,
staff_available: int
) -> float:
"""根据队列和工作人员计算部门的预计等待时间"""
try:
# 每位患者的平均时间(分钟)
AVG_TIME_PER_PATIENT = 15
# 考虑工作人员的可用性
wait_time = (current_queue * AVG_TIME_PER_PATIENT) / max(staff_available, 1)
return round(wait_time, 1)
except Exception as e:
logger.error(f"计算等待时间时出错: {str(e)}")
raise
@tool
def analyze_bed_capacity(
self,
total_beds: int,
occupied_beds: int,
pending_admissions: int
) -> Dict:
"""分析床位容量并提供利用率指标"""
try:
capacity = {
"total_beds": total_beds,
"occupied_beds": occupied_beds,
"available_beds": total_beds - occupied_beds,
"utilization_rate": (occupied_beds / total_beds) * 100,
"pending_admissions": pending_admissions,
"status": "正常"
}
# 根据利用率确定状态
if capacity["utilization_rate"] > 90:
capacity["status"] = "危急"
elif capacity["utilization_rate"] > 80:
capacity["status"] = "高"
return capacity
except Exception as e:
logger.error(f"分析床位容量时出错: {str(e)}")
raise
简单说明:
- 管理患者等待时间和床位容量
- 根据当前情况计算实时估计
- 监控利用率和状态
- 提供危急情况的警报
2. 资源管理
来自 src/tools/resource_tools.py
:
class ResourceTools:
@tool
def analyze_supply_levels(
self,
current_inventory: Dict[str, float],
consumption_rate: Dict[str, float],
reorder_thresholds: Dict[str, float]
) -> Dict:
"""分析供应水平并生成补货建议"""
try:
analysis = {
"critical_items": [],
"reorder_needed": [],
"adequate_supplies": [],
"recommendations": []
}
for item, level in current_inventory.items():
threshold = reorder_thresholds.get(item, 0.2)
consumption = consumption_rate.get(item, 0)
# 剩余供应天数
days_remaining = level / consumption if consumption > 0 else float('inf')
if level <= threshold:
if days_remaining < 2:
analysis["critical_items"].append({
"item": item,
"current_level": level,
"days_remaining": days_remaining
})
else:
analysis["reorder_needed"].append({
"item": item,
"current_level": level,
"days_remaining": days_remaining
})
else:
analysis["adequate_supplies"].append(item)
return analysis
except Exception as e:
logger.error(f"分析供应水平时出错: {str(e)}")
raise
简单说明:
- 跟踪库存和供应水平
- 预测何时需要补货
- 识别关键短缺
- 提供供应管理建议
3. 质量监测
From src/tools/quality_tools.py
:
class QualityTools:
@tool
def analyze_patient_satisfaction(
self,
satisfaction_scores: List[float],
feedback_comments: List[str],
department: Optional[str] = None
) -> Dict:
"""Analyze patient satisfaction scores and feedback"""
try:
analysis = {
"metrics": {
"average_score": sum(satisfaction_scores) / len(satisfaction_scores),
"total_responses": len(satisfaction_scores),
"score_distribution": {},
"trend": "stable"
},
"feedback_analysis": {
"positive_themes": [],
"negative_themes": [],
"improvement_areas": []
},
"recommendations": []
}
# Analyze score distribution and feedback
for score in satisfaction_scores:
category = int(score)
analysis["metrics"]["score_distribution"][category] = \
analysis["metrics"]["score_distribution"].get(category, 0) + 1
# Generate recommendations
if analysis["metrics"]["average_score"] < 7.0:
analysis["recommendations"].append(
"Implement immediate satisfaction improvement plan"
)
return analysis
except Exception as e:
logger.error(f"Error analyzing patient satisfaction: {str(e)}")
raise
简单说明:
- 监测患者满意度和反馈
- 分析临床结果
- 跟踪标准的合规性
- 确定改进领域
4. 员工排班
From src/tools/scheduling_tools.py
:
class SchedulingTools:
@tool
def optimize_staff_schedule(
self,
staff_availability: List[Dict],
department_needs: Dict[str, Dict],
shift_preferences: Optional[List[Dict]] = None
) -> Dict:
"""生成优化的员工排班表"""
try:
schedule = {
"shifts": {},
"coverage_gaps": [],
"recommendations": [],
"staff_assignments": {}
}
# 处理每个部门的需求
for dept, needs in department_needs.items():
schedule["shifts"][dept] = {
"morning": [],
"afternoon": [],
"night": []
}
required_staff = needs.get("required_staff", {})
# 将可用员工匹配到班次
for staff in staff_availability:
if staff["department"] == dept and staff["available"]:
preferred_shift = self._get_preferred_shift(
staff, shift_preferences
)
schedule["shifts"][dept][preferred_shift].append(
staff["id"]
)
return schedule
except Exception as e:
logger.error(f"优化员工排班时出错: {str(e)}")
raise
简单说明:
- 创建最佳员工排班表
- 平衡部门需求与员工可用性
- 识别覆盖缺口
- 管理班次分配
每个工具:
- 有明确的错误处理
- 记录操作以便跟踪
- 提供详细分析
- 返回可操作的建议
节点实现:
1. 输入分析节点
来自 src/nodes/input_analyzer.py
:
class InputAnalyzerNode:
def __init__(self, llm):
self.llm = llm
self.system_prompt = PROMPTS["input_analyzer"]
def __call__(self, state: HospitalState) -> Dict:
try:
# Get the latest message
latest_message = state["messages"][-1]
# Prepare messages for LLM
messages = [
SystemMessage(content=self.system_prompt),
HumanMessage(content=latest_message.content)
]
# Get LLM response
response = self.llm.invoke(messages)
# Parse response to determine task type and priority
parsed_result = self._parse_llm_response(response.content)
return {
"current_task": parsed_result["task_type"],
"priority_level": parsed_result["priority"],
"department": parsed_result["department"],
"context": parsed_result["context"]
}
except Exception as e:
logger.error(f"Error in input analysis: {str(e)}")
raise
简单说明:
- 接收传入消息并理解其含义
- 确定所需任务的类型
- 分配优先级
- 确定相关部门
2. 任务路由节点
来自 src/nodes/task_router.py
:
class TaskRouterNode:
def __call__(self, state: HospitalState) -> Dict:
"""Route to appropriate node based on task type"""
try:
task_type = state["current_task"]
# Create base state update
state_update = {
"messages": state.get("messages", []),
"current_task": task_type,
"priority_level": state.get("priority_level"),
"context": state.get("context", {})
}
# Add routing information to context
if task_type == TaskType.PATIENT_FLOW:
state_update["context"]["next_node"] = "patient_flow"
elif task_type == TaskType.RESOURCE_MANAGEMENT:
state_update["context"]["next_node"] = "resource_management"
elif task_type == TaskType.QUALITY_MONITORING:
state_update["context"]["next_node"] = "quality_monitoring"
elif task_type == TaskType.STAFF_SCHEDULING:
state_update["context"]["next_node"] = "staff_scheduling"
else:
state_update["context"]["next_node"] = "output_synthesis"
return state_update
except Exception as e:
logger.error(f"Error in task routing: {str(e)}")
# Route to output synthesis on error
return {
"messages": state.get("messages", []),
"context": {"next_node": "output_synthesis"},
"current_task": state.get("current_task")
}
简单说明:
- 决定哪个操作节点应处理任务
- 在路由过程中维护任务上下文
- 具有内置错误处理
- 在需要时提供后备路由
3. 操作节点
来自 src/nodes/patient_flow.py
的示例:
class PatientFlowNode:
def __init__(self, llm):
self.llm = llm
self.system_prompt = PROMPTS["patient_flow"]
def __call__(self, state: HospitalState) -> Dict:
try:
# Get current metrics
metrics = state["metrics"]["patient_flow"]
# Format prompt with current metrics
formatted_prompt = self.system_prompt.format(
occupancy=self._calculate_occupancy(metrics),
wait_times=metrics["average_wait_time"],
department_capacity=self._get_department_capacity(metrics),
admission_rate=metrics["admission_rate"]
)
# Get LLM analysis
response = self.llm.invoke([
SystemMessage(content=formatted_prompt)
])
# Parse and structure the response
analysis = self._structure_analysis(response.content)
return {
"analysis": analysis,
"messages": [response]
}
except Exception as e:
logger.error(f"Error in patient flow analysis: {str(e)}")
raise
简单解释:
- 每个操作节点处理特定类型的任务
- 使用当前指标和状态来做出决策
- 产生结构化的分析和建议
- 维护操作上下文
4. 输出合成节点
来自 src/nodes/output_synthesizer.py
:
class OutputSynthesizerNode:
def __init__(self, llm):
self.llm = llm
self.system_prompt = PROMPTS["output_synthesis"]
def __call__(self, state: HospitalState) -> Dict:
try:
# Get analysis results from previous nodes
analysis = state.get("analysis", {})
# Format prompt with context
formatted_prompt = self.system_prompt.format(
context=self._format_context(state)
)
# Get LLM synthesis
response = self.llm.invoke([
SystemMessage(content=formatted_prompt)
])
# Structure the final output
final_output = self._structure_final_output(
response.content,
state["current_task"],
state["priority_level"]
)
return {
"messages": [response],
"analysis": final_output
}
except Exception as e:
logger.error(f"Error in output synthesis: {str(e)}")
raise
def _format_context(self, state: HospitalState) -> str:
"""Format relevant context for synthesis"""
return f"""
任务类型: {state['current_task']}
优先级: {state['priority_level']}
部门: {state['department'] or '所有部门'}
关键指标总结:
- 患者流动: {self._summarize_patient_flow(state)}
- 资源: {self._summarize_resources(state)}
- 质量: {self._summarize_quality(state)}
- 人员配置: {self._summarize_staffing(state)}
"""
简单解释:
- 汇总所有前置节点的结果
- 创建全面的响应
- 包含上下文和摘要
- 格式化输出以便清晰沟通
节点系统:
- 形成完整的处理管道
- 系统化处理任务
- 在处理过程中保持状态
- 提供清晰的错误处理和日志记录
- 确保输出格式一致
高级功能实现:
8.1 用户界面
让我们来看一下 src/ui/app.py
中的每个 UI 组件:
class HealthcareUI:
def __init__(self):
"""初始化医疗运营管理用户界面"""
try:
# 设置 Streamlit 页面配置
st.set_page_config(
page_title="医疗运营助手",
page_icon="🏥",
layout="wide",
initial_sidebar_state="expanded"
)
# 应用自定义主题
self.setup_theme()
# 初始化代理
self.agent = HealthcareAgent(os.getenv("OPENAI_API_KEY"))
# 初始化会话状态
if 'initialized' not in st.session_state:
st.session_state.initialized = True
st.session_state.messages = []
st.session_state.thread_id = datetime.now().strftime("%Y%m%d-%H%M%S")
st.session_state.current_department = "所有部门"
st.session_state.metrics_history = []
st.session_state.system_status = True
except Exception as e:
logger.error(f"初始化 UI 时出错: {str(e)}")
st.error("应用程序初始化失败。请刷新页面。")
来自 src/ui/components/metrics.py
的实时指标显示:
class MetricsComponent:
def render(self, metrics: Optional[Dict[str, Any]] = None):
"""渲染指标仪表板"""
try:
if not metrics:
metrics = self.default_metrics
st.markdown("### 📊 关键指标仪表板")
metrics_container = st.container()
with metrics_container:
# 第一行 - 关键指标
col1, col2, col3, col4 = st.columns(4)
with col1:
occupancy = (metrics['patient_flow']['occupied_beds'] /
metrics['patient_flow']['total_beds'] * 100)
st.metric(
"床位占用率 🛏️",
f"{occupancy:.1f}%",
"正常 🟢" if occupancy < 85 else "偏高 🟡"
)
with col2:
satisfaction = metrics['quality']['patient_satisfaction']
st.metric(
"患者满意度 😊",
f"{satisfaction}/10",
"↗ +0.5" if satisfaction > 8 else "↘ -0.3"
)
# 其他指标...
except Exception as e:
logger.error(f"渲染指标时出错: {str(e)}")
st.error("加载指标仪表板时出错")
8.2 监控与日志记录
来自 src/utils/error_handlers.py
:
class ErrorHandler:
@staticmethod
def retry(max_attempts: int = 3, delay: int = 1):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_attempts):
try:
return await func(*args, **kwargs)
except Exception as e:
last_error = e
if attempt < max_attempts - 1:
await asyncio.sleep(delay * (2 ** attempt))
continue
raise RuntimeError(
f"Failed after {max_attempts} attempts"
) from last_error
return wrapper
return decorator
@staticmethod
def validate_state(func):
@wraps(func)
async def wrapper(self, state: HospitalState, *args, **kwargs):
if not self._validate_state(state):
raise ValueError("Invalid state")
return await func(self, state, *args, **kwargs)
return wrapper
来自 src/utils/logger.py
的日志实现:
class CustomFormatter(logging.Formatter):
"""自定义格式化器,针对不同日志级别进行颜色编码"""
COLORS = {
'DEBUG': '\033[0;36m', # 青色
'INFO': '\033[0;32m', # 绿色
'WARNING': '\033[0;33m', # 黄色
'ERROR': '\033[0;31m', # 红色
'CRITICAL': '\033[0;37;41m' # 红底白字
}
RESET = '\033[0m'
def setup_logger(
name: str,
log_level: Optional[str] = None,
log_file: Optional[str] = None
) -> logging.Logger:
"""设置包含文件和控制台处理程序的日志记录器"""
try:
# 创建日志记录器
logger = logging.getLogger(name)
logger.setLevel(log_level or Settings.LOG_LEVEL)
# 创建格式化器
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - [%(levelname)s] - %(message)s'
)
# 添加处理程序
if log_file:
file_handler = RotatingFileHandler(
log_file,
maxBytes=10 * 1024 * 1024, # 10MB
backupCount=5
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
except Exception as e:
# 回退到基本日志记录
basic_logger = logging.getLogger(name)
basic_logger.setLevel(logging.INFO)
basic_logger.addHandler(logging.StreamHandler(sys.stdout))
basic_logger.error(f"设置日志记录器时出错: {str(e)}")
return basic_logger
来自 src/utils/validators.py
:
class Validator:
@staticmethod
def validate_state(state: Dict[str, Any]) -> bool:
"""验证状态结构和数据类型"""
required_keys = ["messages", "current_task", "metrics", "timestamp"]
try:
# 检查必需的键
for key in required_keys:
if key not in state:
raise ValidationError(
message=f"缺少必需的键: {key}",
error_code="INVALID_STATE_STRUCTURE"
)
# 验证时间戳
if not isinstance(state["timestamp"], datetime):
raise ValidationError(
message="无效的时间戳格式",
error_code="INVALID_TIMESTAMP"
)
return True
except Exception as e:
logger.error(f"状态验证失败: {str(e)}")
raise
关键特性:
- 用户界面:
- 交互式仪表板,实时指标
- 用于通信的聊天界面
- 可配置的控制和设置
- 状态监控和警报
2. 监控与日志记录:
- 综合错误处理
- 状态验证
- 详细的日志记录与轮换
- 性能监控
该系统提供:
- 实时操作可见性
- 错误恢复机制
- 数据验证
- 综合日志记录
- 性能跟踪
测试与质量保证:
9. 测试与质量保证
让我们来看一下 tests/
中的测试结构:
来自 tests/test_agent.py
:
class TestHealthcareAgent:
def test_agent_initialization(self, mock_settings):
"""测试代理初始化"""
agent = HealthcareAgent(api_key=mock_settings["OPENAI_API_KEY"])
assert agent is not None
assert agent.llm is not None
assert agent.tools is not None
assert agent.nodes is not None
def test_process_input(self, mock_hospital_state):
"""测试通过代理处理输入"""
agent = HealthcareAgent()
result = agent.process(
"当前急诊室等待时间是多少?",
thread_id="test-thread"
)
assert "response" in result
assert "analysis" in result
assert "metrics" in result
assert "timestamp" in result
@pytest.mark.asyncio
async def test_async_processing(self):
"""测试异步处理能力"""
agent = HealthcareAgent()
thread_id = "test-thread"
# 测试流式响应
async for event in agent.graph.astream_events(
{"messages": ["测试消息"]},
{"configurable": {"thread_id": thread_id}}
):
assert event is not None
来自 tests/test_healthcare_scenarios.py
:
class HealthcareAssistantTester:
def test_patient_flow(self):
"""测试与患者流相关的查询"""
queries = [
"显示所有科室的等待时间",
"当前急诊室的床位使用率是多少?",
"目前有多少患者在等待入院?",
"重症监护室的平均等待时间是多少?",
"显示过去8小时的患者流趋势",
"现在哪个科室的等待时间最长?"
]
self._run_test_batch("患者流", queries)
def test_emergency_scenarios(self):
"""测试紧急情况查询"""
queries = [
"为大规模伤亡事件启动紧急协议",
"需要急诊的床位可用状态",
"需要快速人员动员计划",
"需要紧急资源分配",
"急诊室的关键容量警报",
"急诊科溢出协议状态"
]
self._run_test_batch("紧急情况", queries)
10. 部署与运维
来自 setup.py
:
setup(
name='healthcare-ops-agent',
version='0.1.0',
description='Healthcare Operations Management Agent using LangGraph',
long_description=long_description,
long_description_content_type='text/markdown',
packages=find_packages(exclude=['tests*']),
install_requires=requirements,
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Healthcare Industry',
'License :: OSI Approved :: MIT License',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
],
python_requires='>=3.9',
include_package_data=True
)
来自 src/config/settings.py
的配置管理
class Settings:
"""医疗运营管理代理的配置设置"""
# OpenAI 配置
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
MODEL_NAME = "gpt-4o-mini-2024-07-18"
MODEL_TEMPERATURE = 0
# 应用设置
MAX_RETRIES = int(os.getenv("MAX_RETRIES", "3"))
REQUEST_TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "30"))
BATCH_SIZE = int(os.getenv("BATCH_SIZE", "10"))
# 日志配置
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
LOG_FILE = "logs/healthcare_ops_agent.log"
@classmethod
def validate_settings(cls) -> bool:
"""验证必需的设置"""
required_settings = [
"OPENAI_API_KEY",
"MODEL_NAME",
"MEMORY_TYPE"
]
for setting in required_settings:
if not getattr(cls, setting):
raise ValueError(f"缺少必需的设置: {setting}")
return True
医疗运营数据管理
当前实现
数据存储
目前,我们的演示数据存储在 src/models/state.py
:
def create_initial_state(thread_id: str) -> HospitalState:
"""Create initial state with default values"""
return {
"metrics": {
"patient_flow": {
"total_beds": 300,
"occupied_beds": 240,
"waiting_patients": 15,
"average_wait_time": 35.0,
"admission_rate": 4.2,
"discharge_rate": 3.8,
"department_metrics": {}
},
"resources": {
"equipment_availability": {},
"supply_levels": {},
"resource_utilization": 0.75,
"pending_requests": 5,
"critical_supplies": []
},
"quality": {
"patient_satisfaction": 8.5,
"care_outcomes": {},
"compliance_rate": 0.95,
"incident_count": 2,
"quality_scores": {},
"last_audit_date": datetime.now()
},
"staffing": {
"total_staff": 500,
"available_staff": {
"doctors": 50,
"nurses": 150,
"specialists": 30,
"support": 70
}
}
}
}
真实医院集成
在真实医院环境中,系统将与多个数据源集成:
1. 医院信息系统 (HIS)
## Example integration with HIS
class HISConnector:
def __init__(self, config: Dict):
self.connection = self._establish_connection(config)
async def get_patient_data(self) -> Dict:
"""Fetch real-time patient data from HIS"""
return await self.connection.query("""
SELECT department_id,
COUNT(*) as occupied_beds,
AVG(wait_time) as avg_wait
FROM patient_admissions
WHERE status = 'active'
GROUP BY department_id
""")
2. 电子健康记录 (EHR)
## Example EHR integration
class EHRSystem:
def get_department_metrics(self) -> Dict:
"""Real-time department statistics"""
return {
"patient_count": self.active_patients(),
"average_stay": self.calculate_los(),
"pending_discharges": self.get_discharge_queue()
}
3. 资源管理系统 (RMS)
## Example resource tracking integration
class ResourceTracker:
def get_inventory_levels(self) -> Dict:
"""Real-time inventory data"""
return {
"supplies": self.current_inventory(),
"equipment": self.equipment_status(),
"utilization": self.calculate_usage_rates()
}
关键集成点
- 实时数据源:
- 患者管理系统 (PMS)
- 电子健康记录 (EHR)
- 实验室信息系统 (LIS)
- 药房管理系统
- 资源管理系统 (RMS)
- 员工管理系统 (SMS)
class RealTimeData:
patient_data: Dict[str, Any] # Patient flow information
resource_data: Dict[str, Any] # Equipment and supply levels
staff_data: Dict[str, Any] # Staff availability and schedules
quality_metrics: Dict[str, Any] # Real-time quality indicators
2. 集成方法:
- REST APIs
- HL7 消息
- FHIR 标准
- 数据库连接器
- 消息队列
3. 示例集成代码:
class HospitalSystemIntegration:
def __init__(self):
self.ehr_system = EHRConnector()
self.his_system = HISConnector()
self.rms_system = RMSConnector()
async def get_real_time_state(self) -> HospitalState:
"""Gather real-time data from all systems"""
try:
return {
"patient_data": await self.ehr_system.get_patient_metrics(),
"resource_data": await self.rms_system.get_resource_levels(),
"staff_data": await self.his_system.get_staff_status(),
"quality_metrics": await self.get_quality_indicators()
}
except IntegrationError as e:
logger.error(f"Integration error: {str(e)}")
raise
您可以在以下位置找到完整的代码和实现:
- 设置您的环境
- 创建并激活虚拟环境
- 安装依赖:
pip install -r requirements.txt
- 将
.env.example
复制到.env
并添加您的 OpenAI API 密钥
- 所需配置 在运行代理之前,您需要配置:
- API 密钥:
- 将您的 OpenAI API 密钥添加到
.env
文件 - 可选:如果使用搜索功能,请添加 Tavily API 密钥
HOSPITAL_SETTINGS = {
"total_beds": 300, # Adjust to your hospital size
"departments": ["ER", "ICU", "General", "Surgery", "Pediatrics"],
"staff_roles": ["Doctor", "Nurse", "Specialist", "Support Staff"]
}
- 可自定义参数
您可以通过调整以下内容来修改代理的行为:
- 状态参数(在
src/models/state.py
中): - 患者流量指标
- 资源利用阈值
- 员工排班偏好
- 质量监测基准
部门配置:
Department = {
"capacity": int, # Department bed capacity
"current_occupancy": int, # Current patient count
"staff_count": Dict, # Staff by role
"wait_time": int # Average wait time in minutes
}
质量阈值:
QUALITY_THRESHOLDS = {
"min_satisfaction_score": 7.0,
"max_wait_time_minutes": 45,
"optimal_bed_utilization": 0.85,
"min_staff_ratio": {
"ICU": 0.5, # 1 nurse per 2 patients
"General": 0.25 # 1 nurse per 4 patients
}
}
测试代理
从基本查询开始,例如:
from src.agent import HealthcareAgent
agent = HealthcareAgent()
## Try some example queries
response = agent.process(
"What is the current ER occupancy and wait time?",
thread_id="test-thread"
)
response = agent.process(
"Show me staff distribution across departments",
thread_id="test-thread"
)
扩展代理
随意:
- 添加新部门或员工角色
- 创建自定义指标和阈值
- 在
src/tools/
中实现额外工具 - 在
src/nodes/
中添加新分析节点
结论
最佳实践总结:
- 状态管理:
- 保持状态不可变
- 验证状态转换
- 维护状态历史
- 错误处理:
- 实施全面的错误处理
- 使用后备机制
- 提供清晰的错误信息
- 测试:
- 编写全面的测试
- 测试真实场景
- 监控性能
未来增强:
- 额外功能:
- 多医院支持
- 高级分析
- 预测能力
- 技术改进:
- 增强缓存
- 更好的并行化
- 更复杂的错误恢复
- 集成能力:
- EHR系统集成
- 移动应用支持
- 实时警报
## Sample code block
def example_function():
return "This code block remains unchanged."
参考 :
Langraph
https://github.com/langchain-ai/langgraph
选择您的 AI 代理框架:
开发者体验与学习曲线
在开发者体验方面,每个框架提供了不同的方式,以满足不同类型的开发者和用例。AutoGen 以其以开发者为中心的设计脱颖而出,特别是在代码生成场景中表现出色。其架构围绕多智能体对话构建,使习惯于调试和迭代开发的开发者感到自然。该框架提供了强大的代码生成、测试和调试工具,使其对软件开发团队特别有吸引力。尽管 AutoGen 的学习曲线适中,但其文档内容全面,并包含许多实用示例,帮助开发者快速入门。
LangGraph 采取了更结构化的开发者体验方法,实施了基于图的架构,对曾经使用工作流引擎或状态机的开发者来说会感到熟悉。这种结构化的方法伴随着更陡峭的学习曲线,因为开发者需要理解节点、边和状态管理等概念。然而,这种学习投资在需要对智能体行为和状态进行精确控制的复杂应用中是值得的。该框架与 LangChain 的集成意味着已经熟悉该生态系统的开发者会发现过渡相对顺利。
CrewAI 可能是四个框架中开发者体验最为友好的。其基于角色的系统自然地映射到开发者如何将复杂任务拆分为更小、可管理的部分。该框架的文档因其清晰和示例丰富而受到特别赞誉。新开发者可以快速启动一个简单的智能体,尽管他们可能需要投入更多时间以充分利用该框架在复杂应用中的能力。
OpenAI Swarm 采取了独特的极简主义开发者体验。作为一个实验性框架,它强调教育价值而非生产就绪性。其轻量级设计使开发者能够快速理解整个系统,是学习智能体架构的优秀工具。然而,这种简单性也带来了局限性——希望构建生产系统的开发者可能需要寻找其他选择。
对于在这些框架之间做出选择的团队而言,决策通常取决于他们的具体需求和限制。如果您的团队高度关注代码生成和自动化,AutoGen 的专业工具使其成为一个引人注目的选择。对于需要精确控制的复杂工作流,LangGraph 的结构化方法可能值得付出更陡峭的学习曲线。希望快速入门且摩擦最小的团队可能更喜欢 CrewAI 的直观角色系统。而对于那些主要希望学习智能体系统或构建简单原型的开发者,OpenAI Swarm 提供了一个优秀的教育平台。
架构与实现哲学:
每个框架采用独特的架构方法,反映其核心设计哲学。LangGraph 的架构围绕图形系统构建,将代理交互视为节点和有向图中的转换。这种方法在复杂工作流程中提供了卓越的清晰度——开发人员可以准确地可视化不同组件如何交互以及状态如何在系统中流动。该框架与 LangChain 的紧密集成意味着它继承了强大的功能,如内存管理和工具集成,尽管这有时会使其在简单应用中显得比较笨重。
AutoGen 采取根本不同的方法,围绕代理之间的基于对话的交互结构化其架构。这种设计反映了自然的问题解决过程,其中不同的专家协作解决复杂问题。每个代理可以维护自己的上下文和能力,使其在需要多轮精炼或验证的任务中尤其有效。该框架对代码生成的强烈强调意味着它包含直接内置于其架构中的复杂代码执行、测试和错误处理机制。
CrewAI 的架构围绕角色基础团队的概念构建。CrewAI 不关注数据或对话的技术流,而是强调代理的组织结构。这种方法使其在商业应用中特别直观,因为不同的代理需要以类似人类团队互动的方式进行协作。该框架的轻量级架构意味着更容易入门,尽管在复杂场景中可能需要更多的自定义实现。
OpenAI Swarm 引入了一种令人耳目一新的简单架构模式,基于例程和交接。其设计强调教育清晰度而非生产特性,实施了足够的结构以展示核心代理概念,而不会使开发人员感到复杂的负担。这种极简主义的方法使其成为理解代理系统基础知识的优秀工具,尽管在生产使用中可能需要显著增强。
状态管理与内存:
状态管理是这些框架之间最关键的差异之一。LangGraph在这一领域表现出色,提供了复杂的状态管理能力,包括内置持久性和暂停与恢复执行的能力。这使得它特别适合于长时间运行的任务或在多个交互中保持上下文的场景。该框架的检查点系统允许在故障后可靠恢复,并支持时间旅行调试等高级功能。
AutoGen的状态管理方法更侧重于在代理之间维护对话上下文。虽然它没有LangGraph那样的持久性,但其状态管理非常适合其主要用例,即代码生成和优化。该框架有效地维护对话历史和中间结果,尽管开发人员可能需要为长时间运行的应用程序实施额外的持久性层。
CrewAI实现了一个更简单但有效的状态管理系统,专注于维护角色特定的上下文。CrewAI系统中的每个代理可以维护自己的状态,框架在必要时提供了代理之间共享信息的机制。这种方法适用于典型的业务流程,但对于复杂的状态管理需求可能需要额外的实现。
OpenAI Swarm采取了一种最小的状态管理方法,在调用之间基本上是无状态的。这种设计选择反映了其教育重点,使理解代理的工作方式变得更加容易,但这意味着开发人员需要为超出简单交互的任何内容实现自己的状态管理解决方案。
生产准备和扩展
在生产部署方面,这些框架在其能力和重点上表现出显著差异。LangGraph 脱颖而出,成为最具生产就绪性的框架,具备强大的错误处理、内置监控功能和良好的扩展支持。它与 LangChain 的集成意味着它继承了丰富的生产部署工具生态系统,包括速率限制和回退处理等功能。
AutoGen 也表现出强大的生产就绪性,特别是在以开发为中心的工作流程中。其错误处理非常复杂,尤其是在代码执行方面,并且它包含管理并发代理交互的功能。该框架的模块化设计使其相对容易进行横向扩展,尽管开发人员可能需要为非常大规模的部署实施额外的基础设施。
CrewAI 处于一个有趣的中间地带。虽然它在多个组织中被用于生产,但在企业级部署中可能需要更多的自定义实现。该框架专注于直观设计和快速启动,这意味着一些高级生产功能需要由开发人员添加,尽管其活跃的社区通常会分享常见挑战的解决方案。
OpenAI Swarm 明确标记为实验性,不适用于生产使用。它的价值在于教育和原型制作,尝试在生产中使用它将需要大量额外开发。
现实世界的应用和用例
这些框架的不同优势在考虑现实世界应用时尤为明显。LangGraph 在复杂的企业场景中表现出色,如工作流自动化、客户服务系统以及需要复杂 RAG(检索增强生成)的应用。其强大的状态管理和控制流能力使其成为对可靠性和可预测性要求极高的关键任务应用的理想选择。
AutoGen 在以开发为中心的应用中找到了最佳切入点。它在代码生成、调试辅助和自动化测试场景中尤其有效。许多组织使用它来增强其开发工作流程,创建可以帮助从代码审查到文档生成等各个方面的系统。
CrewAI 在业务流程自动化场景中表现突出,重点在于基于角色的协作。它在销售自动化、客户服务和项目管理等应用中尤其有效,这些应用中不同的代理需要在明确定义的角色中协同工作。
OpenAI Swarm 虽然不适合生产环境,但在教育和原型设计中发挥着重要作用。它对于学习代理架构的团队或在更强大的框架中实施之前快速测试概念的团队尤为宝贵。
安装说明:
## LangGraph
pip install langgraph
## AutoGen
pip install pyautogen
## CrewAI
pip install crewai
## OpenAI Swarm (experimental)
pip install git+https://github.com/openai/swarm.git
框架参考与资源
LangGraph (v0.0.15)
基于图的框架,用于构建有状态的多参与者应用程序
GitHub: https://github.com/langchain-ai/langgraph
文档: https://python.langchain.com/docs/langgraph
示例: https://python.langchain.com/docs/langgraph/examples
AutoGen (v1.0.0)
微软的多智能体对话框架
GitHub: https://github.com/microsoft/autogen
文档: https://microsoft.github.io/autogen/
CrewAI (v0.14.1)
用于协调基于角色的 AI 代理的框架
GitHub: https://github.com/joaomdmoura/crewai
示例: https://github.com/joaomdmoura/crewai-examples
OpenAI Swarm (实验版)
用于教育目的的实验框架
GitHub: https://github.com/openai/swarm
Cookbook: https://cookbook.openai.com/examples/orchestrating_agents