From 7ddd66d30dfff03d82028e5cf1cac6850260f549 Mon Sep 17 00:00:00 2001 From: Summer Date: Tue, 24 Jun 2025 17:25:49 +0000 Subject: [PATCH 1/4] [06/24] Add gke_code_executor.py --- src/google/adk/code_executors/__init__.py | 11 + .../adk/code_executors/gke_code_executor.py | 223 ++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 src/google/adk/code_executors/gke_code_executor.py diff --git a/src/google/adk/code_executors/__init__.py b/src/google/adk/code_executors/__init__.py index c0f1046f7..3797ea03b 100644 --- a/src/google/adk/code_executors/__init__.py +++ b/src/google/adk/code_executors/__init__.py @@ -49,3 +49,14 @@ ' Executor with agents, please install it. If not, you can ignore this' ' warning.' ) + +try: + from .gke_code_executor import GkeCodeExecutor + + __all__.append('GkeCodeExecutor') +except ImportError: + logger.debug( + 'The kubernetes sdk is not installed. If you want to use the GKE Code' + ' Executor with agents, please install it. If not, you can ignore this' + ' warning.' + ) \ No newline at end of file diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py new file mode 100644 index 000000000..dba2dcb4d --- /dev/null +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -0,0 +1,223 @@ +import logging +import uuid +from typing import Any + +from google.adk.agents.invocation_context import InvocationContext +from google.adk.code_executors.base_code_executor import BaseCodeExecutor +from google.adk.code_executors.code_execution_utils import CodeExecutionInput, CodeExecutionResult + +from kubernetes import client, config +from kubernetes.client.rest import ApiException +from kubernetes.watch import Watch + +logger = logging.getLogger(__name__) + +class GkeCodeExecutor(BaseCodeExecutor): + """ + A secure, robust, and efficient code executor that runs Python code in a + sandboxed gVisor Pod on GKE. + + Features includes: + - Secure code execution via ConfigMaps and a strict security context. + - Kubernetes-native job and pod garbage collection via TTL. + - Efficient, event-driven waiting using the Kubernetes watch API. + - Explicit resource limits to prevent abuse. + """ + namespace: str = "default" + image: str = "python:3.11-slim" + timeout_seconds: int = 3000 + cpu_limit: str = "500m" + mem_limit: str = "512Mi" + use_gvisor_sandbox: bool = True + + _batch_v1: Any = None + _core_v1: Any = None + + def __init__(self, **data): + """ + Initializes the Pydantic model and the Kubernetes clients. + """ + super().__init__(**data) + + try: + config.load_incluster_config() + logger.info("Using in-cluster Kubernetes configuration.") + except config.ConfigException: + logger.info("In-cluster config not found. Falling back to local kubeconfig.") + config.load_kube_config() + + self._batch_v1 = client.BatchV1Api() + self._core_v1 = client.CoreV1Api() + + def execute_code( + self, + invocation_context: InvocationContext, + code_execution_input: CodeExecutionInput, + ) -> CodeExecutionResult: + """ + Orchestrates the secure execution of a code snippet on GKE. + """ + job_name = f"adk-exec-{uuid.uuid4().hex[:10]}" + configmap_name = f"code-src-{job_name}" + + try: + # 1. Create a ConfigMap to hold the code securely. + self._create_code_configmap(configmap_name, code_execution_input.code) + # 2. Create the Job manifest with all security features. + job_manifest = self._create_job_manifest(job_name, configmap_name) + # 3. Create and run the Job on the cluster. + self._batch_v1.create_namespaced_job( + body=job_manifest, namespace=self.namespace + ) + logger.info(f"Submitted Job '{job_name}' to namespace '{self.namespace}'.") + # 4. Efficiently watch for the Job's completion. + return self._watch_job_completion(job_name) + + except Exception as e: + logger.error( + f"An unexpected error occurred during execution of job '{job_name}': {e}", + exc_info=True, + ) + return CodeExecutionResult(stderr=f"Executor failed: {e}") + finally: + # 5. Always clean up the ConfigMap. The Job is cleaned up by Kubernetes. + self._cleanup_configmap(configmap_name) + + def _create_job_manifest(self, job_name: str, configmap_name: str) -> client.V1Job: + """Creates the complete V1Job object with security best practices.""" + # Define the container that will run the code. + container = client.V1Container( + name="code-runner", + image=self.image, + command=["python3", "/app/code.py"], + volume_mounts=[ + client.V1VolumeMount(name="code-volume", mount_path="/app") + ], + # BEST PRACTICE: Enforce a strict security context. + security_context=client.V1SecurityContext( + run_as_non_root=True, + run_as_user=1001, + allow_privilege_escalation=False, + read_only_root_filesystem=True, + capabilities=client.V1Capabilities(drop=["ALL"]), + ), + # BEST PRACTICE: Set resource limits to prevent abuse. + resources=client.V1ResourceRequirements( + requests={"cpu": "100m", "memory": "128Mi"}, + limits={"cpu": self.cpu_limit, "memory": self.mem_limit}, + ), + ) + + # Pod Spec Customization for A/B Testing + pod_spec_args = { + "restart_policy": "Never", + "containers": [container], + "volumes": [ + client.V1Volume( + name="code-volume", + config_map=client.V1ConfigMapVolumeSource(name=configmap_name), + ) + ], + } + + if self.use_gvisor_sandbox: + pod_spec_args["runtime_class_name"] = "gvisor" + pod_spec_args["node_selector"] = { + "cloud.google.com/gke-nodepool": "gvisor-nodepool" + } + pod_spec_args["tolerations"] = [ + client.V1Toleration( + key="sandbox.gke.io/runtime", + operator="Equal", + value="gvisor", + effect="NoSchedule", + ) + ] + else: + pod_spec_args["node_selector"] = { + "cloud.google.com/gke-nodepool": "standard-nodepool" + } + + # Define the pod spec, mounting the code and targeting gVisor. + pod_spec = client.V1PodSpec(**pod_spec_args) + + # Define the Job specification. + job_spec = client.V1JobSpec( + template=client.V1PodTemplateSpec(spec=pod_spec), + backoff_limit=0, # Do not retry the Job on failure. + # BEST PRACTICE: Let the Kubernetes TTL controller handle cleanup. + # This is more robust than client-side cleanup. + ttl_seconds_after_finished=600, # Garbage collect after 10 minutes. + ) + + # Assemble and return the final Job object. + return client.V1Job( + api_version="batch/v1", + kind="Job", + metadata=client.V1ObjectMeta(name=job_name), + spec=job_spec, + ) + + def _watch_job_completion(self, job_name: str) -> CodeExecutionResult: + """Uses the watch API to efficiently wait for job completion.""" + watch = Watch() + try: + for event in watch.stream( + self._batch_v1.list_namespaced_job, + namespace=self.namespace, + field_selector=f"metadata.name={job_name}", + timeout_seconds=self.timeout_seconds, + ): + job = event["object"] + if job.status.succeeded: + watch.stop() + logger.info(f"Job '{job_name}' succeeded.") + logs = self._get_pod_logs(job_name) + return CodeExecutionResult(stdout=logs) + if job.status.failed: + watch.stop() + logger.error(f"Job '{job_name}' failed.") + logs = self._get_pod_logs(job_name) + return CodeExecutionResult(stderr=f"Job failed. Logs:\n{logs}") + + # If the loop finishes without returning, the watch timed out. + raise TimeoutError( + f"Job '{job_name}' did not complete within {self.timeout_seconds}s." + ) + finally: + watch.stop() + + def _get_pod_logs(self, job_name: str) -> str: + """Retrieves logs from the pod created by the specified job.""" + try: + pods = self._core_v1.list_namespaced_pod( + namespace=self.namespace, label_selector=f"job-name={job_name}", limit=1 + ) + if not pods.items: + return "Error: Could not find pod for job." + pod_name = pods.items[0].metadata.name + + return self._core_v1.read_namespaced_pod_log( + name=pod_name, namespace=self.namespace + ) + except ApiException as e: + logger.error(f"Could not retrieve logs for job '{job_name}': {e}") + return f"Error retrieving logs: {e.reason}" + + def _create_code_configmap(self, name: str, code: str) -> None: + """Creates a ConfigMap to hold the Python code.""" + body = client.V1ConfigMap( + metadata=client.V1ObjectMeta(name=name), data={"code.py": code} + ) + self._core_v1.create_namespaced_config_map( + namespace=self.namespace, body=body + ) + + def _cleanup_configmap(self, name: str) -> None: + """Deletes a ConfigMap.""" + try: + self._core_v1.delete_namespaced_config_map(name=name, namespace=self.namespace) + logger.info(f"Cleaned up ConfigMap '{name}'.") + except ApiException as e: + if e.status != 404: + logger.warning(f"Could not delete ConfigMap '{name}': {e.reason}") From e8635b98bd709fe2955a425646648c9b4fad4d8a Mon Sep 17 00:00:00 2001 From: Summer Date: Tue, 24 Jun 2025 19:20:42 +0000 Subject: [PATCH 2/4] [06/24] Add gke_code_executor.py --- .../adk/code_executors/gke_code_executor.py | 119 +++++++++--------- 1 file changed, 58 insertions(+), 61 deletions(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index dba2dcb4d..f0abeb982 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -1,6 +1,6 @@ import logging import uuid -from typing import Any +from typing import Optional from google.adk.agents.invocation_context import InvocationContext from google.adk.code_executors.base_code_executor import BaseCodeExecutor @@ -13,37 +13,42 @@ logger = logging.getLogger(__name__) class GkeCodeExecutor(BaseCodeExecutor): - """ - A secure, robust, and efficient code executor that runs Python code in a - sandboxed gVisor Pod on GKE. + """Executes Python code in a secure gVisor-sandboxed Pod on GKE. + + This executor securely runs code by dynamically creating a Kubernetes Job for + each execution request. The user's code is mounted via a ConfigMap, and the + Pod is hardened with a strict security context and resource limits. - Features includes: - - Secure code execution via ConfigMaps and a strict security context. - - Kubernetes-native job and pod garbage collection via TTL. + Key Features: + - Sandboxed execution using the gVisor runtime. + - Ephemeral, per-execution environments using Kubernetes Jobs. + - Secure-by-default Pod configuration (non-root, no privileges). + - Automatic garbage collection of completed Jobs and Pods via TTL. - Efficient, event-driven waiting using the Kubernetes watch API. - - Explicit resource limits to prevent abuse. """ namespace: str = "default" image: str = "python:3.11-slim" - timeout_seconds: int = 3000 + timeout_seconds: int = 300 cpu_limit: str = "500m" mem_limit: str = "512Mi" - use_gvisor_sandbox: bool = True - _batch_v1: Any = None - _core_v1: Any = None + _batch_v1: client.BatchV1Api + _core_v1: client.CoreV1Api def __init__(self, **data): - """ - Initializes the Pydantic model and the Kubernetes clients. + """Initializes the executor and the Kubernetes API clients. + + This constructor supports overriding default class attributes (like + 'namespace', 'image', etc.) by passing them as keyword arguments. It + automatically configures the Kubernetes client to work either within a + cluster (in-cluster config) or locally using a kubeconfig file. """ super().__init__(**data) - try: config.load_incluster_config() logger.info("Using in-cluster Kubernetes configuration.") except config.ConfigException: - logger.info("In-cluster config not found. Falling back to local kubeconfig.") + logger.info("In-cluster config not found. Falling back to kubeconfig.") config.load_kube_config() self._batch_v1 = client.BatchV1Api() @@ -54,33 +59,40 @@ def execute_code( invocation_context: InvocationContext, code_execution_input: CodeExecutionInput, ) -> CodeExecutionResult: - """ - Orchestrates the secure execution of a code snippet on GKE. - """ + """Orchestrates the secure execution of a code snippet on GKE.""" job_name = f"adk-exec-{uuid.uuid4().hex[:10]}" configmap_name = f"code-src-{job_name}" try: - # 1. Create a ConfigMap to hold the code securely. self._create_code_configmap(configmap_name, code_execution_input.code) - # 2. Create the Job manifest with all security features. job_manifest = self._create_job_manifest(job_name, configmap_name) - # 3. Create and run the Job on the cluster. + self._batch_v1.create_namespaced_job( body=job_manifest, namespace=self.namespace ) logger.info(f"Submitted Job '{job_name}' to namespace '{self.namespace}'.") - # 4. Efficiently watch for the Job's completion. - return self._watch_job_completion(job_name) + return self._watch_for_job_completion(job_name) + except ApiException as e: + logger.error( + "A Kubernetes API error occurred during job" + f" '{job_name}': {e.reason}", + exc_info=True, + ) + return CodeExecutionResult(stderr=f"Kubernetes API error: {e.reason}") + except TimeoutError as e: + logger.error(e, exc_info=True) + logs = self._get_pod_logs(job_name) + stderr = f"Executor timed out: {e}\n\nPod Logs:\n{logs}" + return CodeExecutionResult(stderr=stderr) except Exception as e: logger.error( - f"An unexpected error occurred during execution of job '{job_name}': {e}", + f"An unexpected error occurred during job '{job_name}': {e}", exc_info=True, ) - return CodeExecutionResult(stderr=f"Executor failed: {e}") + return CodeExecutionResult(stderr=f"An unexpected executor error occurred: {e}") finally: - # 5. Always clean up the ConfigMap. The Job is cleaned up by Kubernetes. + # The Job is cleaned up by the TTL controller, and we ensure the ConfigMap is always deleted. self._cleanup_configmap(configmap_name) def _create_job_manifest(self, job_name: str, configmap_name: str) -> client.V1Job: @@ -93,7 +105,7 @@ def _create_job_manifest(self, job_name: str, configmap_name: str) -> client.V1J volume_mounts=[ client.V1VolumeMount(name="code-volume", mount_path="/app") ], - # BEST PRACTICE: Enforce a strict security context. + # Enforce a strict security context. security_context=client.V1SecurityContext( run_as_non_root=True, run_as_user=1001, @@ -101,52 +113,38 @@ def _create_job_manifest(self, job_name: str, configmap_name: str) -> client.V1J read_only_root_filesystem=True, capabilities=client.V1Capabilities(drop=["ALL"]), ), - # BEST PRACTICE: Set resource limits to prevent abuse. + # Set resource limits to prevent abuse. resources=client.V1ResourceRequirements( requests={"cpu": "100m", "memory": "128Mi"}, limits={"cpu": self.cpu_limit, "memory": self.mem_limit}, ), ) - # Pod Spec Customization for A/B Testing - pod_spec_args = { - "restart_policy": "Never", - "containers": [container], - "volumes": [ + # Use tolerations to request a gVisor node. + pod_spec = client.V1PodSpec( + restart_policy="Never", + containers=[container], + volumes=[ client.V1Volume( name="code-volume", config_map=client.V1ConfigMapVolumeSource(name=configmap_name), ) ], - } - - if self.use_gvisor_sandbox: - pod_spec_args["runtime_class_name"] = "gvisor" - pod_spec_args["node_selector"] = { - "cloud.google.com/gke-nodepool": "gvisor-nodepool" - } - pod_spec_args["tolerations"] = [ + runtime_class_name="gvisor", # Request the gVisor runtime. + tolerations=[ client.V1Toleration( key="sandbox.gke.io/runtime", operator="Equal", value="gvisor", effect="NoSchedule", ) - ] - else: - pod_spec_args["node_selector"] = { - "cloud.google.com/gke-nodepool": "standard-nodepool" - } - - # Define the pod spec, mounting the code and targeting gVisor. - pod_spec = client.V1PodSpec(**pod_spec_args) + ], + ) - # Define the Job specification. job_spec = client.V1JobSpec( template=client.V1PodTemplateSpec(spec=pod_spec), backoff_limit=0, # Do not retry the Job on failure. - # BEST PRACTICE: Let the Kubernetes TTL controller handle cleanup. - # This is more robust than client-side cleanup. + # Kubernetes TTL controller will handle Job/Pod cleanup. ttl_seconds_after_finished=600, # Garbage collect after 10 minutes. ) @@ -162,10 +160,11 @@ def _watch_job_completion(self, job_name: str) -> CodeExecutionResult: """Uses the watch API to efficiently wait for job completion.""" watch = Watch() try: + field_selector = f"metadata.name={job_name}" for event in watch.stream( self._batch_v1.list_namespaced_job, namespace=self.namespace, - field_selector=f"metadata.name={job_name}", + field_selector=field_selector, timeout_seconds=self.timeout_seconds, ): job = event["object"] @@ -179,13 +178,13 @@ def _watch_job_completion(self, job_name: str) -> CodeExecutionResult: logger.error(f"Job '{job_name}' failed.") logs = self._get_pod_logs(job_name) return CodeExecutionResult(stderr=f"Job failed. Logs:\n{logs}") + finally: + watch.stop() # If the loop finishes without returning, the watch timed out. raise TimeoutError( f"Job '{job_name}' did not complete within {self.timeout_seconds}s." ) - finally: - watch.stop() def _get_pod_logs(self, job_name: str) -> str: """Retrieves logs from the pod created by the specified job.""" @@ -194,14 +193,14 @@ def _get_pod_logs(self, job_name: str) -> str: namespace=self.namespace, label_selector=f"job-name={job_name}", limit=1 ) if not pods.items: + logger.warning(f"Could not find Pod for Job '{job_name}' to retrieve logs.") return "Error: Could not find pod for job." pod_name = pods.items[0].metadata.name - return self._core_v1.read_namespaced_pod_log( name=pod_name, namespace=self.namespace ) except ApiException as e: - logger.error(f"Could not retrieve logs for job '{job_name}': {e}") + logger.error(f"API error retrieving logs for job '{job_name}': {e.reason}") return f"Error retrieving logs: {e.reason}" def _create_code_configmap(self, name: str, code: str) -> None: @@ -209,9 +208,7 @@ def _create_code_configmap(self, name: str, code: str) -> None: body = client.V1ConfigMap( metadata=client.V1ObjectMeta(name=name), data={"code.py": code} ) - self._core_v1.create_namespaced_config_map( - namespace=self.namespace, body=body - ) + self._core_v1.create_namespaced_config_map(namespace=self.namespace, body=body) def _cleanup_configmap(self, name: str) -> None: """Deletes a ConfigMap.""" From 3dd917ba6fce0b27720a4e9d1e28b20571538f09 Mon Sep 17 00:00:00 2001 From: Summer Date: Tue, 24 Jun 2025 19:22:04 +0000 Subject: [PATCH 3/4] [06/24] Add gke_code_executor.py --- src/google/adk/code_executors/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/google/adk/code_executors/__init__.py b/src/google/adk/code_executors/__init__.py index 3797ea03b..aff6477be 100644 --- a/src/google/adk/code_executors/__init__.py +++ b/src/google/adk/code_executors/__init__.py @@ -59,4 +59,4 @@ 'The kubernetes sdk is not installed. If you want to use the GKE Code' ' Executor with agents, please install it. If not, you can ignore this' ' warning.' - ) \ No newline at end of file + ) From bdbda057f625870ae83fab338e33303fd26712b5 Mon Sep 17 00:00:00 2001 From: Summer Date: Tue, 24 Jun 2025 19:47:34 +0000 Subject: [PATCH 4/4] [06/24] Add gke_code_executor.py --- src/google/adk/code_executors/gke_code_executor.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/google/adk/code_executors/gke_code_executor.py b/src/google/adk/code_executors/gke_code_executor.py index f0abeb982..ecd847aa7 100644 --- a/src/google/adk/code_executors/gke_code_executor.py +++ b/src/google/adk/code_executors/gke_code_executor.py @@ -1,6 +1,5 @@ import logging import uuid -from typing import Optional from google.adk.agents.invocation_context import InvocationContext from google.adk.code_executors.base_code_executor import BaseCodeExecutor