使用 PydanticAI 创建人工智能代理 CRUD 应用程序:逐步进行
很高兴再次回到 Skolo Online!在我们2025年的第一篇教程中,我们将深入探讨 PydanticAI。在本文中,我们将涵盖以下内容:
- 如何开始使用 Pydantic AI — “你好,世界”教程
- 设置 PostgreSQL 数据库
- 更复杂的 PydanticAI 实现,其中两个代理协同工作以执行用户请求。代理 1 理解用户意图,代理 2 执行该意图。
最后,我们将在大约一个小时内构建一个笔记应用程序,您可以提供自然语言提示,例如:“请为我创建一个笔记并添加以下内容”,应用程序将理解该查询并在数据库中执行,添加新的笔记。您还可以列出可用的笔记或查看详细的笔记。
有关逐步的视觉演示,请查看 配套的 YouTube 视频.
AI代理简介
您可能听说过“生成式AI”这个术语——但什么是AI代理呢?本质上,它们是利用先进语言模型以更自主的方式处理任务的下一代工具。它们可以解析用户查询,提取相关信息,并以结构化的方式与外部服务进行交互。
上面的图表展示了AI是如何从基本的生成模型发展到能够与各种工具协作的复杂AI代理的。
Spotlight on PydanticAI
PydanticAI 是一个 Python 代理框架,旨在简化使用生成式 AI 开发生产级应用程序的过程。它由 Pydantic 团队开发——Pydantic 是众多 Python 库和框架中不可或缺的验证层——PydanticAI 强调类型安全,并与 mypy 和 pyright 等静态类型检查器无缝集成。
PydanticAI 的主要特点包括:
- 结构化响应:利用 Pydantic 验证和结构化模型输出,确保运行之间的一致性。
- 依赖注入系统:提供一个可选系统,以向代理的系统提示、工具和结果验证器提供数据和服务,增强测试和迭代开发。
- 流式响应:支持 LLM 输出的连续流式传输,并进行即时验证,以快速准确地获得结果。
目前处于早期测试阶段,PydanticAI 的 API 可能会发生变化,开发团队欢迎反馈,以改进和增强其功能。
设置环境
在安装 pydantic-ai 之前,请确认您的 Python 版本为 3.9 或更高:
python --version
然后,创建并激活一个虚拟环境,接着安装 pydantic-ai:
virtualenv skoloenv
source skoloenv/bin/activate
pip install pydantic-ai
理解 PydanticAI 的核心
在 PydanticAI 中,主要的工作单元是 Agent
类。通过使用它,您可以对多种模型进行查询。您可以在 官方文档 中查看兼容模型的完整列表。以下是一个快速示例,展示如何使用 OpenAI 模型初始化一个 Agent:
from pydantic_ai import Agent
from pydantic_ai.models.openai import OpenAIModel
## 引用您所需的模型
model = OpenAIModel('gpt-4o', api_key='add-your-api-key-here')
agent = Agent(model)
result = agent.run_sync("What is Bitcoin?")
print(result.data)
我们建议将您的 API 密钥存储为环境变量:
export OPENAI_API_KEY='your-api-key'
以下是您可以与 PydanticAI 一起使用的所有可用模型的列表:
KnownModelName = Literal[
"openai:gpt-4o",
"openai:gpt-4o-mini",
"openai:gpt-4-turbo",
"openai:gpt-4",
"openai:o1-preview",
"openai:o1-mini",
"openai:o1",
"openai:gpt-3.5-turbo",
"groq:llama-3.3-70b-versatile",
"groq:llama-3.1-70b-versatile",
"groq:llama3-groq-70b-8192-tool-use-preview",
"groq:llama3-groq-8b-8192-tool-use-preview",
"groq:llama-3.1-70b-specdec",
"groq:llama-3.1-8b-instant",
"groq:llama-3.2-1b-preview",
"groq:llama-3.2-3b-preview",
"groq:llama-3.2-11b-vision-preview",
"groq:llama-3.2-90b-vision-preview",
"groq:llama3-70b-8192",
"groq:llama3-8b-8192",
"groq:mixtral-8x7b-32768",
"groq:gemma2-9b-it",
"groq:gemma-7b-it",
"gemini-1.5-flash",
"gemini-1.5-pro",
"gemini-2.0-flash-exp",
"vertexai:gemini-1.5-flash",
"vertexai:gemini-1.5-pro",
"mistral:mistral-small-latest",
"mistral:mistral-large-latest",
"mistral:codestral-latest",
"mistral:mistral-moderation-latest",
"ollama:codellama",
"ollama:gemma",
"ollama:gemma2",
"ollama:llama3",
"ollama:llama3.1",
"ollama:llama3.2",
"ollama:llama3.2-vision",
"ollama:llama3.3",
"ollama:mistral",
"ollama:mistral-nemo",
"ollama:mixtral",
"ollama:phi3",
"ollama:qwq",
"ollama:qwen",
"ollama:qwen2",
"ollama:qwen2.5",
"ollama:starcoder2",
"claude-3-5-haiku-latest",
"claude-3-5-sonnet-latest",
"claude-3-opus-latest",
"test",
]
扩展 PydanticAI 与外部工具
我们将利用 PostgreSQL 数据库,并构建一个数据库连接类,该类将作为依赖项添加到我们的 PydanticAI 代理中,这将允许代理执行数据库功能。
PostgreSQL 设置
首先,您需要一个可用且干净的 PostgreSQL 数据库。在上面链接的视频中,我向您展示了如何在 DigitalOcean 上设置一个(请注意,您可以使用任何 PostgreSQL 数据库)。
您可以使用此联盟链接开始使用 DigitalOcean — https://m.do.co/c/7d9a2c75356d
一旦您有了数据库,获取连接字符串 — 您将在下一步中需要它。
然后安装以下内容:
pip install psycopg2
pip install asyncpg
我们将创建几个 Python 函数来设置一个“notes”表并检查它是否存在:
import psycopg2
DB_DSN = "database-connection-string"
def create_notes_table():
"""
如果不存在,则建立一个“notes”表,包含“id”、“title”和“text”。
"""
create_table_query = """
CREATE TABLE IF NOT EXISTS notes (
id SERIAL PRIMARY KEY,
title VARCHAR(200) UNIQUE NOT NULL,
text TEXT NOT NULL
);
"""
try:
connection = psycopg2.connect(DB_DSN)
cursor = connection.cursor()
cursor.execute(create_table_query)
connection.commit()
print("成功创建或验证了 'notes' 表。")
except psycopg2.Error as e:
print(f"创建表时出错: {e}")
finally:
if connection:
cursor.close()
connection.close()
def check_table_exists(table_name: str) -> bool:
"""
检查指定的表是否存在于数据库中。
"""
query = """
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = %s
);
"""
try:
connection = psycopg2.connect(DB_DSN)
cursor = connection.cursor()
cursor.execute(query, (table_name,))
exists = cursor.fetchone()[0]
return exists
except psycopg2.Error as e:
print(f"检查表时出错: {e}")
return False
finally:
if connection:
cursor.close()
connection.close()
确保您可以运行 check_table_exists(“notes”) 函数,并且您得到“True”响应。这告诉您数据库连接正常,并且您已成功创建“notes”表。
接下来,我们将引入一个异步类来管理笔记操作,例如添加笔记、检索笔记和列出标题。这是“Agent”将使用的类。
import asyncpg
from typing import Optional, List
class DatabaseConn:
def __init__(self):
"""
存储连接的 DSN(数据源名称)。
"""
self.dsn = DB_DSN
async def _connect(self):
"""
打开与 PostgreSQL 的异步连接。
"""
return await asyncpg.connect(self.dsn)
async def add_note(self, title: str, text: str) -> bool:
"""
插入具有给定标题和文本的笔记。
如果存在相同标题的笔记,则不会覆盖。
"""
query = """
INSERT INTO notes (title, text)
VALUES ($1, $2)
ON CONFLICT (title) DO NOTHING;
"""
conn = await self._connect()
try:
result = await conn.execute(query, title, text)
return result == "INSERT 0 1"
finally:
await conn.close()
async def get_note_by_title(self, title: str) -> Optional[dict]:
"""
检索与指定标题匹配的笔记。返回一个字典或 None。
"""
query = "SELECT title, text FROM notes WHERE title = $1;"
conn = await self._connect()
try:
record = await conn.fetchrow(query, title)
if record:
return {"title": record["title"], "text": record["text"]}
return None
finally:
await conn.close()
async def list_all_titles(self) -> List[str]:
"""
获取并返回所有笔记标题。
"""
query = "SELECT title FROM notes ORDER BY title;"
conn = await self._connect()
try:
results = await conn.fetch(query)
return [row["title"] for row in results]
finally:
await conn.close()
将笔记与 PydanticAI 集成
为了将这些组件结合在一起,我们将创建两个不同的 Agents:
- 一个 意图提取代理 — 确定用户是想创建、列出还是检索笔记。
- 一个 动作处理代理 — 实际使用我们的数据库代码处理数据。
以下是一个 main.py 结构示例:
from dataclasses import dataclass
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext
from typing import Optional, List
from database import DatabaseConn
from pydantic_ai.models.openai import OpenAIModel
OPENAI_API_KEY = "enter-your-openai-api-key-here"
@dataclass
class NoteIntent:
action: str
title: Optional[str] = None
text: Optional[str] = None
@dataclass
class NoteDependencies:
db: DatabaseConn
class NoteResponse(BaseModel):
message: str
note: Optional[dict] = None
titles: Optional[List[str]] = None
## 1. 用于解析用户意图的代理
intent_model = OpenAIModel('gpt-4o-mini', api_key=OPENAI_API_KEY)
intent_agent = Agent(
intent_model,
result_type=NoteIntent,
system_prompt=(
"You are an intent extraction assistant. Understand what the user wants "
"(e.g., create, retrieve, list) and extract the relevant data like title and text. "
"Your output format must be a JSON-like structure with keys: action, title, text."
)
)
## 2. 用于执行识别到的动作的代理
action_model = OpenAIModel('gpt-4o-mini', api_key=OPENAI_API_KEY)
action_agent = Agent(
action_model,
deps_type=NoteDependencies,
result_type=NoteResponse,
system_prompt=(
"Based on the identified user intent, carry out the requested action on the note storage. "
"Actions can include: 'create' (add note), 'retrieve' (get note), or 'list' (list all notes)."
)
)
## action_agent 的工具
@action_agent.tool
async def create_note_tool(ctx: RunContext[NoteDependencies], title: str, text: str) -> NoteResponse:
db = ctx.deps.db
success = await db.add_note(title, text)
return NoteResponse(message="CREATED:SUCCESS" if success else "CREATED:FAILED")
@action_agent.tool
async def retrieve_note_tool(ctx: RunContext[NoteDependencies], title: str) -> NoteResponse:
db = ctx.deps.db
note = await db.get_note_by_title(title)
return NoteResponse(message="GET:SUCCESS", note=note) if note else NoteResponse(message="GET:FAILED")
@action_agent.tool
async def list_notes_tool(ctx: RunContext[NoteDependencies]) -> NoteResponse:
db = ctx.deps.db
all_titles = await db.list_all_titles()
return NoteResponse(message="LIST:SUCCESS", titles=all_titles)
async def handle_user_query(user_input: str, deps: NoteDependencies) -> NoteResponse:
# 确定用户意图
intent = await intent_agent.run(user_input)
print(intent.data)
if intent.data.action == "create":
query = f"Create a note named '{intent.data.title}' with the text '{intent.data.text}'."
response = await action_agent.run(query, deps=deps)
return response.data
elif intent.data.action == "retrieve":
query = f"Retrieve the note titled '{intent.data.title}'."
response = await action_agent.run(query, deps=deps)
return response.data
elif intent.data.action == "list":
query = "List the titles of all notes."
response = await action_agent.run(query, deps=deps)
return response.data
else:
return NoteResponse(message="Action not recognized.")
async def ask(query: str):
db_conn = DatabaseConn()
note_deps = NoteDependencies(db=db_conn)
return await handle_user_query(query, note_deps)
这个设置完成了繁重的工作。第一个代理确定用户意图;第二个代理知道调用哪个工具(创建、检索或列出笔记)。
注意 — note_deps = NoteDependencies(db=db_conn) 类接收数据库连接,并被添加到第二个代理以允许其执行数据库查询。
构建 Streamlit 前端
最后一步是通过一个简单的网页用户界面使一切可访问。安装 Streamlit 非常简单:
pip install streamlit
然后创建一个 app.py 文件:
import asyncio
import streamlit as st
from main import ask # 从你的 main.py 中导入 ask 函数
st.set_page_config(page_title="Note Manager", layout="centered")
st.title("我的笔记仪表板")
st.write("在下面输入指令以创建、检索或列出笔记。")
user_input = st.text_area("你想做什么?", placeholder="例如,'创建一个关于我周一会议的笔记。'")
if st.button("提交"):
if not user_input.strip():
st.error("请输入内容。")
else:
with st.spinner("正在处理..."):
try:
response = asyncio.run(ask(user_input))
if response.note is not None:
st.success(response.message)
st.subheader(f"笔记标题: {response.note.get('title', '')}")
st.write(response.note.get('text', '未找到内容。'))
elif response.titles is not None:
st.success(response.message)
if response.titles:
st.subheader("当前标题:")
for t in response.titles:
st.write(f"- {t}")
else:
st.info("还没有可用的笔记。")
else:
st.info(response.message)
except Exception as e:
st.error(f"错误: {e}")
然后你可以通过以下命令启动它:
streamlit run app.py
总结
只需一点努力,我们就构建了一个强大的 Note Management 工具,使用了:
- PydanticAI 来解析用户请求和结构化数据
- PostgreSQL 来存储笔记
- Streamlit 来提供一个精美的交互式网页界面
查看 GitHub 仓库 获取更多详情: https://github.com/skolo-online/ai-agents
如果您想要更深入的演示,可以在这里观看 YouTube 视频: https://www.youtube.com/watch?v=OAbyC1kMj5w