Skip to content

Commit 65c4de8

Browse files
committed
Small fixes and better docstrings
1 parent ddb368a commit 65c4de8

File tree

12 files changed

+232
-32
lines changed

12 files changed

+232
-32
lines changed

examples/weather_agent/pipelines/weather_agent.py

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Weather Agent Pipeline."""
22

33
import os
4+
import time
45

56
from pipelines.hooks import (
67
InitConfig,
@@ -10,9 +11,10 @@
1011
from steps import analyze_weather_with_llm, get_weather
1112

1213
from zenml import pipeline
13-
from zenml.config import DockerSettings
14+
from zenml.config import DeploymentSettings, DockerSettings
1415

1516
# Import enums for type-safe capture mode configuration
17+
from zenml.config.deployment_settings import MiddlewareSpec
1618
from zenml.config.docker_settings import PythonPackageInstaller
1719
from zenml.config.resource_settings import ResourceSettings
1820

@@ -22,6 +24,58 @@
2224
python_package_installer=PythonPackageInstaller.UV,
2325
)
2426

27+
28+
class RequestTimingMiddleware:
29+
"""ASGI middleware to measure request processing time.
30+
31+
Uses the standard ASGI interface (scope, receive, send) which works
32+
across all ASGI frameworks: FastAPI, Django, Starlette, Quart, etc.
33+
"""
34+
35+
def __init__(self, app):
36+
self.app = app
37+
38+
async def __call__(self, scope, receive, send):
39+
"""Process ASGI request with timing measurement.
40+
41+
Args:
42+
scope: ASGI connection scope (contains request info).
43+
receive: Async callable to receive ASGI events.
44+
send: Async callable to send ASGI events.
45+
"""
46+
if scope["type"] != "http":
47+
return await self.app(scope, receive, send)
48+
49+
start_time = time.time()
50+
51+
async def send_wrapper(message):
52+
"""Intercept response to add timing header."""
53+
if message["type"] == "http.response.start":
54+
process_time = (time.time() - start_time) * 1000
55+
headers = list(message.get("headers", []))
56+
headers.append(
57+
(
58+
b"x-process-time-ms",
59+
str(process_time).encode(),
60+
)
61+
)
62+
message = {**message, "headers": headers}
63+
64+
await send(message)
65+
66+
await self.app(scope, receive, send_wrapper)
67+
68+
69+
deployment_settings = DeploymentSettings(
70+
custom_middlewares=[
71+
MiddlewareSpec(
72+
middleware=RequestTimingMiddleware,
73+
order=10,
74+
native=True,
75+
),
76+
],
77+
)
78+
2579
environment = {}
2680
if os.getenv("OPENAI_API_KEY"):
2781
environment["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
@@ -34,6 +88,7 @@
3488
on_cleanup=cleanup_hook,
3589
settings={
3690
"docker": docker_settings,
91+
"deployment": deployment_settings,
3792
"deployer.gcp": {
3893
"allow_unauthenticated": True,
3994
# "location": "us-central1",

src/zenml/config/deployment_settings.py

Lines changed: 124 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -446,16 +446,131 @@ class SecureHeadersConfig(BaseModel):
446446

447447

448448
class DeploymentSettings(BaseSettings):
449-
"""Settings for the pipeline deployment."""
449+
"""Settings for the pipeline deployment.
450+
451+
Use these settings to fully customize all aspects of the uvicorn web server
452+
and ASGI web application that constitute the pipeline deployment.
453+
454+
Note that these settings are only available at the pipeline level.
455+
456+
The following customizations can be used to configure aspects that are
457+
framework-agnostic (i.e. not specific to a particular ASGI framework like
458+
FastAPI, Django, Flask, etc.):
459+
460+
* the ASGI application details: `app_title`, `app_description`,
461+
`app_version` and `app_kwargs`
462+
* the URL paths for the various built-in endpoints: `root_url_path`,
463+
`docs_url_path`, `redoc_url_path`, `invoke_url_path`, `health_url_path`,
464+
`info_url_path` and `metrics_url_path`
465+
* the location of dashboard static files can be provided to replace the
466+
default UI that is included with the deployment ASGI application:
467+
`dashboard_files_path`
468+
* the CORS configuration: `cors`
469+
* the secure headers configuration: `secure_headers`
470+
* the thread pool size: `thread_pool_size`
471+
* custom application startup and shutdown hooks: `startup_hook_source`,
472+
`shutdown_hook_source`, `startup_hook_kwargs` and `shutdown_hook_kwargs`
473+
* uvicorn server configuration: `uvicorn_host`, `uvicorn_port`,
474+
`uvicorn_workers` and `uvicorn_kwargs`
475+
476+
In addition to the above, the following advanced features can be used to
477+
customize the implementation-specific details of the deployment application:
478+
479+
* custom endpoints (e.g. custom metrics, custom health, etc.): `custom_endpoints`
480+
* custom middlewares (e.g. authentication, logging, etc.): `custom_middlewares`
481+
* application building extensions - these are pluggable components that can
482+
be used to add advanced framework-specific features like custom authentication,
483+
logging, metrics, etc.: `app_extensions`
484+
485+
Ultimately, if neither of the above are sufficient, the user can provide a
486+
custom implementations for the two core components that are used to build
487+
and run the deployment application itself:
488+
489+
* the deployment app runner - this is the component that is responsible for
490+
building and running the ASGI application. It is represented by the
491+
`zenml.deployers.server.BaseDeploymentAppRunner` class.
492+
See: `deployment_app_runner_source` and `deployment_app_runner_kwargs`
493+
* the deployment service - this is the component that is responsible for
494+
handling the business logic of the pipeline deployment. It is represented by
495+
the `zenml.deployers.server.BaseDeploymentService` class. See:
496+
`deployment_service_source` and `deployment_service_kwargs`
497+
498+
Both of these base classes or their existing implementations can be extended
499+
and provided as sources in the deployment settings to be loaded at runtime.
450500
451-
# This settings is only available at the pipeline level
501+
Attributes:
502+
app_title: Title of the deployment application.
503+
app_description: Description of the deployment application.
504+
app_version: Version of the deployment application.
505+
app_kwargs: Arbitrary framework-specific keyword arguments to be passed
506+
to the deployment ASGI application constructor.
507+
508+
include_default_endpoints: Whether to include the default endpoints in
509+
the ASGI application.
510+
include_default_middleware: Whether to include the default middleware
511+
in the ASGI application.
512+
513+
root_url_path: Root URL path.
514+
docs_url_path: URL path for the OpenAPI documentation endpoint.
515+
redoc_url_path: URL path for the Redoc documentation endpoint.
516+
invoke_url_path: URL path for the invoke endpoint.
517+
health_url_path: URL path for the health check endpoint.
518+
info_url_path: URL path for the info endpoint.
519+
metrics_url_path: URL path for the metrics endpoint.
520+
dashboard_files_path: Path where the dashboard static files are located.
521+
This can be used to replace the default UI that is included with the
522+
deployment ASGI application. The referenced directory must contain
523+
at a minimum an `index.html` file and a `assets` directory. The path
524+
can be absolute or relative to the root of the repository
525+
initialized with `zenml init` or relative to the current working
526+
directory.
527+
528+
cors: Configuration for CORS.
529+
secure_headers: Configuration for secure headers.
530+
thread_pool_size: Size of the thread pool for the ASGI application.
531+
532+
startup_hook: Custom startup hook for the ASGI application.
533+
shutdown_hook: Custom shutdown hook for the ASGI application.
534+
startup_hook_kwargs: Keyword arguments for the startup hook.
535+
shutdown_hook_kwargs: Keyword arguments for the shutdown hook.
536+
537+
custom_endpoints: Custom endpoints for the ASGI application. See the
538+
`EndpointSpec` class for more details.
539+
custom_middlewares: Custom middlewares for the ASGI application. See the
540+
`MiddlewareSpec` class for more details.
541+
app_extensions: App extensions used to build the ASGI application. See
542+
the `AppExtensionSpec` class for more details.
543+
544+
uvicorn_host: Host of the uvicorn server.
545+
uvicorn_port: Port of the uvicorn server.
546+
uvicorn_workers: Number of workers for the uvicorn server.
547+
log_level: Log level for the deployment application.
548+
uvicorn_kwargs: Keyword arguments for the uvicorn server.
549+
550+
deployment_app_runner_source: Source of the deployment app runner. Must
551+
point to a class that extends the
552+
`zenml.deployers.server.BaseDeploymentAppRunner` class.
553+
deployment_app_runner_kwargs: Keyword arguments for the deployment app
554+
runner. These will be passed to the constructor of the deployment app
555+
runner class.
556+
deployment_service_source: Source of the deployment service. Must point
557+
to a class that extends the
558+
`zenml.deployers.server.BaseDeploymentService` class.
559+
deployment_service_kwargs: Keyword arguments for the deployment service.
560+
These will be passed to the constructor of the deployment service class.
561+
"""
562+
563+
# These settings are only available at the pipeline level
452564
LEVEL: ClassVar[ConfigurationLevel] = ConfigurationLevel.PIPELINE
453565

454566
app_title: Optional[str] = None
455567
app_description: Optional[str] = None
456568
app_version: Optional[str] = None
457569
app_kwargs: Dict[str, Any] = {}
458570

571+
include_default_endpoints: bool = True
572+
include_default_middleware: bool = True
573+
459574
root_url_path: str = DEFAULT_DEPLOYMENT_APP_ROOT_URL_PATH
460575
docs_url_path: str = DEFAULT_DEPLOYMENT_APP_DOCS_URL_PATH
461576
redoc_url_path: str = DEFAULT_DEPLOYMENT_APP_REDOC_URL_PATH
@@ -464,15 +579,15 @@ class DeploymentSettings(BaseSettings):
464579
info_url_path: str = DEFAULT_DEPLOYMENT_APP_INFO_URL_PATH
465580
metrics_url_path: str = DEFAULT_DEPLOYMENT_APP_METRICS_URL_PATH
466581

467-
dashboard_files_path: str = ""
582+
dashboard_files_path: Optional[str] = None
468583

469584
cors: CORSConfig = CORSConfig()
470585
secure_headers: SecureHeadersConfig = SecureHeadersConfig()
471586

472587
thread_pool_size: int = DEFAULT_DEPLOYMENT_APP_THREAD_POOL_SIZE
473588

474-
startup_hook_source: Optional[SourceOrObjectField] = None
475-
shutdown_hook_source: Optional[SourceOrObjectField] = None
589+
startup_hook: Optional[SourceOrObjectField] = None
590+
shutdown_hook: Optional[SourceOrObjectField] = None
476591
startup_hook_kwargs: Dict[str, Any] = {}
477592
shutdown_hook_kwargs: Dict[str, Any] = {}
478593

@@ -483,12 +598,6 @@ class DeploymentSettings(BaseSettings):
483598
# Pluggable app extensions for advanced features
484599
app_extensions: Optional[List[AppExtensionSpec]] = None
485600

486-
# Include default endpoints in the deployment application
487-
include_default_endpoints: bool = True
488-
489-
# Include default middleware in the deployment application
490-
include_default_middleware: bool = True
491-
492601
uvicorn_host: str = "0.0.0.0"
493602
uvicorn_port: int = 8000
494603
uvicorn_workers: int = 1
@@ -503,10 +612,10 @@ class DeploymentSettings(BaseSettings):
503612

504613
def load_sources(self) -> None:
505614
"""Load source string into callable."""
506-
if self.startup_hook_source is not None:
507-
self.startup_hook_source.load()
508-
if self.shutdown_hook_source is not None:
509-
self.shutdown_hook_source.load()
615+
if self.startup_hook is not None:
616+
self.startup_hook.load()
617+
if self.shutdown_hook is not None:
618+
self.shutdown_hook.load()
510619
if self.deployment_app_runner_source is not None:
511620
self.deployment_app_runner_source.load()
512621
if self.deployment_service_source is not None:

src/zenml/deployers/containerized_deployer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
class ContainerizedDeployer(BaseDeployer, ABC):
3939
"""Base class for all containerized deployers."""
4040

41-
CONTAINER_REQUIREMENTS: List[str] = []
41+
# TODO: this needs to come from the deployment settings or from the
42+
# app runner class itself
43+
CONTAINER_REQUIREMENTS: List[str] = ["uvicorn", "fastapi", "secure~=0.3.0"]
4244

4345
@staticmethod
4446
def get_image(snapshot: PipelineSnapshotResponse) -> str:

src/zenml/deployers/docker/docker_deployer.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ def from_deployment(
135135
class DockerDeployer(ContainerizedDeployer):
136136
"""Deployer responsible for deploying pipelines locally using Docker."""
137137

138-
CONTAINER_REQUIREMENTS: List[str] = ["uvicorn", "fastapi"]
139138
_docker_client: Optional[DockerClient] = None
140139

141140
@property

src/zenml/deployers/server/app.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
)
4949
from zenml.deployers.server.service import (
5050
BasePipelineDeploymentService,
51-
DefaultPipelineDeploymentService,
51+
PipelineDeploymentService,
5252
)
5353
from zenml.integrations.registry import integration_registry
5454
from zenml.logger import get_logger
@@ -276,7 +276,7 @@ def load_deployment_service(
276276
)
277277
if settings.deployment_service_source is None:
278278
service_cls: Type[BasePipelineDeploymentService] = (
279-
DefaultPipelineDeploymentService
279+
PipelineDeploymentService
280280
)
281281
else:
282282
try:
@@ -647,10 +647,10 @@ def _run_startup_hook(self) -> None:
647647
Raises:
648648
ValueError: If the startup hook is not callable.
649649
"""
650-
if not self.settings.startup_hook_source:
650+
if not self.settings.startup_hook:
651651
return
652652

