
构建ai驱动的金融分析应用:利用qdrant和多代理架构实现数据到决策的转变
- Rifx.Online
- Large Language Models , AI Applications , Case Studies
- 05 Mar, 2025
在这篇博客中,我们深入探讨了一个AI驱动的财务分析应用的架构和实现,该应用由Streamlit、Qdrant和LlamaIndex等尖端技术驱动。
该应用利用多个ReAct代理的能力,如OpenAI、Ollama和Anthropic代理,来无缝处理公司季度财务数据。最后,用户可以看到3个代理的输出,从而在三者中做出更好的决策或结合三者的结果。
由作者M K Pavan Kumar创作
在这次生成式AI转型中,企业正在利用先进的基于代理的系统从复杂的数据集中提取洞察。该应用展示了一种创新的方法,通过多代理架构处理和分析公司财务数据。由OpenAI、Ollama和Anthropic代理驱动,该系统旨在与Qdrant这一强大的向量数据库无缝互动,并通过Streamlit界面提供可操作的洞察。
该架构整合了尖端工具和技术,以高效地摄取、索引和查询财务数据。通过采用多个专业代理,该系统确保了可扩展性、模块化和精确的信息检索,使其成为动态财务分析场景的理想解决方案。
架构说明
该架构围绕三个关键组件构建:
-
Streamlit界面:用户通过用户友好的Streamlit界面与系统交互。查询从前端发送到后端,由代理进行处理。
-
代理层:实现了三个专门的代理,每个代理由不同的大语言模型(OpenAI、Ollama和Anthropic)驱动:
- OpenAI代理:处理结构化查询,并对函数调用数量进行限制以优化性能。
- Ollama代理(ReAct):专注于上下文和动态查询处理。
- Anthropic代理(ReAct):提供强大的查询理解和响应生成。
-
Qdrant向量数据库:作为索引财务文档的中央存储。三月、六月和九月的财务数据通过自定义的Python脚本被摄取并索引到Qdrant中。
数据准备与索引:来自多个季度的数据被摄取并索引到Qdrant。如果集合不存在,则从文档创建新的索引。否则,重用现有索引。这确保了高效的数据处理和冗余避免。
查询引擎设置:每个索引被转换为查询引擎,使用OpenAI的大语言模型进行基于相似性的搜索。这些引擎被配置为检索最相关的前三个结果。
工具创建与集成:为每个财务季度创建查询工具,提供详细描述以确保用户查询的准确上下文。然后将这些工具集成到各自的代理中,能够实现精确的信息检索和响应生成。
分层架构允许对财务数据进行模块化、可扩展和高效的分析,以最小的延迟提供可操作的洞察。
实现:
项目结构如下所示
.
├── agent_query_planning.iml
├── .env
├── data
│ ├── 10k
│ │ ├── lyft_2021.pdf
│ │ └── uber_2021.pdf
│ └── 10q
│ ├── uber_10q_june_2022.pdf
│ ├── uber_10q_march_2022.pdf
│ └── uber_10q_sept_2022.pdf
├── financial_data_analysing_agent.py
└── requirements.txt
environment
(.env) 和 requirements.txt
文件如下
OPENAI_API_KEY=sk-proj-
ANTHROPIC_API_KEY=sk-ant-
LANGFUSE_SECRET_KEY=sk-lf-
LANGFUSE_PUBLIC_KEY=pk-lf-
LANGFUSE_HOST=http://localhost:3000
openai
python-dotenv
qdrant-client
llama-index
llama-index-vector-stores-qdrant
llama-index-embeddings-ollama
llama-index-embeddings-openai
llama-index-llms-anthropic
llama-index-llms-ollama
llama-index-llms-openai
llama-index-agent-openai
streamlit
langfuse
步骤 1:现在让我们首先初始化模型和一些全局设置。
def initialize_models():
Settings.chunk_size = 512
Settings.chunk_overlap = 20
openai_llm = OpenAI(model="gpt-4")
ollama_llm = Ollama(model="llama3.2:latest", temperature=0.2, base_url="http://localhost:11434/")
anthropic_llm = Anthropic(model="claude-3-5-sonnet-20240620")
return openai_llm, ollama_llm, anthropic_llm
步骤 2:让我们从多个 PDF 文件中加载数据,如下所示
def load_documents():
march_data = SimpleDirectoryReader(input_files=['./data/10q/uber_10q_march_2022.pdf']).load_data(show_progress=True)
june_data = SimpleDirectoryReader(input_files=['./data/10q/uber_10q_june_2022.pdf']).load_data(show_progress=True)
sept_data = SimpleDirectoryReader(input_files=['./data/10q/uber_10q_sept_2022.pdf']).load_data(show_progress=True)
return march_data, june_data, sept_data
步骤 3:让我们设置向量存储。
def setup_vector_store():
qdrant_store_client = QdrantClient(url="http://localhost:6333/", api_key="th3s3cr3tk3y")
vector_store = QdrantVectorStore(collection_name="financial_data", client=qdrant_store_client)
storage_ctx = StorageContext.from_defaults(vector_store=vector_store)
return qdrant_store_client, vector_store, storage_ctx
步骤 4:让我们创建索引和查询引擎,如下所示。
def create_indices(qdrant_store_client, vector_store, storage_ctx, march_data, june_data, sept_data):
if not qdrant_store_client.collection_exists(collection_name="financial_data"):
march_index = VectorStoreIndex.from_documents(documents=march_data, storage_context=storage_ctx)
june_index = VectorStoreIndex.from_documents(documents=june_data, storage_context=storage_ctx)
sept_index = VectorStoreIndex.from_documents(documents=sept_data, storage_context=storage_ctx)
else:
march_index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
june_index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
sept_index = VectorStoreIndex.from_vector_store(vector_store=vector_store)
return march_index, june_index, sept_index
def setup_query_engines(march_index, june_index, sept_index, openai_llm):
march_engine = march_index.as_query_engine(similarity_top_k=3, llm=openai_llm)
june_engine = june_index.as_query_engine(similarity_top_k=3, llm=openai_llm)
sept_engine = sept_index.as_query_engine(similarity_top_k=3, llm=openai_llm)
return march_engine, june_engine, sept_engine
步骤 5:让我们创建工具以实现 RAG
def create_query_tools(march_engine, june_engine, sept_engine):
query_tools = [
QueryEngineTool.from_defaults(
query_engine=sept_engine,
name="sept_2022",
description="提供有关截至2022年9月的Uber季度财务信息"
),
QueryEngineTool.from_defaults(
query_engine=june_engine,
name="june_2022",
description="提供有关截至2022年6月的Uber季度财务信息"
),
QueryEngineTool.from_defaults(
query_engine=march_engine,
name="march_2022",
description="提供有关截至2022年3月的Uber季度财务信息"
)
]
return query_tools
步骤 6:让我们创建代理
def setup_agents(query_tools, openai_llm, ollama_llm, anthropic_llm):
response_synthesizer = get_response_synthesizer()
query_plan_tool = QueryPlanTool.from_defaults(
query_engine_tools=query_tools,
response_synthesizer=response_synthesizer
)
openai_agent = OpenAIAgent.from_tools(
[query_plan_tool],
max_function_calls=10,
llm=openai_llm,
verbose=True,
)
ollama_agent = ReActAgent.from_tools(
query_tools,
llm=ollama_llm,
verbose=True
)
anthropic_agent = ReActAgent.from_tools(
query_tools,
llm=anthropic_llm,
verbose=True
)
return openai_agent, ollama_agent, anthropic_agent
步骤 7:最后,让我们创建 streamlit 用户界面
def main():
st.set_page_config(layout="wide")
st.title("代理财务 RAG - 模型比较")
初始化会话状态
if 'agents_initialized' not in st.session_state:
with st.spinner("正在初始化模型并加载文档..."):
openai_llm, ollama_llm, anthropic_llm = initialize_models()
march_data, june_data, sept_data = load_documents()
qdrant_store_client, vector_store, storage_ctx = setup_vector_store()
march_index, june_index, sept_index = create_indices(
qdrant_store_client, vector_store, storage_ctx,
march_data, june_data, sept_data
)
march_engine, june_engine, sept_engine = setup_query_engines(
march_index, june_index, sept_index, openai_llm
)
query_tools = create_query_tools(march_engine, june_engine, sept_engine)
openai_agent, ollama_agent, anthropic_agent = setup_agents(query_tools, openai_llm, ollama_llm,
anthropic_llm)
st.session_state.openai_agent = openai_agent
st.session_state.ollama_agent = ollama_agent
st.session_state.anthropic_agent = ollama_agent
st.session_state.agents_initialized = True
查询输入
## 分析Uber在三月、六月和九月的收入增长
query = st.text_input("输入您的查询:", "")
if st.button(“分析”): col1, col2, col3 = st.columns(3)
with col1:
st.subheader("OpenAI响应(GPT-4o)")
with st.spinner("获取OpenAI响应中..."):
openai_response = st.session_state.openai_agent.query(query)
st.write(openai_response.response)
with col2:
st.subheader("Ollama响应(Qwen2.5)")
with st.spinner("获取Ollama响应中..."):
ollama_response = st.session_state.ollama_agent.query(query)
st.write(ollama_response.response)
with col3:
st.subheader("Anthropic响应(claude-3-5-sonnet)")
with st.spinner("获取Anthropic Claude响应中..."):
anthropic_response = st.session_state.anthropic_agent.query(query)
st.write(anthropic_response.response)
监控LLM应用或代理应用在生成式AI世界中至关重要,在此应用中,我们集成了“Langfuse”以实现此目的。要启动自托管版本的Langfuse,请运行以下docker compose。
services:
langfuse-worker:
image: langfuse/langfuse-worker:3
restart: always
depends_on: &langfuse-depends-on
minio:
condition: service_healthy
redis:
condition: service_healthy
clickhouse:
condition: service_healthy
ports:
- "3030:3030"
environment: &langfuse-worker-env
DATABASE_URL: postgresql://root:[email protected]:5432/langfuse
SALT: "mysalt"
ENCRYPTION_KEY: "0000000000000000000000000000000000000000000000000000000000000000"
TELEMETRY_ENABLED: ${TELEMETRY_ENABLED:-true}
LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES: ${LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES:-true}
CLICKHOUSE_MIGRATION_URL: ${CLICKHOUSE_MIGRATION_URL:-clickhouse://clickhouse:9000}
CLICKHOUSE_URL: ${CLICKHOUSE_URL:-http://clickhouse:8123}
CLICKHOUSE_USER: ${CLICKHOUSE_USER:-clickhouse}
CLICKHOUSE_PASSWORD: ${CLICKHOUSE_PASSWORD:-clickhouse}
CLICKHOUSE_CLUSTER_ENABLED: ${CLICKHOUSE_CLUSTER_ENABLED:-false}
LANGFUSE_S3_EVENT_UPLOAD_BUCKET: ${LANGFUSE_S3_EVENT_UPLOAD_BUCKET:-langfuse}
LANGFUSE_S3_EVENT_UPLOAD_REGION: ${LANGFUSE_S3_EVENT_UPLOAD_REGION:-auto}
LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID: ${LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID:-minio}
LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY: ${LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY:-miniosecret}
LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT: ${LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT:-http://minio:9000}
LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE: ${LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE:-true}
LANGFUSE_S3_EVENT_UPLOAD_PREFIX: ${LANGFUSE_S3_EVENT_UPLOAD_PREFIX:-events/}
LANGFUSE_S3_MEDIA_UPLOAD_BUCKET: ${LANGFUSE_S3_MEDIA_UPLOAD_BUCKET:-langfuse}
LANGFUSE_S3_MEDIA_UPLOAD_REGION: ${LANGFUSE_S3_MEDIA_UPLOAD_REGION:-auto}
LANGFUSE_S3_MEDIA_UPLOAD_ACCESS_KEY_ID: ${LANGFUSE_S3_MEDIA_UPLOAD_ACCESS_KEY_ID:-minio}
LANGFUSE_S3_MEDIA_UPLOAD_SECRET_ACCESS_KEY: ${LANGFUSE_S3_MEDIA_UPLOAD_SECRET_ACCESS_KEY:-miniosecret}
LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT: ${LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT:-http://minio:9000}
LANGFUSE_S3_MEDIA_UPLOAD_FORCE_PATH_STYLE: ${LANGFUSE_S3_MEDIA_UPLOAD_FORCE_PATH_STYLE:-true}
LANGFUSE_S3_MEDIA_UPLOAD_PREFIX: ${LANGFUSE_S3_MEDIA_UPLOAD_PREFIX:-media/}
REDIS_HOST: ${REDIS_HOST:-redis}
REDIS_PORT: ${REDIS_PORT:-6379}
REDIS_AUTH: ${REDIS_AUTH:-myredissecret}
langfuse-web:
image: langfuse/langfuse:3
restart: always
depends_on: *langfuse-depends-on
ports:
- "3000:3000"
environment:
<<: *langfuse-worker-env
NEXTAUTH_URL: http://localhost:3000
NEXTAUTH_SECRET: mysecret
LANGFUSE_INIT_ORG_ID: ${LANGFUSE_INIT_ORG_ID:-}
LANGFUSE_INIT_ORG_NAME: ${LANGFUSE_INIT_ORG_NAME:-}
LANGFUSE_INIT_PROJECT_ID: ${LANGFUSE_INIT_PROJECT_ID:-}
LANGFUSE_INIT_PROJECT_NAME: ${LANGFUSE_INIT_PROJECT_NAME:-}
LANGFUSE_INIT_PROJECT_PUBLIC_KEY: ${LANGFUSE_INIT_PROJECT_PUBLIC_KEY:-}
LANGFUSE_INIT_PROJECT_SECRET_KEY: ${LANGFUSE_INIT_PROJECT_SECRET_KEY:-}
LANGFUSE_INIT_USER_EMAIL: ${LANGFUSE_INIT_USER_EMAIL:-}
LANGFUSE_INIT_USER_NAME: ${LANGFUSE_INIT_USER_NAME:-}
LANGFUSE_INIT_USER_PASSWORD: ${LANGFUSE_INIT_USER_PASSWORD:-}
LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED: ${LANGFUSE_SDK_CI_SYNC_PROCESSING_ENABLED:-false}
LANGFUSE_READ_FROM_POSTGRES_ONLY: ${LANGFUSE_READ_FROM_POSTGRES_ONLY:-false}
LANGFUSE_READ_FROM_CLICKHOUSE_ONLY: ${LANGFUSE_READ_FROM_CLICKHOUSE_ONLY:-true}
LANGFUSE_RETURN_FROM_CLICKHOUSE: ${LANGFUSE_RETURN_FROM_CLICKHOUSE:-true}
clickhouse:
image: clickhouse/clickhouse-server
restart: always
user: "101:101"
container_name: clickhouse
hostname: clickhouse
environment:
CLICKHOUSE_DB: default
CLICKHOUSE_USER: clickhouse
CLICKHOUSE_PASSWORD: clickhouse
volumes:
- langfuse_clickhouse_data:/var/lib/clickhouse
- langfuse_clickhouse_logs:/var/log/clickhouse-server
ports:
- "8123:8123"
- "9000:9000"
healthcheck:
test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1
interval: 5s
timeout: 5s
retries: 10
start_period: 1s
minio:
image: minio/minio
restart: always
container_name: minio
entrypoint: sh
command: -c 'mkdir -p /data/langfuse && minio server --address ":9000" --console-address ":9001" /data'
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: miniosecret
ports:
- "9090:9000"
- "9091:9001"
volumes:
- langfuse_minio_data:/data
healthcheck:
test: ["CMD", "mc", "ready", "local"]
interval: 1s
timeout: 5s
retries: 5
start_period: 1s
redis:
image: redis:7
restart: always
command: >
--requirepass ${REDIS_AUTH:-myredissecret}
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 3s
timeout: 10s
retries: 10
volumes:
langfuse_clickhouse_data:
driver: local
langfuse_clickhouse_logs:
driver: local
langfuse_minio_data:
driver: local
执行:
以下是应用程序和可观察性仪表板的输出。
比较3个大语言模型的分析,以便做出更好的决策。
结论:
这个多代理系统展示了先进的大语言模型、向量数据库和用户友好界面的无缝集成,以分析和提取财务数据集中的洞察。通过OpenAI、Ollama和Anthropic代理的和谐协作,以及Qdrant强大的索引能力,该系统提供了一种强大且可扩展的财务数据探索解决方案。此外,与Langfuse的集成增加了一层可观察性,为所有参与的大语言模型提供实时监控和性能洞察。这确保了透明性、效率和增强的操作控制。这种架构突显了结合尖端生成式AI技术以简化复杂数据分析工作流程的变革潜力。