Type something to search...
使用 pydanticai 和 postgresql 创建一个 rag ai 代理:开发者的全面分步指南

使用 pydanticai 和 postgresql 创建一个 rag ai 代理:开发者的全面分步指南

使用 PydanticAI 创建 RAG 应用程序

在本文中,我将逐步向您展示如何使用 PydanticAI 创建 RAG (Retrieval Augmented Generation) 应用程序。与手动实现 RAG 相比,代码更简单、更清晰。

先决条件:

PydanticAI RAG Agent

在开始之前,您需要以下内容:

数据库设置

我们需要设置数据库,获取连接字符串,并使用以下模式创建一个干净的表:

DB_SCHEMA = """
    CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE IF NOT EXISTS text_chunks (
        id serial PRIMARY KEY,
        chunk text NOT NULL,
        embedding vector(1536) NOT NULL
    );
    CREATE INDEX IF NOT EXISTS idx_text_chunks_embedding ON text_chunks USING hnsw (embedding vector_l2_ops);
    """

数据库的其余代码如下所示:

from __future__ import annotations as _annotations

from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import List
import pydantic_core

import asyncpg
import httpx
import fitz
import json
from pydantic import BaseModel
from pydantic_ai import Agent, RunContext
from pydantic_ai.models.openai import OpenAIModel

from openai import AsyncOpenAI

DB_DSN = "database-dsn-goes-here"
OPENAI_API_KEY = "sk-proj-your-api-key-goes-here"

@asynccontextmanager
async def database_connect(create_db: bool = False):
    """Manage database connection pool."""
    pool = await asyncpg.create_pool(DB_DSN)
    try:
        if create_db:
            async with pool.acquire() as conn:
                await conn.execute(DB_SCHEMA)
        yield pool
    finally:
        await pool.close()

class Chunk(BaseModel):
    chunk: str

async def split_text_into_chunks(text: str, max_words: int = 400, overlap: float = 0.2) -> List[Chunk]:
    """Split long text into smaller chunks based on word count with overlap."""
    words = text.split()
    chunks = []
    step_size = int(max_words * (1 - overlap))

    for start in range(0, len(words), step_size):
        end = start + max_words
        chunk_words = words[start:end]
        if chunk_words:
            chunks.append(Chunk(chunk=" ".join(chunk_words)))

    return chunks

async def insert_chunks(pool: asyncpg.Pool, chunks: List[Chunk], openai_client: AsyncOpenAI):
    """Insert text chunks into the database with embeddings."""
    for chunk in chunks:
        embedding_response = await openai_client.embeddings.create(
            input=chunk.chunk,
            model="text-embedding-3-small"
        )

        # Extract embedding data and convert to JSON format
        assert len(embedding_response.data) == 1, f"Expected 1 embedding, got {len(embedding_response.data)}"
        embedding_data = json.dumps(embedding_response.data[0].embedding)

        # Insert into the database
        await pool.execute(
            'INSERT INTO text_chunks (chunk, embedding) VALUES ($1, $2)',
            chunk.chunk,
            embedding_data
        )

async def download_pdf(url: str) -> bytes:
    """Download PDF from a given URL."""
    async with httpx.AsyncClient() as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.content

def extract_text_from_pdf(pdf_content: bytes) -> str:
    """Extract text from PDF content."""
    document = fitz.open(stream=pdf_content, filetype="pdf")
    text = ""
    for page_num in range(document.page_count):
        page = document.load_page(page_num)
        text += page.get_text()
    return text

async def add_pdf_to_db(url: str):
    """Download PDF, extract text, and add to the embeddings database."""
    openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
    pdf_content = await download_pdf(url)
    text = extract_text_from_pdf(pdf_content)
    async with database_connect(create_db=True) as pool:
        chunks = await split_text_into_chunks(text)
        await insert_chunks(pool, chunks, openai_client)

async def update_db_with_pdf(url: str):
    """Download PDF, extract text, and update the embeddings database."""
    openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)
    pdf_content = await download_pdf(url)
    text = extract_text_from_pdf(pdf_content)
    async with database_connect() as pool:
        chunks = await split_text_into_chunks(text)
        await insert_chunks(pool, chunks, openai_client)

async def execute_url_pdf(url: str):
    """
    Check if the database table exists, and call the appropriate function
    to handle the PDF URL.
    """
    async with database_connect() as pool:
        table_exists = await pool.fetchval("""
            SELECT EXISTS (
                SELECT FROM information_schema.tables
                WHERE table_name = 'text_chunks'
            )
        """)

        if table_exists:
            # If the table exists, update the database
            print("Table exists. Updating database with PDF content.")
            await update_db_with_pdf(url)
        else:
            # If the table does not exist, add the PDF and create the table
            print("Table does not exist. Adding PDF and creating the table.")
            await add_pdf_to_db(url)

以上代码将执行以下操作:

  1. 使用您提供的 DB_DSN 字符串连接到数据库,如果表尚不存在,则根据模式创建一个新表。
  2. 获取 PDF 文档并从文档中提取文本。
  3. 将文档分成 20% 重叠的块。
  4. 使用 OpenAI 嵌入模型获取块并创建嵌入。
  5. 将创建的嵌入与块一起保存到数据库中。

此代码还允许您根据需要将新的 PDF 文档添加到同一表中,因此您可以上传多个文档。

💡 提示:此代码目前仅允许上传 PDF 文件。您可以通过添加函数来扩展它,以提取其他文档类型的内容,例如:Word 文档、Excel 电子表格、Powerpoint 等。

🚀 通过使用更复杂的块方法来保留上下文并优化 RAG 输出,改进代码。

PydanticAI RAG Agent 代码

添加以下代码以创建一个带有检索工具的 PydanticAI agent。RAG 仅作为检索工具添加到 agent 中,这意味着您可以将此工具与其他许多工具一起添加,例如我们在上一篇文章中探讨的 AI Agent CRUD 工具

@dataclass
class Deps:
    pool: asyncpg.Pool
    openai: AsyncOpenAI

### 初始化 agent
model = OpenAIModel("gpt-4o", api_key=OPENAI_API_KEY)
rag_agent = Agent(model, deps_type=Deps)

@rag_agent.tool
async def retrieve(context: RunContext[Deps], search_query: str) -> str:
    """根据搜索查询检索文档部分。

参数:
        context: 调用上下文。
        search_query: 搜索查询。
    """
    print("正在检索..............")
    embedding = await context.deps.openai.embeddings.create(
            input=search_query,
            model='text-embedding-3-small',
        )

assert (
        len(embedding.data) == 1
    ), f'Expected 1 embedding, got {len(embedding.data)}, doc query: {search_query!r}'

embedding = embedding.data[0].embedding
    embedding_json = pydantic_core.to_json(embedding).decode()
    rows = await context.deps.pool.fetch(
        'SELECT chunk FROM text_chunks ORDER BY embedding <-> $1 LIMIT 5',
        embedding_json,
    )
    from_db = '\n\n'.join(
    f'# Chunk:\n{row["chunk"]}\n'
    for row in rows
    )
    return from_db

async def run_agent(question: str):
    """运行 agent 并执行基于 RAG 的问答的入口点。"""

## 设置 agent 和依赖项
    async with database_connect() as pool:
        openai_client = AsyncOpenAI(api_key=OPENAI_API_KEY)

async with database_connect(False) as pool:
            deps = Deps(openai=openai_client, pool=pool)
            base_instruction = f"使用 'retrieve' 工具获取信息以帮助您回答这个问题:{question}"
            answer = await rag_agent.run(base_instruction, deps=deps)
            return answer.data

我们的 agent 称为 rag_agent,我们使用一个包含数据库连接和 OpenAI 客户端的 Dependency 类对其进行初始化。此依赖项将允许 rag_agent 连接到数据库并使用数据库中的信息回答问题,只需使用该工具即可。

这很棒,因为 AI agent 将决定是否应该调用该工具,并且工作将在后台进行,我们将获得完整的响应,而无需执行多个 AI 调用来首先过滤数据库,然后回答问题。

前端应用程序:Streamlit

可以通过前端 streamlit 应用程序访问 agent。该应用程序的代码如下所示:

import streamlit as st
import asyncio
from aiagent import execute_url_pdf, run_agent

### Streamlit 页面配置
st.set_page_config(
    page_title="AI 助手 📚🤖",
    page_icon="📚",
    layout="wide"
)

### 标题
st.title("AI 助手 📚🤖")
st.write("与您的基于 PDF 的 AI 助手互动。使用以下选项上传 PDF 或提问。")

### 带有两列的布局
col1, col2 = st.columns(2)

### 第 1 列:通过 URL 上传 PDF
with col1:
    st.subheader("📄 上传 PDF")
    pdf_url = st.text_input("输入 PDF 文档的 URL:", placeholder="https://example.com/document.pdf")
    if st.button("📥 将 PDF 添加到数据库"):
        if pdf_url:
            with st.spinner("正在处理 PDF 并更新数据库..."):
                try:
                    asyncio.run(execute_url_pdf(pdf_url))
                    st.success("PDF 已成功处理并添加到数据库!")
                except Exception as e:
                    st.error(f"处理 PDF 时出错:{e}")
        else:
            st.warning("请输入有效的 URL。")

### 第 2 列:提问
with col2:
    st.subheader("❓ 提问")
    question = st.text_input("输入您的问题:", placeholder="全栈开发人员的职责是什么?")
    if st.button("🔍 获取答案"):
        if question:
            with st.spinner("思考中..."):
                try:
                    answer = asyncio.run(run_agent(question))
                    st.success("这是答案:")
                    st.write(answer)
                except Exception as e:
                    st.error(f"获取答案时出错:{e}")
        else:
            st.warning("请输入有效的问题。")

### 页脚
st.markdown("---")
st.write("✨ 由 [Skolo Online](https://skolo.online) 和 Pydantic AI 提供支持")

从前端,您可以:

  1. 上传 PDF 文档
  2. 向 PydanticAI Agent 提问有关您上传的 PDF 的问题

前端应该如下所示:

Streamlit PydanticAI RAG 应用程序

Related Posts

结合chatgpt-o3-mini与perplexity Deep Research的3步提示:提升论文写作质量的终极指南

结合chatgpt-o3-mini与perplexity Deep Research的3步提示:提升论文写作质量的终极指南

AI 研究报告和论文写作 合并两个系统指令以获得两个模型的最佳效果 Perplexity AI 的 Deep Research 工具提供专家级的研究报告,而 OpenAI 的 ChatGPT-o3-mini-high 擅长推理。我发现你可以将它们结合起来生成令人难以置信的论文,这些论文比任何一个模型单独撰写的都要好。你只需要将这个一次性提示复制到 **

阅读更多
让 Excel 过时的 10 种 Ai 工具:实现数据分析自动化,节省手工作业时间

让 Excel 过时的 10 种 Ai 工具:实现数据分析自动化,节省手工作业时间

Non members click here作为一名软件开发人员,多年来的一个发现总是让我感到惊讶,那就是人们还在 Excel

阅读更多
使用 ChatGPT 搜索网络功能的 10 种创意方法

使用 ChatGPT 搜索网络功能的 10 种创意方法

例如,提示和输出 你知道可以使用 ChatGPT 的“搜索网络”功能来完成许多任务,而不仅仅是基本的网络搜索吗? 对于那些不知道的人,ChatGPT 新的“搜索网络”功能提供实时信息。 截至撰写此帖时,该功能仅对使用 ChatGPT 4o 和 4o-mini 的付费会员开放。 ![](https://images.weserv.nl/?url=https://cdn-im

阅读更多
掌握Ai代理:解密Google革命性白皮书的10个关键问题解答

掌握Ai代理:解密Google革命性白皮书的10个关键问题解答

10 个常见问题解答 本文是我推出的一个名为“10 个常见问题解答”的新系列的一部分。在本系列中,我旨在通过回答关于该主题的十个最常见问题来分解复杂的概念。我的目标是使用简单的语言和相关的类比,使这些想法易于理解。 图片来自 [Solen Feyissa](https://unsplash.com/@solenfeyissa?utm_source=medium&utm_medi

阅读更多
在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和科技这样一个动态的行业中,保持领先意味着不断提升你的技能。无论你是希望深入了解人工智能模型性能、掌握数据分析,还是希望通过人工智能转变传统领域如法律,这些课程都是你成功的捷径。以下是一个精心策划的高价值课程列表,可以助力你的职业发展,并让你始终处于创新的前沿。 1. 生成性人工智能简介课程: [生成性人工智能简介](https://genai.works

阅读更多
揭开真相!深度探悉DeepSeek AI的十大误区,您被误导了吗?

揭开真相!深度探悉DeepSeek AI的十大误区,您被误导了吗?

在AI军备竞赛中分辨事实与虚构 DeepSeek AI真的是它所宣传的游戏规则改变者,还是仅仅聪明的营销和战略炒作?👀 虽然一些人将其视为AI效率的革命性飞跃,但另一些人则认为它的成功建立在借用(甚至窃取的)创新和可疑的做法之上。传言称,DeepSeek的首席执行官在疫情期间像囤积卫生纸一样囤积Nvidia芯片——这只是冰山一角。 从其声称的550万美元培训预算到使用Open

阅读更多
Type something to search...