Type something to search...
用Google Gemini 2.0 API构建RAG!推荐系统的未来在哪里?

用Google Gemini 2.0 API构建RAG!推荐系统的未来在哪里?

LangChain与Vertex AI RAG引擎在亚马逊产品数据上的比较

谷歌在人工智能竞赛中似乎一直处于落后,但在2025年之前发布的Gemini 2.0让人感觉他们终于在某种程度上赶上了。我起初不确定会有什么期待,但在试用后,我对其能力感到由衷的印象深刻。它甚至让我怀疑像ChatGPT、Claude或Llama这样的工具是否仍然必要。Gemini实时多模态API轻松处理复杂的商业问题,并与谷歌现有的工具如搜索和地图无缝集成。此外,谷歌的托管产品,如RAG引擎AI代理构建器Vertex AI for Retail都很出色,此外与LangChain灵活AI开发合作,使得构建适应性强的AI解决方案变得更加简单。

为了进行测试,我使用了Hugging Face提供的2023年亚马逊产品数据集来构建基于RAG的产品推荐系统,比较LangChain与Vertex AI RAG引擎。这个体验既实用又富有洞察力,展示了Gemini在解决现实商业挑战和促进灵活AI开发方面的巨大潜力。

接下来,我将探索Vertex AI for Retail和高级AI代理构建器的功能,并在进展中分享我的见解。

引言

本文探讨了用于产品推荐的检索增强生成(RAG)的实现,特别关注构建一个能够回答用户查询并建议相关产品的系统。我们将深入研究在谷歌云平台上的两种主要方法:利用流行的LangChain框架和完全托管的Vertex AI RAG引擎,两者都使用Vertex AI Search作为底层向量数据库。虽然PgVector、BigQuery、Pinecone、Weaviate和Vertex AI Feature Store等其他向量数据库也可以与这些RAG解决方案集成,但本文将集中于Vertex AI Search以进行针对性的比较,并在结论中提供其他数据库选项的简要概述。

什么是检索增强生成(RAG)?

检索增强生成(RAG)是一种强大的AI技术,结合了三种方法的优势:检索增强生成

  1. 检索: 在这一步中,RAG系统首先搜索大量文档(知识库),以找到与给定输入查询或提示相关的信息。这就像在数据库中搜索或使用搜索引擎。目标是找到最相关的文档或段落,这些文档可能包含查询的答案或提供有用的背景信息。
  2. 增强: 这是一个关键步骤,检索到的信息用于增强原始查询或提示。检索到的文档(或其部分)被添加到将要输入语言模型的内容中。这为模型提供了它本身无法访问的外部知识。
  3. 生成: 最后,一个大型语言模型(LLM)接受增强的输入(原始查询 + 检索到的上下文),并利用这些信息生成全面而连贯的响应。LLM现在可以利用检索步骤中的额外信息,生成更为知情、准确和上下文相关的输出。

从本质上讲,RAG通过外部知识增强了LLM的生成能力。检索到的上下文帮助LLM提供更准确和真实的响应,减少幻觉(编造信息),并回答超出其初始训练数据的主题问题。

类比: 想象一下你正在写一篇论文。你不仅依赖于自己的记忆(就像标准的LLM),还可以使用图书馆(检索组件)。你首先在图书馆中搜索相关的书籍和文章(检索),然后将这些来源的信息整合到你的论文提示中(增强)。最后,你写下你的论文,借鉴你自己的知识以及从图书馆获得的新信息(生成)。

RAG在产品推荐中的力量

在产品推荐的背景下,RAG可以显著增强用户体验并改善商业结果。具体表现如下:

  • 增强的产品推荐: RAG可以通过检索与用户查询或浏览历史相关的产品,考虑文本描述、特征甚至客户评价,提供更相关和个性化的产品推荐。
  • 改进的搜索功能: RAG可以超越关键词匹配,理解用户查询的语义含义,从而在在线商店或产品目录中产生更准确和有用的搜索结果。
  • AI驱动的聊天机器人: RAG使得创建智能聊天机器人成为可能,这些机器人可以回答客户关于产品、运输、退货及其他相关主题的问题,通过检索产品目录、常见问题解答和其他相关文档中的信息。
  • 动态内容生成: RAG可以用于生成动态产品描述、营销材料或甚至基于检索到的产品信息和用户偏好的个性化电子邮件内容。

通过将大型语言模型与外部知识源结合,RAG帮助克服知识截止和幻觉等限制,使其在产品推荐和其他应用中更可靠和有用。

谷歌还提供了Vertex AI Search for Retail,它包括用于产品发现的预配置场景,且易于使用。

https://cloud.google.com/solutions/retail-product-discovery?hl=en

谷歌云上的RAG:LangChain与Vertex AI RAG引擎

本文探讨了在谷歌云上构建RAG应用的两种主要方法:

  • LangChain 一个流行的开源框架,用于开发大型语言模型的应用。它提供了一种灵活和模块化的方式来构建RAG管道。虽然LangChain可以与各种谷歌云服务集成,如Vertex AI Search、PgVector和BigQuery,以及其他向量数据库如Pinecone和Weaviate,但本文将重点使用LangChain与Vertex AI Search
  • Vertex AI RAG引擎 一项较新的完全托管服务,专门设计用于在谷歌云上构建RAG应用。它通过处理许多底层复杂性,包括数据摄取、嵌入生成和向量存储/检索,简化了开发过程。本文将主要关注使用Vertex AI Vector Search作为其底层向量数据库。其他选项如PineconeWeaviateVertex AI Feature Store和默认的RagManagedDb也可用,但在此仅在结论中提及以供了解。

