
扩大成功:优化您的 Rag Pipeline 和克服 Llm 限制的 5 个关键策略
简介
大型语言模型 (LLM) 擅长文本生成,并展现出生成类似人类文本的卓越能力,但在处理过时信息方面存在不足。这就是检索增强生成 (RAG) 发挥作用的地方;它通过添加外部数据来解决此限制,从而提高准确性并减少 AI 的“幻觉”。然而,扩展 RAG 具有挑战性;缓慢的数据检索会削弱性能,影响用户体验。
研究表明,大量 RAG 管道(高达 90%)在扩展时会遇到性能下降,这主要是由于检索过程缓慢。
一个典型的 RAG 管道包含两个主要组件:
1. 检索机制:此组件负责从外部来源获取相关信息。它包括:
- 索引:将数据组织成结构化格式,以促进高效检索。
- 搜索算法:采用近似最近邻 (ANN) 搜索等方法,以便在大型数据集中快速定位相关数据点。
2. 生成模型:一个 LLM,它处理检索到的信息,将其与内部知识相结合,以生成连贯且上下文丰富的响应。
RAG 管道,包括索引、检索和生成
随着数据量和并发查询数量的增加,检索机制通常会成为性能瓶颈。低效的检索会导致延迟增加,进而影响 AI 应用程序的整体响应速度和可扩展性。
由于可扩展性问题,出现的问题如下:
“为什么 我的 AI 模型响应时间这么长?”
“为什么 我的 RAG 管道在扩展时感觉很迟缓?”
检索延迟每增加 500 毫秒,就会导致参与度下降。在规模化的情况下,这是 AI 性能的无声杀手。
因此,优化检索过程对于保持系统性能和用户满意度至关重要。
你知道 RAG 为什么会变慢吗?
在检索增强生成 (RAG) 系统中,高效的检索机制对于性能至关重要,尤其是在数据扩展时。有几个因素会导致检索时间变慢:
1. 低效的索引实践
像暴力 K-最近邻 (KNN) 搜索这样的朴素方法涉及将查询向量与数据集中的所有向量进行比较。随着数据集的增长,这种方法在计算上变得昂贵,导致延迟增加。
示例:使用 LangChain 和 Transformers 进行暴力 KNN 搜索
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.retrievers import KNNRetriever
## Sample data
documents = [
{"id": "1", "text": "The quick brown fox jumps over the lazy dog."},
{"id": "2", "text": "A fast, dark-colored fox leaps over a sleepy canine."},
{"id": "3", "text": "Lorem ipsum dolor sit amet, consectetur adipiscing elit."},
# Add more documents as needed
]
## Initialize embeddings model
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
## Create FAISS vector store and add documents
vector_store = FAISS.from_documents(documents, embeddings)
## Initialize KNN retriever
retriever = KNNRetriever(vector_store=vector_store, k=2)
## Query
query = "A quick brown fox"
retrieved_docs = retriever.get_relevant_documents(query)
## Display results
for doc in retrieved_docs:
print(f"Document ID: {doc['id']}, Text: {doc['text']}")
虽然这种方法很直接,但由于其计算强度,它无法很好地扩展到大型数据集。
2. 过载的向量数据库
在没有适当过滤的情况下,随意将嵌入添加到向量数据库会导致不可管理的数据量。然后,未经过滤的查询必须筛选大量数据,从而淹没检索管道并增加延迟。
示例:管理向量数据库过载
from sentence_transformers import SentenceTransformer
from langchain.vectorstores import Chroma
## Initialize the model and vector store
model = SentenceTransformer('all-MiniLM-L6-v2')
vector_store = Chroma()
## Sample data
documents = ["Document 1 content", "Document 2 content", "Document 3 content"]
## Indexing documents
for doc in documents:
embedding = model.encode(doc)
vector_store.add_texts([doc], embeddings=[embedding])
## Querying without filtering
query = "Sample query text"
query_embedding = model.encode(query)
results = vector_store.similarity_search_by_vector(query_embedding, k=5)
在这里,similarity_search_by_vector 函数会在所有存储的嵌入中进行搜索,而没有任何过滤,随着嵌入数量的增加,这可能会效率低下。
3. 次优的查询处理
同时查询所有文档,尤其是在高维嵌入空间中,会降低性能。处理高维嵌入会增加计算复杂性,从而导致检索时间变慢。
示例:执行暴力相似性搜索,没有任何索引或过滤,随着数据集的增长,这在计算上变得昂贵。
from transformers import AutoTokenizer, AutoModel
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.docstore.document import Document
import torch
## Initialize the Hugging Face model and tokenizer
model_name = "sentence-transformers/all-MiniLM-L6-v2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name)
## Sample documents
documents = [
"Document 1 content about machine learning.",
"Document 2 content about artificial intelligence.",
"Document 3 content about deep learning.",
# Add more documents as needed
]
## Convert documents to LangChain Document format
lc_documents = [Document(page_content=doc) for doc in documents]
## Initialize HuggingFaceEmbeddings
embeddings = HuggingFaceEmbeddings(model_name=model_name)
## Create a FAISS vector store without indexing (brute-force search)
vector_store = FAISS.from_documents(lc_documents, embedding=embeddings)
## Sample query
query = "Tell me about neural networks."
对查询进行编码
query_embedding = embeddings.embed_query(query)
执行相似性搜索,不进行任何过滤或高效索引
results = vector_store.similarity_search_by_vector(query_embedding, k=3)
显示结果
for i, result in enumerate(results, 1):
print(f"Result {i}: {result.page_content}")
如何优化检索速度?
优化检索速度对于检索增强生成(RAG)系统的效率至关重要。实现近似最近邻(ANN)索引可以通过减少搜索复杂度来显著提高检索性能。
1. 实现近似最近邻(ANN)索引
ANN 算法通过近似最近邻而不是执行详尽的搜索来提高检索速度。这种近似将计算复杂度从 O(n) 降低到 O(log n) 甚至常数时间,具体取决于算法和数据分布。
近似最近邻(ANN)索引
分层可导航小世界(HNSW):
HNSW 适用于文档少于 100,000 个的数据集。它构建了一个基于图的索引,其中节点代表数据点,边根据邻近度连接节点。这种结构允许在搜索操作期间进行高效的遍历。
import hnswlib
import numpy as np
## 样本数据:1000 个维度为 128 的向量
data = np.random.randn(1000, 128).astype('float32')
## 初始化 HNSW 索引
dim = data.shape[1]
num_elements = data.shape[0]
index = hnswlib.Index(space='l2', dim=dim)
index.init_index(max_elements=num_elements, ef_construction=200, M=16)
## 将数据添加到索引
index.add_items(data)
## 查询索引
query_vector = np.random.randn(1, 128).astype('float32')
labels, distances = index.knn_query(query_vector, k=5)
print("最近邻:", labels)
print("距离:", distances)
hnswlib 为 1,000 个向量构建了一个 HNSW 索引,每个向量有 128 个维度。knn_query 函数检索给定查询向量的前 5 个最近邻。
具有乘积量化(IVF-PQ)的倒排文件系统:
IVF-PQ 针对大规模检索进行了优化,涉及数百万个文档。它结合了倒排文件系统(IVF)将数据集划分为簇,并结合乘积量化(PQ)来压缩每个簇内的向量,从而减少内存使用并加速搜索。
import faiss
import numpy as np
## 样本数据:1,000,000 个维度为 128 的向量
data = np.random.randn(1000000, 128).astype('float32')
## 定义簇的数量
nlist = 100
## 创建索引
quantizer = faiss.IndexFlatL2(128) # 粗量化器
index = faiss.IndexIVFPQ(quantizer, 128, nlist, 16, 8)
## 16:子量化器的数量
## 8:每个码本的位数
## 训练索引
index.train(data)
## 将数据添加到索引
index.add(data)
## 查询索引
query_vector = np.random.randn(1, 128).astype('float32')
index.nprobe = 10 # 要搜索的簇的数量
distances, indices = index.search(query_vector, k=5)
print("最近邻:", indices)
print("距离:", distances)
faiss 为 1,000,000 个向量创建了一个 IVF-PQ 索引。nprobe 参数控制要搜索的簇的数量,从而平衡速度和准确性。
可扩展最近邻(ScaNN):
ScaNN 专为极大规模系统而设计,可有效处理数十亿个向量。它采用各向异性向量量化和树状聚类来加速搜索操作。
import tensorflow as tf
import scann
import numpy as np
## 样本数据:模拟的文本数据,表示为向量
## 假设每个向量代表文档的嵌入
num_documents = 10000000
embedding_dimension = 128
data = np.random.randn(num_documents, embedding_dimension).astype('float32')
## 构建 ScaNN 搜索器,用于高效的相似性搜索
## 参数经过调整,用于速度和准确性之间的权衡
searcher = scann.scann_ops_pybind.builder(data, 10, "dot_product").tree(
num_leaves=2000, num_leaves_to_search=100, training_sample_size=250000).score_ah(
2, anisotropic_quantization_threshold=0.2).reorder(100).build()
## 样本查询:查询文档的嵌入
query_vector = np.random.randn(1, embedding_dimension).astype('float32')
## 执行搜索以找到最相似的文档
neighbors, distances = searcher.search_batched(query_vector)
## 输出:显示最近文档的索引及其相似性得分
print("最近文档索引:", neighbors)
print("相似性得分:", distances)
scann 为 10,000,000 个向量构建了一个索引。search_batched 函数检索所提供的查询向量的最近邻
2. 预检索过滤技术
预检索过滤技术可以通过在进行计算密集型向量搜索之前缩小搜索空间来显著提高性能。两种有效的方法是基于元数据的过滤和应用稀疏检索技术(如 BM25)。
预检索过滤
基于元数据的过滤
基于元数据的过滤涉及利用文档属性(例如类别、日期或用户上下文)来限制搜索范围。通过应用这些过滤器,系统减少了要考虑的文档数量,从而缩短了检索时间并降低了计算负载。
from langchain_core.documents import Document
from langchain.vectorstores import FAISS
from langchain.embeddings import OpenAIEmbeddings
## 具有元数据的样本文档
documents = [
Document(page_content="凯尔特人是我最喜欢的球队。", metadata={"topic": "sports"}),
Document(page_content="昨天股市崩盘。", metadata={"topic": "finance"}),
Document(page_content="人工智能技术的新进展。", metadata={"topic": "technology"}),
Document(page_content="下个月将举行地方选举。", metadata={"topic": "politics"}),
]
## 初始化嵌入和向量存储
embeddings = OpenAIEmbeddings()
vector_store = FAISS.from_documents(documents, embeddings)
## 定义元数据过滤器
metadata_filter = {"topic": "technology"}
## 在检索之前应用过滤器
filtered_docs = [doc for doc in documents if doc.metadata.get("topic") == metadata_filter["topic"]]
## 将过滤后的文档添加到向量存储
vector_store = FAISS.from_documents(filtered_docs, embeddings)
## 执行搜索查询
query = "人工智能的最新趋势是什么?"
retrieved_docs = vector_store.similarity_search(query)
## 显示检索到的文档
for doc in retrieved_docs:
print(doc.page_content)
首先根据“topic”元数据过滤文档,然后将其添加到向量存储中,确保在相似性搜索期间仅考虑相关文档。
稀疏检索方法(例如,BM25)
稀疏检索技术,如 BM25,依赖于精确的术语匹配,对于文档的初步筛选非常有效。通过在基于矢量的搜索之前应用 BM25,系统可以快速识别并缩小最相关的文档范围,从而减少后续密集检索方法的计算负担。
from langchain_core.documents import Document
from langchain_community.retrievers import BM25Retriever
## 示例文档
documents = [
Document(page_content="凯尔特人是我最喜欢的球队。"),
Document(page_content="股市昨天崩盘了。"),
Document(page_content="人工智能技术的新进展。"),
Document(page_content="下个月将举行地方选举。"),
]
## 初始化 BM25 检索器
retriever = BM25Retriever.from_documents(documents)
## 执行搜索查询
query = "人工智能进展"
retrieved_docs = retriever.get_relevant_documents(query)
## 显示检索到的文档
for doc in retrieved_docs:
print(doc.page_content)
BM25 检索器快速筛选包含与“人工智能进展”相关的术语的文档,为任何后续处理提供了一组精简的文档。
缓存和异步检索
实现这一目标的两种有效策略是缓存频繁的查询和实现异步检索方法。
缓存频繁的查询
缓存涉及存储昂贵操作(例如 LLM 响应)的结果,以便更有效地服务于未来的请求。通过缓存频繁的查询,我们可以防止冗余计算并减少延迟。
from langchain.llms import OpenAI
from langchain.cache import InMemoryCache
from langchain import set_global_cache
## 初始化 LLM
llm = OpenAI(model="text-davinci-003", temperature=0.7)
## 设置内存缓存
cache = InMemoryCache()
set_global_cache(cache)
## 使用缓存获取响应的函数
def get_response(prompt):
return llm(prompt)
## 使用示例
prompt = "法国的首都是什么?"
response = get_response(prompt)
print(response)
InMemoryCache 将响应存储在内存中,允许快速检索重复的查询。对于持久性缓存,LangChain 支持其他后端,如 SQLite。
如果您正在使用 Hugging Face 的 Transformers 库,您可以使用 Python 的 functools.lru_cache
实现缓存
from transformers import pipeline
from functools import lru_cache
## 初始化管道
qa_pipeline = pipeline("question-answering", model="distilbert-base-uncased-distilled-squad")
## 定义一个缓存装饰器
@lru_cache(maxsize=128)
def get_answer(question, context):
result = qa_pipeline(question=question, context=context)
return result['answer']
## 使用示例
context = "巴黎是法国的首都。"
question = "法国的首都是什么?"
answer = get_answer(question, context)
print(answer)
lru_cache 缓存 get_answer 函数的输出,避免了重复问题的冗余计算。
异步检索方法
异步编程允许 RAG 系统同时处理多个检索请求,从而缩短总响应时间。通过异步获取数据,LLM 可以在等待检索操作完成的同时处理其他任务。
import asyncio
from langchain.llms import OpenAI
## 初始化 LLM
llm = OpenAI(model="text-davinci-003", temperature=0.7)
## 异步获取响应的函数
async def get_response_async(prompt):
return await llm.ainvoke(prompt)
## 使用示例
prompt = "给我讲一个关于猫的笑话。"
## 运行异步函数
response = asyncio.run(get_response_async(prompt))
print(response)
ainvoke 允许 LLM 异步处理提示,使系统能够同时处理其他任务。
对于 Hugging Face 的 Transformers,您可以使用 transformers
库对异步操作的支持:
import asyncio
from transformers import pipeline
## 初始化管道
qa_pipeline = pipeline("question-answering", model="distilbert-base-uncased-distilled-squad")
## 异步获取答案的函数
async def get_answer_async(question, context):
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, qa_pipeline, {"question": question, "context": context})
return result['answer']
## 使用示例
context = "埃菲尔铁塔位于巴黎。"
question = "埃菲尔铁塔位于哪里?"
## 运行异步函数
answer = asyncio.run(get_answer_async(question, context))
print(answer)
run_in_executor 允许同步的 qa_pipeline 在单独的线程中运行,从而实现异步执行。
结合缓存和异步检索
通过将缓存与异步检索集成,RAG 系统可以实现最佳性能。可以立即返回缓存的响应,而异步检索确保高效地处理未缓存的请求。
实施这些策略需要仔细考虑特定的用例和工作负载特征。但是,如果应用得当,它们可以显著提高生成式 AI 应用程序的响应速度和可扩展性。
行动号召
在快速发展的 AI 领域中,确保您的检索增强生成 (RAG) 管道的效率和可扩展性至关重要。作为 AI 从业者和系统架构师,必须批判性地评估和改进这些管道,重点优化检索流程。忽略此方面可能导致延迟增加、用户参与度降低以及大规模潜在的系统故障。
立即采取行动:
- 评估您当前的 RAG 管道:分析您的检索机制的性能。找出瓶颈和发生延迟的区域。
- 实施缓存策略:对频繁查询使用缓存,以最大限度地减少冗余的检索操作,从而提高响应时间。
- 采用异步检索方法:使您的大型语言模型 (LLM) 能够与检索操作并行处理数据,从而减少总体延迟。
通过主动解决这些问题,您可以增强 AI 系统的性能和可扩展性,从而提高用户满意度和参与度。
进一步学习机会:
为了加深您对 RAG 及其应用的理解和专业知识,请考虑探索以下资源:
- 从头开始学习 RAG: 这个全面的 YouTube 播放列表提供了 RAG 实现的基础知识和高级技术。
- 金融领域的 LLM + RAG 用例: 探索 RAG 与 LLM 结合如何通过实际用例和见解彻底改变金融领域。
抓住这些学习机会,保持在 AI 创新的前沿,并确保您的系统高效且可扩展。
RAG Pipeline crashed when data is not correctly optimised stored or retrieved