ADK app deployment to Vertex AI and consuming it

This is for formality purposes and we constantly need to deploy and consumer agentic app that we deployed to Vertex AI. 

To deploy,  we can use the following code.

import logging
import os

import vertexai
from dotenv import set_key
from vertexai import agent_engines
from vertexai.preview.reasoning_engines import AdkApp

from rag.agent import root_agent

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

GOOGLE_CLOUD_PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT")
GOOGLE_CLOUD_LOCATION = os.getenv("GOOGLE_CLOUD_LOCATION")
STAGING_BUCKET = os.getenv("STAGING_BUCKET")
# Define the path to the .env file relative to this script
ENV_FILE_PATH = os.path.abspath(
    os.path.join(os.path.dirname(__file__), "..", ".env")
)

vertexai.init(
    project=GOOGLE_CLOUD_PROJECT,
    location=GOOGLE_CLOUD_LOCATION,
    staging_bucket=STAGING_BUCKET,
)

logger.info("deploying app...")
app = AdkApp(
    agent=root_agent,
    enable_tracing=True,
)

logging.debug("deploying agent to agent engine:")

remote_app = agent_engines.create(
    app,
    requirements=[
        "google-cloud-aiplatform[adk,agent-engines]==1.108.0",
        "google-adk==1.10.0",
        "python-dotenv",
        "google-auth",
        "tqdm",
        "requests",
        "llama-index",
    ],
    extra_packages=[
        "./rag",
    ],
)

# log remote_app
logging.info(
    f"Deployed agent to Vertex AI Agent Engine
successfully, resource name: {remote_app.resource_name}"
)

To consume it, is straight forward as well.

import asyncio
import json
import os

import vertexai
from dotenv import load_dotenv
from google.adk.sessions import VertexAiSessionService
from vertexai import agent_engines


def pretty_print_event(event):
    """Pretty prints an event with truncation for long content."""
    max_args = 100
    max_part_length = 200
    max_response_length = 100

    if "content" not in event:
        print(f"[{event.get('author', 'unknown')}]: {event}")
        return

    author = event.get("author", "unknown")
    parts = event["content"].get("parts", [])

    for part in parts:
        if "text" in part:
            text = part["text"]
            # Truncate long text to 200 characters
            if len(text) > max_part_length:
                text = text[: max_part_length - 3] + "..."
            print(f"[{author}]: {text}")
        elif "functionCall" in part:
            func_call = part["functionCall"]
            print(
                f"[{author}]: Function call: {func_call.get('name', 'unknown')}"
            )
            # Truncate args if too long
            args = json.dumps(func_call.get("args", {}))
            if len(args) > max_args:
                args = args[: max_args - 3] + "..."
            print(f"  Args: {args}")
        elif "functionResponse" in part:
            func_response = part["functionResponse"]
            print(
                f"[{author}]: Function response: {func_response.get('name', 'unknown')}"
            )
            # Truncate response if too long
            response = json.dumps(func_response.get("response", {}))
            if len(response) > max_response_length:
                response = response[: max_response_length - 3] + "..."
            print(f"  Response: {response}")


load_dotenv()

vertexai.init(
    project=os.getenv("GOOGLE_CLOUD_PROJECT"),
    location=os.getenv("GOOGLE_CLOUD_LOCATION"),
)

session_service = VertexAiSessionService(
    project=os.getenv("GOOGLE_CLOUD_PROJECT"),
    location=os.getenv("GOOGLE_CLOUD_LOCATION"),
)

AGENT_ENGINE_ID = os.getenv("AGENT_ENGINE_ID")

session = asyncio.run(
    session_service.create_session(
        app_name=AGENT_ENGINE_ID,
        user_id="123",
    )
)

agent_engine = agent_engines.get(AGENT_ENGINE_ID)

queries = [
    "Hi, how are you?",
    "According to the MD&A, how might the increasing proportion of revenues derived from non-advertising sources like Google Cloud and devices potentially impact Alphabet's overall operating margin, and why?",
    "The report mentions significant investments in AI. What specific connection is drawn between these AI investments and the company's expectations regarding future capital expenditures?",
    "Thanks, I got all the information I need. Goodbye!",
]

for query in queries:
    print(f"\n[user]: {query}")
    for event in agent_engine.stream_query(
        user_id="123",
        session_id=session.id,
        message=query,
    ):
        pretty_print_event(event)


Comments

Popular posts from this blog

vllm : Failed to infer device type

NodeJS: Error: spawn EINVAL in window for node version 20.20 and 18.20

android studio kotlin source is null error