其他向量数据库概述

虽然本文重点介绍Vertex AI Search,但这里还有一些与LangChain和Vertex AI RAG引擎兼容的其他向量数据库选项:

选择合适的解决方案 & 向量数据库

最佳的RAG解决方案和向量数据库取决于几个因素:

  • 项目规模和复杂性: 对于简单的原型或小规模应用,RAG引擎与RagManagedDb或LangChain与BigQuery可能足够。对于更大、更复杂的项目,考虑使用LangChain与Vertex AI Search或RAG引擎与Vertex AI Search、Pinecone或Weaviate。
  • 性能要求: 如果低延迟检索至关重要,Vertex AI Search、Pinecone和Weaviate通常是强有力的选择。
  • 现有基础设施: 如果您已经在使用PostgreSQL,LangChain与PgVector可能适合您。如果您在Google Cloud上投入较多,Vertex AI服务可能更方便。
  • 开发时间和精力: RAG引擎由于其托管特性提供更快的开发速度。LangChain提供更多的灵活性,但可能需要更多的开发时间。
  • 成本: 仔细评估每项服务(向量数据库、LLM、RAG引擎等)的定价模型,以估算您用例的整体成本。
  • 团队专业知识: 选择与您团队的技能和经验相匹配的解决方案。
  • 多模态需求: 如果您计划在未来纳入多模态嵌入(图像、视频),请确保所选的向量数据库支持它们或有明确的集成路径。

最后的想法

LangChain和Vertex AI RAG引擎都提供了在Google Cloud上构建RAG应用程序的强大方式。LangChain提供灵活性和控制,而RAG引擎则提供简化的托管体验。通过理解每种方法的优缺点,并仔细考虑您项目的具体要求,您可以选择最适合您需求的解决方案,并构建一个强大而有效的RAG驱动产品推荐系统。请记住,RAG领域正在快速发展,因此跟上这两个框架和基础技术的最新进展至关重要。

技术实现细节

数据集:Hugging Face 亚马逊评论(美容类)

为了演示本文中的概念,我们将使用一个真实世界的数据集:Hugging Face 亚马逊评论数据集,特别是**“raw_meta_All_Beauty”**拆分。

所需的 APIs 和服务

要构建我们的 RAG 应用程序,需要启用多个 Google Cloud APIs 和服务。您可以使用 Google Cloud Console(图形界面)或 gcloud 命令行工具 来完成此操作。以下是启用所需 APIs 的步骤:

选项 1:使用 Google Cloud 控制台

  1. 导航到 Google Cloud 控制台: https://console.cloud.google.com
  2. 选择您的项目(例如,rag-product-recommendation 或您选择的项目名称)。
  3. 点击 “+ 启用 API 和服务” 按钮。
  4. 搜索并启用以下 API:

核心 API(必需):

  • Vertex AI API 启用 API & 创建 API 密钥/凭据以使用 https://console.cloud.google.com/apis/library/aiplatform.googleapis.com 文档:https://cloud.google.com/vertex-ai

选项 2:使用 gcloud CLI

如果您更喜欢使用命令行,可以使用以下命令启用所有必需的 API:

gcloud services enable \
    aiplatform.googleapis.com \
    generativelanguage.googleapis.com  

安装 Google Cloud CLI: 如果您没有安装,请按照此处的说明进行操作:https://cloud.google.com/sdk/docs/install

IAM 权限

确保您使用的用户或服务帐户具有访问和管理所需资源的必要 IAM 权限。如果您需要完全控制,还需要授予 Vertex AI 管理员访问权限。以下是一些基本的 IAM 角色:

## 替换为您的项目 ID 和服务帐户
PROJECT_ID="rag-product-recommendation"
SERVICE_ACCOUNT="rag-sa@${PROJECT_ID}.iam.gserviceaccount.com"

## 如果您需要完全控制,则授予 Vertex AI 管理员访问权限
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:${SERVICE_ACCOUNT}" \
    --role="roles/aiplatform.admin"

## 添加存储管理员以管理向量和嵌入
gcloud projects add-iam-policy-binding $PROJECT_ID \
    --member="serviceAccount:${SERVICE_ACCOUNT}" \
    --role="roles/storage.admin"

安装库(SDK)

我们将使用以下 Python 库来进行此项目:

  • google-cloud-aiplatform:Vertex AI 的 Python SDK。
  • google-cloud-storage:用于与 Google Cloud Storage 交互。
  • datasets:用于下载 Hugging Face 数据集。
  • langchain:用于使用 LangChain 框架实现 RAG。
  • langchain-google-vertexai:用于将 LangChain 与 Google Vertex AI 服务集成。

打开您的终端或命令提示符并运行以下命令:

pip install --upgrade google-cloud-aiplatform google-cloud-storage datasets langchain langchain-google-vertexai

数据加载、预处理和存储

在此阶段,我们将使用辅助函数加载、清理并将数据集存储在 GCS 中。

从 Hugging Face 加载 Amazon Beauty 数据集

我们首先使用 Hugging Face datasets 库加载数据集。

from datasets import load_dataset

def load_product_data():
    
    print("正在加载 Amazon Beauty 产品数据集...")

    # 加载数据集
    dataset = load_dataset(
        "McAuley-Lab/Amazon-Reviews-2023",
        "raw_meta_All_Beauty",
        trust_remote_code=True
    )

    # 转换为字典列表以便于处理
    products = list(dataset['full'])

    print(f"加载了 {len(products)} 个产品")
    return products


