ZenML is a unified MLOps framework that extends battle-tested machine learning operations principles to support the entire AI stack, from classical machine learning models to advanced AI agents.
Configuration classes for Docker containerization, resource allocation, scheduling, caching, and other pipeline/step settings.
class DockerSettings:
"""
Configuration for Docker containerization.
Controls how ZenML builds and runs Docker containers for pipeline steps.
Attributes:
- parent_image: Base Docker image
- dockerfile: Path to custom Dockerfile
- build_context_root: Build context directory
- build_options: Additional Docker build options dict
- install_stack_requirements: Install stack component requirements
- apt_packages: List of apt packages to install
- requirements: List of pip requirements or path to requirements.txt
- required_integrations: List of ZenML integrations to install
- required_hub_plugins: List of ZenML Hub plugins
- replicate_local_python_environment: Replicate local environment
- environment: Environment variables dict
- user: User to run container as
- python_package_installer: Package installer (from PythonPackageInstaller enum)
- python_package_installer_args: Additional installer arguments
- skip_build: Skip building new image
- target_repository: Target repository for built image
"""
def __init__(
self,
parent_image: str = None,
dockerfile: str = None,
build_context_root: str = None,
build_options: dict = None,
install_stack_requirements: bool = True,
apt_packages: list = None,
requirements: list = None,
required_integrations: list = None,
required_hub_plugins: list = None,
replicate_local_python_environment: str = None,
environment: dict = None,
user: str = None,
python_package_installer: str = None,
python_package_installer_args: dict = None,
skip_build: bool = False,
target_repository: str = None
):
"""
Initialize Docker settings.
Example:
```python
from zenml.config import DockerSettings
docker_settings = DockerSettings(
parent_image="python:3.9-slim",
requirements=["pandas==2.0.0", "scikit-learn==1.3.0"],
apt_packages=["git", "curl"],
environment={"MY_VAR": "value"},
python_package_installer="pip"
)
```
"""Import from:
from zenml.config import DockerSettingsclass ResourceSettings:
"""
Hardware resource settings for steps and deployed pipelines.
Attributes:
- cpu_count: Amount of CPU cores (can be fractional, e.g., 0.5)
- gpu_count: Number of GPUs
- memory: Memory allocation string (e.g., "4GB", "512MB")
- min_replicas: Minimum number of replicas (for deployed pipelines)
- max_replicas: Maximum number of replicas (for deployed pipelines)
- autoscaling_metric: Metric for autoscaling ("cpu", "memory", "concurrency", "rps")
- autoscaling_target: Target value for autoscaling metric
- max_concurrency: Maximum concurrent requests per instance
"""
def __init__(
self,
cpu_count: float = None,
gpu_count: int = None,
memory: str = None,
min_replicas: int = None,
max_replicas: int = None,
autoscaling_metric: str = None,
autoscaling_target: float = None,
max_concurrency: int = None
):
"""
Initialize resource settings.
Parameters:
- cpu_count: Number of CPU cores (can be fractional, e.g., 0.5, 2.0)
- gpu_count: Number of GPUs to allocate
- memory: Memory to allocate (with unit, e.g., "4GB", "512MB", "2048MB")
- min_replicas: Minimum replicas (for deployers/deployed pipelines)
- max_replicas: Maximum replicas (for deployers/deployed pipelines)
- autoscaling_metric: Metric for autoscaling - "cpu", "memory", "concurrency", "rps"
- autoscaling_target: Target value for the metric (e.g., 75.0 for CPU percentage)
- max_concurrency: Max concurrent requests per instance
Example:
```python
from zenml.config import ResourceSettings
# Basic resources
resources = ResourceSettings(
cpu_count=8.0,
gpu_count=2,
memory="16GB"
)
# Deployed pipeline with autoscaling
deployed_resources = ResourceSettings(
cpu_count=2.0,
memory="4GB",
min_replicas=1,
max_replicas=10,
autoscaling_metric="cpu",
autoscaling_target=75.0,
max_concurrency=50
)
```
"""Import from:
from zenml.config import ResourceSettings
from zenml.steps import ResourceSettingsclass Schedule:
"""
Schedule configuration for pipeline runs.
Supports both cron-based and interval-based scheduling.
Attributes:
- name: Schedule name
- cron_expression: Cron expression (e.g., "0 0 * * *")
- start_time: Schedule start datetime
- end_time: Schedule end datetime
- interval_second: Interval as timedelta between runs
- catchup: Whether to catch up on missed runs
- run_once_start_time: When to run the pipeline once
"""
def __init__(
self,
name: str = None,
cron_expression: str = None,
start_time: datetime = None,
end_time: datetime = None,
interval_second: timedelta = None,
catchup: bool = False,
run_once_start_time: datetime = None
):
"""
Initialize schedule configuration.
Use either cron_expression or interval_second, not both.
Parameters:
- name: Schedule name
- cron_expression: Cron expression
- start_time: When to start the schedule
- end_time: When to end the schedule
- interval_second: Run interval as timedelta object
- catchup: Whether to catch up on missed runs
- run_once_start_time: When to run the pipeline once
Example:
```python
from zenml.config import Schedule
from datetime import datetime, timedelta
# Cron schedule - daily at midnight
daily_schedule = Schedule(
name="daily_training",
cron_expression="0 0 * * *",
start_time=datetime.now()
)
# Interval schedule - every 2 hours
interval_schedule = Schedule(
name="periodic_check",
interval_second=2 * 60 * 60,
start_time=datetime.now(),
end_time=datetime.now() + timedelta(days=30)
)
```
"""Import from:
from zenml.config import Schedule
from zenml.pipelines import Scheduleclass StepRetryConfig:
"""
Configuration for step retry behavior.
Controls how steps are retried on failure.
Attributes:
- max_retries: Maximum number of retries
- delay: Delay between retries in seconds
- backoff: Backoff multiplier for delay
"""
def __init__(
self,
max_retries: int = 0,
delay: int = 1,
backoff: int = 1
):
"""
Initialize retry configuration.
Parameters:
- max_retries: Maximum number of retry attempts
- delay: Initial delay between retries in seconds
- backoff: Multiplier for delay on each retry (exponential backoff)
Example:
```python
from zenml.config import StepRetryConfig
# Retry up to 3 times with exponential backoff
retry_config = StepRetryConfig(
max_retries=3,
delay=5, # Start with 5 seconds
backoff=2 # Double delay each time: 5s, 10s, 20s
)
```
"""Import from:
from zenml.config import StepRetryConfigclass CachePolicy:
"""
Configuration for step caching behavior.
Controls which components are included in the cache key to determine when step outputs can be reused.
Attributes:
- include_step_code: Include step code in cache key (default: True)
- include_step_parameters: Include step parameters in cache key (default: True)
- include_artifact_values: Include artifact values in cache key (default: True)
- include_artifact_ids: Include artifact IDs in cache key (default: True)
- ignored_inputs: List of input names to ignore in cache key (default: None)
"""
def __init__(
self,
include_step_code: bool = True,
include_step_parameters: bool = True,
include_artifact_values: bool = True,
include_artifact_ids: bool = True,
ignored_inputs: list = None
):
"""
Initialize cache policy.
Parameters:
- include_step_code: Whether to include step code in cache key
- include_step_parameters: Whether to include step parameters in cache key
- include_artifact_values: Whether to include artifact values in cache key
- include_artifact_ids: Whether to include artifact IDs in cache key
- ignored_inputs: List of input names to ignore when computing cache key
Example:
```python
from zenml.config import CachePolicy
# Default policy - includes everything
default_policy = CachePolicy()
# Ignore specific inputs
selective_cache = CachePolicy(
ignored_inputs=["timestamp", "random_seed"]
)
# Only cache based on step code, ignore parameters
code_only = CachePolicy(
include_step_code=True,
include_step_parameters=False,
include_artifact_values=False,
include_artifact_ids=False
)
```
"""
@classmethod
def default():
"""
Get the default cache policy.
Returns:
CachePolicy: Default policy with all flags enabled
"""
@classmethod
def from_string(value: str):
"""
Create a cache policy from a string.
Parameters:
- value: String value (currently supports "default")
Returns:
CachePolicy: Cache policy instance
Raises:
ValueError: If string is not a valid cache policy
"""Import from:
from zenml.config import CachePolicyclass StoreConfiguration:
"""
Configuration for the ZenML store backend.
Controls connection to ZenML server or local store.
Attributes:
- type: Store type (SQL or REST)
- url: Store URL
- secrets_store: Secrets store configuration
- backup_secrets_store: Backup secrets store configuration
"""Import from:
from zenml.config import StoreConfigurationclass PythonPackageInstaller(str, Enum):
"""
Python package installer options.
Values:
- PIP: Use pip
- UV: Use uv (faster pip alternative)
"""
PIP = "pip"
UV = "uv"Import from:
from zenml.config import PythonPackageInstallerclass ByteUnit(str, Enum):
"""
Units for memory/storage specifications.
Values:
- KB: Kilobytes
- MB: Megabytes
- GB: Gigabytes
- TB: Terabytes
- KIB: Kibibytes
- MIB: Mebibytes
- GIB: Gibibytes
- TIB: Tebibytes
"""
KB = "KB"
MB = "MB"
GB = "GB"
TB = "TB"
KIB = "KiB"
MIB = "MiB"
GIB = "GiB"
TIB = "TiB"Import from:
from zenml.config import ByteUnitfrom zenml import pipeline, step
from zenml.config import DockerSettings
docker_settings = DockerSettings(
parent_image="python:3.9-slim",
requirements=["tensorflow==2.13.0", "numpy==1.24.0"],
apt_packages=["libgomp1"],
environment={
"TF_ENABLE_ONEDNN_OPTS": "0",
"CUDA_VISIBLE_DEVICES": "0"
}
)
@step
def train_model(data: list) -> dict:
import tensorflow as tf
# Training logic
return {"model": "trained"}
@pipeline(
settings={
"docker": docker_settings
}
)
def training_pipeline():
data = [1, 2, 3]
model = train_model(data)
return modelfrom zenml import step
from zenml.config import ResourceSettings
@step(
settings={
"resources": ResourceSettings(
cpu_count=16,
gpu_count=4,
memory="64GB"
)
}
)
def large_scale_training(data: list) -> dict:
"""Step requiring significant resources."""
# Heavy training logic
return {"model": "large_model"}from zenml import pipeline
from zenml.config import Schedule
from datetime import datetime, timedelta
# Daily schedule at 2 AM
schedule = Schedule(
name="nightly_training",
cron_expression="0 2 * * *",
start_time=datetime.now(),
end_time=datetime.now() + timedelta(days=365)
)
@pipeline(schedule=schedule)
def scheduled_pipeline():
# Pipeline definition
passfrom zenml import step
from zenml.config import StepRetryConfig
retry_config = StepRetryConfig(
max_retries=5,
delay=10,
backoff=2
)
@step(
settings={
"retry": retry_config
}
)
def flaky_external_api_call(endpoint: str) -> dict:
"""Step that might fail due to network issues."""
# API call that might fail
return {"status": "success"}from zenml import step
from zenml.config import CachePolicy
# Ignore specific inputs in cache key
selective_cache = CachePolicy(
ignored_inputs=["timestamp", "random_seed"]
)
@step(cache_policy=selective_cache)
def process_data(data: dict, timestamp: str, random_seed: int) -> dict:
"""Step that ignores timestamp and random_seed for caching."""
# Process data - cache will only consider 'data' input
return {"processed": data}
# Cache only based on step code, ignore parameters and artifacts
code_only_cache = CachePolicy(
include_step_code=True,
include_step_parameters=False,
include_artifact_values=False,
include_artifact_ids=False
)
@step(cache_policy=code_only_cache)
def generate_random_data() -> list:
"""Step cached only by code version."""
import random
return [random.random() for _ in range(10)]
# Use default cache policy
@step(cache_policy=CachePolicy.default())
def standard_processing(input_data: str) -> dict:
"""Step with default caching behavior."""
return {"result": input_data}from zenml import pipeline, step
from zenml.config import DockerSettings, ResourceSettings, StepRetryConfig
docker_settings = DockerSettings(
parent_image="nvidia/cuda:11.8.0-cudnn8-runtime-ubuntu22.04",
requirements=["torch==2.0.0", "transformers==4.30.0"],
environment={"HF_HOME": "/cache"}
)
resource_settings = ResourceSettings(
cpu_count=8,
gpu_count=2,
memory="32GB"
)
retry_config = StepRetryConfig(
max_retries=3,
delay=30,
backoff=2
)
@step(
settings={
"docker": docker_settings,
"resources": resource_settings,
"retry": retry_config
}
)
def train_transformer(data: str) -> dict:
"""LLM training with full configuration."""
# Training logic
return {"model": "trained_transformer"}
@pipeline(
settings={
"docker": docker_settings
}
)
def llm_pipeline():
model = train_transformer("training_data")
return modelfrom zenml import pipeline
from zenml.config import DockerSettings
# Use custom Dockerfile
docker_settings = DockerSettings(
dockerfile="./docker/Dockerfile",
build_context_root="./",
build_options={
"buildargs": {
"BASE_IMAGE": "python:3.9"
}
}
)
@pipeline(
settings={
"docker": docker_settings
}
)
def custom_docker_pipeline():
# Pipeline using custom Docker image
passInstall with Tessl CLI
npx tessl i tessl/pypi-zenml