653-
startup_hook = self.settings.startup_hook_source.load()
653+
startup_hook = self.settings.startup_hook.load()
654654

655655
if not callable(startup_hook):
656656
raise ValueError(
@@ -690,10 +690,10 @@ def _run_shutdown_hook(self) -> None:
690690
Raises:
691691
ValueError: If the shutdown hook is not callable.
692692
"""
693-
if not self.settings.shutdown_hook_source:
693+
if not self.settings.shutdown_hook:
694694
return
695695

696-
shutdown_hook = self.settings.shutdown_hook_source.load()
696+
shutdown_hook = self.settings.shutdown_hook.load()
697697

698698
if not shutdown_hook:
699699
return

src/zenml/deployers/server/service.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ class PipelineInvokeResponse(BaseDeploymentInvocationResponse):
312312
return PipelineInvokeRequest, PipelineInvokeResponse
313313

314314

315-
class DefaultPipelineDeploymentService(BasePipelineDeploymentService):
315+
class PipelineDeploymentService(BasePipelineDeploymentService):
316316
"""Default pipeline deployment service implementation."""
317317

318318
def initialize(self) -> None:

src/zenml/deployers/utils.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import requests
2222

2323
from zenml.client import Client
24-
from zenml.config.deployment_settings import DEFAULT_DEPLOYMENT_APP_INVOKE_URL_PATH
24+
from zenml.config.deployment_settings import (
25+
DEFAULT_DEPLOYMENT_APP_INVOKE_URL_PATH,
26+
)
2527
from zenml.config.step_configurations import Step
2628
from zenml.deployers.exceptions import (
2729
DeploymentHTTPError,

src/zenml/integrations/aws/deployers/aws_deployer.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,8 +253,6 @@ def from_deployment(
253253
class AWSDeployer(ContainerizedDeployer):
254254
"""Deployer responsible for deploying pipelines on AWS App Runner."""
255255

256-
CONTAINER_REQUIREMENTS: List[str] = ["uvicorn", "fastapi"]
257-
258256
_boto_session: Optional[boto3.Session] = None
259257
_region: Optional[str] = None
260258
_app_runner_client: Optional[Any] = None

src/zenml/integrations/gcp/deployers/gcp_deployer.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,6 @@ def from_deployment(
247247
class GCPDeployer(ContainerizedDeployer, GoogleCredentialsMixin):
248248
"""Deployer responsible for deploying pipelines on GCP Cloud Run."""
249249

250-
CONTAINER_REQUIREMENTS: List[str] = ["uvicorn", "fastapi"]
251-
252250
_credentials: Optional[Any] = None
253251
_project_id: Optional[str] = None
254252
_cloud_run_client: Optional[run_v2.ServicesClient] = None

src/zenml/orchestrators/step_launcher.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
ENV_ZENML_STEP_OPERATOR,
2727
handle_bool_env_var,
2828
)
29-
from zenml.deployers.server import runtime
3029
from zenml.enums import ExecutionMode, ExecutionStatus
3130
from zenml.environment import get_run_environment_dict
3231
from zenml.exceptions import RunInterruptedException, RunStoppedException
@@ -424,6 +423,8 @@ def _run_step(
424423
step_run: The model of the current step run.
425424
force_write_logs: The context for the step logs.
426425
"""
426+
from zenml.deployers.server import runtime
427+
427428
step_run_info = StepRunInfo(
428429
config=self._step.config,
429430
pipeline=self._snapshot.pipeline_configuration,

0 commit comments

Comments
 (0)