products = load_product_data()

数据清理和预处理

此阶段涉及清理加载的数据,以确保其质量和一致性。

def clean_product_data(products, debug_sample_size=5):
   
    print("\n开始进行增强的价格分析数据清理...")
    initial_count = len(products)
    
    # 增强的价格问题跟踪
    price_issues = {
        'none_price': {'count': 0, 'examples': []},  # 专门针对 'None' 字符串值
        'missing_price': {'count': 0, 'examples': []},  # 针对 None 类型值
        'non_numeric_price': {'count': 0, 'examples': []},  # 针对其他无效格式
        'empty_string': {'count': 0, 'examples': []},  # 针对空字符串
        'other_issues': {'count': 0, 'examples': []}  # 针对意外情况
    }
    
    cleaned_products = []
    
    for product in products:
        if not product.get('title'):
            continue
            
        try:
            price = None
            raw_price = product.get('price')
            
            # 分类价格问题
            if raw_price is None:
                price_issues['missing_price']['count'] += 1
                if len(price_issues['missing_price']['examples']) < debug_sample_size:
                    price_issues['missing_price']['examples'].append({
                        'title': product.get('title'),
                        'raw_price': 'None (type)',
                        'price_type': type(raw_price).__name__
                    })
                continue
                
            if isinstance(raw_price, str):
                if raw_price.lower() == 'none':
                    price_issues['none_price']['count'] += 1
                    if len(price_issues['none_price']['examples']) < debug_sample_size:
                        price_issues['none_price']['examples'].append({
                            'title': product.get('title'),
                            'raw_price': raw_price
                        })
                    continue
                    
                if not raw_price.strip():
                    price_issues['empty_string']['count'] += 1
                    if len(price_issues['empty_string']['examples']) < debug_sample_size:
                        price_issues['empty_string']['examples'].append({
                            'title': product.get('title'),
                            'raw_price': '空字符串'
                        })
                    continue
                
                # 尝试清理并转换非 None 字符串价格
                try:
                    cleaned_price = raw_price.replace('$', '').replace(',', '').strip()
                    price = float(cleaned_price)
                except ValueError:
                    price_issues['non_numeric_price']['count'] += 1
                    if len(price_issues['non_numeric_price']['examples']) < debug_sample_size:
                        price_issues['non_numeric_price']['examples'].append({
                            'title': product.get('title'),
                            'raw_price': raw_price,
                            'price_type': type(raw_price).__name__
                        })
                    continue
                    
            elif isinstance(raw_price, (int, float)):
                price = float(raw_price)
            else:
                price_issues['other_issues']['count'] += 1
                if len(price_issues['other_issues']['examples']) < debug_sample_size:
                    price_issues['other_issues']['examples'].append({
                        'title': product.get('title'),
                        'raw_price': raw_price,
                        'price_type': type(raw_price).__name__
                    })
                continue
                
            if price is not None and price > 0:
                cleaned_product = product.copy()
                cleaned_product['price'] = price
                cleaned_products.append(cleaned_product)
                
        except Exception as e:
            price_issues['other_issues']['count'] += 1
            if len(price_issues['other_issues']['examples']) < debug_sample_size:
                price_issues['other_issues']['examples'].append({
                    'title': product.get('title'),
                    'raw_price': raw_price,
                    'error': str(e)
                })
    
    # 打印详细分析
    print("\n详细价格分析:")
    print(f"总共分析的产品数量:{initial_count}")
    print(f"具有有效价格的产品数量:{len(cleaned_products)}")
    
    print("\n价格问题细分:")
    total_issues = 0
    for issue_type, data in price_issues.items():
        if data['count'] > 0:
            print(f"\n{issue_type.replace('_', ' ').title()}:")
            print(f"数量: {data['count']}")
            total_issues += data['count']
            if data['examples']:
                print("示例产品:")
                for idx, example in enumerate(data['examples'][:debug_sample_size], 1):
                    print(f"  {idx}. {example}")
    
    print(f"\n总无效价格数量:{total_issues}")
    print(f"最终有效产品数量:{len(cleaned_products)}")
    
    return cleaned_products

## 运行增强分析
cleaned_products = clean_product_data(products)

为 RAG 准备数据(JSONL 格式)

为了有效地准备数据以供 Vertex AI RAG 引擎摄取,我们将把清理后的产品数据格式化为 JSON Lines(JSONL)格式,并在每个文件中存储多个产品。

  • JSON Lines 格式: 在 JSONL 中,每一行都是一个有效的 JSON 对象。此格式非常适合大型数据集,并且受 RAG Engine 的 import_files() 方法支持。
  • 每个文件多个产品: 我们将多个产品分组到每个 JSONL 文件中,而不是为每个产品创建单独的文件。这减少了文件的总数,并可以提高导入效率。
from google.cloud import storage
import os
import json

## Configuration for Google Cloud Storage
PROJECT_ID = "rag-product-recommendation"  # Replace with your project ID
BUCKET_NAME = "rag-genaipros"  # Replace with your bucket name
OUTPUT_PREFIX = "amazon_product_data"
GCS_BUCKET_PATH = f"gs://{BUCKET_NAME}/{OUTPUT_PREFIX}"

