In this example, we are going to use Google cloud run to update corpus Vertex AI RAG. So when we have a new document, we load it up into Cloud storage. This generates an event for Cloud Run and update RAG.
To setup our Cloud run with event trigger. We will give it a name and then configure the target resource name. As you can see we are using "tgoogle.cloud.storage.object.v1.finalized" which gets fired when we create a new object, or overwrite an existing object, and Cloud Storage creates a new generation of that object.
This is code in our cloud run.
import functions_framework
import os
from google.cloud import aiplatform
from vertexai.preview import rag
import vertexai
# Configuration from Environment Variables
PROJECT_ID = os.environ.get("PROJECT_ID")
LOCATION = os.environ.get("LOCATION", "us-central1")
RAG_CORPUS_ID = os.environ.get("RAG_CORPUS_ID")
vertexai.init(project=PROJECT_ID, location=LOCATION)
# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def hello_gcs(cloud_event):
data = cloud_event.data
event_id = cloud_event["id"]
event_type = cloud_event["type"]
bucket = data["bucket"]
name = data["name"]
metageneration = data["metageneration"]
timeCreated = data["timeCreated"]
updated = data["updated"]
print(f"Event ID: {event_id}")
print(f"Event type: {event_type}")
print(f"Bucket: {bucket}")
print(f"File: {name}")
print(f"Metageneration: {metageneration}")
print(f"Created: {timeCreated}")
print(f"Updated: {updated}")
##############################################
supported_extensions = ('.pdf', '.txt', '.html', '.docx')
if not name.lower().endswith(supported_extensions):
print(f"Skipping unsupported file: {name}")
return
gcs_uri = f"gs://{bucket}/{name}"
corpus_resource_name = f"projects/{PROJECT_ID}/locations/{LOCATION}/ragCorpora/{RAG_CORPUS_ID}"
print(f"Processing file: {gcs_uri} for Corpus: {RAG_CORPUS_ID}")
try:
# 2. Trigger Incremental Indexing
# We target ONLY the changed file to save costs and time
response = rag.import_files(
corpus_name=corpus_resource_name,
paths=[gcs_uri],
chunk_size=512, # Matches your pyproject.toml settings
chunk_overlap=50,
# Production tuning: avoid hitting embedding API rate limits
max_embedding_requests_per_min=1000
)
print(f"Import job triggered successfully: {response}")
except Exception as e:
print(f"Error indexing {gcs_uri}: {str(e)}")
# In production, you might send this to an Error Reporting API or Slack
raise e
And this is in the requirements.txt
functions-framework~=3.0
google-cloud-aiplatform[adk,agent-engines]>=1.108.0
google-adk>=1.10.0
pydantic-settings>=2.8.1
tabulate>=0.9.0
google-auth>=2.36.0
requests>=2.32.3
To add or edit your environment variables, click on "Edit and deploy new revision".
Then look for "Variables and secret".
After you load a file in storage account and it will get our vertex AI RAG updated.
It is very important that the service account that you're using, need to be granted permission to access Vertex AI.
Comments