ADK app deployment to Vertex AI and consuming it
This is for formality purposes and we constantly need to deploy and consumer agentic app that we deployed to Vertex AI.
To deploy, we can use the following code.
import logging
import os
import vertexai
from dotenv import set_key
from vertexai import agent_engines
from vertexai.preview.reasoning_engines import AdkApp
from rag.agent import root_agent
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
GOOGLE_CLOUD_PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT")
GOOGLE_CLOUD_LOCATION = os.getenv("GOOGLE_CLOUD_LOCATION")
STAGING_BUCKET = os.getenv("STAGING_BUCKET")
# Define the path to the .env file relative to this script
ENV_FILE_PATH = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", ".env")
)
vertexai.init(
project=GOOGLE_CLOUD_PROJECT,
location=GOOGLE_CLOUD_LOCATION,
staging_bucket=STAGING_BUCKET,
)
logger.info("deploying app...")
app = AdkApp(
agent=root_agent,
enable_tracing=True,
)
logging.debug("deploying agent to agent engine:")
remote_app = agent_engines.create(
app,
requirements=[
"google-cloud-aiplatform[adk,agent-engines]==1.108.0",
"google-adk==1.10.0",
"python-dotenv",
"google-auth",
"tqdm",
"requests",
"llama-index",
],
extra_packages=[
"./rag",
],
)
# log remote_app
logging.info(
f"Deployed agent to Vertex AI Agent Engine
successfully, resource name: {remote_app.resource_name}"
)
To consume it, is straight forward as well.
import asyncio
import json
import os
import vertexai
from dotenv import load_dotenv
from google.adk.sessions import VertexAiSessionService
from vertexai import agent_engines
def pretty_print_event(event):
"""Pretty prints an event with truncation for long content."""
max_args = 100
max_part_length = 200
max_response_length = 100
if "content" not in event:
print(f"[{event.get('author', 'unknown')}]: {event}")
return
author = event.get("author", "unknown")
parts = event["content"].get("parts", [])
for part in parts:
if "text" in part:
text = part["text"]
# Truncate long text to 200 characters
if len(text) > max_part_length:
text = text[: max_part_length - 3] + "..."
print(f"[{author}]: {text}")
elif "functionCall" in part:
func_call = part["functionCall"]
print(
f"[{author}]: Function call: {func_call.get('name', 'unknown')}"
)
# Truncate args if too long
args = json.dumps(func_call.get("args", {}))
if len(args) > max_args:
args = args[: max_args - 3] + "..."
print(f" Args: {args}")
elif "functionResponse" in part:
func_response = part["functionResponse"]
print(
f"[{author}]: Function response: {func_response.get('name', 'unknown')}"
)
# Truncate response if too long
response = json.dumps(func_response.get("response", {}))
if len(response) > max_response_length:
response = response[: max_response_length - 3] + "..."
print(f" Response: {response}")
load_dotenv()
vertexai.init(
project=os.getenv("GOOGLE_CLOUD_PROJECT"),
location=os.getenv("GOOGLE_CLOUD_LOCATION"),
)
session_service = VertexAiSessionService(
project=os.getenv("GOOGLE_CLOUD_PROJECT"),
location=os.getenv("GOOGLE_CLOUD_LOCATION"),
)
AGENT_ENGINE_ID = os.getenv("AGENT_ENGINE_ID")
session = asyncio.run(
session_service.create_session(
app_name=AGENT_ENGINE_ID,
user_id="123",
)
)
agent_engine = agent_engines.get(AGENT_ENGINE_ID)
queries = [
"Hi, how are you?",
"According to the MD&A, how might the increasing proportion of revenues derived from non-advertising sources like Google Cloud and devices potentially impact Alphabet's overall operating margin, and why?",
"The report mentions significant investments in AI. What specific connection is drawn between these AI investments and the company's expectations regarding future capital expenditures?",
"Thanks, I got all the information I need. Goodbye!",
]
for query in queries:
print(f"\n[user]: {query}")
for event in agent_engine.stream_query(
user_id="123",
session_id=session.id,
message=query,
):
pretty_print_event(event)
Comments