解锁研究潜力:用 OCI AI Agents 在 5 个简单步骤中创建智能研究助手
在当今快速发展的技术领域,人工智能已成为我们进行研究和分析信息的不可或缺的一部分。这个简单的解决方案可以帮助您设置一个研究助手聊天机器人,它将帮助您与您的文档进行对话。
项目概述
Oracle Cloud Infrastructure 的 AI Agents 提供了强大、可扩展且安全的 AI 功能,可以改变企业进行研究和分析信息的方式。我们的应用程序通过一个流畅、用户友好的界面(使用 Streamlit 构建)利用这些功能,使每个人都可以使用先进的 AI 交互。
先决条件
- 创建 OCI AI Agent 端点。
- 创建带有“所有对象”选项的数据源。
- 在与 OCI AI Agent 相同的区间中创建 Bucket。
- 用于授权的 IAM 策略 — https://docs.oracle.com/en-us/iaas/Content/generative-ai-agents/iam-policies.htm
主要特点
- 无缝身份验证:应用程序与 OCI 的身份验证系统无缝集成,使用您现有的 OCI CLI 配置凭据进行安全访问。
- 交互式聊天界面:该应用程序使用 Streamlit 构建,提供响应迅速且直观的聊天体验,感觉自然且引人入胜。
- 会话持久性:您的聊天记录将在整个会话中保留,允许上下文对话并轻松参考以前的交互。
- 企业就绪:该应用程序具有内置的错误处理和用户反馈机制,专为可靠性和专业用途而设计。
- 数据隐私:所有文件都安全地存储在 OCI 对象存储 Bucket 中。
限制
- 目前 OCI AI Agents 仅支持 TXT 和 PDF 格式的文件。
- 要摄取的文件的最大大小为 100 MB。
- 您的租户必须具有美国中西部(芝加哥)区域。生成式 AI Agents 仅在此区域可用。
- 您的 OCI AI Agents 和 Buckets 应该在同一个区间中(这只是此应用程序的限制,而不是通用的 OCI AI Agent 限制)。
技术实现
该应用程序使用 Python 构建,并利用了几项关键技术:
- Streamlit:用于创建响应迅速的现代 Web 界面
- OCI Python SDK:用于与 Oracle Cloud Infrastructure 无缝集成
- Python 3.7+:确保与现代 Python 功能的兼容性
入门
设置应用程序非常简单:
-
创建并激活 Python 虚拟环境:
MAC:
python -m venv .venv
source .venv/bin/activate
或
Windows:
.venv\Scripts\activate
-
使用 pip 安装所需的依赖项:
pip install -r requirements.txt
-
在 OCI CLI 配置配置文件 (~/.oci/config) 中配置您的 OCI 凭据
-
将以下代码保存在虚拟环境中(例如:app.py)并运行该应用程序
Streamlit run app.py
-
在应用程序中传入您所需的参数并开始聊天。
P.S:当向需要成为此应用程序一部分的 Bucket 添加或删除现有对象时,运行“创建注入作业”,一旦对象被上传/删除。
代码
import streamlit as st
import oci
import os
import time
### Set page configuration with OCI favicon
st.set_page_config(
page_title="OCI AI Research Assistant",
page_icon="https://www.oracle.com/favicon.ico",
layout="wide"
)
from oci.config import from_file
from oci.generative_ai_agent_runtime.generative_ai_agent_runtime_client import GenerativeAiAgentRuntimeClient
from oci.generative_ai_agent_runtime.models.chat_details import ChatDetails
from oci.generative_ai_agent import GenerativeAiAgentClient
from oci.generative_ai_agent.models import CreateDataIngestionJobDetails
from oci.object_storage import ObjectStorageClient
from oci.object_storage.models import CreateBucketDetails
from tempfile import NamedTemporaryFile
def initialize_oci_clients(profile_name="DEFAULT", agent_endpoint_id=None):
"""Initialize OCI clients with the specified profile and create a session"""
try:
st.write(f"Attempting to load config profile: {profile_name}")
config = from_file(profile_name=profile_name)
st.write("Config loaded successfully")
st.write(f"Using region: {config.get('region')}")
## Use the appropriate service endpoint based on your region
service_endpoint = "https://agent-runtime.generativeai.us-chicago-1.oci.oraclecloud.com"
st.write(f"Using service endpoint: {service_endpoint}")
## Initialize GenAI client with service endpoint
genai_client = GenerativeAiAgentRuntimeClient(
config,
service_endpoint=service_endpoint
)
## Create a session if agent_endpoint_id is provided and session doesn't exist
if agent_endpoint_id and 'chat_session_id' not in st.session_state:
try:
create_session_response = genai_client.create_session(
create_session_details=oci.generative_ai_agent_runtime.models.CreateSessionDetails(
display_name="USER_Session",
description="User Session"),
agent_endpoint_id=agent_endpoint_id)
st.session_state.chat_session_id = create_session_response.data.id
st.write("Chat session created successfully")
except Exception as e:
st.error(f"Error creating chat session: {str(e)}")
## Initialize Object Storage client
object_storage_client = ObjectStorageClient(config)
## Initialize Identity client
identity_client = oci.identity.IdentityClient(config)
st.write("OCI clients initialized")
return genai_client, object_storage_client, identity_client, config
except Exception as e:
st.error(f"Error initializing OCI clients: {str(e)}")
st.error("Please check if your OCI config file (~/.oci/config) exists and contains the correct profile")
return None, None, None, None
def list_objects(object_storage_client, namespace, bucket_name):
"""List objects in a bucket"""
try:
# Send the request to service with minimal required parameters
list_objects_response = object_storage_client.list_objects(
namespace_name=namespace,
bucket_name=bucket_name,
fields="name,size,timeCreated" # Only fetch essential fields
)
return list_objects_response.data.objects
except Exception as e:
st.error(f"Error listing objects: {str(e)}")
return []
def upload_file(object_storage_client, namespace, bucket_name, file):
"""Upload a file to object storage"""
try:
# Read the file content
file_content = file.read()
import os
import time
import streamlit as st
import oci
from oci.config import from_file
from oci.generative_ai_agent_runtime.models import ChatDetails
from oci.generative_ai_agent.models import CreateDataIngestionJobDetails
from oci.generative_ai_agent.generative_ai_agent_client import GenerativeAiAgentClient
from oci.object_storage import ObjectStorageClient
from oci.identity import IdentityClient
def initialize_oci_clients(profile_name, agent_endpoint_id):
"""Initializes OCI clients."""
try:
# Load configuration from the OCI config file
config = from_file(profile_name=profile_name)
## Initialize the Generative AI Agent client
genai_client = oci.generative_ai_agent_runtime.GenerativeAiAgentClient(config=config)
## Initialize the Object Storage client
object_storage_client = ObjectStorageClient(config=config)
## Initialize the Identity client
identity_client = IdentityClient(config=config)
return genai_client, object_storage_client, identity_client, config
except Exception as e:
st.error(f"Error initializing OCI clients: {str(e)}")
return None, None, None, None
def upload_file_to_object_storage(object_storage_client, namespace, bucket_name, file):
"""Upload a file to Object Storage"""
try:
file_content = file.getvalue()
## Upload the file to Object Storage using put_object
put_object_response = object_storage_client.put_object(
namespace_name=namespace,
bucket_name=bucket_name,
object_name=file.name,
put_object_body=file_content,
content_type=file.type if hasattr(file, 'type') else None
)
return True
except Exception as e:
st.error(f"Error uploading file: {str(e)}")
return False
def delete_object(object_storage_client, namespace, bucket_name, object_name):
"""Delete an object from object storage"""
try:
# Send the delete request to service
object_storage_client.delete_object(
namespace_name=namespace,
bucket_name=bucket_name,
object_name=object_name
)
# Verify deletion by trying to get object metadata
try:
object_storage_client.head_object(
namespace_name=namespace,
bucket_name=bucket_name,
object_name=object_name
)
st.error(f"Failed to delete {object_name}. Object still exists.")
return False
except:
# If we get an error trying to get the object, it means it's deleted
return True
except Exception as e:
st.error(f"Error deleting object: {str(e)}")
return False
def list_data_sources(profile_name, compartment_id):
"""List available data sources"""
try:
# Initialize the GenerativeAiAgent client
config = from_file(profile_name=profile_name)
generative_ai_agent_client = GenerativeAiAgentClient(config)
## List data sources
response = generative_ai_agent_client.list_data_sources(
compartment_id=compartment_id,
lifecycle_state="ACTIVE"
)
return response.data.items if response.data else []
except Exception as e:
st.error(f"Error listing data sources: {str(e)}")
return []
def create_ingestion_job(profile_name, compartment_id, data_source_id):
"""Create a data ingestion job"""
try:
# Initialize the GenerativeAiAgent client
config = from_file(profile_name=profile_name)
generative_ai_agent_client = GenerativeAiAgentClient(config)
## Create the ingestion job
response = generative_ai_agent_client.create_data_ingestion_job(
create_data_ingestion_job_details=CreateDataIngestionJobDetails(
compartment_id=compartment_id,
data_source_id=data_source_id,
display_name=f"Ingestion-Job-{int(time.time())}", # Unique name using timestamp
description="Data ingestion job created from Research Assistant"
)
)
return response.data
except Exception as e:
st.error(f"Error creating ingestion job: {str(e)}")
return None
def get_chat_response(client, agent_endpoint_id, message):
"""Get response from the chat agent"""
try:
# Validate agent endpoint ID
if not agent_endpoint_id or not agent_endpoint_id.strip():
st.error("Agent Endpoint ID is required")
return None
## Ensure we have a session ID
if 'chat_session_id' not in st.session_state:
# Create a new session if we don't have one
try:
create_session_response = client.create_session(
create_session_details=oci.generative_ai_agent_runtime.models.CreateSessionDetails(
display_name="USER_Session",
description="User Session"),
agent_endpoint_id=agent_endpoint_id)
st.session_state.chat_session_id = create_session_response.data.id
except Exception as e:
st.error(f"Error creating chat session: {str(e)}")
return None
## Send the chat request
response = client.chat(
agent_endpoint_id=agent_endpoint_id,
chat_details=ChatDetails(
user_message=message,
should_stream=False, # Set to False for now until we implement streaming properly
session_id=st.session_state.chat_session_id
)
)
## Debug: Print response structure
st.write("Response data attributes:", dir(response.data))
## Return the response - accessing the correct attribute
return response.data.message
except Exception as e:
st.error(f"Error getting chat response: {str(e)}")
return None
def main():
"""Main function for the OCI AI Research Assistant application"""
st.title("OCI AI Research Assistant")
## Configuration Section in Sidebar
with st.sidebar:
st.header("Configuration")
## Display available profiles from ~/.oci/config
config_file = os.path.expanduser("~/.oci/config")
available_profiles = []
if os.path.exists(config_file):
with open(config_file, 'r') as f:
content = f.read()
profiles = [line.strip('[').strip(']') for line in content.split('\n') if line.strip().startswith('[')]
available_profiles = profiles
#st.write("Available profiles:", ", ".join(available_profiles))
## OCI Configuration
profile_name = st.selectbox(
"OCI Profile Name",
options=available_profiles,
index=available_profiles.index("DEFAULT") if "DEFAULT" in available_profiles else 0
)
agent_endpoint_id = st.text_input("Agent Endpoint ID")
compartment_id = st.text_input("Compartment ID")
## Object Storage Configuration
namespace = st.text_input("Namespace Name")
bucket_name = st.text_input("Bucket Name")
## Initialize button
if st.button("Initialize Clients"):
# Validate required inputs
if not agent_endpoint_id or not agent_endpoint_id.strip():
st.error("Agent Endpoint ID is required")
return
## Store all inputs in session state
st.session_state.profile_name = profile_name
st.session_state.agent_endpoint_id = agent_endpoint_id
st.session_state.compartment_id = compartment_id
st.session_state.namespace = namespace
st.session_state.bucket_name = bucket_name
## Initialize OCI clients with agent endpoint ID
genai_client, object_storage_client, identity_client, config = initialize_oci_clients(
profile_name=profile_name,
agent_endpoint_id=agent_endpoint_id
)
if all([gen
```markdown
## 聊天主内容区
```python
if hasattr(st.session_state, 'genai_client'):
st.markdown("""
欢迎使用您的 AI 研究助手! 提出任何问题,我将帮助您找到所需的信息。
""")
## 初始化聊天记录(如果不存在)
if 'messages' not in st.session_state:
st.session_state.messages = []
# 显示聊天记录
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# 聊天输入
if prompt := st.chat_input("您想研究什么?"):
# 将用户消息添加到聊天记录
st.session_state.messages.append({"role": "user", "content": prompt})
## 显示用户消息
with st.chat_message("user"):
st.markdown(prompt)
## 获取 AI 响应
with st.chat_message("assistant"):
try:
response = get_chat_response(
st.session_state.genai_client,
st.session_state.agent_endpoint_id,
prompt
)
if response:
st.markdown(response)
# 将助手响应添加到聊天记录
st.session_state.messages.append({"role": "assistant", "content": response})
except Exception as e:
st.error(f"获取响应时出错: {str(e)}")
## 侧边栏中的对象存储
if hasattr(st.session_state, 'object_storage_client'):
with st.sidebar:
st.markdown("---")
## 摄取作业部分
st.header("数据摄取")
## 列出数据源
data_sources = list_data_sources(
st.session_state.profile_name,
st.session_state.compartment_id
)
if data_sources:
# 为 selectbox 创建数据源名称和 ID 列表
data_source_options = {f"{ds.display_name} ({ds.id})": ds.id for ds in data_sources}
selected_source = st.selectbox(
"选择数据源",
options=list(data_source_options.keys())
)
if st.button("创建摄取作业", type="primary"):
with st.spinner("正在创建摄取作业..."):
result = create_ingestion_job(
st.session_state.profile_name,
st.session_state.compartment_id,
data_source_options[selected_source]
)
if result:
st.success(f"已创建摄取作业: {result.id}")
else:
st.warning("未在存储分区间找到活动数据源。")
st.markdown("---")
st.header("对象存储")
## 上传部分
st.subheader("上传文件")
uploaded_file = st.file_uploader("选择要上传的文件", key="sidebar_uploader")
if uploaded_file is not None:
if st.button("上传"):
if upload_file(st.session_state.object_storage_client,
st.session_state.namespace,
st.session_state.bucket_name,
uploaded_file):
st.success(f"文件 {uploaded_file.name} 上传成功!")
st.experimental_rerun()
## 列出对象部分
st.subheader("存储桶中的对象")
## 获取当前的对象列表
objects = list_objects(
st.session_state.object_storage_client,
st.session_state.namespace,
st.session_state.bucket_name
)
## 添加刷新按钮
if st.button("刷新对象", type="primary"):
st.rerun()
if objects:
for obj in objects:
col1, col2 = st.columns([3, 1])
with col1:
st.write(f"📄 {obj.name}\n{obj.size:,} bytes")
with col2:
delete_button_key = f"delete_{obj.name}"
if st.button("🗑", key=delete_button_key, help=f"删除 {obj.name}"):
try:
with st.spinner(f"正在删除 {obj.name}..."):
if delete_object(
st.session_state.object_storage_client,
st.session_state.namespace,
st.session_state.bucket_name,
obj.name
):
st.success(f"已删除 {obj.name}")
st.rerun()
else:
st.error(f"删除 {obj.name} 失败")
except Exception as e:
st.error(f"删除 {obj.name} 时出错: {str(e)}")
st.divider()
else:
st.info("此存储桶中未找到任何对象")
st.markdown(response)
# 将助手响应添加到聊天记录
st.session_state.messages.append({"role": "assistant", "content": response})
if __name__ == "__main__":
main()