Skip to content

Conversation

stefannica
Copy link
Contributor

@stefannica stefannica commented Oct 15, 2025

Framework-Agnostic Deployment App Factory

Overview

This PR implements a framework-agnostic deployment ASGI app factory system that allows customizing all aspects of the ASGI web application that powers up the pipeline deployments:

  • app factory abstraction compatible with any ASGI framework (e.g. FastAPI, Django, Flask, Falcon, Quart, BlackSheep, etc.). Currently implemented is only the FastAPI version.
  • unified endpoint/middleware specifications make it possible to add endpoints and middleware via configuration independently of ASGI framework
  • app extensions make it possible to contribute with framework-specific code to the process of building the ASGI application (e.g. add middleware, endpoints, routes, advanced security mechanisms etc.)

If necessary, the core components of the deployment server - the app factory (aka app runner) and the deployment service classes can be extended and custom implementations can be used instead of the built-in ones via configuration options.

Key Components Implemented

zenml.config.source.SourceOrObject

A hybrid type that can hold either a source string OR a loaded object:

  • Accepts strings, Source objects, or actual importable objects (types, functions, global variables).
  • Lazy loading via load() method
  • Lazy serialization to source strings

zenml.config.deployment_config.DeploymentSettings

Similar to DockerSettings, can be used to customize the configuration and behavior of the ASGI application that is run by the pipeline deployment server:

  • Contains a rich set of settings, from descriptive to URL paths, to custom endpoints, middleware and extensions
  • Relies on SourceOrObject to encode the location of classes and functions used as values
  • Can only be used at pipeline level

zenml.deployers.server.app.BaseDeploymentAppRunner

This is the abstract factory used to build and run the pipeline deployment ASGI application according to the specifications in the DeploymentSettings. This ASGI application is just a wrapper around the core service that implements the pipeline deployment operations (see next point). The responsibilities of this component are:

  • Construct an ASGI application using one of the supported frameworks (e.g. FastAPI, Django, Flask, Falcon, Quart, BlackSheep, etc.) and according to the configuration values in the DeploymentSettings.
  • Provide a unified, framework agnostic API that can be used to extend the ASGI application with custom endpoints and middleware.
  • Implement the REST API specific logic around the core deployment service.

The only built-in implementation provided for this class is the one for FastAPI: zenml.deployers.server.fastapi.app.FastAPIDeploymentAppRunner.

Users can implement this abstract class to support any other ASGI capable framework.

zenml.deployers.server.app.BaseDeploymentAppRunnerFlavor

This implements a simple flavor system on top of BaseDeploymentAppRunner classes, to be able to collect software requirements for app runner implementations independently of the actual implementation.

zenml.deployers.server.service.BasePipelineDeploymentService

This is the base class for the service that runs the core pipeline deployment logic. The responsibilities of this component are:

  • Initialize and cleanup the global state shared by all pipeline deployment invocations (e.g. run the init hooks, configure the orchestrator).
  • Provide information about the input and output deployment schemas.
  • Run one or more actual pipelines according to an invocation request (i.e. input parameters) and return its output.
  • Implement some rudimentary health check, status and metrics.
  • This code needs to be ASGI / HTTP agnostic.

The only built-in implementation provided for this class is the one that uses the local orchestrator to run pipelines: zenml.deployers.server.service.PipelineDeploymentService.

Users can implement this abstract class to provide their own custom implementations of running one or more pipelines.

Other Changes

  • upgrades the secure package to latest (1.0.1) and removes all code handling the X-XSS-Protection security header that is no longer supported
  • adds a install_deployment_requirements flag to the DockerSettings and includes deployment settings in the set of software requirements used for building container images
  • updates existing Deployer stack components (Docker, GCP, AWS) to remove fields from their configurations that are now customizable via the DeploymentSettings (ports, health-check API url paths)
  • changes the entrypoint for the deployment container to take in only one argument: the deployment UUID
  • moves software requirements for deployment containers from ContainerizedDeployer.CONTAINER_REQUIREMENTS to the BaseDeploymentAppRunnerFlavor class

Examples

1. Basic Configuration Examples

Configure URL Paths

from zenml import pipeline
from zenml.config.deployment_settings import DeploymentSettings

deployment_settings = DeploymentSettings(
    # Customize endpoint paths
    invoke_url_path="/predict",
    health_url_path="/healthz",
    metrics_url_path="/api/metrics",
    docs_url_path="/api-docs",
    
    # App metadata
    app_title="Customer Churn Prediction Service",
    app_description="ML service for predicting customer churn",
    app_version="1.2.0",
)

