ZenML is a unified MLOps framework that extends battle-tested machine learning operations principles to support the entire AI stack, from classical machine learning models to advanced AI agents.
npx @tessl/cli install tessl/pypi-zenml@0.90.0ZenML is a unified MLOps framework that extends battle-tested machine learning operations principles to support the entire AI stack, from classical machine learning models to advanced AI agents. The framework provides comprehensive pipeline orchestration, experiment tracking, model versioning, and reproducibility management with integration capabilities across major cloud platforms and ML tools.
pip install zenmlimport zenml
from zenml import pipeline, step
from zenml import Model, ArtifactConfig
from zenml.client import Clientfrom zenml import pipeline, step, Model
from zenml.client import Client
# Define a step
@step
def load_data() -> dict:
"""Load training data."""
return {"train": [1, 2, 3], "test": [4, 5, 6]}
@step
def train_model(data: dict) -> float:
"""Train a model and return accuracy."""
# Training logic here
return 0.95
# Define a pipeline
@pipeline(
name="ml_pipeline",
enable_cache=True,
model=Model(name="my_model", version="1.0.0")
)
def ml_pipeline():
"""ML training pipeline."""
data = load_data()
accuracy = train_model(data)
return accuracy
# Run the pipeline
if __name__ == "__main__":
ml_pipeline()
# Access results via Client
client = Client()
latest_run = client.get_pipeline("ml_pipeline").runs[0]
print(f"Pipeline run: {latest_run.id}")ZenML's architecture is built on several key abstractions:
Core decorators for defining ML workflows and their constituent steps, with support for configuration, caching, hooks, and execution contexts.
def pipeline(
_func=None,
*,
name: str = None,
enable_cache: bool = None,
enable_artifact_metadata: bool = None,
enable_step_logs: bool = None,
environment: dict = None,
secrets: list = None,
enable_pipeline_logs: bool = None,
settings: dict = None,
tags: list = None,
extra: dict = None,
on_failure=None,
on_success=None,
on_init=None,
on_init_kwargs: dict = None,
on_cleanup=None,
model: Model = None,
retry=None,
substitutions: dict = None,
execution_mode=None,
cache_policy=None
):
"""
Decorator to define a ZenML pipeline.
Parameters:
- name: Pipeline name (defaults to function name)
- enable_cache: Enable step caching
- enable_artifact_metadata: Enable artifact metadata logging
- enable_step_logs: Enable step logging
- environment: Environment variables to set when running this pipeline
- secrets: Secrets to set as environment variables (list of UUIDs or names)
- enable_pipeline_logs: Enable pipeline logs
- settings: Stack component settings
- tags: Tags to apply to runs of the pipeline
- extra: Extra pipeline metadata
- on_failure: Failure hook callable
- on_success: Success hook callable
- on_init: Callback function to run on initialization of the pipeline
- on_init_kwargs: Arguments for the init hook
- on_cleanup: Callback function to run on cleanup of the pipeline
- model: Model configuration for Model Control Plane
- retry: Retry configuration for the pipeline steps
- substitutions: Extra placeholders to use in the name templates
- execution_mode: The execution mode to use for the pipeline
- cache_policy: Cache policy for this pipeline
Returns:
Pipeline decorator function
"""
def step(
_func=None,
*,
name: str = None,
enable_cache: bool = None,
enable_artifact_metadata: bool = None,
enable_artifact_visualization: bool = None,
enable_step_logs: bool = None,
experiment_tracker: bool | str = None,
step_operator: bool | str = None,
output_materializers=None,
environment: dict = None,
secrets: list = None,
settings: dict = None,
extra: dict = None,
on_failure=None,
on_success=None,
model: Model = None,
retry=None,
substitutions: dict = None,
cache_policy=None
):
"""
Decorator to define a ZenML step.
Parameters:
- name: Step name (defaults to function name)
- enable_cache: Enable caching for this step
- enable_artifact_metadata: Enable artifact metadata for this step
- enable_artifact_visualization: Enable artifact visualization for this step
- enable_step_logs: Enable step logs for this step
- experiment_tracker: Experiment tracker to use
- step_operator: Step operator for remote execution
- output_materializers: Custom materializers for outputs
- environment: Environment variables to set when running this step
- secrets: Secrets to set as environment variables (list of UUIDs or names)
- settings: Stack component settings
- extra: Extra step metadata
- on_failure: Failure hook callable
- on_success: Success hook callable
- model: Model configuration
- retry: Retry configuration in case of step failure
- substitutions: Extra placeholders for the step name
- cache_policy: Cache policy for this step
Returns:
Step decorator function
"""
def get_pipeline_context():
"""
Get the current pipeline execution context.
Returns:
PipelineContext: Context object with pipeline metadata
Raises:
RuntimeError: If called outside pipeline execution
"""
def get_step_context():
"""
Get the current step execution context.
Returns:
StepContext: Context object with step metadata and utilities
Raises:
RuntimeError: If called outside step execution
"""Functions for saving, loading, registering artifacts, and logging metadata outside the standard step output flow.
def save_artifact(
data,
name: str,
version: str = None,
artifact_type = None,
tags: list = None,
extract_metadata: bool = True,
include_visualizations: bool = True,
user_metadata: dict = None,
materializer: type = None,
uri: str = None
):
"""
Save an artifact to the artifact store.
Parameters:
- data: Data to save
- name: Artifact name
- version: Artifact version (auto-generated if None)
- artifact_type: Type of artifact (e.g., ArtifactType.MODEL, ArtifactType.DATA)
- tags: List of tags
- extract_metadata: Extract metadata automatically
- include_visualizations: Generate visualizations
- user_metadata: Custom metadata dict
- materializer: Custom materializer class
- uri: Optional URI to use for the artifact (advanced usage)
Returns:
ArtifactVersionResponse: Created artifact version
"""
def load_artifact(
name_or_id: str,
version: str = None
):
"""
Load an artifact from the artifact store.
Parameters:
- name_or_id: Artifact name or UUID
- version: Artifact version (loads latest if None)
Returns:
Data object loaded from artifact store
"""
def register_artifact(
uri: str,
name: str,
version: str = None,
tags: list = None,
user_metadata: dict = None,
artifact_type=None,
materializer: type = None
):
"""
Register an existing artifact from a URI.
Parameters:
- uri: URI of existing artifact
- name: Artifact name
- version: Artifact version
- tags: List of tags
- user_metadata: Custom metadata
- artifact_type: Type of artifact
- materializer: Custom materializer
Returns:
ArtifactVersionResponse: Registered artifact version
"""
def log_artifact_metadata(
metadata: dict,
artifact_name: str = None,
artifact_version: str = None
):
"""
Log metadata for an artifact.
Parameters:
- metadata: Metadata dict to log
- artifact_name: Artifact name (uses current context if None)
- artifact_version: Artifact version
"""The Model class and related functions for organizing artifacts, metadata, and versions in a centralized model namespace.
class Model:
"""
Model configuration for grouping artifacts and metadata.
Attributes:
- name: Model name
- version: Model version or stage
- license: Model license
- description: Model description
- audience: Target audience
- use_cases: Use cases
- limitations: Known limitations
- trade_offs: Trade-offs made
- ethics: Ethical considerations
- tags: List of tags
- save_models_to_registry: Auto-save to model registry
- suppress_class_validation_warnings: Suppress warnings
"""
def __init__(
self,
name: str,
version: str = None,
license: str = None,
description: str = None,
audience: str = None,
use_cases: str = None,
limitations: str = None,
trade_offs: str = None,
ethics: str = None,
tags: list = None,
save_models_to_registry: bool = True,
suppress_class_validation_warnings: bool = False
):
"""Initialize Model configuration."""
def log_model_metadata(
metadata: dict,
model_name: str = None,
model_version: str = None
):
"""
Log metadata for a model version.
Parameters:
- metadata: Metadata dict to log
- model_name: Model name (uses current context if None)
- model_version: Model version
"""
def link_artifact_to_model(
artifact_version,
model=None
):
"""
Link an artifact to a model version.
Parameters:
- artifact_version: ArtifactVersionResponse object to link
- model: Model object to link to (uses current context if None)
Raises:
RuntimeError: If called without model parameter and no model context exists
"""The Client class provides programmatic access to all ZenML resources including stacks, pipelines, artifacts, models, secrets, and more.
class Client:
"""
Main interface for interacting with ZenML programmatically.
Singleton instance access:
client = Client()
"""
@staticmethod
def get_instance():
"""Get singleton client instance."""
@property
def active_stack():
"""Get the active stack object."""
@property
def active_stack_model():
"""Get the active stack model."""
@property
def active_project():
"""Get the active project."""
@property
def active_user():
"""Get the active user."""Stack and stack component classes for configuring ML infrastructure.
class Stack:
"""Complete ZenML stack configuration."""
class StackComponent:
"""Base class for stack components."""
class StackComponentConfig:
"""Base configuration for stack components."""
class Flavor:
"""Flavor of a stack component."""Configuration classes for Docker, resources, scheduling, caching, and more.
class DockerSettings:
"""Configuration for Docker containerization."""
class ResourceSettings:
"""Resource allocation settings for steps."""
class Schedule:
"""Schedule configuration for pipeline runs."""
class StepRetryConfig:
"""Configuration for step retry behavior."""
class CachePolicy:
"""Configuration for step caching behavior."""Built-in materializers for serializing and deserializing Python objects.
class BuiltInMaterializer:
"""Materializer for built-in Python types."""
class BuiltInContainerMaterializer:
"""Materializer for container types (list, dict, tuple, set)."""
class CloudpickleMaterializer:
"""Materializer using cloudpickle for serialization."""
class PydanticMaterializer:
"""Materializer for Pydantic models."""Base classes and implementations for stack components including orchestrators, artifact stores, container registries, and more.
ZenML includes 67 integrations with cloud providers, ML frameworks, orchestrators, experiment trackers, and MLOps tools.
Functions for logging metadata and managing tags across resources.
def log_metadata(
metadata: dict,
infer_resource: bool = True
):
"""
Generic function to log metadata.
Parameters:
- metadata: Metadata dict to log
- infer_resource: Infer resource from context
"""
def log_step_metadata(
metadata: dict,
step_name: str = None
):
"""
Log metadata for a step.
Parameters:
- metadata: Metadata dict to log
- step_name: Step name (uses current context if None)
"""
def add_tags(
tags: list,
*,
pipeline: str = None,
run: str = None,
artifact: str = None,
# ... additional resource type parameters
):
"""
Add tags to various resource types.
Parameters:
- tags: List of tag names to add
- pipeline: ID or name of pipeline to tag
- run: ID or name of pipeline run to tag
- artifact: ID or name of artifact to tag
- (additional parameters for other resource types)
"""
def remove_tags(
tags: list,
*,
pipeline: str = None,
run: str = None,
artifact: str = None,
# ... additional resource type parameters
):
"""
Remove tags from various resource types.
Parameters:
- tags: List of tag names to remove
- pipeline: ID or name of pipeline
- run: ID or name of pipeline run
- artifact: ID or name of artifact
- (additional parameters for other resource types)
"""
class Tag:
"""Tag model for categorizing resources."""Pre-built hooks for alerting and custom hook utilities.
def alerter_success_hook() -> None:
"""
Standard success hook that executes after step finishes successfully.
This hook uses any `BaseAlerter` configured in the active stack
to post a success notification message.
"""
def alerter_failure_hook(exception: BaseException) -> None:
"""
Standard failure hook that executes after step fails.
This hook uses any `BaseAlerter` configured in the active stack
to post a failure notification message with exception details.
Parameters:
- exception: Original exception that led to step failing
"""
def resolve_and_validate_hook(hook):
"""
Utility to resolve and validate custom hooks.
Parameters:
- hook: Hook specification
Returns:
Resolved hook callable
"""Exception classes for error handling.
class ZenMLBaseException(Exception):
"""Base exception for all ZenML exceptions."""
class AuthorizationException(ZenMLBaseException):
"""Authorization/access errors."""
class DoesNotExistException(ZenMLBaseException):
"""Entity not found errors."""
class ValidationError(ZenMLBaseException):
"""Model/data validation errors."""
class EntityExistsError(ZenMLBaseException):
"""Entity already exists errors."""Important enumerations used throughout the API.
class ExecutionStatus(str, Enum):
"""Pipeline/step execution status."""
INITIALIZING = "initializing"
PROVISIONING = "provisioning"
FAILED = "failed"
COMPLETED = "completed"
RUNNING = "running"
CACHED = "cached"
class StackComponentType(str, Enum):
"""Stack component types."""
ORCHESTRATOR = "orchestrator"
ARTIFACT_STORE = "artifact_store"
CONTAINER_REGISTRY = "container_registry"
# ... and more
class ModelStages(str, Enum):
"""Model lifecycle stages."""
STAGING = "staging"
PRODUCTION = "production"
ARCHIVED = "archived"
LATEST = "latest"Artifact configuration classes for controlling step outputs.
class ArtifactConfig:
"""
Configuration for artifacts produced by steps.
Attributes:
- name: Artifact name
- version: Artifact version strategy
- tags: List of tags
- run_metadata: Metadata to attach
- artifact_type: Optional type of the artifact
"""
def __init__(
self,
name: str = None,
version: str = None,
tags: list = None,
run_metadata: dict = None,
artifact_type = None
):
"""Initialize artifact configuration."""
class ExternalArtifact:
"""
External artifacts provide values as input to ZenML steps.
Can be used to provide any value as input to a step without writing
an additional step that returns this value.
Attributes:
- value: The artifact value (any Python object)
- materializer: Materializer to use for saving the artifact value
- store_artifact_metadata: Whether to store metadata
- store_artifact_visualizations: Whether to store visualizations
"""
def __init__(
self,
value = None,
materializer: type = None,
store_artifact_metadata: bool = True,
store_artifact_visualizations: bool = True
):
"""Initialize external artifact with a value to upload."""Utility functions for dashboard, environment, and other operations.
def show(port: int = None):
"""
Opens the ZenML dashboard in a browser.
Parameters:
- port: Port number (optional)
"""Service abstractions for long-running processes like model servers and deployments.
class BaseService:
"""Abstract base class for services."""
class ServiceConfig:
"""Configuration for services."""
class ServiceStatus:
"""Status tracking for services."""200+ Pydantic model classes for API request/response objects.
Custom type definitions for specialized content.
class HTMLString(str):
"""String subclass for HTML content."""
class MarkdownString(str):
"""String subclass for Markdown content."""
class CSVString(str):
"""String subclass for CSV content."""
class JSONString(str):
"""String subclass for JSON content."""