增强AI代理交互:使用PydanticAI和MongoDB实施记忆层
内存结构
我将内存结构划分为以下几个组件:
- User Profile 存储持久性细节(例如,姓名、年龄、兴趣爱好、对话偏好)。
- Memories 从过去的交互中提取的单个数据点,分类为事实、偏好或经验。
- Agent Experience 从交互中积累的知识,用于指导 agent 的响应策略,并延续到新的环境中。
- Task History 跟踪工作流程进度、用户-agent 交互和中间结果(例如,对话记录)。
MongoDB 文档模型
以下是这些概念如何映射到我们的 MongoDB 文档:
class Profile(BaseModel):
name: str = ""
age: int | None = None
interests: list[str] = Field(default_factory=list)
home: str = Field(default="", description="Description of the user's home town/neighborhood, etc.")
occupation: str = Field(default="", description="The user's current occupation or profession")
conversation_preferences: list[str] = Field(
default_factory=list,
description="A list of the user's preferred conversation styles, topics they want to avoid, etc.",
)
def __str__(self) -> str:
res = ""
if any(v for v in self.model_dump().values()):
res += f"<user_profile>\n{self.model_dump_json()}\n</user_profile>"
return res.strip()
@classmethod
def user_prompt(cls) -> str:
return "Create an updated detailed user profile from the current information you have. Make sure to incorporate the existing profile if it exists in <user_specific_experience>. Prefer to add new stuff to the profile rather than overwrite existing stuff. Unless of course it makes sense to overwrite existing stuff. For example, if the user says they are 25 years old, and the profile says they are 20 years old, then it makes sense to overwrite the profile with the new information."
class Memory(BaseModel):
"Save notable memories the user has shared with you for later recall."
id: SkipJsonSchema[UUID4] = Field(default_factory=uuid4)
created_at: SkipJsonSchema[datetime] = Field(default_factory=datetime.now)
context: str = Field(
description="The situation or circumstance where this memory may be relevant. Include any caveats or conditions that contextualize the memory. For example, if a user shares a preference, note if it only applies in certain situations (e.g., 'only at work'). Add any other relevant 'meta' details that help fully understand when and how to use this memory."
)
category: str = Field(description="Category of memory (e.g., 'preference', 'fact', 'experience')")
content: str = Field(description="The specific information, preference, or event being remembered.")
superseded_ids: list[str] = Field(
default_factory=list, description="IDs of memories this explicitly supersedes"
)
def __str__(self) -> str:
return self.model_dump_json(exclude={"superseded_ids"})
@classmethod
def user_prompt(cls) -> str:
return """
Analyze the conversation to identify important information that should be remembered for future interactions. Focus on:
1. Personal Details & Preferences:
- Stated preferences, likes, and dislikes
- Personal background information
- Professional or educational details
- Important relationships mentioned
2. Contextual Information:
- Time-sensitive information (upcoming events, deadlines)
- Location-specific details
- Current projects or goals
- Recent experiences shared
3. Interaction Patterns:
- Communication style preferences
- Topics they enjoy discussing
- Topics to avoid or handle sensitively
- Specific terminology or jargon they use
4. Previous Commitments:
- Promised follow-ups or continuations
- Unfinished discussions
- Expressed intentions for future interactions
For each memory identified:
- Include relevant context about when/how it should be used
- Note any temporal limitations or conditions
- Consider how it might affect future interactions
When creating memories that update existing information:
- If you have access to previous memories in <user_specific_experience>, check if any new information contradicts or updates them
- Include the IDs of any superseded memories in the `superseded_ids` field
- Example: If a user previously said they lived in New York (memory ID: abc-123) but now mentions moving to Boston,
create a new memory with superseded_ids=["abc-123"]
- Only generate the new memories, the older ones will automatically be overwritten based on the `superseded_ids` field.
Return a list of structured memories, each with clear context, category, and content.
Prioritize information that would be valuable for maintaining conversation continuity across sessions.
""".strip()
class AgentExperience(BaseModel):
procedural_knowledge: str = Field(
default="", description="Accumulated understanding of how to approach tasks in the agent's domain"
)
common_scenarios: list[str] = Field(
description="Frequently encountered situations and their typical contexts", default_factory=list
)
effective_strategies: list[str] = Field(
description="Proven approaches and methodologies that have worked well", default_factory=list
)
known_pitfalls: list[str] = Field(
description="Common challenges, edge cases, and how to handle them", default_factory=list
)
tool_patterns: list[str] = Field(
description="Effective ways to use different tools, organized by tool name", default_factory=list
)
heuristics: list[str] = Field(
description="Rules of thumb and decision-making guidelines that emerge from experience",
default_factory=list,
)
user_feedback: list[str] = Field(
description="Collection of user feedback when the user was not satisfied with the agent's response. This is to help improve the agent's technical skills and behavior. So basic responses from the user are not useful here.",
default_factory=list,
)
improvement_areas: list[str] = Field(
description="Identified areas for optimization or enhancement. Looking at the user_feedback can also help identify areas for improvement.",
default_factory=list,
)
@classmethod
def user_prompt(cls) -> str:
return """
Review this interaction and update the agent's accumulated experience, focusing on general patterns and learnings that apply across all users and sessions:
1. Knowledge Evolution:
- What general domain insights were gained that could benefit all users?
- What universal patterns or anti-patterns emerged?
- Which strategies proved effective regardless of user context?
2. Pattern Recognition:
- What common scenarios or use cases does this interaction represent?
- Which tool usage patterns were universally effective?
- What decision-making principles emerged that could apply broadly?
3. Heuristic Development:
- What general rules of thumb can be derived from this experience?
- How can existing heuristics be refined to be more universally applicable?
- What contextual factors consistently influence success across users?
Integrate this experience with existing knowledge in <agent_experience>:
- Focus on patterns that are likely to repeat across different users
- Develop heuristics that are user-agnostic
- Document tool usage patterns that work in general scenarios
- Identify improvement areas that would benefit all users
Important:
- Exclude
## 用户特定细节或偏好
- 侧重于普遍适用的技术和程序知识
- 捕捉一般原则而非具体实例
- 通过避免任何个人身份信息来维护隐私
专注于构建一个强大、不断发展的知识库,随着时间的推移提高代理对所有用户的有效性。
请记住,这是累积的经验——不要覆盖现有的知识,而是增强和完善它。
```python
def validate_memories(memories: list[Memory]) -> list[Memory]:
if not memories:
return []
superseded_ids = set()
for memory in memories:
superseded_ids.update(memory.superseded_ids)
memory_dict = {}
for memory in memories:
if str(memory.id) not in superseded_ids:
memory_dict[str(memory.id)] = Memory(**memory.model_dump(exclude={"superseded_ids"}))
return sorted(memory_dict.values(), key=lambda x: x.created_at)
class User(Document):
"""User document storing profile and accumulated memories"""
profile: Profile = Field(default_factory=Profile)
memories: list[Memory] = Field(default_factory=list)
class Settings:
name = "users"
validate_on_save = True
def __str__(self) -> str:
res = str(self.profile)
if self.memories:
mems = "\n\n".join([str(m) for m in self.memories])
res += f"\n\n<memories>\n{mems}\n</memories>\n\n"
return res.strip()
@field_validator("memories")
@classmethod
def validate_memories(cls, v: list[Memory]) -> list[Memory]:
return validate_memories(v)
def update_from_generated_user(self, generated_user: GeneratedUser) -> None:
self.profile = generated_user.profile or self.profile
self.memories.extend(generated_user.memories)
class Agent(Document):
"""Agent document storing configuration and accumulated experience"""
name: Annotated[str, Indexed(unique=True)]
model: KnownModelName
description: str = ""
system_prompt: str = ""
experience: AgentExperience = Field(default_factory=AgentExperience)
class Settings:
name = "agents"
validate_on_save = True
class Task(Document):
"""Task document tracking workflow progress"""
user: Link[User]
agent: Link[Agent]
status: TaskStatus = TaskStatus.CREATED
message_history: list[_messages.ModelMessage | dict[str, Any]] = Field(default_factory=list)
created_at: datetime = Field(default_factory=datetime.now)
updated_at: datetime = Field(default_factory=datetime.now)
class Settings:
name = "tasks"
indexes = ["user._id", "agent.name", "status", "created_at"]
validate_on_save = True
def experience_str(self) -> str:
return (
f"<agent_experience>\n{self.agent.experience.model_dump_json()}\n</agent_experience>\n\n"
f"<user_specific_experience>\n{self.user}\n</user_specific_experience>\n\n"
)
记忆更新与流程:逐步详解
以下是完整的流程,包含更新逻辑。每个步骤都展示了用户和代理数据的更新方式,以及所有内容如何集成到动态 system_prompt
中,以用于未来的任务。
1. 用户发起对话
在 MongoDB 中创建一个新的 Task
实例,将用户和代理关联起来。我们在那里存储 message_history
,随着对话的展开。
2. 代理检索之前的经验
当代理运行时,它会加载任何现有的用户详细信息和代理经验。这些将被注入到动态 system_prompt
中。
from pydantic_ai import Agent, RunContext
task_assistant = Agent(
name="task_assistant",
model="google-gla:gemini-1.5-flash",
system_prompt="You are a task assistant. Use your stored knowledge to help.",
deps_type=Task
)
@task_assistant.system_prompt(dynamic=True)
def system_prompt(ctx: RunContext[Task]) -> str:
return ctx.deps.experience_str()
3. 代理交互;message_history 增长
用户消息和代理响应会添加到 message_history
中。 内存更新被排队,但尚未处理。
4. 记忆代理处理对话
我们有一个单独的代理(通常称为 memory_agent
),它的工作是分析对话并生成对用户的资料和记忆以及代理的经验的更新。这发生在对话之后。
关键见解: 记忆代理处理完整的 message_history
,其中包括原始任务代理的动态 system_prompt
(来自 experience_str()
)。这意味着:
- 新的记忆是在先前代理经验的背景下创建的
- 资料更新会考虑来自先前会话的现有用户数据
- 记忆取代决策理解完整的历史背景
async def create_user_experience(memory_agent: Agent, message_history: list):
prepared_messages = prepare_message_history(message_history)
profile = await memory_agent.run(
user_prompt=Profile.user_prompt(),
result_type=Profile,
message_history=prepared_messages
)
memories = await memory_agent.run(
user_prompt=Memory.user_prompt(),
result_type=list[Memory],
message_history=prepared_messages
)
return GeneratedUser(profile=profile.data, memories=memories.data)
async def create_agent_experience(memory_agent: Agent, message_history: list):
prepared_messages = prepare_message_history(message_history)
return await memory_agent.run(
user_prompt=AgentExperience.user_prompt(),
result_type=AgentExperience,
message_history=prepared_messages
).data
5. 记忆被验证或取代
请注意,每个新的 Memory
都可以引用旧的记忆 ID 来覆盖。一旦我们有了新记忆的最终列表,我们就会整合它们,删除被标记为取代的旧记忆。
6. 将更新后的经验保存到 MongoDB
最后,我们将新生成的用户信息(资料 + 记忆)和更新后的代理经验合并到数据库中。
async def save_experience(
user: User,
generated_user: GeneratedUser,
agent: Agent,
agent_experience: AgentExperience
):
user.update_from_generated_user(generated_user)
agent.experience = agent_experience
await user.save()
await agent.save()
7. 未来任务会看到更新后的知识
任何针对同一用户/代理的新 Task
都会自动在动态 system_prompt
中包含更新后的记忆(步骤 2)。代理保留或扩展之前的学习,即使用户开始全新的对话或上下文。
示例场景
1. 初始推荐
- 用户:“给我推荐一部电影”
- 用户消息添加到
Task.message_history
- 代理根据空资料推荐浪漫喜剧
- 代理消息添加到
Task.message_history
2. 对话期间的用户反馈
- 用户:“我更喜欢动作片”
- 用户消息添加到
Task.message_history
- 代理继续对话,没有记忆/经验更新,但有更新后的
message_history
- 最后的消息:“动作大片标题怎么样?”
- 用户批准,任务被标记为
completed
- 完整的对话存储在
Task.message_history
中
3. 对话后处理
记忆代理分析完整历史:
- 使用
interests: ["action movies"]
更新资料 - 创建记忆:“用户表示喜欢动作片”
- 记录代理经验:“动作片推荐获得更好的参与度”
4. 第二天的新任务
system_prompt
现在包含更新后的偏好- 代理根据存储的记忆推荐“另一部动作惊悚片”
- 用户获得个性化回复,无需重新解释偏好
结论
构建自适应记忆系统
这种结构化的记忆方法通过结合 PydanticAI 的验证和 MongoDB 的灵活性,使 AI 代理能够通过交互来发展。
每次 run
都会通过资料细化、上下文感知的记忆和累积的经验来改进。 MongoDB 确保了原子、经过验证的更新,并有效地处理嵌套的数据结构和时间查询。 关键的创新是对话后处理流程,它将实时交互与后台记忆分析解耦,在保持聊天响应能力的同时实现复杂的更新。
结果
用户获得个性化的体验,而开发人员则利用自动上下文注入:
def experience_str(self) -> str:
return (
f"<agent_experience>\n{self.agent.experience.model_dump_json()}\n</agent_experience>\n\n"
f"<user_specific_experience>\n{self.user}\n</user_specific_experience>\n\n"
)