Building RAG for Product Recommendation using Google Gemini 2.0 API’s
- Rifx.Online
- Programming , Machine Learning , Chatbots
- 11 Jan, 2025
A Comparison of LangChain and Vertex AI RAG Engine on Amazon Product Data using Vertex AI Search
Google has always seemed behind in the AI race, but with the release of Gemini 2.0 just before 2025, it feels like they’ve finally caught up in a meaningful way. I wasn’t sure what to expect at first, but after trying it out, I’m genuinely impressed by its capabilities. It’s even got me wondering if tools like ChatGPT, Claude, or Llama are still necessary. The Gemini live multimodal API handles complex business problems effortlessly and integrates smoothly with Google’s existing tools like Search and Maps. On top of that, Google’s managed offerings, such as the RAG Engine, AI Agent Builder, and Vertex AI for Retail , are solid, and their partnership with LangChain for flexible AI development makes building adaptable AI solutions more straightforward.
To put it to the test, I used the 2023 Amazon product dataset from Hugging Face to build a RAG-based product recommendation system, comparing LangChain with the Vertex AI RAG Engine. The experience was practical and insightful, showing how much potential Gemini has to offer both in solving real-world business challenges and enabling flexible AI development..
Next, I’ll be exploring the Vertex AI for Retail and Advanced AI Agent Builder capabilities and will share my insights as I progress.
Introduction
This article explores the implementation of Retrieval-Augmented Generation (RAG) for product recommendation, specifically focusing on building a system that can answer user queries and suggest relevant products. We’ll delve into two primary approaches on Google Cloud Platform: leveraging the popular LangChain framework and the fully managed Vertex AI RAG Engine, both utilizing Vertex AI Search as the underlying vector database. While other vector databases like PgVector, BigQuery, Pinecone, Weaviate, and Vertex AI Feature Store can be integrated with these RAG solutions, this article will concentrate on Vertex AI Search for a focused comparison and provide a brief overview of other database options in the conclusion.
What is Retrieval-Augmented Generation (RAG)?
Retrieval-Augmented Generation (RAG) is a powerful AI technique that combines the strengths of three approaches: retrieval, augmentation, and generation.
- Retrieval: In this step, a RAG system first searches a large corpus of documents (a knowledge base) to find information relevant to a given input query or prompt. This is like searching through a database or using a search engine. The goal is to find the most relevant documents or passages that might contain the answer to the query or provide useful context.
- Augmentation: This is the crucial step where the retrieved information is used to augment the original query or prompt. The retrieved documents (or parts of them) are added to the input that will be fed to the language model. This provides the model with external knowledge that it wouldn’t otherwise have access to.
- Generation: Finally, a large language model (LLM) takes the augmented input (original query + retrieved context) and uses it to generate a comprehensive and coherent response. The LLM can now leverage the additional information from the retrieval step to produce a more informed, accurate, and contextually relevant output.
In essence, RAG augments the generative capabilities of LLMs with external knowledge. The retrieved context helps the LLM to provide more accurate and factual responses, reduce hallucinations (making up information), and answer questions about topics beyond its initial training data.
Analogy: Imagine you’re writing an essay. Instead of relying solely on your memory (like a standard LLM), you can also use a library (the retrieval component). You first search the library for relevant books and articles (retrieval), then you integrate information from those sources into your essay prompt (augmentation). Finally, you write your essay, drawing on both your knowledge and the newly acquired information from the library (generation).
The Power of RAG for Product Recommendation
In the context of product recommendation, RAG can significantly enhance the user experience and improve business outcomes. Here’s how:
- Enhanced Product Recommendations: RAG can power more relevant and personalized product recommendations by retrieving products similar to a user’s query or browsing history, considering factors like textual descriptions, features, and even customer reviews.
- Improved Search Functionality: RAG can go beyond keyword matching to understand the semantic meaning of user queries, leading to more accurate and helpful search results within an online store or product catalog.
- AI-Powered Chatbots: RAG enables the creation of intelligent chatbots that can answer customer questions about products, shipping, returns, and other related topics by retrieving information from product catalogs, FAQs, and other relevant documents.
- Dynamic Content Generation: RAG can be used to generate dynamic product descriptions, marketing materials, or even personalized email content based on retrieved information about products and user preferences.
By grounding large language models in external knowledge sources, RAG helps overcome limitations like knowledge cutoffs and hallucinations, making them more reliable and useful for product recommendation and other applications.
Google is also offering Vertex AI Search for Retail, which includes preconfigured scenarios for product discovery and is easy to use.
https://cloud.google.com/solutions/retail-product-discovery?hl=en
RAG on Google Cloud: LangChain vs. Vertex AI RAG Engine
This article explores two primary approaches to building RAG applications on Google Cloud:
- LangChain: A popular open-source framework for developing applications with large language models. It provides a flexible and modular way to build RAG pipelines. While LangChain can integrate with various Google Cloud services like Vertex AI Search, PgVector, and BigQuery, as well as other vector databases like Pinecone and Weaviate, this article will focus on using LangChain with Vertex AI Search.
- Vertex AI RAG Engine: A newer, fully managed service specifically designed for building RAG applications on Google Cloud. It simplifies development by handling many underlying complexities, including data ingestion, embedding generation, and vector storage/retrieval. This article will primarily focus on using Vertex AI Vector Search as its underlying vector database. Other options like Pinecone, Weaviate, Vertex AI Feature Store, and the default RagManagedDb are also available but are included here only for awareness in the conclusion.
Overview of Other Vector Databases
While this article focuses on Vertex AI Search, here are some other vector database options compatible with LangChain and the Vertex AI RAG Engine:
Choosing the Right Solution & Vector Database
The best RAG solution and vector database for your project depend on several factors:
- Project Scale and Complexity: For simple prototypes or small-scale applications, the RAG Engine with RagManagedDb or LangChain with BigQuery might suffice. For larger, more complex projects, consider LangChain with Vertex AI Search or the RAG Engine with Vertex AI Search, Pinecone, or Weaviate.
- Performance Requirements: If low-latency retrieval is critical, Vertex AI Search, Pinecone, and Weaviate are generally strong choices.
- Existing Infrastructure: If you’re already using PostgreSQL, LangChain with PgVector might be a good fit. If you’re heavily invested in Google Cloud, Vertex AI services are likely more convenient.
- Development Time and Effort: The RAG Engine offers faster development due to its managed nature. LangChain provides more flexibility but might require more development time.
- Cost: Carefully evaluate the pricing models of each service (vector database, LLM, RAG Engine, etc.) to estimate the overall cost for your use case.
- Team Expertise: Choose a solution that aligns with your team’s skills and experience.
- Multi-Modal Needs: If you plan to incorporate multi-modal embeddings (images, videos) in the future, ensure the chosen vector database supports them or has a clear path for integration.
Final Thoughts
Both LangChain and the Vertex AI RAG Engine offer powerful ways to build RAG applications on Google Cloud. LangChain provides flexibility and control, while the RAG Engine offers a streamlined, managed experience. By understanding the strengths and weaknesses of each approach, and carefully considering your project’s specific requirements, you can choose the solution that best fits your needs and build a robust and effective RAG-powered product recommendation system. Remember that the field of RAG is rapidly evolving, so staying up-to-date with the latest advancements in both frameworks and underlying technologies is essential.
TECHNICAL IMPLEMENTATION DETAILS
Dataset: Hugging Face Amazon Reviews (Beauty)
To demonstrate the concepts in this article, we’ll use a real-world dataset: the Hugging Face Amazon Reviews dataset, specifically the “raw_meta_All_Beauty” split.
- Dataset Source: https://huggingface.co/datasets/McAuley-Lab/Amazon-Reviews-2023
- Content: This dataset contains metadata about beauty products listed on Amazon, including product titles, descriptions, features, prices, ratings, and categories.
Required APIs and Services
To build our RAG application, several Google Cloud APIs and services need to be enabled. You can do this using either the Google Cloud Console (graphical interface) or the gcloud command-line tool. Below are the steps for enabling the required APIs:
Option 1: Using Google Cloud Console
- Navigate to the Google Cloud Console:
https://console.cloud.google.com
- Select your project (e.g.,
rag-product-recommendation
or your chosen project name). - Click the “+ ENABLE APIS AND SERVICES” button.
- Search for and enable the following APIs:
Core APIs (Required):
- Vertex AI API
Enable API & Create API Key/Credentials to use
https://console.cloud.google.com/apis/library/aiplatform.googleapis.com
Documentation:https://cloud.google.com/vertex
-ai
- Gemini API
Enable API:
https://console.cloud.google.com/apis/library/generativelanguage.googleapis.com
Documentation:https://ai.google.dev/gemini-
api
- Cloud Storage API
Enable API:
https://console.cloud.google.com/apis/library/storage-component.googleapis.com
Documentation:https://cloud.google.com/storage/d
ocs
Option 2: Using gcloud CLI
If you prefer using the command line, you can enable all required APIs with the following commands:
gcloud services enable \
aiplatform.googleapis.com \
generativelanguage.googleapis.com
Install the Google Cloud CLI: If you don’t have it, follow the instructions here: https://cloud.google.com/sdk/docs/install
IAM Permissions
Ensure that the user or service account you’re using has the necessary IAM permissions to access and manage the required resources. Also we need to grant Vertex AI admin access if you need full control. Here are some essential IAM roles:
## Replace with your project ID and service account
PROJECT_ID="rag-product-recommendation"
SERVICE_ACCOUNT="rag-sa@${PROJECT_ID}.iam.gserviceaccount.com"
## Grant Vertex AI admin access if you need full control
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SERVICE_ACCOUNT}" \
--role="roles/aiplatform.admin"
## Add Storage admin for managing vectors and embeddings
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SERVICE_ACCOUNT}" \
--role="roles/storage.admin"
Installing Libraries (SDKs)
We will use the following Python libraries for this project:
google-cloud-aiplatform
: The Vertex AI SDK for Python.google-cloud-storage
: For interacting with Google Cloud Storage.datasets
: For downloading the Hugging Face dataset.langchain
: For RAG implementation using LangChain framework.langchain-google-vertexai
: For integrating LangChain with Google Vertex AI services.
Open your terminal or command prompt and run the following command:
pip install --upgrade google-cloud-aiplatform google-cloud-storage datasets langchain langchain-google-vertexai
Data Loading, Preprocessing, and Storage
This is where we will use our helper functions to load, clean and store the dataset in GCS.
Loading the Amazon Beauty Dataset from Hugging Face
We begin by loading the dataset using the Hugging Face datasets
library.
from datasets import load_dataset
def load_product_data():
print("Loading Amazon Beauty product dataset...")
# Load the dataset
dataset = load_dataset(
"McAuley-Lab/Amazon-Reviews-2023",
"raw_meta_All_Beauty",
trust_remote_code=True
)
# Convert to list of dictionaries for easier processing
products = list(dataset['full'])
print(f"Loaded {len(products)} products")
return products
products = load_product_data()
Data Cleaning and Preprocessing
This stage involves cleaning the loaded data to ensure its quality and consistency.
def clean_product_data(products, debug_sample_size=5):
print("\nStarting data cleaning with enhanced price analysis...")
initial_count = len(products)
# Enhanced price issue tracking
price_issues = {
'none_price': {'count': 0, 'examples': []}, # Specifically for 'None' string values
'missing_price': {'count': 0, 'examples': []}, # For None type values
'non_numeric_price': {'count': 0, 'examples': []}, # For other invalid formats
'empty_string': {'count': 0, 'examples': []}, # For empty strings
'other_issues': {'count': 0, 'examples': []} # For unexpected cases
}
cleaned_products = []
for product in products:
if not product.get('title'):
continue
try:
price = None
raw_price = product.get('price')
# Categorize price issues
if raw_price is None:
price_issues['missing_price']['count'] += 1
if len(price_issues['missing_price']['examples']) < debug_sample_size:
price_issues['missing_price']['examples'].append({
'title': product.get('title'),
'raw_price': 'None (type)',
'price_type': type(raw_price).__name__
})
continue
if isinstance(raw_price, str):
if raw_price.lower() == 'none':
price_issues['none_price']['count'] += 1
if len(price_issues['none_price']['examples']) < debug_sample_size:
price_issues['none_price']['examples'].append({
'title': product.get('title'),
'raw_price': raw_price
})
continue
if not raw_price.strip():
price_issues['empty_string']['count'] += 1
if len(price_issues['empty_string']['examples']) < debug_sample_size:
price_issues['empty_string']['examples'].append({
'title': product.get('title'),
'raw_price': 'empty string'
})
continue
# Try to clean and convert non-None string prices
try:
cleaned_price = raw_price.replace('$', '').replace(',', '').strip()
price = float(cleaned_price)
except ValueError:
price_issues['non_numeric_price']['count'] += 1
if len(price_issues['non_numeric_price']['examples']) < debug_sample_size:
price_issues['non_numeric_price']['examples'].append({
'title': product.get('title'),
'raw_price': raw_price,
'price_type': type(raw_price).__name__
})
continue
elif isinstance(raw_price, (int, float)):
price = float(raw_price)
else:
price_issues['other_issues']['count'] += 1
if len(price_issues['other_issues']['examples']) < debug_sample_size:
price_issues['other_issues']['examples'].append({
'title': product.get('title'),
'raw_price': raw_price,
'price_type': type(raw_price).__name__
})
continue
if price is not None and price > 0:
cleaned_product = product.copy()
cleaned_product['price'] = price
cleaned_products.append(cleaned_product)
except Exception as e:
price_issues['other_issues']['count'] += 1
if len(price_issues['other_issues']['examples']) < debug_sample_size:
price_issues['other_issues']['examples'].append({
'title': product.get('title'),
'raw_price': raw_price,
'error': str(e)
})
# Print detailed analysis
print("\nDetailed Price Analysis:")
print(f"Total products analyzed: {initial_count}")
print(f"Products with valid prices: {len(cleaned_products)}")
print("\nPrice Issue Breakdown:")
total_issues = 0
for issue_type, data in price_issues.items():
if data['count'] > 0:
print(f"\n{issue_type.replace('_', ' ').title()}:")
print(f"Count: {data['count']}")
total_issues += data['count']
if data['examples']:
print("Example products:")
for idx, example in enumerate(data['examples'][:debug_sample_size], 1):
print(f" {idx}. {example}")
print(f"\nTotal invalid prices: {total_issues}")
print(f"Final valid products: {len(cleaned_products)}")
return cleaned_products
## Run the enhanced analysis
cleaned_products = clean_product_data(products)
Preparing Data for RAG (JSONL Format)
To efficiently prepare the data for ingestion into the Vertex AI RAG Engine, we will now format the cleaned product data into the JSON Lines (JSONL) format and store multiple products within each file.
- JSON Lines Format: In JSONL, each line is a valid JSON object. This format is well-suited for large datasets and is supported by the RAG Engine’s
import_files()
method. - Multiple Products per File: Instead of creating a separate file for each product, we’ll group multiple products into each JSONL file. This reduces the total number of files and can improve import efficiency.
from google.cloud import storage
import os
import json
## Configuration for Google Cloud Storage
PROJECT_ID = "rag-product-recommendation" # Replace with your project ID
BUCKET_NAME = "rag-genaipros" # Replace with your bucket name
OUTPUT_PREFIX = "amazon_product_data"
GCS_BUCKET_PATH = f"gs://{BUCKET_NAME}/{OUTPUT_PREFIX}"
def upload_batch_to_gcs(bucket, batch, output_prefix, file_counter):
file_name = f"{output_prefix}/products_{file_counter}.jsonl"
blob = bucket.blob(file_name)
# Convert batch to JSONL format
jsonl_data = "\n".join(json.dumps(product) for product in batch)
# Upload to GCS
blob.upload_from_string(jsonl_data, content_type="application/jsonl")
print(f"Uploaded {len(batch)} products to gs://{bucket.name}/{file_name}")
def prepare_data_for_rag_jsonl(cleaned_products, bucket_name, output_prefix, products_per_file=1000):
print(f"\nPreparing data upload to GCS...")
# Initialize GCS client
storage_client = storage.Client(project=PROJECT_ID)
bucket = storage_client.bucket(bucket_name)
# Batch processing variables
file_counter = 0
product_counter = 0
batch = []
total_products = len(cleaned_products)
for product in cleaned_products:
# Create a standardized product entry
product_data = {
"title": product.get("title", ""),
"price": product.get("price", "N/A"),
"rating": product.get("average_rating", "N/A"),
"category": product.get("main_category", ""),
"features": product.get("features", ""),
"description": product.get("description", ""),
# Keep URLs for potential future multi-modal implementation
"image_urls": product.get("image_urls", []),
"video_urls": product.get("video_urls", []),
}
batch.append(product_data)
product_counter += 1
# When batch is full, upload it
if product_counter >= products_per_file:
upload_batch_to_gcs(bucket, batch, output_prefix, file_counter)
print(f"Progress: {min((file_counter + 1) * products_per_file, total_products)}/{total_products} products processed")
file_counter += 1
product_counter = 0
batch = []
# Upload any remaining products
if batch:
upload_batch_to_gcs(bucket, batch, output_prefix, file_counter)
print(f"Final batch uploaded: {total_products}/{total_products} products processed")
Why Load Data to Google Cloud Storage (GCS)?
Storing data in Google Cloud Storage (GCS) is a crucial step when building a RAG pipeline with Vertex AI RAG Engine. While the dataset can initially be loaded directly from Hugging Face using the datasets
library, GCS ensures the data is accessible within the Google Cloud ecosystem, enabling seamless ingestion into the RAG Corpus via the rag.import_files()
method. GCS provides the scalability and efficiency needed to handle large datasets, offering high performance for processing millions of product records. It also supports data management and versioning, making it easy to track changes to your product catalog over time. Additionally, storing data in GCS allows smooth integration with other Google Cloud services, such as Dataflow for preprocessing and Cloud Build for CI/CD, creating a unified and optimized pipeline for your RAG solution.
Choosing an Embedding Model
For this article series, we’ll use the text-embedding-005 model from Vertex AI. This is a powerful text embedding model that provides high-quality embeddings for various downstream tasks, including information retrieval.
Key Features of text-embedding-005**:**
- High-Quality Embeddings: The model is trained on a massive dataset and generates embeddings that effectively capture semantic meaning.
- 768 Dimensions: The model produces embeddings with 768 dimensions, providing a rich representation of the text.
- Optimized for Retrieval: This model is specifically designed for use cases like ours, where we need to find similar items based on their textual content.
Reference: https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/text-embeddings-api
Initializing the Embedding Model
Before we can generate embeddings, we need to initialize the embedding model using the Vertex
from langchain_google_vertexai import VertexAIEmbeddings
TEXT_EMBEDDING_MODEL_NAME = "text-embedding-005"
text_embedding_model = VertexAIEmbeddings(model_name=TEXT_EMBEDDING_MODEL_NAME)
print(f"Initialized embedding model: {TEXT_EMBEDDING_MODEL_NAME}")
Loading Data from Google Cloud Storage (GCS)
Since we will be using both LangChain and Vertex AI RAG Engine, we need our data to be accessible to both. We already uploaded our data to GCS in the previous article, so here we will define a function to load data from GCS:
TEXT_EMBEDDING_MODEL_NAME = "text-embedding-005"
INPUT_PREFIX = "amazon_product_data" # Prefix where JSONL files are stored
GCS_INPUT_PATH = f"gs://{BUCKET_NAME}/{INPUT_PREFIX}"
from langchain_google_vertexai import VertexAIEmbeddings
text_embedding_model = VertexAIEmbeddings(model_name=TEXT_EMBEDDING_MODEL_NAME)
print(f"Initialized embedding model: {TEXT_EMBEDDING_MODEL_NAME}")
def load_data_from_gcs(bucket_name, input_prefix):
print(f"\nLoading data from GCS: {GCS_INPUT_PATH}...")
storage_client = storage.Client(project=PROJECT_ID)
bucket = storage_client.bucket(bucket_name)
products = []
blobs = bucket.list_blobs(prefix=input_prefix)
for blob in blobs:
if blob.name.endswith(".jsonl"):
print(f"Processing: {blob.name}")
file_content = blob.download_as_string()
for line in file_content.decode("utf-8").splitlines():
try:
product = json.loads(line)
products.append(product)
except json.JSONDecodeError as e:
print(f"Error decoding JSON from line: {line}. Error: {e}")
print(f"Loaded {len(products)} products from GCS")
return products
def generate_text_embeddings_for_products(products, batch_size=1000):
print(f"\nGenerating text embeddings for {len(products)} products...")
all_embeddings = []
for i in range(0, len(products), batch_size):
batch = products[i:i + batch_size]
batch_embeddings = []
texts = [
f"Title: {product.get('title', '')} Description: {product.get('description', '')} Features: {product.get('features', '')}"
for product in batch
]
try:
batch_embeddings = text_embedding_model.embed_documents(texts)
all_embeddings.extend(batch_embeddings)
print(f"Processed batch {i // batch_size + 1}/{len(products) // batch_size + 1}")
except Exception as e:
print(f"Error generating embeddings for batch starting at index {i}: {e}")
print(f"Generated embeddings for {len(all_embeddings)} products.")
return all_embeddings
products = load_data_from_gcs(BUCKET_NAME, INPUT_PREFIX)
embeddings = generate_text_embeddings_for_products(products)
Creating LangChain Document
Objects
Now, we will use the products
data from GCS along with embeddings to create LangChain Document
objects, which will be used in our LangChain RAG implementation
from langchain_core.documents import Document
def create_langchain_documents(cleaned_products):
documents = []
for product in cleaned_products:
# Create the same text content structure as used in GCS
text_content = (
f"Title: {product.get('title', '')}\n"
f"Price: ${product.get('price', 'N/A')}\n"
f"Rating: {product.get('average_rating', 'N/A')} stars\n"
f"Category: {product.get('main_category', '')}\n"
f"Features: {product.get('features', '')}\n"
f"Description: {product.get('description', '')}"
).strip()
# Create metadata matching GCS JSONL structure exactly
metadata = {
"title": product.get("title", ""),
"price": product.get("price", "N/A"),
"rating": product.get("average_rating", "N/A"),
"category": product.get("main_category", ""),
"features": product.get("features", ""),
"description": product.get("description", ""),
"image_urls": product.get("image_urls", []),
"video_urls": product.get("video_urls", [])
}
# Create Document object with aligned structure
documents.append(
Document(
page_content=text_content,
metadata=metadata
)
)
return documents
documents = create_langchain_documents(cleaned_products)
Creating a Vertex AI Search Index
First, we need to create an index in Vertex AI Search to store our product embeddings. An index is essentially a container for your data that allows for efficient similarity search.
## Create Index
indexes = aiplatform.MatchingEngineIndex.list()
INDEX_RESOURCE_NAME = None
for index in indexes:
if index.display_name == "amazon_beauty_langchain_index":
INDEX_RESOURCE_NAME = index.resource_name
print(f"Found existing index: {INDEX_RESOURCE_NAME}")
break
if not INDEX_RESOURCE_NAME:
my_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
display_name="amazon_beauty_langchain_index",
description="Langchain Index for Amazon Beauty product embeddings",
dimensions=768,
approximate_neighbors_count=10,
leaf_node_embedding_count=500,
leaf_nodes_to_search_percent=7,
distance_measure_type="DOT_PRODUCT_DISTANCE",
feature_norm_type="UNIT_L2_NORM",
index_update_method="BATCH_UPDATE",
)
INDEX_RESOURCE_NAME = my_index.resource_name
print(f"Created index: {INDEX_RESOURCE_NAME}")
## Create Index
indexes = aiplatform.MatchingEngineIndex.list()
INDEX_RESOURCE_NAME = None
for index in indexes:
if index.display_name == "amazon_beauty_ragengine_index":
INDEX_RESOURCE_NAME = index.resource_name
print(f"Found existing index: {INDEX_RESOURCE_NAME}")
break
if not INDEX_RESOURCE_NAME:
my_index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
display_name="amazon_beauty_ragengine_index",
description="RAG Engine Index for Amazon Beauty product embeddings",
dimensions=768,
approximate_neighbors_count=10,
leaf_node_embedding_count=500,
leaf_nodes_to_search_percent=7,
distance_measure_type="DOT_PRODUCT_DISTANCE",
feature_norm_type="UNIT_L2_NORM",
index_update_method="STREAM_UPDATE",
)
INDEX_RESOURCE_NAME = my_index.resource_name
print(f"Created index: {INDEX_RESOURCE_NAME}")
Explanation of Index Creation with Stream and Batch Updates
In this setup, two Vertex AI Search indexes are created for different purposes. The RAG Engine index uses STREAM_UPDATE
to support continuous, low-latency updates, making it ideal for real-time applications where data changes frequently and needs to be instantly available for retrieval. On the other hand, the LangChain index uses BATCH_UPDATE
, which is better suited for scenarios where data updates occur less frequently, allowing for efficient bulk updates with optimized performance. Both indexes are configured with the Tree-AH algorithm, a robust default for approximate nearest neighbor (ANN) search, and are tuned with parameters like embedding dimensions, neighbor count, and search accuracy to handle embeddings generated by models. This dual-index strategy ensures flexibility, catering to both real-time and batch processing needs in the RAG pipeline.
Creating a Vertex AI Search Endpoint
Next, we need an endpoint to serve queries against our index. I will show one of them here as example.
#Create Endpoint
endpoints = aiplatform.MatchingEngineIndexEndpoint.list()
ENDPOINT_RESOURCE_NAME = None
for endpoint in endpoints:
if endpoint.display_name == "amazon_beauty_langchain_endpoint":
ENDPOINT_RESOURCE_NAME = endpoint.resource_name
print(f"Found existing endpoint: {ENDPOINT_RESOURCE_NAME}")
break
if not ENDPOINT_RESOURCE_NAME:
my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
display_name="amazon_beauty_langchain_endpoint",
description="Endpoint for serving product recommendations",
public_endpoint_enabled=True # Set to False if you don't need a public endpoint
)
ENDPOINT_RESOURCE_NAME = my_index_endpoint.resource_name
print(f"Created endpoint: {ENDPOINT_RESOURCE_NAME}")
Deploying the Index to the Endpoint
Now, we deploy the index to the endpoint, making it ready to serve queries.
## Deploy Index to Endpoint ---
try:
my_index = aiplatform.MatchingEngineIndex(INDEX_RESOURCE_NAME)
my_index_endpoint = aiplatform.MatchingEngineIndexEndpoint(ENDPOINT_RESOURCE_NAME)
if my_index and my_index_endpoint:
index_deployed = False
for deployed_index in my_index_endpoint.deployed_indexes:
if deployed_index.index == my_index.resource_name:
index_deployed = True
print(
f"Index {my_index.resource_name} is already deployed to endpoint {my_index_endpoint.resource_name}"
)
break
if not index_deployed:
deployed_index_id = "amazon_beauty_langchain_deployed_index"
my_index_endpoint.deploy_index(
index=my_index, deployed_index_id=deployed_index_id
)
print(
f"Deployed index {my_index.resource_name} to endpoint {my_index_endpoint.resource_name} with deployed_index_id: {deployed_index_id}"
)
else:
print("Skipping deployment as the index is already deployed.")
else:
print("Index or endpoint not defined.")
except Exception as e:
print(f"Error during index deployment: {e}")
Storing Embeddings in Vertex AI Search using LangChain
Here, we use LangChain’s VectorSearchVectorStore
to connect to Vertex AI Search and add our documents along with their pre-computed embeddings.
from langchain_google_vertexai import VectorSearchVectorStore
vector_store = VectorSearchVectorStore.from_components(
project_id=PROJECT_ID,
region=LOCATION,
index_id=INDEX_RESOURCE_NAME.split("/")[-1],
endpoint_id=ENDPOINT_RESOURCE_NAME.split("/")[-1],
embedding=text_embedding_model,
gcs_bucket_name=BUCKET_NAME.replace("gs://", "").split("/")[0]
)
batch_size = 5
for i in range(0, len(documents), batch_size):
batch_docs = documents[i : i + batch_size]
vector_store.add_documents(documents=batch_docs)
print(
f"Added batch {i // batch_size + 1}/{len(documents) // batch_size + 1} to Vertex AI Search"
)
print("Embeddings added to Vertex AI Search.")
Building a RAG Chain with LangChain
Now, we construct the RAG chain using LangChain’s components:
from langchain_google_vertexai import ChatVertexAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
llm = ChatVertexAI(
model_name="gemini-exp-1206", project=PROJECT_ID, location=LOCATION #gemini-1.5-pro
)
## Define prompt template
prompt_template = """
You are a helpful product recommender. Answer the question based on the context below.
If you don't know the answer, just say that you don't know. Don't try to make up an answer.
Context:
{context}
Question: {question}
"""
prompt = ChatPromptTemplate.from_template(prompt_template)
## Create retriever
retriever = vector_store.as_retriever(search_kwargs={"k": 5}) # Retrieve top 5 documents
## Build RAG chain
rag_chain = (
{"context": retriever, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
print("RAG chain created with LangChain and Vertex AI Search.")
query = "Recommend a good moisturizing cream for sensitive skin under $30."
response = rag_chain.invoke(query)
print(f"Query: {query}")
print(f"Response: {response}")
RAG Implementation — Vertex AI RAG Engine with Vertex AI Search
Now, we’ll explore Google Cloud’s fully managed solution: Vertex AI RAG Engine. The RAG Engine simplifies the process of creating RAG applications by handling data ingestion, embedding generation, vector storage, and retrieval behind the scenes.
Vertex AI RAG Engine Workflow:
- Create a RAG Corpus: A RAG Corpus acts as a container for your data within the RAG Engine.
- Configure Vector Database: We’ll link our existing Vertex AI Vector Search index and endpoint to the RAG Corpus.
- Import Data: We’ll import our product data (in JSONL format) from GCS into the RAG Corpus. The RAG Engine will automatically generate embeddings using a default text embedding model (you can also specify a different one during corpus creation if needed).
- Build RAG Tool and Model: We’ll create a retrieval tool using
Tool.from_retrieval()
and integrate it with a Gemini model. - Query and Generate: We’ll use the Gemini model, augmented with the retrieval tool, to answer user queries based on the data in our RAG Corpus.
#Vertex AI Vector Search as the vector database
vector_db = rag.VertexVectorSearch(
index=my_index.resource_name, index_endpoint=my_index_endpoint.resource_name
)
## RAG corpus
DISPLAY_NAME = "amazon-beauty-rag-engine"
## Create RAG Corpus
rag_corpus = rag.create_corpus(display_name=DISPLAY_NAME, vector_db=vector_db)
print(f"Created RAG Corpus resource: {rag_corpus.name}")
## Importing Data from GCS into the RAG Corpus
response = rag.import_files(
corpus_name=rag_corpus.name,
paths=[GCS_BUCKET_PATH],
chunk_size=1024,
chunk_overlap=100,
)
print(f"Data imported into RAG Corpus: {response}")
## Create a retrieval tool for the RAG Corpus
rag_resource = rag.RagResource(rag_corpus=rag_corpus.name)
rag_retrieval_tool = Tool.from_retrieval(
retrieval=rag.Retrieval(
source=rag.VertexRagStore(
rag_resources=[rag_resource],
similarity_top_k=5,
vector_distance_threshold=0.4,
)
)
)
## Add the retrieval tool to the Gemini model
#rag_model = GenerativeModel("gemini-2.0-flash-exp", tools=[rag_retrieval_tool])
rag_model = GenerativeModel("gemini-exp-1206", tools=[rag_retrieval_tool])
print("RAG Tool and Model created.")
## --- Querying and Generating Responses ---
query = "Recommend a good moisturizing cream for sensitive skin under $30."
response = rag_model.generate_content(query)
print(f"Query: {query}")
print(f"Response: {response.text}")
Integrating with a Chatbot Interface
The RAG application we’ve built, whether using LangChain or the Vertex AI RAG Engine, can be further enhanced by integrating it with a chatbot interface. This allows for interactive, conversational access to your product recommendations. While we won’t delve into the full implementation details here, one approach involves deploying your RAG model as a web service, such as on Google Cloud Functions, and then connecting it to a conversational AI platform like Google Cloud’s Vertex AI Agent Builder. Agent Builder provides a user-friendly, no-code environment for designing and deploying chatbots. By configuring a webhook within Agent Builder that points to your deployed RAG function, you can seamlessly route user queries to your RAG model, enabling the chatbot to provide intelligent, data-driven product recommendations based on the retrieved information and the generative power of the underlying language model. In nutshell, the rag application can be easily exposed as a chatbot.
from google.cloud import aiplatform
from vertexai.preview import rag
from vertexai.preview.generative_models import GenerativeModel, Tool
import functions_framework
## Configuration values
PROJECT_ID = "rag-product-recommendation"
LOCATION = "us-central1"
INDEX_DISPLAY_NAME = "amazon_beauty_ragengine_index"
ENDPOINT_DISPLAY_NAME = "amazon_beauty_ragengine_endpoint"
RAG_CORPUS_DISPLAY_NAME = "amazon-beauty-rag-engine"
def initialize_rag_components():
"""
Initializes the RAG system by setting up vector search and corpus components.
This function follows the same pattern that works in your notebook but is
adapted for the Cloud Functions environment.
"""
try:
# First, we need to get our index
indexes = aiplatform.MatchingEngineIndex.list(
filter=f'display_name="{INDEX_DISPLAY_NAME}"',
project=PROJECT_ID,
location=LOCATION,
)
if not indexes:
raise ValueError("Index not found")
my_index = indexes[0]
# Then get our endpoint
endpoints = aiplatform.MatchingEngineIndexEndpoint.list(
filter=f'display_name="{ENDPOINT_DISPLAY_NAME}"',
project=PROJECT_ID,
location=LOCATION,
)
if not endpoints:
raise ValueError("Endpoint not found")
my_index_endpoint = endpoints[0]
# Set up vector search
vector_db = rag.VertexVectorSearch(
index=my_index.resource_name,
index_endpoint=my_index_endpoint.resource_name
)
# Instead of listing corpora with project parameter, we'll try to get the corpus directly
try:
# First try to get existing corpus
rag_corpus = next(
(corpus for corpus in rag.list_corpora()
if corpus.display_name == RAG_CORPUS_DISPLAY_NAME),
None
)
if not rag_corpus:
# Create new corpus if it doesn't exist
rag_corpus = rag.create_corpus(
display_name=RAG_CORPUS_DISPLAY_NAME,
vector_db=vector_db
)
print(f"Created new RAG Corpus: {rag_corpus.name}")
else:
print(f"Using existing RAG Corpus: {rag_corpus.name}")
except Exception as e:
print(f"Error with corpus operations: {e}")
raise
# Create the RAG resource and retrieval tool
rag_resource = rag.RagResource(rag_corpus=rag_corpus.name)
rag_retrieval_tool = Tool.from_retrieval(
retrieval=rag.Retrieval(
source=rag.VertexRagStore(
rag_resources=[rag_resource],
similarity_top_k=5,
vector_distance_threshold=0.4
)
)
)
# Initialize the model with the retrieval tool
return GenerativeModel("gemini-exp-1206", tools=[rag_retrieval_tool])
except Exception as e:
print(f"Error in initialize_rag_components: {e}")
raise
## Initialize the model globally
rag_model = initialize_rag_components()
@functions_framework.http
def query_rag(request):
"""
Cloud Function that handles RAG queries.
This function receives HTTP requests and returns responses using the RAG model.
"""
try:
request_json = request.get_json(silent=True)
if not request_json or 'query' not in request_json:
return {'error': 'Missing query parameter'}, 400
query = request_json['query']
response = rag_model.generate_content(query)
return {'response': response.text}
except Exception as e:
print(f"Error processing query: {e}")
return {'error': str(e)}, 500
gcloud functions deploy query_rag_function --region us-central1 --runtime python311 --source . --entry-point query_rag_function --trigger-http --allow-unauthenticated --timeout=540s
Conclusion
In this comprehensive exploration of Retrieval-Augmented Generation (RAG) for product recommendation, we’ve journeyed through two distinct yet powerful approaches on Google Cloud: the flexible, open-source LangChain framework and the fully managed Vertex AI RAG Engine. Both implementations leveraged Vertex AI Search as the core vector database, showcasing its capabilities in efficiently storing and retrieving product embeddings. By comparing these methods, we’ve seen how LangChain offers granular control over each aspect of the RAG pipeline, making it ideal for complex scenarios and developers who desire customization. Conversely, the Vertex AI RAG Engine provides a streamlined, user-friendly experience, automating many of the intricate processes and offering seamless integration with Google’s Gemini models. While our focus remained on Vertex AI Search, we also acknowledged the broader ecosystem of vector databases, including PgVector, BigQuery, Pinecone, and Weaviate, each presenting unique advantages depending on project scale, performance needs, and existing infrastructure. Ultimately, the choice between LangChain and the Vertex AI RAG Engine depends on your project’s specific requirements, your team’s expertise, and the desired balance between control and ease of use. This article has equipped you with the knowledge to make an informed decision and build robust, intelligent RAG applications for product recommendation and beyond, setting the stage for future exploration into multi-modal data and even more sophisticated conversational AI interfaces.