ZenML is a unified MLOps framework that extends battle-tested machine learning operations principles to support the entire AI stack, from classical machine learning models to advanced AI agents.
Core decorators and context objects for defining ML workflows and their constituent steps. ZenML pipelines are directed acyclic graphs (DAGs) of steps that define reproducible ML workflows with automatic versioning, caching, and lineage tracking.
Decorator to define a ZenML pipeline from a Python function.
def pipeline(
_func=None,
*,
name: str = None,
enable_cache: bool = None,
enable_artifact_metadata: bool = None,
enable_step_logs: bool = None,
environment: dict = None,
secrets: list = None,
enable_pipeline_logs: bool = None,
settings: dict = None,
tags: list = None,
extra: dict = None,
on_failure=None,
on_success=None,
on_init=None,
on_init_kwargs: dict = None,
on_cleanup=None,
model: Model = None,
retry=None,
substitutions: dict = None,
execution_mode=None,
cache_policy=None
):
"""
Decorator to define a ZenML pipeline.
Parameters:
- name: Pipeline name (defaults to function name)
- enable_cache: Enable step caching
- enable_artifact_metadata: Enable artifact metadata logging
- enable_step_logs: Enable step logging
- environment: Environment variables to set when running this pipeline
- secrets: Secrets to set as environment variables (list of UUIDs or names)
- enable_pipeline_logs: Enable pipeline logs
- settings: Stack component settings dict
- tags: Tags to apply to runs of the pipeline
- extra: Extra pipeline metadata dict
- on_failure: Failure hook callable or list of callables
- on_success: Success hook callable or list of callables
- on_init: Callback function to run on initialization of the pipeline
- on_init_kwargs: Arguments for the init hook
- on_cleanup: Callback function to run on cleanup of the pipeline
- model: Model configuration for Model Control Plane
- retry: Retry configuration for the pipeline steps
- substitutions: Extra placeholders to use in the name templates
- execution_mode: The execution mode to use for the pipeline
- cache_policy: Cache policy for this pipeline
Returns:
Pipeline decorator function that wraps the pipeline function
Example:
```python
from zenml import pipeline, step, Model
@pipeline(
name="training_pipeline",
enable_cache=True,
model=Model(name="my_model", version="1.0.0")
)
def my_pipeline():
data = load_data()
model = train_model(data)
return model
```
"""Import from:
from zenml import pipelineDecorator to define a ZenML step from a Python function.
def step(
_func=None,
*,
name: str = None,
enable_cache: bool = None,
enable_artifact_metadata: bool = None,
enable_artifact_visualization: bool = None,
enable_step_logs: bool = None,
experiment_tracker: bool | str = None,
step_operator: bool | str = None,
output_materializers=None,
environment: dict = None,
secrets: list = None,
settings: dict = None,
extra: dict = None,
on_failure=None,
on_success=None,
model: Model = None,
retry=None,
substitutions: dict = None,
cache_policy=None
):
"""
Decorator to define a ZenML step.
Parameters:
- name: Step name (defaults to function name)
- enable_cache: Enable caching for this step
- enable_artifact_metadata: Enable artifact metadata for this step
- enable_artifact_visualization: Enable artifact visualization for this step
- enable_step_logs: Enable step logs for this step
- experiment_tracker: Name of experiment tracker component to use, or bool to enable/disable
- step_operator: Name of step operator for remote execution, or bool to enable/disable
- output_materializers: Custom materializers for outputs (single class or dict mapping output names to classes)
- environment: Environment variables to set when running this step
- secrets: Secrets to set as environment variables (list of UUIDs or names)
- settings: Stack component settings dict
- extra: Extra step metadata dict
- on_failure: Failure hook callable or list of callables
- on_success: Success hook callable or list of callables
- model: Model configuration for Model Control Plane
- retry: Retry configuration in case of step failure
- substitutions: Extra placeholders for the step name
- cache_policy: Cache policy for this step
Returns:
Step decorator function that wraps the step function
Example:
```python
from zenml import step
from typing import Tuple
@step
def load_data() -> Tuple[list, list]:
train_data = [1, 2, 3, 4, 5]
test_data = [6, 7, 8, 9, 10]
return train_data, test_data
@step(enable_cache=False)
def train_model(data: list) -> float:
# Training logic
accuracy = 0.95
return accuracy
```
"""Import from:
from zenml import stepBase class for implementing custom steps with additional functionality.
class BaseStep:
"""
Base class for implementing custom steps.
Use the @step decorator for most cases. This class is for advanced
scenarios requiring additional control over step behavior.
"""
def entrypoint(self, *args, **kwargs):
"""
Main execution method to be implemented by subclass.
Parameters:
- *args: Positional arguments
- **kwargs: Keyword arguments
Returns:
Step outputs
"""Import from:
from zenml.steps import BaseStepAccess pipeline execution context and metadata.
class PipelineContext:
"""
Pipeline execution context object.
Attributes:
- name: Pipeline name
- run_name: Pipeline run name
- pipeline_run: PipelineRunResponse object with full run details
- extra: Extra metadata dict
- model: Model configuration if set
"""
@property
def name(self) -> str:
"""Get pipeline name."""
@property
def run_name(self) -> str:
"""Get pipeline run name."""
@property
def pipeline_run(self):
"""
Get pipeline run response object.
Returns:
PipelineRunResponse: Full pipeline run details including status, timestamps, configuration
"""
@property
def extra(self) -> dict:
"""Get extra metadata."""
@property
def model(self):
"""
Get model configuration.
Returns:
ModelVersionResponse or None
"""
def get_pipeline_context() -> PipelineContext:
"""
Get the current pipeline execution context.
Returns:
PipelineContext: Context object with pipeline metadata
Raises:
RuntimeError: If called outside pipeline execution
Example:
```python
from zenml import step, get_pipeline_context
@step
def my_step():
context = get_pipeline_context()
print(f"Running in pipeline: {context.name}")
print(f"Run name: {context.run_name}")
if context.model:
print(f"Model: {context.model.name}")
```
"""Import from:
from zenml import get_pipeline_context
from zenml.pipelines import PipelineContextAccess step execution context and utilities.
class StepContext:
"""
Step execution context object.
Provides access to step metadata, pipeline information, and utilities
for interacting with the stack during step execution.
Attributes:
- step_name: Current step name
- pipeline_name: Parent pipeline name
- run_name: Pipeline run name
- step_run: StepRunResponse object with full step run details
- model: Model configuration if set
- inputs: Input artifacts metadata
- outputs: Output artifacts metadata
"""
@property
def step_name(self) -> str:
"""Get step name."""
@property
def pipeline_name(self) -> str:
"""Get pipeline name."""
@property
def run_name(self) -> str:
"""Get pipeline run name."""
@property
def step_run(self):
"""
Get step run response object.
Returns:
StepRunResponse: Full step run details including status, timestamps, configuration
"""
@property
def model(self):
"""
Get model configuration.
Returns:
ModelVersionResponse or None
"""
@property
def inputs(self) -> dict:
"""Get input artifacts metadata."""
@property
def outputs(self) -> dict:
"""Get output artifacts metadata."""
def get_step_context() -> StepContext:
"""
Get the current step execution context.
Returns:
StepContext: Context object with step metadata and utilities
Raises:
RuntimeError: If called outside step execution
Example:
```python
from zenml import step, get_step_context
@step
def my_step(data: list) -> float:
context = get_step_context()
print(f"Running step: {context.step_name}")
print(f"In pipeline: {context.pipeline_name}")
print(f"Run: {context.run_name}")
# Access step run details
print(f"Status: {context.step_run.status}")
# Training logic
accuracy = 0.95
return accuracy
```
"""Import from:
from zenml import get_step_context
from zenml.steps import StepContextConfiguration for scheduling pipeline runs.
class Schedule:
"""
Schedule configuration for pipeline runs.
Supports cron expressions and interval-based scheduling.
Attributes:
- name: Schedule name
- cron_expression: Cron expression (e.g., "0 0 * * *" for daily at midnight)
- start_time: Schedule start datetime
- end_time: Schedule end datetime
- interval_second: Interval as timedelta between runs (for periodic schedules)
- catchup: Whether to catch up on missed runs
- run_once_start_time: When to run the pipeline once (for one-time schedules)
"""
def __init__(
self,
name: str = None,
cron_expression: str = None,
start_time: datetime = None,
end_time: datetime = None,
interval_second: timedelta = None,
catchup: bool = False,
run_once_start_time: datetime = None
):
"""
Initialize schedule configuration.
Use either cron_expression or interval_second, not both.
Parameters:
- name: Schedule name
- cron_expression: Cron expression for schedule
- start_time: When to start the schedule
- end_time: When to end the schedule
- interval_second: Run interval as timedelta object
- catchup: Whether to catch up on missed runs
- run_once_start_time: When to run the pipeline once
Example:
```python
from zenml import pipeline, Schedule
from datetime import datetime, timedelta
# Daily schedule
schedule = Schedule(
name="daily_training",
cron_expression="0 0 * * *",
start_time=datetime.now()
)
# Interval-based schedule (every 2 hours)
schedule = Schedule(
name="periodic_training",
interval_second=2 * 60 * 60,
start_time=datetime.now()
)
@pipeline(schedule=schedule)
def my_pipeline():
# Pipeline definition
pass
```
"""Import from:
from zenml import Schedule
from zenml.config import Schedule
from zenml.pipelines import ScheduleResource allocation settings for steps.
class ResourceSettings:
"""
Settings for resource allocation (CPU, GPU, memory).
Attributes:
- cpu_count: Number of CPUs
- gpu_count: Number of GPUs
- memory: Memory allocation (e.g., "4GB", "512MB")
"""
def __init__(
self,
cpu_count: int = None,
gpu_count: int = None,
memory: str = None
):
"""
Initialize resource settings.
Parameters:
- cpu_count: Number of CPUs to allocate
- gpu_count: Number of GPUs to allocate
- memory: Memory to allocate (e.g., "4GB", "512MB")
Example:
```python
from zenml import step
from zenml.config import ResourceSettings
@step(
settings={
"resources": ResourceSettings(
cpu_count=4,
gpu_count=1,
memory="8GB"
)
}
)
def train_model(data: list) -> float:
# Training with allocated resources
return 0.95
```
"""Import from:
from zenml.config import ResourceSettings
from zenml.steps import ResourceSettingsfrom zenml import pipeline, step
from typing import Tuple
@step
def load_data() -> Tuple[list, list]:
"""Load training and test data."""
train_data = [1, 2, 3, 4, 5]
test_data = [6, 7, 8, 9, 10]
return train_data, test_data
@step
def preprocess_data(train: list, test: list) -> Tuple[list, list]:
"""Preprocess data."""
train_processed = [x * 2 for x in train]
test_processed = [x * 2 for x in test]
return train_processed, test_processed
@step
def train_model(data: list) -> dict:
"""Train a model."""
return {"accuracy": 0.95, "loss": 0.05}
@step
def evaluate_model(model: dict, test_data: list) -> float:
"""Evaluate model on test data."""
return model["accuracy"] * 0.98
@pipeline
def ml_pipeline():
"""Complete ML training pipeline."""
train, test = load_data()
train_processed, test_processed = preprocess_data(train, test)
model = train_model(train_processed)
accuracy = evaluate_model(model, test_processed)
return accuracy
if __name__ == "__main__":
ml_pipeline()from zenml import pipeline, step, Model
@step
def train_model(data: list) -> dict:
"""Train and return model."""
return {"weights": [0.1, 0.2, 0.3], "accuracy": 0.95}
@pipeline(
model=Model(
name="text_classifier",
version="1.0.0",
license="Apache-2.0",
description="Text classification model",
tags=["nlp", "classification"]
)
)
def training_pipeline():
"""Pipeline with model tracking."""
data = [1, 2, 3, 4, 5]
model = train_model(data)
return model
if __name__ == "__main__":
training_pipeline()from zenml import pipeline, step
from zenml.hooks import alerter_success_hook, alerter_failure_hook
@step
def train_model(data: list) -> float:
"""Train model."""
return 0.95
@pipeline(
on_success=alerter_success_hook("slack_alerter", "Training completed!"),
on_failure=alerter_failure_hook("slack_alerter", "Training failed!")
)
def monitored_pipeline():
"""Pipeline with alerting."""
data = [1, 2, 3]
accuracy = train_model(data)
return accuracyfrom zenml import step, get_step_context, get_pipeline_context
@step
def contextual_step(data: list) -> dict:
"""Step that uses context."""
step_context = get_step_context()
pipeline_context = get_pipeline_context()
print(f"Step: {step_context.step_name}")
print(f"Pipeline: {pipeline_context.name}")
print(f"Run: {pipeline_context.run_name}")
# Access model if configured
if pipeline_context.model:
print(f"Model: {pipeline_context.model.name}")
return {
"step": step_context.step_name,
"pipeline": pipeline_context.name,
"processed_data": [x * 2 for x in data]
}from zenml import step
from zenml.config import ResourceSettings
@step(
settings={
"resources": ResourceSettings(
cpu_count=8,
gpu_count=2,
memory="16GB"
)
}
)
def gpu_intensive_step(data: list) -> dict:
"""Step requiring GPU resources."""
# GPU training logic
return {"model": "trained_model", "accuracy": 0.98}Install with Tessl CLI
npx tessl i tessl/pypi-zenml