@pipeline(settings={"deployment": deployment_settings})
def my_pipeline():
    ...

Configure CORS

from zenml.config.deployment_settings import (
    DeploymentSettings,
    CORSConfig,
)

deployment_settings = DeploymentSettings(
    cors=CORSConfig(
        allow_origins=[
            "https://myapp.example.com",
            "https://admin.example.com",
        ],
        allow_methods=["GET", "POST", "OPTIONS"],
        allow_headers=["Content-Type", "Authorization"],
        allow_credentials=True,
    ),
)

Configure Security Headers

from zenml.config.deployment_settings import (
    DeploymentSettings,
    SecureHeadersConfig,
)

deployment_settings = DeploymentSettings(
    secure_headers=SecureHeadersConfig(
        hsts="max-age=31536000; includeSubDomains",
        xfo="DENY",
        csp=(
            "default-src 'self'; "
            "script-src 'self' 'unsafe-inline'; "
            "style-src 'self' 'unsafe-inline'; "
            "img-src 'self' data: https:;"
        ),
        referrer="strict-origin-when-cross-origin",
        cache="no-store",
    ),
)

2. Custom Endpoints

Framework-Agnostic: Simple Endpoint Function

from pydantic import BaseModel
from zenml.config.deployment_settings import (
    DeploymentSettings,
    EndpointSpec,
    EndpointMethod,
)

async def health_detailed() -> Dict[str, Any]:
    """Detailed health check with system metrics."""
    import psutil
    
    return {
        "status": "healthy",
        "cpu_percent": psutil.cpu_percent(),
        "memory_percent": psutil.virtual_memory().percent,
        "disk_percent": psutil.disk_usage('/').percent,
    }

deployment_settings = DeploymentSettings(
    custom_endpoints=[
        EndpointSpec(
            path="/health/detailed",
            method=EndpointMethod.GET,
            handler=health_detailed,
            auth_required=False,
        ),
    ],
)

Framework-Agnostic: Endpoint Builder Function

from typing import Callable
from zenml.deployers.server import BaseDeploymentAppRunner
from pydantic import BaseModel

class PredictionRequest(BaseModel):
    features: list[float]

class PredictionResponse(BaseModel):
    prediction: float
    confidence: float
    model_version: str

def create_custom_predict_endpoint(
    app_runner: BaseDeploymentAppRunner,
    model_path: str,
) -> Callable:
    """Builder function that creates a custom prediction endpoint.
    
    This pattern allows you to access the app_runner and inject
    configuration at build time.
    """
    # Load model during app build time (once)
    import joblib
    model = joblib.load(model_path)
    
    async def predict_endpoint(
        request: PredictionRequest,
    ) -> PredictionResponse:
        """The actual endpoint implementation."""
        prediction = model.predict([request.features])[0]
        confidence = model.predict_proba([request.features]).max()
        
        return PredictionResponse(
            prediction=float(prediction),
            confidence=float(confidence),
            model_version=app_runner.deployment.name,
        )
    
    return predict_endpoint

deployment_settings = DeploymentSettings(
    custom_endpoints=[
        EndpointSpec(
            path="/predict/custom",
            method=EndpointMethod.POST,
            handler=create_custom_predict_endpoint,
            init_kwargs={"model_path": "/models/custom_model.pkl"},
            auth_required=True,
        ),
    ],
)

FastAPI-Specific: Native FastAPI Router

from fastapi import APIRouter, HTTPException, Depends
from zenml.config.deployment_settings import (
    DeploymentSettings,
    EndpointSpec,
    EndpointMethod,
)

# Create a native FastAPI router with all FastAPI features
admin_router = APIRouter(prefix="/admin", tags=["admin"])

@admin_router.get("/stats")
async def get_stats():
    """Get deployment statistics."""
    return {"total_requests": 1000, "uptime_hours": 72}

@admin_router.post("/reload")
async def reload_model():
    """Reload the model."""
    # Implementation here
    return {"status": "reloaded"}

@admin_router.delete("/cache")
async def clear_cache():
    """Clear the cache."""
    # Implementation here
    return {"status": "cleared"}

# Register as native endpoint
deployment_settings = DeploymentSettings(
    custom_endpoints=[
        EndpointSpec(
            path="",  # Router has its own prefix
            method=EndpointMethod.GET,
            handler=admin_router,
            native=True,  # Treat as native FastAPI object
            auth_required=True,
        ),
    ],
)

3. Custom Middleware

Framework-Agnostic: Simple Middleware Function

