转变您的数据分析:在 5 个步骤中使用 Crew AI 多代理系统构建交互式可视化应用程序
import os
import re
import sqlite3
import warnings
import dotenv
import streamlit as st
import pandas as pd
from src.logo_title import logo_title
### Crew AI + Tools imports
from crewai import Agent, Crew, Process, Task
from crewai.tools import tool
from langchain_openai import ChatOpenAI
### SQL Tools from langchain_community
from langchain_community.tools.sql_database.tool import (
InfoSQLDatabaseTool,
ListSQLDatabaseTool,
QuerySQLCheckerTool,
QuerySQLDataBaseTool,
)
from langchain_community.utilities.sql_database import SQLDatabase
warnings.filterwarnings("ignore", category=UserWarning, module="pydantic")
### Load Environment Variables
dotenv.load_dotenv()
openai_api_key = os.getenv("OPENAI_API_KEY", "")
说明:
- 环境变量: 我们使用
dotenv
加载敏感的 API 密钥。 - Crew AI 和 LangChain: 这些库为我们的代理提供了 GPT-4 的支持,从而实现了自然语言处理,用于 SQL 生成、分析和可视化。
2. 使用正则表达式提取代码块
我们创建一个辅助函数,用于从用三个反引号包裹的文本中提取 Python 代码。当 AI 代理返回代码片段时,这很有用。
### Regex to Extract Code Snippet in triple backticks
flexible_pattern = re.compile(
r"```python\s*(.*?)(```|$)", # match until '```' or end-of-string
re.DOTALL | re.IGNORECASE
)
def extract_code_block(raw_text: str) -> str:
"""
Finds the substring after '```python' up to either the next triple backticks or end of string.
Returns that as the code snippet. If not found, returns empty string.
"""
match = flexible_pattern.search(raw_text)
if match:
code_part = match.group(1)
# Remove leftover triple backticks just in case
code_part = code_part.replace("```", "").strip()
return code_part
return ""
说明:
- 正则表达式模式: 匹配用三个反引号括起来的代码块。
- 辅助函数: 清理并提取代码,以便稍后执行。
3. 创建数据可视化代理
我们定义一个 AI 代理,专门根据用户提示生成 Plotly 代码。期望该代码创建表示可视化的 fig
对象。
### Data Visualization Agent
data_viz_agent = Agent(
role="Data Visualization Agent",
goal=(
"Generate Python code using Plotly to visualize data based on user queries. "
"Your code must be wrapped in triple backticks: ```python ... ``` and produce a 'fig' object."
),
backstory=(
"You are an expert data scientist. You have a local CSV file (already prepared). "
"Use Plotly to create a figure (e.g. fig = px.bar(...)). "
"Do not show or save the figure. Just produce the code snippet in triple backticks."
),
tools=[], # no extra tools needed for code generation
llm=ChatOpenAI(
model_name="gpt-4",
temperature=0.0,
),
verbose=True,
)
data_viz_task = Task(
description="",
expected_output="A snippet of valid Plotly code in triple backticks that creates a 'fig' object.",
agent=data_viz_agent,
)
viz_crew = Crew(
agents=[data_viz_agent],
tasks=[data_viz_task],
)
说明:
- 代理定义: 数据可视化代理的任务是生成 Plotly 代码。
- 任务和 Crew: 我们将代理打包成一个任务和一个 crew,以便稍后可以在应用程序工作流程中调用它。
4. 构建 SQL 和分析代理
接下来,我们设置多个代理和工具,用于 SQL 查询生成、检查和数据分析。
### SQL / Analysis Agents and Tools
@tool("list_tables")
def list_tables_tool() -> str:
"""List the available tables in the DB."""
return ListSQLDatabaseTool(db=db).invoke("")
@tool("tables_schema")
def tables_schema_tool(tables: str) -> str:
"""Show schema & sample rows for the given tables (comma-separated)."""
return InfoSQLDatabaseTool(db=db).invoke(tables)
@tool("execute_sql")
def execute_sql_tool(sql_query: str) -> str:
"""Execute a SQL query against the DB. Returns the result as a string."""
return QuerySQLDataBaseTool(db=db).invoke(sql_query)
@tool("check_sql")
def check_sql_tool(sql_query: str) -> str:
"""Check if the SQL query is correct. Returns suggestions/fixes or success message."""
try:
llm_checker = ChatOpenAI(model_name="gpt-4", temperature=0)
query_checker_tool = QuerySQLCheckerTool(db=db, llm=llm_checker)
return query_checker_tool.invoke({"query": sql_query})
except Exception as e:
return f"Error using QuerySQLCheckerTool: {str(e)}"
带有数据样本的 SQL Developer Agent
sample_rows = df.head(5).to_csv(index=False) # (Assuming df is loaded later)
sql_dev = Agent(
role="SQL Developer",
goal="Construct and execute SQL queries based on user requests",
backstory=(f"""
You are an experienced database engineer who is master at creating efficient and complex SQL queries.
You have a deep understanding of how different databases work and how to optimize queries.
Here is a sample of the dataset for reference:
{sample_rows}
Use the 'list_tables' to find available tables.
Use the 'tables_schema' to understand the metadata for the 'list_tables'.
Use the 'execute_sql' to execute queries against the database.
Use the 'check_sql' to check your queries for correctness.
"""),
llm=ChatOpenAI(model_name="gpt-4o-mini", temperature=0),
tools=[list_tables_tool, tables_schema_tool, execute_sql_tool, check_sql_tool],
allow_delegation=False,
verbose=True,
)
data_analyst = Agent(
role="Senior Data Analyst",
goal="Analyze the data from the SQL developer and provide insights",
backstory="You analyze datasets using Python and produce clear, concise insights.",
llm=ChatOpenAI(model_name="gpt-4o-mini", temperature=0),
allow_delegation=False,
verbose=True,
)
report_writer = Agent(
role="Report Writer",
goal="Summarize the analysis into a short, executive-level report",
backstory="You create concise reports highlighting the most important findings.",
llm=ChatOpenAI(model_name="gpt-4o-mini", temperature=0),
allow_delegation=False,
verbose=True,
)
### Define tasks for the SQL pipeline
extract_data = Task(
description="User Query: {query}\nWrite any needed SQL, run it, return the result.",
expected_output="Raw data from the SQL query as text or structured data.",
agent=sql_dev,
)
analyze_data = Task(
description="Analyze the data from the SQL Developer. Provide a detailed explanation for {query}.",
expected_output="Detailed analysis text",
agent=data_analyst,
context=[extract_data],
)
write_report = Task(
description="Write an executive summary of the report from the analysis.",
expected_output="Short bullet-point or paragraph summarizing the analysis.",
agent=report_writer,
context=[analyze_data],
)
main_crew = Crew(
agents=[sql_dev, data_analyst, report_writer],
tasks=[extract_data, analyze_data, write_report],
process=Process.sequential,
verbose=True,
output_log_file="crew.log"
)
说明:
- SQL 工具: 我们定义了列出表、显示模式信息、执行 SQL 和检查查询正确性的函数。
- 用于 SQL 任务的 Agent: SQL Developer Agent 构建查询,数据分析师 Agent 解释结果,报告撰写者 Agent 总结发现。
- Crew 和任务: 这些 Agent 通过任务进行协调,形成一个顺序管道。
5. 设置 Streamlit 应用程序
现在,我们将所有组件集成到一个 Streamlit 应用程序中。本节处理文件上传、显示数据预览和初始化 SQLite 数据库。
st.set_page_config(layout="wide")
logo_title("Data Analyst Agent App")
### Upload CSV file for both data analysis and Plotly visualization
uploaded_file = st.file_uploader("Upload a CSV (for both data analysis and Plotly visualization)", type="csv")
if not uploaded_file:
st.info("Please upload a CSV file to begin.")
st.stop()
### Attempt to read CSV
try:
df = pd.read_csv(uploaded_file)
except Exception as e:
st.error(f"Error reading uploaded CSV: {e}")
st.stop()
### Show data preview
st.subheader("Preview of Uploaded Data")
if st.checkbox("Show Full Data"):
st.dataframe(df)
else:
st.dataframe(df.head())
### Capitalize columns for consistency
df.columns = [col.capitalize() for col in df.columns]
### Generate a sample of the first 5 rows for reference
sample_rows = df.head(5).to_csv(index=False)
### Create (or re-create) local SQLite DB from the uploaded CSV
db_file = "temp_db.sqlite"
def init_database():
if os.path.isfile(db_file):
os.remove(db_file)
conn = sqlite3.connect(db_file)
table_name = "data_table"
df.to_sql(name=table_name, con=conn, if_exists="replace", index=False)
conn.close()
if "db_initialized" not in st.session_state:
st.session_state["db_initialized"] = False
if not st.session_state["db_initialized"]:
init_database()
st.session_state["db_initialized"] = True
st.success("Database created from uploaded CSV.")
### Build the SQLDatabase object
database_uri = f"sqlite:///{db_file}"
db = SQLDatabase.from_uri(database_uri)
说明:
- 文件上传: 用户上传一个 CSV 文件,该文件被读入 Pandas DataFrame 中。
- 数据预览: 应用程序显示前几行或完整的数据集。
- 数据库初始化: CSV 数据被加载到本地 SQLite 数据库中,允许我们的 SQL Agent 查询它。
6. 运行多 Agent SQL 查询管道
用户现在可以询问有关他们数据的问题。 SQL Developer Agent 根据用户输入构建和执行查询,然后管道提供详细的分析和执行摘要报告。
st.subheader("Ask a Question about the Data")
user_query = st.text_input("Example: 'Show the average score by subject'", "")
if st.button("Generate Report"):
if not user_query.strip():
st.warning("Please enter a query.")
st.stop()
## Kick off the multi-agent pipeline
inputs = {"query": user_query}
with st.spinner("Running the multi-agent pipeline..."):
result = main_crew.kickoff(inputs=inputs)
st.success("Done! See below for the final result.")
## Show each Task's raw output
st.header("Detailed Report & Analysis")
for i, task_output in enumerate(result.tasks_output):
agent_raw_text = task_output.raw or "No raw output found."
st.write(agent_raw_text)
st.write("---")
说明:
- 用户查询: 输入关于数据的自然语言问题。
- 管道执行: 多 Agent 系统按顺序运行:
- SQL Developer Agent 生成并执行查询。
- 数据分析师 Agent 解释结果。
- 报告撰写者 Agent 总结分析。
- 显示: 为了透明度和洞察力,显示每个 Agent 的输出。
7. 生成交互式数据可视化
最后一个功能允许您创建交互式可视化。数据可视化 Agent 生成 Plotly 代码,该代码被执行以渲染动态图表。
st.subheader("Visualize the Data")
viz_prompt = st.text_area(
"Write your instructions. Example:\n'Please create a bar chart of average Score by Subject using the data'"
)
if st.button("Generate Plot"):
if not viz_prompt.strip():
st.warning("Please enter a visualization prompt.")
st.stop()
### 将 DataFrame 保存为临时 CSV 文件供 Agent 参考
```python
csv_path = "temp.csv"
df.to_csv(csv_path, index=False)
agent_prompt = f"""
我们有一个名为 'temp.csv' 的 CSV 文件,包含以下列: {', '.join(df.columns)}.
以下是前几行:
{df.head(5).to_csv(index=False)}
用户想要一个 Plotly 图表: "{viz_prompt}"
用三个反引号生成代码。
"""
data_viz_task.description = agent_prompt
with st.spinner("正在询问数据可视化 Agent 获取代码..."):
viz_result = viz_crew.kickoff()
agent_response_text = viz_result.raw or ""
generated_code = extract_code_block(agent_response_text)
if not generated_code:
st.error("在 Agent 的回复中未找到 Plotly 代码!")
st.stop()
## Remove any calls that would immediately display the figure
safe_code = generated_code.replace("fig.show()", "")
env = {}
try:
exec(safe_code, env)
if "fig" in env:
fig = env["fig"]
fig.update_layout(title="Agent 生成的可视化图表")
st.plotly_chart(fig, use_container_width=True)
else:
st.warning("在代码中未找到 'fig' 对象。")
except Exception as e:
st.error(f"执行生成的代码时出错:\n{e}")
说明:
- 用户可视化提示: 输入您想要的图表类型(例如,条形图、折线图)的指令。
- Agent 提示: 数据可视化 Agent 会获得 CSV 文件详细信息和示例行。
- 代码生成和执行: Agent 返回用三个反引号包裹的 Plotly 代码。该代码被提取、清理并执行以渲染交互式图表。
结论
这个交互式应用程序将 AI 驱动的 Agent、SQL 查询和动态可视化结合成一个无缝的体验。我们实现了以下目标:
- 数据上传和预览: 轻松导入 CSV 数据并可视化预览。
- SQL 查询和分析: 使用自然语言生成和执行 SQL 查询,然后进行详细的分析和汇总报告。
- 交互式可视化: 以最少的努力生成自定义的 Plotly 可视化图表。
无论您是希望简化工作流程的数据科学家,还是希望从复杂数据集中获得清晰见解的业务分析师,此应用程序都能让您以引人入胜的交互方式探索您的数据。