def upload_batch_to_gcs(bucket, batch, output_prefix, file_counter):
    
    file_name = f"{output_prefix}/products_{file_counter}.jsonl"
    blob = bucket.blob(file_name)
    
    # Convert batch to JSONL format
    jsonl_data = "\n".join(json.dumps(product) for product in batch)
    
    # Upload to GCS 
    blob.upload_from_string(jsonl_data, content_type="application/jsonl")
    print(f"Uploaded {len(batch)} products to gs://{bucket.name}/{file_name}")

def prepare_data_for_rag_jsonl(cleaned_products, bucket_name, output_prefix, products_per_file=1000):
    
    print(f"\nPreparing data upload to GCS...")
    
    # Initialize GCS client
    storage_client = storage.Client(project=PROJECT_ID)
    bucket = storage_client.bucket(bucket_name)
    
    # Batch processing variables
    file_counter = 0
    product_counter = 0
    batch = []
    total_products = len(cleaned_products)

    for product in cleaned_products:
        # Create a standardized product entry
        product_data = {
            "title": product.get("title", ""),
            "price": product.get("price", "N/A"),
            "rating": product.get("average_rating", "N/A"),
            "category": product.get("main_category", ""),
            "features": product.get("features", ""),
            "description": product.get("description", ""),
            # Keep URLs for potential future multi-modal implementation
            "image_urls": product.get("image_urls", []),
            "video_urls": product.get("video_urls", []),
        }
        batch.append(product_data)
        product_counter += 1

        # When batch is full, upload it
        if product_counter >= products_per_file:
            upload_batch_to_gcs(bucket, batch, output_prefix, file_counter)
            print(f"Progress: {min((file_counter + 1) * products_per_file, total_products)}/{total_products} products processed")
            file_counter += 1
            product_counter = 0
            batch = []

    # Upload any remaining products
    if batch:
        upload_batch_to_gcs(bucket, batch, output_prefix, file_counter)
        print(f"Final batch uploaded: {total_products}/{total_products} products processed")

为什么将数据加载到 Google Cloud Storage (GCS)?

将数据存储在 Google Cloud Storage (GCS) 中是构建带有 Vertex AI RAG 引擎的 RAG 管道的关键步骤。虽然数据集最初可以通过 datasets 库直接从 Hugging Face 加载,但 GCS 确保数据在 Google Cloud 生态系统内可访问,从而通过 rag.import_files() 方法实现无缝导入到 RAG 语料库中。GCS 提供了处理大数据集所需的可扩展性和效率,能够高效处理数百万条产品记录。它还支持数据管理和版本控制,使您能够轻松跟踪产品目录随时间的变化。此外,将数据存储在 GCS 中可以与其他 Google Cloud 服务(如用于预处理的 Dataflow 和用于 CI/CD 的 Cloud Build)顺畅集成,为您的 RAG 解决方案创建一个统一且优化的管道。

选择嵌入模型

在本系列文章中,我们将使用 Vertex AI 的 text-embedding-005 模型。这是一个强大的文本嵌入模型,为各种下游任务提供高质量的嵌入,包括信息检索。

text-embedding-005 的主要特性:

  • 高质量嵌入: 该模型在一个庞大的数据集上进行训练,生成能够有效捕捉语义意义的嵌入。
  • 768 维度: 该模型生成 768 维的嵌入,为文本提供丰富的表示。
  • 优化检索: 该模型专门设计用于像我们这样需要根据文本内容查找相似项的用例。

参考: https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/text-embeddings-api

初始化嵌入模型

在生成嵌入之前,我们需要使用 Vertex 初始化嵌入模型。

from langchain_google_vertexai import VertexAIEmbeddings
TEXT_EMBEDDING_MODEL_NAME = "text-embedding-005"
text_embedding_model = VertexAIEmbeddings(model_name=TEXT_EMBEDDING_MODEL_NAME)
print(f"Initialized embedding model: {TEXT_EMBEDDING_MODEL_NAME}")

从 Google Cloud Storage (GCS) 加载数据

由于我们将同时使用 LangChain 和 Vertex AI RAG 引擎,因此我们需要确保数据对两者都可访问。我们已经在上一篇文章中将数据上传到 GCS,因此这里我们将定义一个从 GCS 加载数据的函数:

TEXT_EMBEDDING_MODEL_NAME = "text-embedding-005"
INPUT_PREFIX = "amazon_product_data"  # 存储 JSONL 文件的前缀
GCS_INPUT_PATH = f"gs://{BUCKET_NAME}/{INPUT_PREFIX}"
from langchain_google_vertexai import VertexAIEmbeddings

text_embedding_model = VertexAIEmbeddings(model_name=TEXT_EMBEDDING_MODEL_NAME)
print(f"Initialized embedding model: {TEXT_EMBEDDING_MODEL_NAME}")

def load_data_from_gcs(bucket_name, input_prefix):
    print(f"\nLoading data from GCS: {GCS_INPUT_PATH}...")
    storage_client = storage.Client(project=PROJECT_ID)
    bucket = storage_client.bucket(bucket_name)

    products = []
    blobs = bucket.list_blobs(prefix=input_prefix)
    for blob in blobs:
        if blob.name.endswith(".jsonl"):
            print(f"Processing: {blob.name}")
            file_content = blob.download_as_string()
            for line in file_content.decode("utf-8").splitlines():
                try:
                    product = json.loads(line)
                    products.append(product)
                except json.JSONDecodeError as e:
                    print(f"Error decoding JSON from line: {line}. Error: {e}")

    print(f"Loaded {len(products)} products from GCS")
    return products