from zenml.config.deployment_settings import (
    DeploymentSettings,
    MiddlewareSpec,
)
import time

class RequestTimingMiddleware:
    """ASGI middleware to measure request processing time.
    
    Uses the standard ASGI interface (scope, receive, send) which works
    across all ASGI frameworks: FastAPI, Django, Starlette, Quart, etc.
    """
    
    def __init__(self, app):
        self.app = app
    
    async def __call__(self, scope, receive, send):
        """Process ASGI request with timing measurement.
        
        Args:
            scope: ASGI connection scope (contains request info).
            receive: Async callable to receive ASGI events.
            send: Async callable to send ASGI events.
        """
        if scope["type"] != "http":
            return await self.app(scope, receive, send)
        
        start_time = time.time()
        
        async def send_wrapper(message):
            """Intercept response to add timing header."""
            if message["type"] == "http.response.start":
                process_time = (time.time() - start_time) * 1000
                headers = list(message.get("headers", []))
                headers.append((
                    b"x-process-time-ms",
                    str(process_time).encode(),
                ))
                message = {**message, "headers": headers}
            
            await send(message)
        
        await self.app(scope, receive, send_wrapper)

deployment_settings = DeploymentSettings(
    custom_middlewares=[
        MiddlewareSpec(
            middleware=RequestTimingMiddleware,
            order=10,
        ),
    ],
)

FastAPI-Specific: Native FastAPI Middleware

from fastapi.middleware.gzip import GZipMiddleware
from zenml.config.deployment_settings import (
    DeploymentSettings,
    MiddlewareSpec,
)

deployment_settings = DeploymentSettings(
    custom_middlewares=[
        # Use native FastAPI GZip middleware
        MiddlewareSpec(
            middleware=GZipMiddleware,
            native=True,
            init_kwargs={"minimum_size": 1000},
            order=100,  # Run late in the chain
        ),
    ],
)

4. App Extensions

Simple Extension Function

from zenml.deployers.server import BaseDeploymentAppRunner

def add_monitoring_extension(
    app_runner: BaseDeploymentAppRunner,
    prometheus_path: str = "/prometheus",
):
    """Simple extension that adds Prometheus metrics endpoint."""
    from prometheus_client import (
        make_asgi_app,
        Counter,
        Histogram,
    )
    
    # Create Prometheus metrics
    request_counter = Counter(
        'deployment_requests_total',
        'Total requests',
    )
    
    # Mount Prometheus metrics app
    metrics_app = make_asgi_app()
    app_runner.asgi_app.mount(prometheus_path, metrics_app)
    
    print(f"✅ Prometheus metrics available at {prometheus_path}")

deployment_settings = DeploymentSettings(
    app_extensions=[
        AppExtensionSpec(
            extension=add_monitoring_extension,
            extension_kwargs={"prometheus_path": "/metrics/prometheus"},
        ),
    ],
)

Pre-requisites

Please ensure you have done the following:

  • I have read the CONTRIBUTING.md document.
  • I have added tests to cover my changes.
  • I have based my new branch on develop and the open PR is targeting develop. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.
  • IMPORTANT: I made sure that my changes are reflected properly in the following resources:
    • ZenML Docs
    • Dashboard: Needs to be communicated to the frontend team.
    • Templates: Might need adjustments (that are not reflected in the template tests) in case of non-breaking changes and deprecations.
    • Projects: Depending on the version dependencies, different projects might get affected.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Other (add details above)

@github-actions github-actions bot added internal To filter out internal PRs and issues enhancement New feature or request labels Oct 15, 2025
@stefannica stefannica force-pushed the feature/customizable-deployment-servers branch from dd63272 to 55719bb Compare October 15, 2025 13:26
@stefannica stefannica marked this pull request as ready for review October 15, 2025 21:23
@stefannica stefannica force-pushed the feature/customizable-deployment-servers branch from 45049ac to 65c4de8 Compare October 15, 2025 21:46

