
构建无sql数据分析ai代理:使用langchain和duckdb实现快速数据洞察
- Rifx.Online
- Large Language Models , Data Science , AI Applications
- 05 Mar, 2025
SQL 和 AI:使用 LangChain 和 DuckDB 构建 AI 代理
照片由 James A. Molnar 提供,来源于 Unsplash
SQL 一直以来是大多数数据分析任务的基本语言。我们经常会提出需要数据来获得答案的问题。SQL 对于将业务需求转换为可执行的数据检索代码至关重要。然而,借助 AI,我们可以开发一个 AI 代理,能够在不需要 SQL 专业知识的情况下解决业务询问。
本文将演示如何使用 LangChain 和 DuckDB 从零开始构建一个 AI 代理。在为 SQL 构建自己的 AI 代理后,您可以相当快速地完成数据分析任务。更有趣的是,我们可以使用来自 Kaggle 的随机数据集来测试 AI 在 SQL 中分析数据的能力。
Tools & Data Used Today
- DuckDB: DuckDB 是一个内嵌式 SQL OLAP 数据库管理系统;它易于安装,并且可以在几秒钟内从 CSV/JSON 文件中编写分析 SQL 查询。
- LangChain: LangChain 是一个可组合的框架,用于与 LLMs 一起构建。鉴于 AI 代理的快速增长,开发我们的代理的一个关键组成部分是使用像 LangChain 这样的框架来管理 LLM 过程并执行特定的内置操作。
- OpenAI API Key: LangChain 只是一个与 LLMs 一起构建应用程序的框架。为了理解分析问题,使用已知模式解析它,然后生成结果,我们仍然需要一个 LLM 解决方案。OpenAI 的一个强大 LLM 是
gpt-4o
,这是 2024 年 5 月开发的最新模型。我们将使用这个模型作为我们 SQL 生成 AI 代理的“大脑”。 - Netflix Dataset: 我们将使用来自 Kaggle 的 Netflix Movies and TV Shows 数据集作为测试我们 SQL 生成 AI 代理的样本。
SQL生成AI代理的高级工作流程
SQL生成工作流程的AI代理
从高层次来看,我们的SQL生成AI代理遵循三个主要步骤。
-
write_query: 所有的魔法都发生在这里,生成SQL所需的输入知识是最重要的方面。我们提供一个分析性问题作为输入,并获得一个SQL查询作为输出。例如,我们可以将以下问题作为输入:“给我提供年总收入超过两百万的商店。”输出将是一个可执行的SQL查询,如下所示:
SELECT store, SUM(sale_price) revenue FROM sales GROUP BY store HAVING SUM(sale_price) > 2000000000
-
execute_query: 在从write_query步骤获得SQL语句后,此步骤使用从LLM生成的SQL在DuckDB上运行以获取实际结果。这本质上与自己编写的SQL语句的运行过程相同。我们可以利用LangChain来管理此工作流程的状态:write_query的输出将作为execute_query的输入。
-
generate_answer: 尽管步骤2的结果以表格格式获得,但解释起来仍然可能很困难。在generate_answer步骤中,我们通过将表格结果反馈给LLM来完善我们的答案,期望它产生更易于理解的输出。
通过遵循这三个步骤,我们将能够创建一个可靠的AI代理,能够帮助解决业务查询,而无需了解SQL语法。以下是上述三个步骤的输出示例。
我问的问题是:“你能获取每位导演的总剧集数,并按总剧集数降序排序前3位导演吗?”
{'write_query': {'query': "SELECT director, COUNT(*) as total_shows \\nFROM read_csv_auto('data/netflix_titles.csv') \\nWHERE director IS NOT NULL \\nGROUP BY director \\nORDER BY total_shows DESC \\nLIMIT 3;"}}
{'execute_query': {'result': [Document(metadata={}, page_content='director: Rajiv Chilaka\\ntotal_shows: 19'), Document(metadata={}, page_content='director: Raúl Campos, Jan Suter\\ntotal_shows: 18'), Document(metadata={}, page_content='director: Suhas Kadav\\ntotal_shows: 16')]}}
{'generate_answer': {'answer': '每位导演的总剧集数,按总剧集数降序排序的前3位导演如下:\\n\\n1. Rajiv Chilaka - 19剧集\\n2. Raúl Campos, Jan Suter - 18剧集\\n3. Suhas Kadav - 16剧集'}}
第0步. 选择适合SQL的LLM。
最初,我们需要正确配置所有环境变量,选择gpt-4o作为我们的基础LLM模型,然后利用LangChain中心的sql-query-system-prompt来简化SQL生成过程。
import os
from langchain_openai import ChatOpenAI
from langchain import hub
os.environ["LANGCHAIN_API_KEY"] = os.environ.get("LANGCHAIN_API_KEY")
os.environ["LANGCHAIN_TRACING_V2"] = os.environ.get("LANGCHAIN_TRACING_V2")
os.environ["OPENAI_API_KEY"] = os.environ.get("OPENAI_API_KEY")
llm = ChatOpenAI(model="gpt-4o")
query_prompt_template = hub.pull("langchain-ai/sql-query-system-prompt")
sql-query-system-prompt 是一个提示模板,后面跟着4个关键参数需要填写:dialect, top_k, table_info和input。我们将在下一步中填写这些参数。
第一步:构建 write_query 函数
为了保持工作流的输入和输出可交换,我们将创建一个名为 State
的类,以帮助保持每个步骤的输出。
从第 0 步开始,我们有 sql-query-system-prompt
作为提示模板。为了提高生成 SQL 的准确性,有必要告知 LLM 我们感兴趣的特定 SQL 风格。由于我们将使用 DuckDB,因此可以通过提供 duckdb
作为 dialect 来实现。
另一个需要提供的关键值是 table_info
,它可以为 LLM 提供有关表的模式或示例行的更多见解。由于我们直接从 CSV 文件读取表,我们将利用 DuckDB 中的 read_csv_auto
函数,使用 CSV 文件路径。table_info
将作为表名,并在最终的 SQL 语句中用作 from
子句。
最后,我们将生成的 SQL 作为查询返回,以便在下一步中对 DuckDB 执行。
from typing_extensions import Annotated
class State():
question: str
query: str
result: str
answer: str
class QueryOutput(TypedDict):
query: Annotated[str, ..., "语法上有效的 SQL 查询。"]
def write_query(state: State):
"""生成 SQL 查询以获取信息。"""
prompt = query_prompt_template.invoke(
{
"dialect": "duckdb",
"top_k": 10,
"table_info": f"read_csv_auto('{file_path}')",
"input": state["question"],
}
)
structured_llm = llm.with_structured_output(QueryOutput)
result = structured_llm.invoke(prompt)
return {"query": result["query"]}
让我们首先测试输出,以查看生成的 SQL 是否符合预期。
sql_query = write_query({"question": "你能获取每位导演的总节目数,并按总节目数降序排序吗?"})
print(sql_query)
输出是一个有效的 SQL 语句,似乎能够有效利用列。
{'query': "SELECT director, COUNT(*) as total_shows \\nFROM read_csv_auto('data/netflix_titles.csv') \\nWHERE director IS NOT NULL\\nGROUP BY director \\nORDER BY total_shows DESC \\nLIMIT 10;"}
第2步. 构建 execute_query 函数
现在我们有了 SQL 语句,我们应该能够通过 DuckDB 运行它进行测试。LangChain 提供的 duckDB loader 便于与 LangChain 的文档加载器接口集成。
状态对象的输出可以被检索并传递给 DuckDBLoader,它将执行从 LLM 生成的查询并运行 DuckDB,继续执行第1步中的 SQL 查询。
由于我们完全依赖于 LLM 生成的 SQL,这一步容易出现各种错误。为了防止数据丢失或在未通知的情况下意外操作,我们要么避免进行任何 DDL 类型的问题,要么加入人工审核流程。此外,我们可以在数据库端实施一些权限控制,以防止意外执行特定的 SQL 查询。
from langchain_community.document_loaders import DuckDBLoader
def execute_query(state: State):
"""Execute SQL query."""
data = DuckDBLoader(state["query"]).load()
return {'result': data}
在 DuckDB 中执行的查询的输出如下所示。
{'result': [
Document(metadata={}, page_content='director: Rajiv Chilaka\\ntotal_shows: 19'),
Document(metadata={}, page_content='director: Raúl Campos, Jan Suter\\ntotal_shows: 18'),
Document(metadata={}, page_content='director: Suhas Kadav\\ntotal_shows: 16')
]}
第 3 步. 构建 generate_answer 函数
这个过程的最后一步是利用 SQL 结果生成一个更易于人类理解的段落。如果您能够直接解析 SQL 结果,这一步可能不是必需的。然而,利用 LLM 来总结输出可能是一个非常有益的步骤。
generate_answer 步骤是一个简单的过程,涉及将所有信息结合起来,并允许 LLM 总结结果。
def generate_answer(state: State):
"""使用检索到的信息作为上下文回答问题。"""
prompt = (
"给定以下用户问题、相应的 SQL query 和 SQL result,回答用户问题。\\n\\n"
f'问题: {state["question"]}\\n'
f'SQL Query: {state["query"]}\\n'
f'SQL Result: {state["result"]}'
)
response = llm.invoke(prompt)
return {"answer": response.content}
这是 generate_answer 步骤的输出,LLM 还提供了换行以便于理解为段落。
{'answer':
'每位导演的总节目数,按总节目数降序排列的前 3 位导演如下:\\n\\n
1. Rajiv Chilaka - 19 个节目\\n
2. Raúl Campos, Jan Suter - 18 个节目\\n
3. Suhas Kadav - 16 个节目'
}
合并所有步骤
最终,我们可以利用LangChain构建一个图,从而无缝地自动化工作流管理和状态控制。
from langgraph.graph import START, StateGraph
graph_builder = StateGraph(State).add_sequence(
[write_query, execute_query, generate_answer]
)
graph_builder.add_edge(START, "write_query")
graph = graph_builder.compile()
使用 LangChain Smith 监控请求
为了更深入地了解整体请求生命周期,我们可以利用 LangChain Smith 获取更详细的信息。该工具有效地为用户提供了对我们 AI 代理所做请求的可视化,防止我们将 LLM 视为单纯的黑箱。
这是 LangChain Smith 提供的 LangGraph。我们可以获得关于图中每一步的见解,以及与每一步相关的延迟、输入和输出。
LangChain Smith 的示例查询
附加示例
让我们利用我们的 AI 代理对 Netflix 数据集提出额外的询问,以确定它是否能够生成准确的 SQL 并提供正确的答案。
Q1: 你能获取以字母 D 开头的节目数量吗?
该查询准确识别了适当的列标题,并正确计算了节目的数量。
for step in graph.stream(
{"question": "Can you get the number of shows that start with letter D?"}, stream_mode="updates"
):
print(step)
Output is
{'write_query': {'query': "SELECT COUNT(*) FROM read_csv_auto('data/netflix_titles.csv') WHERE title LIKE 'D%';"}}
{'execute_query': {'result': [Document(metadata={}, page_content='count_star(): 375')]}}
{'generate_answer': {'answer': 'There are 375 shows that start with the letter D.'}}
Q2: 你能获取导演 Rajiv Chilaka 制作的每个节目的发行年份之间的年数,并按发行年份排序吗?
这一点让我感到惊讶。当我们利用窗口函数制定越来越复杂的 SQL 时,LLM 能够有效理解这个询问。
for step in graph.stream(
{"question": "Can you get the how many years between each show director Rajiv Chilaka produced, sort by release years?"}, stream_mode="updates"
):
print(step)
Output is
{'write_query': {'query': "SELECT title, release_year, release_year - LAG(release_year) OVER (ORDER BY release_year) AS years_between_releases\\nFROM read_csv_auto('data/netflix_titles.csv')\\nWHERE director = 'Rajiv Chilaka'\\nORDER BY release_year\\nLIMIT 10;"}}
{'execute_query': {'result': [Document(metadata={}, page_content='title: Chhota Bheem & Ganesh\\nrelease_year: 2009\\nyears_between_releases: None'), Document(metadata={}, page_content='title: Chhota Bheem aur Krishna\\nrelease_year: 2009\\nyears_between_releases: 0'), Document(metadata={}, page_content='title: Chhota Bheem & Krishna: Pataliputra- City of the Dead\\nrelease_year: 2010\\nyears_between_releases: 1'), Document(metadata={}, page_content='title: Chhota Bheem: Bheem vs Aliens\\nrelease_year: 2010\\nyears_between_releases: 0'), Document(metadata={}, page_content='title: Chhota Bheem & Krishna: Mayanagari\\nrelease_year: 2011\\nyears_between_releases: 1'), Document(metadata={}, page_content='title: Chhota Bheem: Journey to Petra\\nrelease_year: 2011\\nyears_between_releases: 0'), Document(metadata={}, page_content='title: Chhota Bheem: Master of Shaolin\\nrelease_year: 2011\\nyears_between_releases: 0'), Document(metadata={}, page_content='title: Chhota Bheem Aur Hanuman\\nrelease_year: 2012\\nyears_between_releases: 1'), Document(metadata={}, page_content='title: Chhota Bheem: Dholakpur to Kathmandu\\nrelease_year: 2012\\nyears_between_releases: 0'), Document(metadata={}, page_content='title: Chhota Bheem: The Rise of Kirmada\\nrelease_year: 2012\\nyears_between_releases: 0')]}}
{'generate_answer': {'answer': 'Based on the SQL result, here are the number of years between each show that director Rajiv Chilaka produced, sorted by release years:\\n\\n1. "Chhota Bheem & Ganesh" - Released in 2009 (No previous release to compare, so the difference is None)\\n2. "Chhota Bheem aur Krishna" - Released in 2009 (0 years since the previous release)\\n3. "Chhota Bheem & Krishna: Pataliputra- City of the Dead" - Released in 2010 (1 year since the previous release)\\n4. "Chhota Bheem: Bheem vs Aliens" - Released in 2010 (0 years since the previous release)\\n5. "Chhota Bheem & Krishna: Mayanagari" - Released in 2011 (1 year since the previous release)\\n6. "Chhota Bheem: Journey to Petra" - Released in 2011 (0 years since the previous release)\\n7. "Chhota Bheem: Master of Shaolin" - Released in 2011 (0 years since the previous release)\\n8. "Chhota Bheem Aur Hanuman" - Released in 2012 (1 year since the previous release)\\n9. "Chhota Bheem: Dholakpur to Kathmandu" - Released in 2012 (0 years since the previous release)\\n10. "Chhota Bheem: The Rise of Kirmada" - Released in 2012 (0 years since the previous release)'}}
最终思考
利用上述示例,我们可以制定复杂的查询,以有效地从 LLM 中引出响应并从 DuckDB 数据库中检索请求。由于 LLM 技术的进步,进行数据分析不再需要具备 SQL 背景。这可能引发对数据分析师角色的进一步疑问:LLM 是否会生成 SQL,从而增强他们的能力或危及他们的职位?
在本文中,我们观察到 AI 代理能够毫无问题地完成特定任务。尽管仍有一些领域需要人类与 AI 代理互动,但人类所需的时间比例已显著减少。我预期到 2025 年,将会增加构建 AI 代理以解决特定领域相关问题的案例。
要访问构建 AI 代理以生成 SQL 的完整代码,这里有一个笔记本来展示所有代码:https://github.com/ChengzhiZhao/AIAgentExamples/tree/main/SQLGenerationAgent