def generate_text_embeddings_for_products(products, batch_size=1000):
    print(f"\nGenerating text embeddings for {len(products)} products...")
    all_embeddings = []

    for i in range(0, len(products), batch_size):
        batch = products[i:i + batch_size]
        batch_embeddings = []
        texts = [
            f"Title: {product.get('title', '')} Description: {product.get('description', '')} Features: {product.get('features', '')}"
            for product in batch
        ]

        try:
            batch_embeddings = text_embedding_model.embed_documents(texts)
            all_embeddings.extend(batch_embeddings)
            print(f"Processed batch {i // batch_size + 1}/{len(products) // batch_size + 1}")
        except Exception as e:
            print(f"Error generating embeddings for batch starting at index {i}: {e}")

    print(f"Generated embeddings for {len(all_embeddings)} products.")
    return all_embeddings


products = load_data_from_gcs(BUCKET_NAME, INPUT_PREFIX)
embeddings = generate_text_embeddings_for_products(products)

创建 LangChain Document 对象

现在,我们将使用来自 GCS 的 products 数据以及嵌入来创建 LangChain Document 对象,这些对象将用于我们的 LangChain RAG 实现。

from langchain_core.documents import Document

def create_langchain_documents(cleaned_products):
    
    documents = []
    
    for product in cleaned_products:
        # 创建与 GCS 中使用的相同文本内容结构
        text_content = (
            f"Title: {product.get('title', '')}\n"
            f"Price: ${product.get('price', 'N/A')}\n"
            f"Rating: {product.get('average_rating', 'N/A')} stars\n"
            f"Category: {product.get('main_category', '')}\n"
            f"Features: {product.get('features', '')}\n"
            f"Description: {product.get('description', '')}"
        ).strip()
        
        # 创建与 GCS JSONL 结构完全匹配的元数据
        metadata = {
            "title": product.get("title", ""),
            "price": product.get("price", "N/A"),
            "rating": product.get("average_rating", "N/A"),
            "category": product.get("main_category", ""),
            "features": product.get("features", ""),
            "description": product.get("description", ""),
            "image_urls": product.get("image_urls", []),
            "video_urls": product.get("video_urls", [])
        }
        
        # 创建具有对齐结构的 Document 对象
        documents.append(
            Document(
                page_content=text_content,
                metadata=metadata
            )
        )       
    return documents
documents = create_langchain_documents(cleaned_products)

创建 Vertex AI 搜索索引

首先,我们需要在 Vertex AI 搜索中创建一个索引,以存储我们的产品嵌入。索引本质上是一个数据容器,允许高效的相似性搜索。

## 创建索引
indexes = aiplatform.MatchingEngineIndex.list()
INDEX_RESOURCE_NAME = None

for index in indexes:
    if index.display_name == "amazon_beauty_langchain_index":
        INDEX_RESOURCE_NAME = index.resource_name
        print(f"Found existing index: {INDEX_RESOURCE_NAME}")
        break

if not INDEX_RESOURCE_NAME:
    my_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
        display_name="amazon_beauty_langchain_index",
        description="Langchain Index for Amazon Beauty product embeddings",
        dimensions=768, 
        approximate_neighbors_count=10,
        leaf_node_embedding_count=500,
        leaf_nodes_to_search_percent=7,
        distance_measure_type="DOT_PRODUCT_DISTANCE",
        feature_norm_type="UNIT_L2_NORM",
        index_update_method="BATCH_UPDATE",
    )
    INDEX_RESOURCE_NAME = my_index.resource_name
    print(f"Created index: {INDEX_RESOURCE_NAME}")
## 创建索引
indexes = aiplatform.MatchingEngineIndex.list()
INDEX_RESOURCE_NAME = None

for index in indexes:
    if index.display_name == "amazon_beauty_ragengine_index":
        INDEX_RESOURCE_NAME = index.resource_name
        print(f"Found existing index: {INDEX_RESOURCE_NAME}")
        break

if not INDEX_RESOURCE_NAME:
    my_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
        display_name="amazon_beauty_ragengine_index",
        description="RAG Engine Index for Amazon Beauty product embeddings",
        dimensions=768, 
        approximate_neighbors_count=10,
        leaf_node_embedding_count=500,
        leaf_nodes_to_search_percent=7,
        distance_measure_type="DOT_PRODUCT_DISTANCE",
        feature_norm_type="UNIT_L2_NORM",
        index_update_method="STREAM_UPDATE",
    )
    INDEX_RESOURCE_NAME = my_index.resource_name
    print(f"Created index: {INDEX_RESOURCE_NAME}")

使用流和批量更新创建索引的说明

在此设置中,为不同目的创建了两个 Vertex AI 搜索索引。RAG 引擎索引使用 STREAM_UPDATE 以支持连续、低延迟的更新,使其非常适合实时应用程序,其中数据经常变化并且需要即时可用以进行检索。另一方面,LangChain索引使用 BATCH_UPDATE,更适合数据更新不那么频繁的场景,允许以优化性能的方式进行高效的批量更新。两个索引均配置为使用 Tree-AH 算法,这是近似最近邻(ANN)搜索的强大默认选择,并根据嵌入维度、邻居数量和搜索准确性等参数进行了调整,以处理由模型生成的嵌入。这种双索引策略确保了灵活性,满足 RAG 流水线中实时和批处理需求。

创建 Vertex AI 搜索端点

接下来,我们需要一个端点来对我们的索引进行查询。我将在这里展示其中一个作为示例。

