实时生成嵌入的力量:利用mongodb流处理提升生成ai应用的响应速度和准确性
- Rifx.Online
- Machine Learning , Data Science , AI Applications
- 23 Feb, 2025
与 Kenny Gorman 共同创作
衷心感谢 Nicolas Benhamou 的慷慨技术支持和鼓励。
在过去十年中,我们在实时、以数据为中心的应用程序方面发生了显著变化。无论是在电子商务推荐、欺诈检测还是物联网事件分析中,用户现在都期待由持续数据流驱动的即时、上下文相关的响应。MongoDB 灵活的文档模型和实时能力使其非常适合这些动态工作负载,尤其是与像 Kafka 这样的流处理解决方案配合使用时。
实时 AI 的需求在生成即时向量嵌入时尤为相关,这对于驱动语义搜索、个性化推荐和生成 AI 助手等应用至关重要。流处理解决方案允许随着新数据的到来更新嵌入,确保 AI 驱动的洞察保持新鲜和准确。
在本文中,我们将探讨 MongoDB 流处理如何为 AI 应用程序提供实时嵌入生成,确保模型与最新的产品、用户和内容更新保持同步。
照片由 Felix Dubois-Robert 提供,来源于 Unsplash
为什么实时 AI 和嵌入生成很重要?
生成性 AI 已成为主流,驱动着生成高质量文本、提供智能搜索和推动个性化推荐的应用程序。一项关键技术,检索增强生成 (RAG),通过将最新的、特定领域的数据(以嵌入的形式)注入生成过程来增强 AI 模型。嵌入作为 AI 系统的语义记忆,使其能够理解产品、用户偏好和上下文查询之间的关系。
然而,AI 驱动的应用程序只有在使用最新可用数据时才能提供相关和准确的响应。如果没有实时更新,推荐系统可能会建议缺货的产品,或者 AI 聊天机器人可能会提供过时的定价信息。支持这些 AI 驱动的功能需要一个能够处理实时更新和高吞吐量操作工作负载的基础设施。
通过利用流处理,我们可以在更新发生时持续将原始数据转换为嵌入。这确保了 AI 模型在生成响应之前检索到最新信息,提高了准确性、可靠性和用户体验。
所有图像由作者创建
图 1: 检索增强生成 RAG 的简单结构
为什么选择 MongoDB 进行实时操作数据?
MongoDB 不仅是一个强大的操作数据库,而且非常适合实时 AI 工作负载,特别是即时嵌入生成。它处理高速度更新、灵活的模式变化和实时索引的能力使其非常适合于流处理应用程序,在这些应用程序中,嵌入会根据新数据不断演变。
对于以快速插入、更新和查询为特征的操作 (OLTP) 工作负载,MongoDB 提供高吞吐量和最低延迟,使其非常适合于对以下要求的应用程序:
- 高效的读/写模式 — 其文档模型能够容纳复杂、不断演变的数据(如产品目录),而不受严格模式的限制。
- 水平可扩展性 — 分片在数据量和请求增加时保持一致的性能。
- 多功能查询能力 — 除了标准的 CRUD 操作外,它还允许临时查询、高级索引和向量索引(在 MongoDB Atlas 中可用)以支持 AI 驱动的搜索和推荐。
此外,MongoDB 可以通过连接器(如 Kafka Connect 或 Atlas 集群)无缝集成到流处理架构中,以摄取、转换和存储从产品、用户或事件更新生成的实时嵌入。这使得低延迟的 AI 应用程序成为可能,确保向量搜索和检索增强生成 (RAG) 工作流能够使用最新数据进行操作。
Bridging RAG with Real-Time Pipelines
检索增强生成 (RAG) 的强大之处在于使用最相关、最新的数据来指导大型语言模型的响应。如果没有新鲜的嵌入,AI驱动的应用程序可能会面临以下风险:
- 不正确的推荐 — 建议过时的商品或缺货产品。
- 不准确的产品信息 — 显示旧价格、商品规格或可用性。
- 用户满意度下降 — 当系统“知道”的信息少于您网站的静态列表时,用户会感到沮丧。
MongoDB 流处理确保新的嵌入可以立即用于 AI 驱动的搜索、问答和推荐,以防止这些陷阱。
想象一下,一个客户在电子商务网站上搜索“无线降噪耳机”。如果 AI 助手依赖过时的嵌入,它可能会推荐已停产的产品或无法显示新发布的型号。实时流处理确保嵌入保持最新,因此 AI 助手始终可以检索到最相关的产品。
现实生活中的使用案例:实时嵌入生成用于 AI 驱动的产品目录
在快速变化的电子商务世界中,产品目录不断演变。新产品发布,价格波动,库存可用性实时变化。 AI 驱动的体验必须与时俱进,以确保客户获得准确的推荐和搜索结果。
众多 MongoDB 客户利用 MongoDB 作为一个具有动态产品目录的电子商务平台:
- 频繁更新:新产品被引入,现有产品被修改,有些产品被移除。
- 即时用户参与:购物者寻找产品,期待定制的推荐,或与 AI 助手互动以获取指导。
- AI 增强能力:生成式 AI 将产品信息转换为营销材料、聊天回复或改善发现体验。
MongoDB 流处理在处理这些动态变化中发挥着关键作用。每当产品更新发生时,MongoDB 触发实时嵌入生成,确保 AI 模型即时访问最新的向量表示,以便进行搜索、推荐和生成响应。
例如,假设零售商添加了一个新的智能手表。在这种情况下,产品详情(品牌、特性、价格)必须即时转换为嵌入,以便可以通过语义搜索发现或与类似商品一起推荐。如果没有实时嵌入,AI 可能会在数小时甚至数天内无法识别新产品。
为了确保准确性,实时嵌入生成至关重要——瞬间将更新的产品信息转换为向量嵌入,以便 AI 模型能够访问。
自动嵌入通过流处理:工作原理
随着产品和数据的演变,嵌入必须即时刷新。MongoDB 流处理自动化了这一过程,确保 AI 驱动的应用程序保持最新。以下是其工作原理:
端到端即时嵌入生成
-
实时事件 => Kafka 或 MongoDB Atlas 集群
当新产品或现有产品的任何更新被引入时,管道开始工作。这些更改可以作为实时事件发布到 Kafka,或直接插入到您的 MongoDB Atlas 集群中,随时准备处理。
-
MongoDB Atlas 流处理
一旦您在 Atlas 中配置了流处理实例(使用上述步骤),它将持续监控这些事件。任何新的或修改的产品文档都会触发您的流处理管道,无论数据是来自 Kafka 还是 Atlas 集群本身。
-
调用外部 REST API ($https 操作符)
每当管道检测到更新时,Atlas 可以调用外部 REST 端点(使用 $https 操作符)——通常是一个专门用于生成嵌入的微服务。此步骤通常将产品的描述或其他元数据传递给该服务。
-
嵌入模型
外部服务使用嵌入模型或其他算法来创建数值向量嵌入。这些向量捕捉每个产品的语义本质——特征、类别或文本描述——从而实现先进的 AI 驱动搜索和推荐。
-
实时目录更新
新计算的嵌入随后直接附加到存储在 MongoDB Atlas 中的产品文档。这确保在产品被添加或更新的几秒钟内,其嵌入立即可用于下游处理。
-
前端集成
有了这些嵌入,前端应用程序(如电子商务网站)可以解锁下一层次的功能:
- 个性化推荐 — 根据向量相似性识别相似或互补的产品。
- 检索增强生成 (RAG) — 向 AI 模型提供最新的产品细节,以进行实时问答或内容生成。
- 向量/混合搜索 — 让用户进行语义搜索(例如,“环保跑鞋”)或结合关键词 + 向量过滤器来细化结果。
实现即时嵌入生成
本节描述了如何设置 MongoDB 流处理,以自动化 AI 驱动应用程序的嵌入生成。我们将配置 MongoDB 以:
- 通过流处理捕获实时产品更新
- 触发嵌入生成 API(例如,OpenAI)
- 将嵌入存储在 MongoDB 中,以便进行向量搜索和推荐
前提条件
在开始之前,请确保您具备以下条件:
- 启用流处理的 MongoDB Atlas
- Kafka 或 MongoDB 集群(用于实时事件捕获)
- 一个外部嵌入服务(例如,OpenAI、Hugging Face 或自定义模型)
- 启用 MongoDB Atlas 向量搜索(有关向量搜索的更多详细信息,请参阅此 教程。)
步骤 1:创建 MongoDB 流处理器
点击创建实例以创建流处理器。
步骤 2:创建流处理器连接
我们将首先设置与 MongoDB 流处理和嵌入生成 API 的连接。
建立与 OpenAI 嵌入 API 的连接
运行以下 curl 命令,以在 MongoDB Atlas 和 OpenAI 之间创建安全的 HTTPS 连接:
curl - user "<publicApiKey\>:<privateApiKey\>" - digest \\
- header "Content-Type: application/json" \\
- header "Accept: application/vnd.atlas.2023–02–01+json" \\
- include \\
- data '{ "name": "OpenAIEmbedConnection", "type": "Https", "url": "https://api.openai.com/v1/embeddings", "headers": { "Authorization": "Bearer YOUR\_OPENAI\_API\_KEY" } }' \\
- request POST "https://cloud.mongodb.com/api/atlas/v2/groups/<projectID\>/streams/<tenantName\>/connections"
建立与 MongoDB 集群的连接
步骤 3:连接到 MongoDB 流处理
使用 mongosh
连接到您的 MongoDB Atlas 流处理实例:
mongosh "mongodb://atlas-stream-67acd00c1f602e74fd99b167-bofm7.westus.z.query.mongodb.net/" - tls - authenticationDatabase admin - username <db\_username\> - password <db\_password\>
步骤 4:定义流处理管道
现在,我们配置 MongoDB 流处理以:
- 监控产品集合的更新
- 触发嵌入 API
- 将生成的嵌入存储在 vectoredProduct 中
流处理配置
将以下 JSON 保存为 pipeline.json:
[
{
"$source": {
"connectionName": "atlas\_connection",
"db": "retail",
"coll": "product",
"config": { "fullDocument": "whenAvailable" }
}
},
{
"$https": {
"connectionName": "OpenAIEmbedConnection",
"path": "",
"method": "POST",
"headers": { "Content-Type": "application/json" },
"payload": [
{
"$project": {
"model": { "$literal": "text-embedding-ada-002" },
"input": "$fullDocument.description"
}
}
],
"as": "embedding\_response"
}
},
{
"$addFields": {
"embedding": {
"$getField": {
"field": "embedding",
"input": { "$arrayElemAt": [ "$embedding\_response.data", 0 ] }
}
}
}
},
{
"$unset": "embedding\_response"
},
{
"$merge": {
"into": {
"connectionName": "atlas\_connection",
"db": "retail",
"coll": "vectoredProduct"
}
}
}
]
步骤 5:测试集成
要测试端到端工作流,请将新产品插入产品集合:
db.product.insertOne({
"name": "环保跑鞋",
"description": "由可持续材料制成的轻便跑鞋。",
"category": "鞋类",
"price": 129.99
})
预期行为:
- MongoDB 变更流检测到新产品。
- 流处理管道触发嵌入请求。
- 嵌入存储在 vectoredProduct 中。
步骤 6:在 MongoDB Atlas 中查询向量搜索
现在,我们可以使用向量搜索检索相似产品:
db.vectoredProduct.aggregate([
{
"$vectorSearch": {
"index": "product\_embeddings",
"queryVector": [0.3412, -0.9281, 0.1123, …],
"path": "embedding",
"numCandidates": 5,
"limit": 3
}
}
])
这有什么作用:
- 使用向量相似性查找三个最相关的产品。
- 针对电子商务、推荐和问答中的实时 AI 进行了优化。
结论:释放流处理在AI及更多领域的全部潜力
AI驱动的应用在实时数据方面表现出色。在超个性化推荐、智能搜索、欺诈检测和预测分析等领域的成功依赖于即时处理数据。
通过MongoDB流处理,企业可以做的不仅仅是即时生成嵌入。通过利用实时事件管道、AI驱动的自动化和动态数据丰富,组织可以:
- 通过实时推荐提供超个性化的用户体验
- 通过持续刷新嵌入加强检索增强生成 (RAG)
- 通过即时分析交易改善欺诈检测和异常识别
- 通过响应实时数据更新提升供应链和运营效率
- 通过不断更新的模型促进预测分析和预测
实时流处理不仅增强了AI能力,还提高了系统的速度、响应能力和对业务需求的适应性。