diff --git a/src/google/adk/cli/cli_deploy.py b/src/google/adk/cli/cli_deploy.py index 0dedae6de..e3cfc82d0 100644 --- a/src/google/adk/cli/cli_deploy.py +++ b/src/google/adk/cli/cli_deploy.py @@ -121,7 +121,6 @@ def to_cloud_run( port: int, trace_to_cloud: bool, with_ui: bool, - log_level: str, verbosity: str, adk_version: str, allow_origins: Optional[list[str]] = None, @@ -153,11 +152,11 @@ def to_cloud_run( app_name: The name of the app, by default, it's basename of `agent_folder`. temp_folder: The temp folder for the generated Cloud Run source files. port: The port of the ADK api server. - allow_origins: The list of allowed origins for the ADK api server. trace_to_cloud: Whether to enable Cloud Trace. with_ui: Whether to deploy with UI. verbosity: The verbosity level of the CLI. adk_version: The ADK version to use in Cloud Run. + allow_origins: The list of allowed origins for the ADK api server. session_service_uri: The URI of the session service. artifact_service_uri: The URI of the artifact service. memory_service_uri: The URI of the memory service. @@ -182,7 +181,7 @@ def to_cloud_run( if os.path.exists(requirements_txt_path) else '' ) - click.echo('Copying agent source code complete.') + click.echo('Copying agent source code completed.') # create Dockerfile click.echo('Creating Dockerfile...') @@ -236,7 +235,7 @@ def to_cloud_run( '--port', str(port), '--verbosity', - log_level.lower() if log_level else verbosity, + verbosity, '--labels', 'created-by=adk', ], @@ -309,7 +308,7 @@ def to_agent_engine( try: click.echo('Copying agent source code...') shutil.copytree(agent_folder, temp_folder) - click.echo('Copying agent source code complete.') + click.echo('Copying agent source code completed.') click.echo('Initializing Vertex AI...') import sys @@ -417,3 +416,210 @@ def to_agent_engine( finally: click.echo(f'Cleaning up the temp folder: {temp_folder}') shutil.rmtree(temp_folder) + + +def to_gke( + *, + agent_folder: str, + project: Optional[str], + region: Optional[str], + cluster_name: str, + service_name: str, + app_name: str, + temp_folder: str, + port: int, + trace_to_cloud: bool, + with_ui: bool, + verbosity: str, + adk_version: str, + allow_origins: Optional[list[str]] = None, + session_service_uri: Optional[str] = None, + artifact_service_uri: Optional[str] = None, + memory_service_uri: Optional[str] = None, +): + """Deploys an agent to Google Kubernetes Engine(GKE). + + Args: + agent_folder: The folder (absolute path) containing the agent source code. + project: Google Cloud project id. + region: Google Cloud region. + cluster_name: The name of the GKE cluster. + service_name: The service name in GKE. + app_name: The name of the app, by default, it's basename of `agent_folder`. + temp_folder: The local directory to use as a temporary workspace for preparing deployment artifacts. The tool populates this folder with a copy of the agent's source code and auto-generates necessary files like a Dockerfile and deployment.yaml. + port: The port of the ADK api server. + trace_to_cloud: Whether to enable Cloud Trace. + with_ui: Whether to deploy with UI. + verbosity: The verbosity level of the CLI. + adk_version: The ADK version to use in GKE. + allow_origins: The list of allowed origins for the ADK api server. + session_service_uri: The URI of the session service. + artifact_service_uri: The URI of the artifact service. + memory_service_uri: The URI of the memory service. + """ + click.secho( + '\nšŸš€ Starting ADK Agent Deployment to GKE...', fg='cyan', bold=True + ) + click.echo('--------------------------------------------------') + # Resolve project early to show the user which one is being used + project = _resolve_project(project) + click.echo(f' Project: {project}') + click.echo(f' Region: {region}') + click.echo(f' Cluster: {cluster_name}') + click.echo('--------------------------------------------------\n') + + app_name = app_name or os.path.basename(agent_folder) + + click.secho('STEP 1: Preparing build environment...', bold=True) + click.echo(f' - Using temporary directory: {temp_folder}') + + # remove temp_folder if exists + if os.path.exists(temp_folder): + click.echo(' - Removing existing temporary directory...') + shutil.rmtree(temp_folder) + + try: + # copy agent source code + click.echo(' - Copying agent source code...') + agent_src_path = os.path.join(temp_folder, 'agents', app_name) + shutil.copytree(agent_folder, agent_src_path) + requirements_txt_path = os.path.join(agent_src_path, 'requirements.txt') + install_agent_deps = ( + f'RUN pip install -r "/app/agents/{app_name}/requirements.txt"' + if os.path.exists(requirements_txt_path) + else '' + ) + click.secho('āœ… Environment prepared.', fg='green') + + allow_origins_option = ( + f'--allow_origins={",".join(allow_origins)}' if allow_origins else '' + ) + + # create Dockerfile + click.secho('\nSTEP 2: Generating deployment files...', bold=True) + click.echo(' - Creating Dockerfile...') + host_option = '--host=0.0.0.0' if adk_version > '0.5.0' else '' + dockerfile_content = _DOCKERFILE_TEMPLATE.format( + gcp_project_id=project, + gcp_region=region, + app_name=app_name, + port=port, + command='web' if with_ui else 'api_server', + install_agent_deps=install_agent_deps, + service_option=_get_service_option_by_adk_version( + adk_version, + session_service_uri, + artifact_service_uri, + memory_service_uri, + ), + trace_to_cloud_option='--trace_to_cloud' if trace_to_cloud else '', + allow_origins_option=allow_origins_option, + adk_version=adk_version, + host_option=host_option, + ) + dockerfile_path = os.path.join(temp_folder, 'Dockerfile') + os.makedirs(temp_folder, exist_ok=True) + with open(dockerfile_path, 'w', encoding='utf-8') as f: + f.write( + dockerfile_content, + ) + click.secho(f'āœ… Dockerfile generated: {dockerfile_path}', fg='green') + + # Build and push the Docker image + click.secho( + '\nSTEP 3: Building container image with Cloud Build...', bold=True + ) + click.echo( + ' (This may take a few minutes. Raw logs from gcloud will be shown' + ' below.)' + ) + project = _resolve_project(project) + image_name = f'gcr.io/{project}/{service_name}' + subprocess.run( + ['gcloud', 'builds', 'submit', '--tag', image_name, temp_folder], + check=True, + ) + click.secho('āœ… Container image built and pushed successfully.', fg='green') + + # Create a Kubernetes deployment + click.echo(' - Creating Kubernetes deployment.yaml...') + deployment_yaml = f""" +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {service_name} +spec: + replicas: 1 + selector: + matchLabels: + app: {service_name} + template: + metadata: + labels: + app: {service_name} + spec: + containers: + - name: {service_name} + image: {image_name} + ports: + - containerPort: {port} +--- +apiVersion: v1 +kind: Service +metadata: + name: {service_name} +spec: + type: LoadBalancer + selector: + app: {service_name} + ports: + - port: 80 + targetPort: {port} +""" + deployment_yaml_path = os.path.join(temp_folder, 'deployment.yaml') + with open(deployment_yaml_path, 'w', encoding='utf-8') as f: + f.write(deployment_yaml) + click.secho( + f'āœ… Kubernetes deployment manifest generated: {deployment_yaml_path}', + fg='green', + ) + + # Apply the deployment + click.secho('\nSTEP 4: Applying deployment to GKE cluster...', bold=True) + click.echo(' - Getting cluster credentials...') + subprocess.run( + [ + 'gcloud', + 'container', + 'clusters', + 'get-credentials', + cluster_name, + '--region', + region, + '--project', + project, + ], + check=True, + ) + click.echo(' - Applying Kubernetes manifest...') + result = subprocess.run( + ['kubectl', 'apply', '-f', temp_folder], + check=True, + capture_output=True, # <-- Add this + text=True, # <-- Add this + ) + + # 2. Print the captured output line by line + click.secho( + ' - The following resources were applied to the cluster:', fg='green' + ) + for line in result.stdout.strip().split('\n'): + click.echo(f' - {line}') + + finally: + click.secho('\nSTEP 5: Cleaning up...', bold=True) + click.echo(f' - Removing temporary directory: {temp_folder}') + shutil.rmtree(temp_folder) + click.secho( + '\nšŸŽ‰ Deployment to GKE finished successfully!', fg='cyan', bold=True + ) diff --git a/src/google/adk/cli/cli_tools_click.py b/src/google/adk/cli/cli_tools_click.py index f3095c2ff..78ac21ca1 100644 --- a/src/google/adk/cli/cli_tools_click.py +++ b/src/google/adk/cli/cli_tools_click.py @@ -42,11 +42,6 @@ from .utils import evals from .utils import logs -LOG_LEVELS = click.Choice( - ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], - case_sensitive=False, -) - class HelpfulCommand(click.Command): """Command that shows full help on error instead of just the error message. @@ -475,15 +470,6 @@ def decorator(func): ), default=None, ) - @click.option( - "--eval_storage_uri", - type=str, - help=( - "Optional. The evals storage URI to store agent evals," - " supported URIs: gs://." - ), - default=None, - ) @click.option( "--memory_service_uri", type=str, @@ -544,6 +530,13 @@ def fast_api_common_options(): """Decorator to add common fast api options to click commands.""" def decorator(func): + @click.option( + "--host", + type=str, + help="Optional. The binding host of the server", + default="127.0.0.1", + show_default=True, + ) @click.option( "--port", type=int, @@ -557,7 +550,10 @@ def decorator(func): ) @click.option( "--log_level", - type=LOG_LEVELS, + type=click.Choice( + ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], + case_sensitive=False, + ), default="INFO", help="Optional. Set the logging level", ) @@ -593,13 +589,6 @@ def wrapper(*args, **kwargs): @main.command("web") -@click.option( - "--host", - type=str, - help="Optional. The binding host of the server", - default="127.0.0.1", - show_default=True, -) @fast_api_common_options() @adk_services_options() @deprecated_adk_services_options() @@ -633,7 +622,7 @@ def cli_web( Example: - adk web --port=[port] path/to/agents_dir + adk web --session_service_uri=[uri] --port=[port] path/to/agents_dir """ logs.setup_adk_logger(getattr(logging, log_level.upper())) @@ -687,16 +676,6 @@ async def _lifespan(app: FastAPI): @main.command("api_server") -@click.option( - "--host", - type=str, - help="Optional. The binding host of the server", - default="127.0.0.1", - show_default=True, -) -@fast_api_common_options() -@adk_services_options() -@deprecated_adk_services_options() # The directory of agents, where each sub-directory is a single agent. # By default, it is the current working directory @click.argument( @@ -706,6 +685,9 @@ async def _lifespan(app: FastAPI): ), default=os.getcwd(), ) +@fast_api_common_options() +@adk_services_options() +@deprecated_adk_services_options() def cli_api_server( agents_dir: str, eval_storage_uri: Optional[str] = None, @@ -729,7 +711,7 @@ def cli_api_server( Example: - adk api_server --port=[port] path/to/agents_dir + adk api_server --session_service_uri=[uri] --port=[port] path/to/agents_dir """ logs.setup_adk_logger(getattr(logging, log_level.upper())) @@ -792,7 +774,19 @@ def cli_api_server( " of the AGENT source code)." ), ) -@fast_api_common_options() +@click.option( + "--port", + type=int, + default=8000, + help="Optional. The port of the ADK API server (default: 8000).", +) +@click.option( + "--trace_to_cloud", + is_flag=True, + show_default=True, + default=False, + help="Optional. Whether to enable Cloud Trace for cloud run.", +) @click.option( "--with_ui", is_flag=True, @@ -803,11 +797,6 @@ def cli_api_server( " only)" ), ) -@click.option( - "--verbosity", - type=LOG_LEVELS, - help="Deprecated. Use --log_level instead.", -) @click.option( "--temp_folder", type=str, @@ -821,6 +810,20 @@ def cli_api_server( " (default: a timestamped folder in the system temp directory)." ), ) +@click.option( + "--verbosity", + type=click.Choice( + ["debug", "info", "warning", "error", "critical"], case_sensitive=False + ), + default="WARNING", + help="Optional. Override the default verbosity level.", +) +@click.argument( + "agent", + type=click.Path( + exists=True, dir_okay=True, file_okay=False, resolve_path=True + ), +) @click.option( "--adk_version", type=str, @@ -842,12 +845,6 @@ def cli_api_server( ) @adk_services_options() @deprecated_adk_services_options() -@click.argument( - "agent", - type=click.Path( - exists=True, dir_okay=True, file_okay=False, resolve_path=True - ), -) def cli_deploy_cloud_run( agent: str, project: Optional[str], @@ -858,11 +855,8 @@ def cli_deploy_cloud_run( port: int, trace_to_cloud: bool, with_ui: bool, + verbosity: str, adk_version: str, - log_level: Optional[str] = None, - verbosity: str = "WARNING", - reload: bool = True, - allow_origins: Optional[list[str]] = None, session_service_uri: Optional[str] = None, artifact_service_uri: Optional[str] = None, memory_service_uri: Optional[str] = None, @@ -879,7 +873,6 @@ def cli_deploy_cloud_run( adk deploy cloud_run --project=[project] --region=[region] path/to/my_agent """ - log_level = log_level or verbosity session_service_uri = session_service_uri or session_db_url artifact_service_uri = artifact_service_uri or artifact_storage_uri try: @@ -892,9 +885,7 @@ def cli_deploy_cloud_run( temp_folder=temp_folder, port=port, trace_to_cloud=trace_to_cloud, - allow_origins=allow_origins, with_ui=with_ui, - log_level=log_level, verbosity=verbosity, adk_version=adk_version, session_service_uri=session_service_uri, @@ -1036,3 +1027,156 @@ def cli_deploy_agent_engine( ) except Exception as e: click.secho(f"Deploy failed: {e}", fg="red", err=True) + + +@deploy.command("gke") +@click.option( + "--project", + type=str, + help=( + "Required. Google Cloud project to deploy the agent. When absent," + " default project from gcloud config is used." + ), +) +@click.option( + "--region", + type=str, + help=( + "Required. Google Cloud region to deploy the agent. When absent," + " gcloud run deploy will prompt later." + ), +) +@click.option( + "--cluster_name", + type=str, + help="Required. The name of the GKE cluster.", +) +@click.option( + "--service_name", + type=str, + default="adk-default-service-name", + help=( + "Optional. The service name to use in GKE (default:" + " 'adk-default-service-name')." + ), +) +@click.option( + "--app_name", + type=str, + default="", + help=( + "Optional. App name of the ADK API server (default: the folder name" + " of the AGENT source code)." + ), +) +@click.option( + "--port", + type=int, + default=8000, + help="Optional. The port of the ADK API server (default: 8000).", +) +@click.option( + "--trace_to_cloud", + is_flag=True, + show_default=True, + default=False, + help="Optional. Whether to enable Cloud Trace for GKE.", +) +@click.option( + "--with_ui", + is_flag=True, + show_default=True, + default=False, + help=( + "Optional. Deploy ADK Web UI if set. (default: deploy ADK API server" + " only)" + ), +) +@click.option( + "--temp_folder", + type=str, + default=os.path.join( + tempfile.gettempdir(), + "gke_deploy_src", + datetime.now().strftime("%Y%m%d_%H%M%S"), + ), + help=( + "Optional. Temp folder for the generated GKE source files" + " (default: a timestamped folder in the system temp directory)." + ), +) +@click.option( + "--verbosity", + type=click.Choice( + ["debug", "info", "warning", "error", "critical"], case_sensitive=False + ), + default="WARNING", + help="Optional. Override the default verbosity level.", +) +@click.argument( + "agent", + type=click.Path( + exists=True, dir_okay=True, file_okay=False, resolve_path=True + ), +) +@click.option( + "--adk_version", + type=str, + default=version.__version__, + show_default=True, + help=( + "Optional. The ADK version used in GKE deployment. (default: the" + " version in the dev environment)" + ), +) +@adk_services_options() +@deprecated_adk_services_options() +def cli_deploy_gke( + agent: str, + project: Optional[str], + region: Optional[str], + cluster_name: str, + service_name: str, + app_name: str, + temp_folder: str, + port: int, + trace_to_cloud: bool, + with_ui: bool, + verbosity: str, + adk_version: str, + session_service_uri: Optional[str] = None, + artifact_service_uri: Optional[str] = None, + memory_service_uri: Optional[str] = None, + session_db_url: Optional[str] = None, # Deprecated + artifact_storage_uri: Optional[str] = None, # Deprecated +): + """Deploys an agent to GKE. + + AGENT: The path to the agent source code folder. + + Example: + + adk deploy gke --project=[project] --region=[region] --cluster_name=[cluster_name] path/to/my_agent + """ + session_service_uri = session_service_uri or session_db_url + artifact_service_uri = artifact_service_uri or artifact_storage_uri + try: + cli_deploy.to_gke( + agent_folder=agent, + project=project, + region=region, + cluster_name=cluster_name, + service_name=service_name, + app_name=app_name, + temp_folder=temp_folder, + port=port, + trace_to_cloud=trace_to_cloud, + with_ui=with_ui, + verbosity=verbosity, + adk_version=adk_version, + session_service_uri=session_service_uri, + artifact_service_uri=artifact_service_uri, + memory_service_uri=memory_service_uri, + ) + except Exception as e: + click.secho(f"Deploy failed: {e}", fg="red", err=True) diff --git a/tests/unittests/cli/utils/test_cli_deploy.py b/tests/unittests/cli/utils/test_cli_deploy.py index d3b2a538c..9c35dfa91 100644 --- a/tests/unittests/cli/utils/test_cli_deploy.py +++ b/tests/unittests/cli/utils/test_cli_deploy.py @@ -17,22 +17,26 @@ from __future__ import annotations +import importlib from pathlib import Path import shutil import subprocess +import sys import tempfile import types from typing import Any from typing import Callable from typing import Dict +from typing import Generator from typing import List from typing import Tuple from unittest import mock import click -import google.adk.cli.cli_deploy as cli_deploy import pytest +import src.google.adk.cli.cli_deploy as cli_deploy + # Helpers class _Recorder: @@ -44,30 +48,92 @@ def __init__(self) -> None: def __call__(self, *args: Any, **kwargs: Any) -> None: self.calls.append((args, kwargs)) + def get_last_call_args(self) -> Tuple[Any, ...]: + """Returns the positional arguments of the last call.""" + if not self.calls: + raise IndexError("No calls have been recorded.") + return self.calls[-1][0] + + def get_last_call_kwargs(self) -> Dict[str, Any]: + """Returns the keyword arguments of the last call.""" + if not self.calls: + raise IndexError("No calls have been recorded.") + return self.calls[-1][1] + # Fixtures @pytest.fixture(autouse=True) def _mute_click(monkeypatch: pytest.MonkeyPatch) -> None: """Suppress click.echo to keep test output clean.""" monkeypatch.setattr(click, "echo", lambda *a, **k: None) + monkeypatch.setattr(click, "secho", lambda *a, **k: None) + + +@pytest.fixture(autouse=True) +def reload_cli_deploy(): + """Reload cli_deploy before each test.""" + importlib.reload(cli_deploy) + yield # This allows the test to run after the module has been reloaded. @pytest.fixture() -def agent_dir(tmp_path: Path) -> Callable[[bool], Path]: - """Return a factory that creates a dummy agent directory tree.""" +def agent_dir(tmp_path: Path) -> Callable[[bool, bool], Path]: + """ + Return a factory that creates a dummy agent directory tree. - def _factory(include_requirements: bool) -> Path: + Args: + tmp_path: The temporary path fixture provided by pytest. + + Returns: + A factory function that takes two booleans: + - include_requirements: Whether to include a `requirements.txt` file. + - include_env: Whether to include a `.env` file. + """ + + def _factory(include_requirements: bool, include_env: bool) -> Path: base = tmp_path / "agent" base.mkdir() (base / "agent.py").write_text("# dummy agent") (base / "__init__.py").touch() if include_requirements: (base / "requirements.txt").write_text("pytest\n") + if include_env: + (base / ".env").write_text('TEST_VAR="test_value"\n') return base return _factory +@pytest.fixture +def mock_vertex_ai( + monkeypatch: pytest.MonkeyPatch, +) -> Generator[mock.MagicMock, None, None]: + """Mocks the entire vertexai module and its sub-modules.""" + mock_vertexai = mock.MagicMock() + mock_agent_engines = mock.MagicMock() + mock_vertexai.agent_engines = mock_agent_engines + mock_vertexai.init = mock.MagicMock() + mock_agent_engines.create = mock.MagicMock() + mock_agent_engines.ModuleAgent = mock.MagicMock( + return_value="mock-agent-engine-object" + ) + + sys.modules["vertexai"] = mock_vertexai + sys.modules["vertexai.agent_engines"] = mock_agent_engines + + # Also mock dotenv + mock_dotenv = mock.MagicMock() + mock_dotenv.dotenv_values = mock.MagicMock(return_value={"FILE_VAR": "value"}) + sys.modules["dotenv"] = mock_dotenv + + yield mock_vertexai + + # Cleanup: remove mocks from sys.modules + del sys.modules["vertexai"] + del sys.modules["vertexai.agent_engines"] + del sys.modules["dotenv"] + + # _resolve_project def test_resolve_project_with_option() -> None: """It should return the explicit project value untouched.""" @@ -87,97 +153,192 @@ def test_resolve_project_from_gcloud(monkeypatch: pytest.MonkeyPatch) -> None: mocked_echo.assert_called_once() -# _get_service_option_by_adk_version -def test_get_service_option_by_adk_version() -> None: - """It should return the explicit project value untouched.""" - assert cli_deploy._get_service_option_by_adk_version( - adk_version="1.3.0", - session_uri="sqlite://", - artifact_uri="gs://bucket", - memory_uri="rag://", - ) == ( - "--session_service_uri=sqlite:// " - "--artifact_service_uri=gs://bucket " - "--memory_service_uri=rag://" - ) - - assert ( - cli_deploy._get_service_option_by_adk_version( - adk_version="1.2.0", - session_uri="sqlite://", - artifact_uri="gs://bucket", - memory_uri="rag://", - ) - == "--session_db_url=sqlite:// --artifact_storage_uri=gs://bucket" +def test_resolve_project_from_gcloud_fails( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """It should raise an exception if the gcloud command fails.""" + monkeypatch.setattr( + subprocess, + "run", + mock.Mock(side_effect=subprocess.CalledProcessError(1, "cmd", "err")), ) + with pytest.raises(subprocess.CalledProcessError): + cli_deploy._resolve_project(None) + + +@pytest.mark.parametrize( + "adk_version, session_uri, artifact_uri, memory_uri, expected", + [ + ( + "1.3.0", + "sqlite://s", + "gs://a", + "rag://m", + ( + "--session_service_uri=sqlite://s --artifact_service_uri=gs://a" + " --memory_service_uri=rag://m" + ), + ), + ( + "1.2.5", + "sqlite://s", + "gs://a", + "rag://m", + "--session_db_url=sqlite://s --artifact_storage_uri=gs://a", + ), + ( + "0.5.0", + "sqlite://s", + "gs://a", + "rag://m", + "--session_db_url=sqlite://s", + ), + ( + "1.3.0", + "sqlite://s", + None, + None, + "--session_service_uri=sqlite://s ", + ), + ( + "1.3.0", + None, + "gs://a", + "rag://m", + " --artifact_service_uri=gs://a --memory_service_uri=rag://m", + ), + ("1.2.0", None, "gs://a", None, " --artifact_storage_uri=gs://a"), + ], +) +# _get_service_option_by_adk_version +def test_get_service_option_by_adk_version( + adk_version: str, + session_uri: str | None, + artifact_uri: str | None, + memory_uri: str | None, + expected: str, +) -> None: + """It should return the correct service URI flags for a given ADK version.""" assert ( cli_deploy._get_service_option_by_adk_version( - adk_version="0.5.0", - session_uri="sqlite://", - artifact_uri="gs://bucket", - memory_uri="rag://", + adk_version=adk_version, + session_uri=session_uri, + artifact_uri=artifact_uri, + memory_uri=memory_uri, ) - == "--session_db_url=sqlite://" + == expected ) -# to_cloud_run @pytest.mark.parametrize("include_requirements", [True, False]) +@pytest.mark.parametrize("with_ui", [True, False]) def test_to_cloud_run_happy_path( monkeypatch: pytest.MonkeyPatch, - agent_dir: Callable[[bool], Path], + agent_dir: Callable[[bool, bool], Path], + tmp_path: Path, include_requirements: bool, + with_ui: bool, ) -> None: """ - End-to-end execution test for `to_cloud_run` covering both presence and - absence of *requirements.txt*. - """ - tmp_dir = Path(tempfile.mkdtemp()) - src_dir = agent_dir(include_requirements) + End-to-end execution test for `to_cloud_run`. - copy_recorder = _Recorder() + This test verifies that for a given configuration: + 1. The agent source files are correctly copied to a temporary build context. + 2. A valid Dockerfile is generated with the correct parameters. + 3. The `gcloud run deploy` command is constructed with the correct arguments. + """ + src_dir = agent_dir(include_requirements, False) run_recorder = _Recorder() - # Cache the ORIGINAL copytree before patching - original_copytree = cli_deploy.shutil.copytree - - def _recording_copytree(*args: Any, **kwargs: Any): - copy_recorder(*args, **kwargs) - return original_copytree(*args, **kwargs) - - monkeypatch.setattr(cli_deploy.shutil, "copytree", _recording_copytree) - # Skip actual cleanup so that we can inspect generated files later. - monkeypatch.setattr(cli_deploy.shutil, "rmtree", lambda *_a, **_k: None) monkeypatch.setattr(subprocess, "run", run_recorder) + # Mock rmtree to prevent actual deletion during test run but record calls + rmtree_recorder = _Recorder() + monkeypatch.setattr(shutil, "rmtree", rmtree_recorder) + # Execute the function under test cli_deploy.to_cloud_run( agent_folder=str(src_dir), project="proj", region="asia-northeast1", service_name="svc", - app_name="app", - temp_folder=str(tmp_dir), + app_name="agent", + temp_folder=str(tmp_path), port=8080, trace_to_cloud=True, - with_ui=True, + with_ui=with_ui, verbosity="info", - log_level="info", + allow_origins=["http://localhost:3000", "https://my-app.com"], session_service_uri="sqlite://", artifact_service_uri="gs://bucket", memory_service_uri="rag://", - adk_version="0.0.5", + adk_version="1.3.0", ) - # Assertions + # 1. Assert that source files were copied correctly + agent_dest_path = tmp_path / "agents" / "agent" + assert (agent_dest_path / "agent.py").is_file() + assert (agent_dest_path / "__init__.py").is_file() + assert ( + agent_dest_path / "requirements.txt" + ).is_file() == include_requirements + + # 2. Assert that the Dockerfile was generated correctly + dockerfile_path = tmp_path / "Dockerfile" + assert dockerfile_path.is_file() + dockerfile_content = dockerfile_path.read_text() + + expected_command = "web" if with_ui else "api_server" + assert f"CMD adk {expected_command} --port=8080" in dockerfile_content + assert "FROM python:3.11-slim" in dockerfile_content assert ( - len(copy_recorder.calls) == 1 - ), "Agent sources must be copied exactly once." - assert run_recorder.calls, "gcloud command should be executed at least once." - assert (tmp_dir / "Dockerfile").exists(), "Dockerfile must be generated." + 'RUN adduser --disabled-password --gecos "" myuser' in dockerfile_content + ) + assert "USER myuser" in dockerfile_content + assert "ENV GOOGLE_CLOUD_PROJECT=proj" in dockerfile_content + assert "ENV GOOGLE_CLOUD_LOCATION=asia-northeast1" in dockerfile_content + assert "RUN pip install google-adk==1.3.0" in dockerfile_content + assert "--trace_to_cloud" in dockerfile_content + + if include_requirements: + assert ( + 'RUN pip install -r "/app/agents/agent/requirements.txt"' + in dockerfile_content + ) + else: + assert "RUN pip install -r" not in dockerfile_content - # Manual cleanup because we disabled rmtree in the monkeypatch. - shutil.rmtree(tmp_dir, ignore_errors=True) + assert ( + "--allow_origins=http://localhost:3000,https://my-app.com" + in dockerfile_content + ) + + # 3. Assert that the gcloud command was constructed correctly + assert len(run_recorder.calls) == 1 + gcloud_args = run_recorder.get_last_call_args()[0] + + expected_gcloud_command = [ + "gcloud", + "run", + "deploy", + "svc", + "--source", + str(tmp_path), + "--project", + "proj", + "--region", + "asia-northeast1", + "--port", + "8080", + "--verbosity", + "info", + "--labels", + "created-by=adk", + ] + assert gcloud_args == expected_gcloud_command + + # 4. Assert cleanup was performed + assert str(rmtree_recorder.get_last_call_args()[0]) == str(tmp_path) def test_to_cloud_run_cleans_temp_dir( @@ -186,7 +347,7 @@ def test_to_cloud_run_cleans_temp_dir( ) -> None: """`to_cloud_run` should always delete the temporary folder on exit.""" tmp_dir = Path(tempfile.mkdtemp()) - src_dir = agent_dir(False) + src_dir = agent_dir(False, False) deleted: Dict[str, Path] = {} @@ -207,7 +368,6 @@ def _fake_rmtree(path: str | Path, *a: Any, **k: Any) -> None: trace_to_cloud=False, with_ui=False, verbosity="info", - log_level="info", adk_version="1.0.0", session_service_uri=None, artifact_service_uri=None, @@ -215,3 +375,258 @@ def _fake_rmtree(path: str | Path, *a: Any, **k: Any) -> None: ) assert deleted["path"] == tmp_dir + + +def test_to_cloud_run_cleans_temp_dir_on_failure( + monkeypatch: pytest.MonkeyPatch, + agent_dir: Callable[[bool, bool], Path], +) -> None: + """`to_cloud_run` should always delete the temporary folder on exit, even if gcloud fails.""" + tmp_dir = Path(tempfile.mkdtemp()) + src_dir = agent_dir(False, False) + + rmtree_recorder = _Recorder() + monkeypatch.setattr(shutil, "rmtree", rmtree_recorder) + # Make the gcloud command fail + monkeypatch.setattr( + subprocess, + "run", + mock.Mock(side_effect=subprocess.CalledProcessError(1, "gcloud")), + ) + + with pytest.raises(subprocess.CalledProcessError): + cli_deploy.to_cloud_run( + agent_folder=str(src_dir), + project="proj", + region="us-central1", + service_name="svc", + app_name="app", + temp_folder=str(tmp_dir), + port=8080, + trace_to_cloud=False, + with_ui=False, + verbosity="info", + adk_version="1.0.0", + session_service_uri=None, + artifact_service_uri=None, + memory_service_uri=None, + ) + + # Check that rmtree was called on the temp folder in the finally block + assert rmtree_recorder.calls, "shutil.rmtree should have been called" + assert str(rmtree_recorder.get_last_call_args()[0]) == str(tmp_dir) + + +@pytest.mark.usefixtures("mock_vertex_ai") +@pytest.mark.parametrize("has_reqs", [True, False]) +@pytest.mark.parametrize("has_env", [True, False]) +def test_to_agent_engine_happy_path( + monkeypatch: pytest.MonkeyPatch, + agent_dir: Callable[[bool, bool], Path], + tmp_path: Path, + has_reqs: bool, + has_env: bool, +) -> None: + """ + Tests the happy path for the `to_agent_engine` function. + + Verifies: + 1. Source files are copied. + 2. `adk_app.py` is created correctly. + 3. `requirements.txt` is handled (created if not present). + 4. `.env` file is read if present. + 5. `vertexai.init` and `agent_engines.create` are called with the correct args. + 6. Cleanup is performed. + """ + src_dir = agent_dir(has_reqs, has_env) + temp_folder = tmp_path / "build" + rmtree_recorder = _Recorder() + + monkeypatch.setattr(shutil, "rmtree", rmtree_recorder) + + # Execute + cli_deploy.to_agent_engine( + agent_folder=str(src_dir), + temp_folder=str(temp_folder), + adk_app="my_adk_app", + staging_bucket="gs://my-staging-bucket", + trace_to_cloud=True, + project="my-gcp-project", + region="us-central1", + display_name="My Test Agent", + description="A test agent.", + ) + + # 1. Verify file operations + assert (temp_folder / "agent.py").is_file() + assert (temp_folder / "__init__.py").is_file() + + # 2. Verify adk_app.py creation + adk_app_path = temp_folder / "my_adk_app.py" + assert adk_app_path.is_file() + content = adk_app_path.read_text() + assert "from agent import root_agent" in content + assert "adk_app = AdkApp(" in content + assert "enable_tracing=True" in content + + # 3. Verify requirements handling + reqs_path = temp_folder / "requirements.txt" + assert reqs_path.is_file() + if not has_reqs: + # It should have been created with the default content + assert "google-cloud-aiplatform[adk,agent_engines]" in reqs_path.read_text() + + # 4. Verify Vertex AI SDK calls + vertexai = sys.modules["vertexai"] + vertexai.init.assert_called_once_with( + project="my-gcp-project", + location="us-central1", + staging_bucket="gs://my-staging-bucket", + ) + + # 5. Verify env var handling + dotenv = sys.modules["dotenv"] + if has_env: + dotenv.dotenv_values.assert_called_once() + expected_env_vars = {"FILE_VAR": "value"} + else: + dotenv.dotenv_values.assert_not_called() + expected_env_vars = None + + # 6. Verify agent_engines.create call + vertexai.agent_engines.create.assert_called_once() + create_kwargs = vertexai.agent_engines.create.call_args.kwargs + assert create_kwargs["agent_engine"] == "mock-agent-engine-object" + assert create_kwargs["display_name"] == "My Test Agent" + assert create_kwargs["description"] == "A test agent." + assert create_kwargs["requirements"] == str(reqs_path) + assert create_kwargs["extra_packages"] == [str(temp_folder)] + assert create_kwargs["env_vars"] == expected_env_vars + + # 7. Verify cleanup + assert str(rmtree_recorder.get_last_call_args()[0]) == str(temp_folder) + + +@pytest.mark.parametrize("include_requirements", [True, False]) +def test_to_gke_happy_path( + monkeypatch: pytest.MonkeyPatch, + agent_dir: Callable[[bool, bool], Path], + tmp_path: Path, + include_requirements: bool, +) -> None: + """ + Tests the happy path for the `to_gke` function. + + Verifies: + 1. Source files are copied and Dockerfile is created. + 2. `gcloud builds submit` is called to build the image. + 3. `deployment.yaml` is created with the correct content. + 4. `gcloud container get-credentials` and `kubectl apply` are called. + 5. Cleanup is performed. + """ + src_dir = agent_dir(include_requirements, False) + run_recorder = _Recorder() + rmtree_recorder = _Recorder() + + def mock_subprocess_run(*args, **kwargs): + # We still use the recorder to check which commands were called + run_recorder(*args, **kwargs) + + # The command is the first positional argument, e.g., ['kubectl', 'apply', ...] + command_list = args[0] + + # Check if this is the 'kubectl apply' call + if command_list and command_list[0:2] == ["kubectl", "apply"]: + # If it is, return a fake process object with a .stdout attribute + # This mimics the real output from kubectl. + fake_stdout = "deployment.apps/gke-svc created\nservice/gke-svc created" + return types.SimpleNamespace(stdout=fake_stdout) + + # For all other subprocess.run calls (like 'gcloud builds submit'), + # we don't need a return value, so the default None is fine. + return None + + monkeypatch.setattr(subprocess, "run", mock_subprocess_run) + monkeypatch.setattr(shutil, "rmtree", rmtree_recorder) + + # Execute + cli_deploy.to_gke( + agent_folder=str(src_dir), + project="gke-proj", + region="us-east1", + cluster_name="my-gke-cluster", + service_name="gke-svc", + app_name="agent", + temp_folder=str(tmp_path), + port=9090, + trace_to_cloud=False, + with_ui=True, + verbosity="debug", + adk_version="1.2.0", + allow_origins=["http://localhost:3000", "https://my-app.com"], + session_service_uri="sqlite:///", + artifact_service_uri="gs://gke-bucket", + ) + + # 1. Verify Dockerfile (basic check) + dockerfile_path = tmp_path / "Dockerfile" + assert dockerfile_path.is_file() + dockerfile_content = dockerfile_path.read_text() + assert "CMD adk web --port=9090" in dockerfile_content + assert "RUN pip install google-adk==1.2.0" in dockerfile_content + + # 2. Verify command executions by checking each recorded call + assert len(run_recorder.calls) == 3, "Expected 3 subprocess calls" + + # Call 1: gcloud builds submit + build_args = run_recorder.calls[0][0][0] + expected_build_args = [ + "gcloud", + "builds", + "submit", + "--tag", + "gcr.io/gke-proj/gke-svc", + str(tmp_path), + ] + assert build_args == expected_build_args + + # Call 2: gcloud container clusters get-credentials + creds_args = run_recorder.calls[1][0][0] + expected_creds_args = [ + "gcloud", + "container", + "clusters", + "get-credentials", + "my-gke-cluster", + "--region", + "us-east1", + "--project", + "gke-proj", + ] + assert creds_args == expected_creds_args + + assert ( + "--allow_origins=http://localhost:3000,https://my-app.com" + in dockerfile_content + ) + + # Call 3: kubectl apply + apply_args = run_recorder.calls[2][0][0] + expected_apply_args = ["kubectl", "apply", "-f", str(tmp_path)] + assert apply_args == expected_apply_args + + # 3. Verify deployment.yaml content + deployment_yaml_path = tmp_path / "deployment.yaml" + assert deployment_yaml_path.is_file() + yaml_content = deployment_yaml_path.read_text() + + assert "kind: Deployment" in yaml_content + assert "kind: Service" in yaml_content + assert "name: gke-svc" in yaml_content + assert "image: gcr.io/gke-proj/gke-svc" in yaml_content + assert f"containerPort: 9090" in yaml_content + assert f"targetPort: 9090" in yaml_content + assert "type: LoadBalancer" in yaml_content + + # 4. Verify cleanup + assert str(rmtree_recorder.get_last_call_args()[0]) == str(tmp_path) diff --git a/tests/unittests/cli/utils/test_cli_tools_click.py b/tests/unittests/cli/utils/test_cli_tools_click.py index da45442a4..396e72d81 100644 --- a/tests/unittests/cli/utils/test_cli_tools_click.py +++ b/tests/unittests/cli/utils/test_cli_tools_click.py @@ -28,12 +28,12 @@ import click from click.testing import CliRunner -from google.adk.cli import cli_tools_click -from google.adk.evaluation import local_eval_set_results_manager -from google.adk.sessions import Session +import google.adk.evaluation.local_eval_sets_manager as managerModule from pydantic import BaseModel import pytest +from src.google.adk.cli import cli_tools_click + # Helpers class _Recorder(BaseModel): @@ -50,13 +50,14 @@ def __call__(self, *args: Any, **kwargs: Any) -> None: # noqa: D401 def _mute_click(monkeypatch: pytest.MonkeyPatch) -> None: """Suppress click output during tests.""" monkeypatch.setattr(click, "echo", lambda *a, **k: None) - monkeypatch.setattr(click, "secho", lambda *a, **k: None) + # Keep secho for error messages + # monkeypatch.setattr(click, "secho", lambda *a, **k: None) # validate_exclusive def test_validate_exclusive_allows_single() -> None: """Providing exactly one exclusive option should pass.""" - ctx = click.Context(cli_tools_click.main) + ctx = click.Context(cli_tools_click.cli_run) param = SimpleNamespace(name="replay") assert ( cli_tools_click.validate_exclusive(ctx, param, "file.json") == "file.json" @@ -65,7 +66,7 @@ def test_validate_exclusive_allows_single() -> None: def test_validate_exclusive_blocks_multiple() -> None: """Providing two exclusive options should raise UsageError.""" - ctx = click.Context(cli_tools_click.main) + ctx = click.Context(cli_tools_click.cli_run) param1 = SimpleNamespace(name="replay") param2 = SimpleNamespace(name="resume") @@ -156,10 +157,6 @@ def _boom(*_a: Any, **_k: Any) -> None: # noqa: D401 monkeypatch.setattr(cli_tools_click.cli_deploy, "to_cloud_run", _boom) - # intercept click.secho(error=True) output - captured: List[str] = [] - monkeypatch.setattr(click, "secho", lambda msg, **__: captured.append(msg)) - agent_dir = tmp_path / "agent3" agent_dir.mkdir() runner = CliRunner() @@ -168,7 +165,73 @@ def _boom(*_a: Any, **_k: Any) -> None: # noqa: D401 ) assert result.exit_code == 0 - assert any("Deploy failed: boom" in m for m in captured) + assert "Deploy failed: boom" in result.output + + +# cli deploy agent_engine +def test_cli_deploy_agent_engine_success( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Successful path should call cli_deploy.to_agent_engine.""" + rec = _Recorder() + monkeypatch.setattr(cli_tools_click.cli_deploy, "to_agent_engine", rec) + + agent_dir = tmp_path / "agent_ae" + agent_dir.mkdir() + runner = CliRunner() + result = runner.invoke( + cli_tools_click.main, + [ + "deploy", + "agent_engine", + "--project", + "test-proj", + "--region", + "us-central1", + "--staging_bucket", + "gs://mybucket", + str(agent_dir), + ], + ) + assert result.exit_code == 0 + assert rec.calls, "cli_deploy.to_agent_engine must be invoked" + called_kwargs = rec.calls[0][1] + assert called_kwargs.get("project") == "test-proj" + assert called_kwargs.get("region") == "us-central1" + assert called_kwargs.get("staging_bucket") == "gs://mybucket" + + +# cli deploy gke +def test_cli_deploy_gke_success( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """Successful path should call cli_deploy.to_gke.""" + rec = _Recorder() + monkeypatch.setattr(cli_tools_click.cli_deploy, "to_gke", rec) + + agent_dir = tmp_path / "agent_gke" + agent_dir.mkdir() + runner = CliRunner() + result = runner.invoke( + cli_tools_click.main, + [ + "deploy", + "gke", + "--project", + "test-proj", + "--region", + "us-central1", + "--cluster_name", + "my-cluster", + str(agent_dir), + ], + ) + assert result.exit_code == 0 + assert rec.calls, "cli_deploy.to_gke must be invoked" + called_kwargs = rec.calls[0][1] + assert called_kwargs.get("project") == "test-proj" + assert called_kwargs.get("region") == "us-central1" + assert called_kwargs.get("cluster_name") == "my-cluster" # cli eval @@ -176,16 +239,30 @@ def test_cli_eval_missing_deps_raises( tmp_path: Path, monkeypatch: pytest.MonkeyPatch ) -> None: """If cli_eval sub-module is missing, command should raise ClickException.""" - # Ensure .cli_eval is not importable orig_import = builtins.__import__ - def _fake_import(name: str, *a: Any, **k: Any): - if name.endswith(".cli_eval") or name == "google.adk.cli.cli_eval": - raise ModuleNotFoundError() - return orig_import(name, *a, **k) + def _fake_import(name: str, globals=None, locals=None, fromlist=(), level=0): + if name == "google.adk.cli.cli_eval" or (level > 0 and "cli_eval" in name): + raise ModuleNotFoundError(f"Simulating missing {name}") + return orig_import(name, globals, locals, fromlist, level) monkeypatch.setattr(builtins, "__import__", _fake_import) + agent_dir = tmp_path / "agent_missing_deps" + agent_dir.mkdir() + (agent_dir / "__init__.py").touch() + eval_file = tmp_path / "dummy.json" + eval_file.touch() + + runner = CliRunner() + result = runner.invoke( + cli_tools_click.main, + ["eval", str(agent_dir), str(eval_file)], + ) + assert result.exit_code != 0 + assert isinstance(result.exception, SystemExit) + assert cli_tools_click.MISSING_EVAL_DEPENDENCIES_MESSAGE in result.output + # cli web & api_server (uvicorn patched) @pytest.fixture() @@ -207,18 +284,18 @@ def run(self) -> None: monkeypatch.setattr( cli_tools_click.uvicorn, "Server", lambda *_a, **_k: _DummyServer() ) - monkeypatch.setattr( - cli_tools_click, "get_fast_api_app", lambda **_k: object() - ) return rec def test_cli_web_invokes_uvicorn( - tmp_path: Path, _patch_uvicorn: _Recorder + tmp_path: Path, _patch_uvicorn: _Recorder, monkeypatch: pytest.MonkeyPatch ) -> None: """`adk web` should configure and start uvicorn.Server.run.""" agents_dir = tmp_path / "agents" agents_dir.mkdir() + monkeypatch.setattr( + cli_tools_click, "get_fast_api_app", lambda **_k: object() + ) runner = CliRunner() result = runner.invoke(cli_tools_click.main, ["web", str(agents_dir)]) assert result.exit_code == 0 @@ -226,148 +303,76 @@ def test_cli_web_invokes_uvicorn( def test_cli_api_server_invokes_uvicorn( - tmp_path: Path, _patch_uvicorn: _Recorder + tmp_path: Path, _patch_uvicorn: _Recorder, monkeypatch: pytest.MonkeyPatch ) -> None: """`adk api_server` should configure and start uvicorn.Server.run.""" agents_dir = tmp_path / "agents_api" agents_dir.mkdir() + monkeypatch.setattr( + cli_tools_click, "get_fast_api_app", lambda **_k: object() + ) runner = CliRunner() result = runner.invoke(cli_tools_click.main, ["api_server", str(agents_dir)]) assert result.exit_code == 0 assert _patch_uvicorn.calls, "uvicorn.Server.run must be called" -def test_cli_eval_success_path( - tmp_path: Path, monkeypatch: pytest.MonkeyPatch +def test_cli_web_passes_service_uris( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, _patch_uvicorn: _Recorder ) -> None: - """Test the success path of `adk eval` by fully executing it with a stub module, up to summary generation.""" - import asyncio - import sys - import types - - # stub cli_eval module - stub = types.ModuleType("google.adk.cli.cli_eval") - eval_sets_manager_stub = types.ModuleType( - "google.adk.evaluation.local_eval_sets_manager" - ) - - class _EvalMetric: - - def __init__(self, metric_name: str, threshold: float) -> None: - ... - - class _EvalCaseResult(BaseModel): - eval_set_id: str - eval_id: str - final_eval_status: Any - user_id: str - session_id: str - session_details: Optional[Session] = None - eval_metric_results: list = {} - overall_eval_metric_results: list = {} - eval_metric_result_per_invocation: list = {} - - class EvalCase(BaseModel): - eval_id: str - - class EvalSet(BaseModel): - eval_set_id: str - eval_cases: list[EvalCase] - - def mock_save_eval_set_result(cls, *args, **kwargs): - return None + """`adk web` should pass service URIs to get_fast_api_app.""" + agents_dir = tmp_path / "agents" + agents_dir.mkdir() - monkeypatch.setattr( - local_eval_set_results_manager.LocalEvalSetResultsManager, - "save_eval_set_result", - mock_save_eval_set_result, - ) + mock_get_app = _Recorder() + monkeypatch.setattr(cli_tools_click, "get_fast_api_app", mock_get_app) - # minimal enum-like namespace - _EvalStatus = types.SimpleNamespace(PASSED="PASSED", FAILED="FAILED") - - # helper funcs - stub.EvalMetric = _EvalMetric - stub.EvalCaseResult = _EvalCaseResult - stub.EvalStatus = _EvalStatus - stub.MISSING_EVAL_DEPENDENCIES_MESSAGE = "stub msg" - - stub.get_evaluation_criteria_or_default = lambda _p: {"foo": 1.0} - stub.get_root_agent = lambda _p: object() - stub.try_get_reset_func = lambda _p: None - stub.parse_and_get_evals_to_run = lambda _paths: {"set1.json": ["e1", "e2"]} - eval_sets_manager_stub.load_eval_set_from_file = lambda x, y: EvalSet( - eval_set_id="test_eval_set_id", - eval_cases=[EvalCase(eval_id="e1"), EvalCase(eval_id="e2")], + runner = CliRunner() + result = runner.invoke( + cli_tools_click.main, + [ + "web", + str(agents_dir), + "--session_service_uri", + "sqlite:///test.db", + "--artifact_service_uri", + "gs://mybucket", + "--memory_service_uri", + "rag://mycorpus", + ], ) + assert result.exit_code == 0 + assert mock_get_app.calls + called_kwargs = mock_get_app.calls[0][1] + assert called_kwargs.get("session_service_uri") == "sqlite:///test.db" + assert called_kwargs.get("artifact_service_uri") == "gs://mybucket" + assert called_kwargs.get("memory_service_uri") == "rag://mycorpus" - # Create an async generator function for run_evals - async def mock_run_evals(*_a, **_k): - yield _EvalCaseResult( - eval_set_id="set1.json", - eval_id="e1", - final_eval_status=_EvalStatus.PASSED, - user_id="user", - session_id="session1", - overall_eval_metric_results=[{ - "metricName": "some_metric", - "threshold": 0.0, - "score": 1.0, - "evalStatus": _EvalStatus.PASSED, - }], - ) - yield _EvalCaseResult( - eval_set_id="set1.json", - eval_id="e2", - final_eval_status=_EvalStatus.FAILED, - user_id="user", - session_id="session2", - overall_eval_metric_results=[{ - "metricName": "some_metric", - "threshold": 0.0, - "score": 0.0, - "evalStatus": _EvalStatus.FAILED, - }], - ) - - stub.run_evals = mock_run_evals - - # Replace asyncio.run with a function that properly handles coroutines - def mock_asyncio_run(coro): - # Create a new event loop - loop = asyncio.new_event_loop() - try: - return loop.run_until_complete(coro) - finally: - loop.close() - - monkeypatch.setattr(cli_tools_click.asyncio, "run", mock_asyncio_run) - - # inject stub - monkeypatch.setitem(sys.modules, "google.adk.cli.cli_eval", stub) - monkeypatch.setitem( - sys.modules, - "google.adk.evaluation.local_eval_sets_manager", - eval_sets_manager_stub, - ) - # create dummy agent directory - agent_dir = tmp_path / "agent5" - agent_dir.mkdir() - (agent_dir / "__init__.py").touch() +def test_cli_web_passes_deprecated_uris( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch, _patch_uvicorn: _Recorder +) -> None: + """`adk web` should use deprecated URIs if new ones are not provided.""" + agents_dir = tmp_path / "agents" + agents_dir.mkdir() - # inject monkeypatch - monkeypatch.setattr( - cli_tools_click.envs, "load_dotenv_for_agent", lambda *a, **k: None - ) + mock_get_app = _Recorder() + monkeypatch.setattr(cli_tools_click, "get_fast_api_app", mock_get_app) runner = CliRunner() result = runner.invoke( cli_tools_click.main, - ["eval", str(agent_dir), str(tmp_path / "dummy_eval.json")], + [ + "web", + str(agents_dir), + "--session_db_url", + "sqlite:///deprecated.db", + "--artifact_storage_uri", + "gs://deprecated", + ], ) - assert result.exit_code == 0 - assert "Eval Run Summary" in result.output - assert "Tests passed: 1" in result.output - assert "Tests failed: 1" in result.output + assert mock_get_app.calls + called_kwargs = mock_get_app.calls[0][1] + assert called_kwargs.get("session_service_uri") == "sqlite:///deprecated.db" + assert called_kwargs.get("artifact_service_uri") == "gs://deprecated"