## 创建端点 
endpoints = aiplatform.MatchingEngineIndexEndpoint.list()
ENDPOINT_RESOURCE_NAME = None

for endpoint in endpoints:
    if endpoint.display_name == "amazon_beauty_langchain_endpoint":
        ENDPOINT_RESOURCE_NAME = endpoint.resource_name
        print(f"Found existing endpoint: {ENDPOINT_RESOURCE_NAME}")
        break

if not ENDPOINT_RESOURCE_NAME:
    my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
        display_name="amazon_beauty_langchain_endpoint",
        description="Endpoint for serving product recommendations",
        public_endpoint_enabled=True  # 如果不需要公共端点,则设置为 False
    )
    ENDPOINT_RESOURCE_NAME = my_index_endpoint.resource_name
    print(f"Created endpoint: {ENDPOINT_RESOURCE_NAME}")

将索引部署到端点

现在,我们将索引部署到端点,使其准备好处理查询。

## 将索引部署到端点 ---
try:
    my_index = aiplatform.MatchingEngineIndex(INDEX_RESOURCE_NAME)
    my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint(ENDPOINT_RESOURCE_NAME)

    if my_index and my_index_endpoint:
        
        index_deployed = False
        for deployed_index in my_index_endpoint.deployed_indexes:
            if deployed_index.index == my_index.resource_name:
                index_deployed = True
                print(
                    f"Index {my_index.resource_name} is already deployed to endpoint {my_index_endpoint.resource_name}"
                )
                break

        if not index_deployed:
            
            deployed_index_id = "amazon_beauty_langchain_deployed_index"

            my_index_endpoint.deploy_index(
                index=my_index, deployed_index_id=deployed_index_id
            )
            print(
                f"Deployed index {my_index.resource_name} to endpoint {my_index_endpoint.resource_name} with deployed_index_id: {deployed_index_id}"
            )
        else:
            print("Skipping deployment as the index is already deployed.")
    else:
        print("Index or endpoint not defined.")

except Exception as e:
    print(f"Error during index deployment: {e}")

在 Vertex AI Search 中使用 LangChain 存储嵌入

在这里,我们使用 LangChain 的 VectorSearchVectorStore 连接到 Vertex AI Search,并添加我们的文档及其预计算的嵌入。

from langchain_google_vertexai import VectorSearchVectorStore
vector_store = VectorSearchVectorStore.from_components(
    project_id=PROJECT_ID,
    region=LOCATION,
    index_id=INDEX_RESOURCE_NAME.split("/")[-1],
    endpoint_id=ENDPOINT_RESOURCE_NAME.split("/")[-1],
    embedding=text_embedding_model,
    gcs_bucket_name=BUCKET_NAME.replace("gs://", "").split("/")[0]
    )

batch_size = 5
for i in range(0, len(documents), batch_size):
    batch_docs = documents[i : i + batch_size]
    vector_store.add_documents(documents=batch_docs)
    print(
        f"Added batch {i // batch_size + 1}/{len(documents) // batch_size + 1} to Vertex AI Search"
    )

print("Embeddings added to Vertex AI Search.")

使用 LangChain 构建 RAG 链

现在,我们使用 LangChain 的组件构建 RAG 链:

from langchain_google_vertexai import ChatVertexAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

llm = ChatVertexAI(
    model_name="gemini-exp-1206", project=PROJECT_ID, location=LOCATION #gemini-1.5-pro
)

## 定义提示模板
prompt_template = """
You are a helpful product recommender. Answer the question based on the context below.
If you don't know the answer, just say that you don't know. Don't try to make up an answer.

Context:
{context}

Question: {question}
"""

prompt = ChatPromptTemplate.from_template(prompt_template)
## 创建检索器
retriever = vector_store.as_retriever(search_kwargs={"k": 5})  # 检索前 5 个文档
## 构建 RAG 链
rag_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)
print("RAG chain created with LangChain and Vertex AI Search.")
query = "Recommend a good moisturizing cream for sensitive skin under $30."
response = rag_chain.invoke(query)
print(f"Query: {query}")
print(f"Response: {response}")

RAG 实现 — Vertex AI RAG 引擎与 Vertex AI Search

现在,我们将探索 Google Cloud 完全托管的解决方案:Vertex AI RAG 引擎。RAG 引擎通过处理数据摄取、嵌入生成、向量存储和检索,简化了创建 RAG 应用程序的过程。

Vertex AI RAG 引擎工作流程:

  1. 创建 RAG 语料库: RAG 语料库充当 RAG 引擎中数据的容器。
  2. 配置向量数据库: 我们将现有的 Vertex AI 向量搜索索引和端点链接到 RAG 语料库。
  3. 导入数据: 我们将产品数据(以 JSONL 格式)从 GCS 导入 RAG 语料库。RAG 引擎将使用默认的文本嵌入模型自动生成嵌入(如果需要,您也可以在创建语料库时指定不同的模型)。
  4. 构建 RAG 工具和模型: 我们将使用 Tool.from_retrieval() 创建检索工具,并将其与 Gemini 模型集成。
  5. 查询和生成: 我们将使用增强了检索工具的 Gemini 模型,根据 RAG 语料库中的数据回答用户查询。
## Vertex AI Vector Search 作为向量数据库
vector_db = rag.VertexVectorSearch(
    index=my_index.resource_name, index_endpoint=my_index_endpoint.resource_name
)

## RAG 语料库
DISPLAY_NAME = "amazon-beauty-rag-engine"

## 创建 RAG 语料库
rag_corpus = rag.create_corpus(display_name=DISPLAY_NAME, vector_db=vector_db)
print(f"Created RAG Corpus resource: {rag_corpus.name}")

## 从 GCS 导入数据到 RAG 语料库 

response = rag.import_files(
    corpus_name=rag_corpus.name,
    paths=[GCS_BUCKET_PATH],
    chunk_size=1024,
    chunk_overlap=100,
)
print(f"Data imported into RAG Corpus: {response}")
## 为 RAG 语料库创建检索工具
rag_resource = rag.RagResource(rag_corpus=rag_corpus.name)

rag_retrieval_tool = Tool.from_retrieval(
    retrieval=rag.Retrieval(
        source=rag.VertexRagStore(
            rag_resources=[rag_resource],
            similarity_top_k=5,
            vector_distance_threshold=0.4,
        )
    )
)

## 将检索工具添加到 Gemini 模型
## rag_model = GenerativeModel("gemini-2.0-flash-exp", tools=[rag_retrieval_tool])
rag_model = GenerativeModel("gemini-exp-1206", tools=[rag_retrieval_tool])
print("RAG Tool and Model created.")

## --- 查询和生成响应 ---

query = "Recommend a good moisturizing cream for sensitive skin under $30."
response = rag_model.generate_content(query)
print(f"Query: {query}")
print(f"Response: {response.text}")

与聊天机器人接口集成

我们构建的 RAG 应用程序,无论是使用 LangChain 还是 Vertex AI RAG 引擎,都可以通过与聊天机器人接口的集成进一步增强。这允许对您的产品推荐进行互动和对话式访问。虽然我们在这里不会深入讨论完整的实现细节,但一种方法是将您的 RAG 模型部署为 web 服务,例如在 Google Cloud Functions 上,然后将其连接到对话式 AI 平台,如 Google Cloud 的 Vertex AI Agent Builder。Agent Builder 提供了一个用户友好的无代码环境,用于设计和部署聊天机器人。通过在 Agent Builder 中配置一个指向您部署的 RAG 函数的 webhook,您可以无缝地将用户查询路由到您的 RAG 模型,使聊天机器人能够根据检索到的信息和底层语言模型的生成能力提供智能的数据驱动产品推荐。简而言之,RAG 应用程序可以轻松地暴露为聊天机器人。

from google.cloud import aiplatform
from vertexai.preview import rag
from vertexai.preview.generative_models import GenerativeModel, Tool
import functions_framework

## 配置值
PROJECT_ID = "rag-product-recommendation"
LOCATION = "us-central1"
INDEX_DISPLAY_NAME = "amazon_beauty_ragengine_index"
ENDPOINT_DISPLAY_NAME = "amazon_beauty_ragengine_endpoint"
RAG_CORPUS_DISPLAY_NAME = "amazon-beauty-rag-engine"


def initialize_rag_components():
    """
    通过设置向量搜索和语料库组件初始化 RAG 系统。
    此函数遵循与您笔记本中相同的模式,但适应于 Cloud Functions 环境。
    """
    try:
        # 首先,我们需要获取我们的索引
        indexes = aiplatform.MatchingEngineIndex.list(
            filter=f'display_name="{INDEX_DISPLAY_NAME}"',
            project=PROJECT_ID,
            location=LOCATION,
        )
        if not indexes:
            raise ValueError("Index not found")
        my_index = indexes[0]

        # 然后获取我们的端点
        endpoints = aiplatform.MatchingEngineIndexEndpoint.list(
            filter=f'display_name="{ENDPOINT_DISPLAY_NAME}"',
            project=PROJECT_ID,
            location=LOCATION,
        )
        if not endpoints:
            raise ValueError("Endpoint not found")
        my_index_endpoint = endpoints[0]

        # 设置向量搜索
        vector_db = rag.VertexVectorSearch(
            index=my_index.resource_name,
            index_endpoint=my_index_endpoint.resource_name
        )

        # 我们将尝试直接获取语料库,而不是使用项目参数列出语料库
        try:
            # 首先尝试获取现有语料库
            rag_corpus = next(
                (corpus for corpus in rag.list_corpora()
                 if corpus.display_name == RAG_CORPUS_DISPLAY_NAME),
                None
            )

            if not rag_corpus:
                # 如果不存在,则创建新语料库
                rag_corpus = rag.create_corpus(
                    display_name=RAG_CORPUS_DISPLAY_NAME,
                    vector_db=vector_db
                )
                print(f"Created new RAG Corpus: {rag_corpus.name}")
            else:
                print(f"Using existing RAG Corpus: {rag_corpus.name}")

        except Exception as e:
            print(f"Error with corpus operations: {e}")
            raise

        # 创建 RAG 资源和检索工具
        rag_resource = rag.RagResource(rag_corpus=rag_corpus.name)
        rag_retrieval_tool = Tool.from_retrieval(
            retrieval=rag.Retrieval(
                source=rag.VertexRagStore(
                    rag_resources=[rag_resource],
                    similarity_top_k=5,
                    vector_distance_threshold=0.4
                )
            )
        )

        # 使用检索工具初始化模型
        return GenerativeModel("gemini-exp-1206", tools=[rag_retrieval_tool])

    except Exception as e:
        print(f"Error in initialize_rag_components: {e}")
        raise


## 全局初始化模型
rag_model = initialize_rag_components()


@functions_framework.http
def query_rag(request):
    """
    处理 RAG 查询的云函数。
    此函数接收 HTTP 请求并使用 RAG 模型返回响应。
    """
    try:
        request_json = request.get_json(silent=True)
        if not request_json or 'query' not in request_json:
            return {'error': 'Missing query parameter'}, 400

        query = request_json['query']
        response = rag_model.generate_content(query)
        return {'response': response.text}

    except Exception as e:
        print(f"Error processing query: {e}")
        return {'error': str(e)}, 500

部署 Google Cloud 函数

使用以下命令部署 query_rag_function

 gcloud functions deploy query_rag_function --region us-central1 --runtime python311 --source . --entry-point query_rag_function --trigger-http --allow-unauthenticated --timeout=540s

结论

在对用于产品推荐的检索增强生成(RAG)的全面探索中,我们经历了 Google Cloud 上两种独特而强大的方法:灵活的开源 LangChain 框架和完全托管的 Vertex AI RAG 引擎。这两种实现都利用 Vertex AI Search 作为核心向量数据库,展示了其在高效存储和检索产品嵌入方面的能力。通过比较这些方法,我们看到 LangChain 在 RAG 管道的每个方面提供了细粒度的控制,使其非常适合复杂场景和希望进行定制的开发人员。相反,Vertex AI RAG 引擎提供了一种简化的用户体验,自动化了许多复杂的过程,并与 Google 的 Gemini 模型实现了无缝集成。虽然我们的重点仍然是 Vertex AI Search,但我们也承认了向量数据库的更广泛生态系统,包括 PgVectorBigQueryPineconeWeaviate,每种数据库根据项目规模、性能需求和现有基础设施提供独特的优势。最终,在 LangChain 和 Vertex AI RAG 引擎之间的选择取决于您项目的具体需求、团队的专业知识以及对控制和易用性之间的理想平衡。本文为您提供了做出明智决策的知识,并为构建强大、智能的 RAG 应用程序以进行产品推荐及其他应用奠定了基础,为未来探索多模态数据和更复杂的对话 AI 接口铺平了道路。

Related Posts

使用 ChatGPT 搜索网络功能的 10 种创意方法

使用 ChatGPT 搜索网络功能的 10 种创意方法

例如,提示和输出 你知道可以使用 ChatGPT 的“搜索网络”功能来完成许多任务,而不仅仅是基本的网络搜索吗? 对于那些不知道的人,ChatGPT 新的“搜索网络”功能提供实时信息。 截至撰写此帖时,该功能仅对使用 ChatGPT 4o 和 4o-mini 的付费会员开放。 ![](https://images.weserv.nl/?url=https://cdn-im

阅读更多
在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和技术领域保持领先地位的 10 项必学技能 📚

在人工智能和科技这样一个动态的行业中,保持领先意味着不断提升你的技能。无论你是希望深入了解人工智能模型性能、掌握数据分析,还是希望通过人工智能转变传统领域如法律,这些课程都是你成功的捷径。以下是一个精心策划的高价值课程列表,可以助力你的职业发展,并让你始终处于创新的前沿。 1. 生成性人工智能简介课程: [生成性人工智能简介](https://genai.works

阅读更多
10 个强大的 Perplexity AI 提示,让您的营销任务自动化

10 个强大的 Perplexity AI 提示,让您的营销任务自动化

在当今快速变化的数字世界中,营销人员总是在寻找更智能的方法来简化他们的工作。想象一下,有一个个人助理可以为您创建受众档案,建议营销策略,甚至为您撰写广告文案。这听起来像是一个梦想? 多亏了像 Perplexity 这样的 AI 工具,这个梦想现在成为现实。通过正确的提示,您可以将 AI 转变为您的 个人营销助理。在本文中,我将分享 10 个强大的提示,帮助您自动

阅读更多
10+ 面向 UI/UX 设计师的顶级 ChatGPT 提示

10+ 面向 UI/UX 设计师的顶级 ChatGPT 提示

人工智能技术,如机器学习、自然语言处理和数据分析,正在重新定义传统设计方法。从自动化重复任务到实现个性化用户体验,人工智能使设计师能够更加专注于战略思维和创造力。随着这一趋势的不断增长,UI/UX 设计师越来越多地采用 AI 驱动的工具来促进他们的工作。利用人工智能不仅能提供基于数据的洞察,还为满足多样化用户需求的创新设计解决方案开辟了机会。 1. 用户角色开发 目的

阅读更多
在几分钟内完成数月工作的 100 种人工智能工具

在几分钟内完成数月工作的 100 种人工智能工具

人工智能(AI)的快速发展改变了企业的运作方式,使人们能够在短短几分钟内完成曾经需要几周或几个月的任务。从内容创作到网站设计,AI工具帮助专业人士节省时间,提高生产力,专注于创造力。以下是按功能分类的100个AI工具的全面列表,以及它们在现实世界中的使用实例。 1. 研究工具 研究可能耗时,但人工智能工具使查找、分析和组织数据变得更加容易。**ChatGPT, Cop

阅读更多
你从未知道的 17 个令人惊叹的 GitHub 仓库

你从未知道的 17 个令人惊叹的 GitHub 仓库

Github 隐藏的宝石!! 立即收藏的代码库 学习编程相对简单,但掌握编写更好代码的艺术要困难得多。GitHub 是开发者的宝藏,那里“金子”是其他人分享的精心编写的代码。通过探索 GitHub,您可以发现如何编写更清晰的代码,理解高质量代码的样子,并学习成为更熟练开发者的基本步骤。 1. notwaldorf/emoji-translate *谁需

阅读更多