__all__ = [
"BaseStep",
"ResourceSettings",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can break user code and needs to be mentioned in the release notes + makes this PR breaking, is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was a mistake that this was listed here and I fixed it. Nobody should import ResourceSettings from this module, given that all our docs and examples use the proper import module for this, and if they do, they will see their mistake and correct it. I can mention it in the release notes, but I won't mark this PR as breaking over such a small thing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I don't think it's a subjective discussion "if something is breaking or not". This import was previously part of our public API (which the steps module certainly is), and this PR removes it. So anyone that imported resource settings from here has a broken codebase after the upgrade. That makes this PR and the next release a breaking release, no matter what you or I think.
  • This import was definitely not in here by mistake, because I remember when Hamza told me he wanted it there. The reason for this is because resource settings are usually defined on steps, and we wanted both to be importable from the same module for convenience (from zenml.steps import step, ResourceSettings).

Just FYI, I actually agree with you that the import shouldn't be here, and I'm all for removing it. Whether we do that as part of this PR or in a huge bulk PR where we do breaking changes whenever we do the next breaking release, I'll leave that up to you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import was definitely not in here by mistake

I wasn't aware of this. I'll bring it back.

self._source = resolve(self._object)
return self._source

def to_source_string(self) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason you're serializing this to a string? The Source object actually contains more information and is used in all other places where we serialize sources

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand this question. I need to serialize this to a JSON-able data type (dict, string, list, etc.). This is only used in serialization, so I can't return a Source object here instead. It needs to be something that json.dump(...) can work with.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm missing something, but you're not JSON serializing this by itself, but only as an attribute of other pydantic models, correct? And Source itself is a pydantic model, which works just fine if you use DeploymentSettings.model_dump(...)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what you mean. I'll try it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this isn't possible because this function is already used in the PlainSerializer serialization config and must return a JSON-friendly type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reworked the entire SourceOrObject class so that it actually extends the Source class, which feels like a better design.

@socket-security
Copy link

socket-security bot commented Oct 17, 2025

Review the following changes in direct dependencies. Learn more about Socket for GitHub.

Diff Package Supply Chain
Security
Vulnerability Quality Maintenance License
Addedasgiref@​3.10.0100100100100100

View full report

@github-actions
Copy link
Contributor

github-actions bot commented Oct 17, 2025

ZenML CLI Performance Comparison (Threshold: 1.0s, Timeout: 60s, Slow: 5s)

❌ Failed Commands on Current Branch (feature/customizable-deployment-servers)

  • zenml stack list: Command failed on run 1 (exit code: 1)
  • zenml pipeline list: Command failed on run 1 (exit code: 1)
  • zenml model list: Command failed on run 1 (exit code: 1)

🚨 New Failures Introduced

The following commands fail on your branch but worked on the target branch:

  • zenml stack list
  • zenml pipeline list
  • zenml model list

Performance Comparison

Command develop Time (s) feature/customizable-deployment-servers Time (s) Difference Status
zenml --help 1.361138 ± 0.011045 1.390522 ± 0.018055 +0.029s ✓ No significant change
zenml model list Not tested Failed N/A ❌ Broken in current branch
zenml pipeline list Not tested Failed N/A ❌ Broken in current branch
zenml stack --help 1.346265 ± 0.017146 1.378075 ± 0.024860 +0.032s ✓ No significant change
zenml stack list Not tested Failed N/A ❌ Broken in current branch

Summary

  • Total commands analyzed: 5
  • Commands compared for timing: 2
  • Commands improved: 0 (0.0% of compared)
  • Commands degraded: 0 (0.0% of compared)
  • Commands unchanged: 2 (100.0% of compared)
  • Failed commands: 3 (NEW FAILURES INTRODUCED)
  • Timed out commands: 0
  • Slow commands: 0

Environment Info

  • Target branch: Linux 6.11.0-1018-azure
  • Current branch: Linux 6.11.0-1018-azure
  • Test timestamp: 2025-10-22T15:07:29Z
  • Timeout: 60 seconds
  • Slow threshold: 5 seconds

@github-actions
Copy link
Contributor

github-actions bot commented Oct 17, 2025

Documentation Link Check Results

Absolute links check failed
There are broken absolute links in the documentation. See workflow logs for details
Relative links check passed
Last checked: 2025-10-23 09:32:41 UTC


__all__ = [
"BaseStep",
"ResourceSettings",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I don't think it's a subjective discussion "if something is breaking or not". This import was previously part of our public API (which the steps module certainly is), and this PR removes it. So anyone that imported resource settings from here has a broken codebase after the upgrade. That makes this PR and the next release a breaking release, no matter what you or I think.
  • This import was definitely not in here by mistake, because I remember when Hamza told me he wanted it there. The reason for this is because resource settings are usually defined on steps, and we wanted both to be importable from the same module for convenience (from zenml.steps import step, ResourceSettings).

Just FYI, I actually agree with you that the import shouldn't be here, and I'm all for removing it. Whether we do that as part of this PR or in a huge bulk PR where we do breaking changes whenever we do the next breaking release, I'll leave that up to you.

@stefannica stefannica requested a review from schustmi October 21, 2025 06:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request internal To filter out internal PRs and issues run-slow-ci

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants