CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-zenml

ZenML is a unified MLOps framework that extends battle-tested machine learning operations principles to support the entire AI stack, from classical machine learning models to advanced AI agents.

Overview
Eval results
Files

config.mddocs/

Configuration

Configuration classes for Docker containerization, resource allocation, scheduling, caching, and other pipeline/step settings.

Capabilities

Docker Settings

class DockerSettings:
    """
    Configuration for Docker containerization.

    Controls how ZenML builds and runs Docker containers for pipeline steps.

    Attributes:
    - parent_image: Base Docker image
    - dockerfile: Path to custom Dockerfile
    - build_context_root: Build context directory
    - build_options: Additional Docker build options dict
    - install_stack_requirements: Install stack component requirements
    - apt_packages: List of apt packages to install
    - requirements: List of pip requirements or path to requirements.txt
    - required_integrations: List of ZenML integrations to install
    - required_hub_plugins: List of ZenML Hub plugins
    - replicate_local_python_environment: Replicate local environment
    - environment: Environment variables dict
    - user: User to run container as
    - python_package_installer: Package installer (from PythonPackageInstaller enum)
    - python_package_installer_args: Additional installer arguments
    - skip_build: Skip building new image
    - target_repository: Target repository for built image
    """

    def __init__(
        self,
        parent_image: str = None,
        dockerfile: str = None,
        build_context_root: str = None,
        build_options: dict = None,
        install_stack_requirements: bool = True,
        apt_packages: list = None,
        requirements: list = None,
        required_integrations: list = None,
        required_hub_plugins: list = None,
        replicate_local_python_environment: str = None,
        environment: dict = None,
        user: str = None,
        python_package_installer: str = None,
        python_package_installer_args: dict = None,
        skip_build: bool = False,
        target_repository: str = None
    ):
        """
        Initialize Docker settings.

        Example:
        ```python
        from zenml.config import DockerSettings

        docker_settings = DockerSettings(
            parent_image="python:3.9-slim",
            requirements=["pandas==2.0.0", "scikit-learn==1.3.0"],
            apt_packages=["git", "curl"],
            environment={"MY_VAR": "value"},
            python_package_installer="pip"
        )
        ```
        """

Import from:

from zenml.config import DockerSettings

Resource Settings

class ResourceSettings:
    """
    Hardware resource settings for steps and deployed pipelines.

    Attributes:
    - cpu_count: Amount of CPU cores (can be fractional, e.g., 0.5)
    - gpu_count: Number of GPUs
    - memory: Memory allocation string (e.g., "4GB", "512MB")
    - min_replicas: Minimum number of replicas (for deployed pipelines)
    - max_replicas: Maximum number of replicas (for deployed pipelines)
    - autoscaling_metric: Metric for autoscaling ("cpu", "memory", "concurrency", "rps")
    - autoscaling_target: Target value for autoscaling metric
    - max_concurrency: Maximum concurrent requests per instance
    """

    def __init__(
        self,
        cpu_count: float = None,
        gpu_count: int = None,
        memory: str = None,
        min_replicas: int = None,
        max_replicas: int = None,
        autoscaling_metric: str = None,
        autoscaling_target: float = None,
        max_concurrency: int = None
    ):
        """
        Initialize resource settings.

        Parameters:
        - cpu_count: Number of CPU cores (can be fractional, e.g., 0.5, 2.0)
        - gpu_count: Number of GPUs to allocate
        - memory: Memory to allocate (with unit, e.g., "4GB", "512MB", "2048MB")
        - min_replicas: Minimum replicas (for deployers/deployed pipelines)
        - max_replicas: Maximum replicas (for deployers/deployed pipelines)
        - autoscaling_metric: Metric for autoscaling - "cpu", "memory", "concurrency", "rps"
        - autoscaling_target: Target value for the metric (e.g., 75.0 for CPU percentage)
        - max_concurrency: Max concurrent requests per instance

        Example:
        ```python
        from zenml.config import ResourceSettings

        # Basic resources
        resources = ResourceSettings(
            cpu_count=8.0,
            gpu_count=2,
            memory="16GB"
        )

        # Deployed pipeline with autoscaling
        deployed_resources = ResourceSettings(
            cpu_count=2.0,
            memory="4GB",
            min_replicas=1,
            max_replicas=10,
            autoscaling_metric="cpu",
            autoscaling_target=75.0,
            max_concurrency=50
        )
        ```
        """

Import from:

from zenml.config import ResourceSettings
from zenml.steps import ResourceSettings

Schedule

class Schedule:
    """
    Schedule configuration for pipeline runs.

    Supports both cron-based and interval-based scheduling.

    Attributes:
    - name: Schedule name
    - cron_expression: Cron expression (e.g., "0 0 * * *")
    - start_time: Schedule start datetime
    - end_time: Schedule end datetime
    - interval_second: Interval as timedelta between runs
    - catchup: Whether to catch up on missed runs
    - run_once_start_time: When to run the pipeline once
    """

    def __init__(
        self,
        name: str = None,
        cron_expression: str = None,
        start_time: datetime = None,
        end_time: datetime = None,
        interval_second: timedelta = None,
        catchup: bool = False,
        run_once_start_time: datetime = None
    ):
        """
        Initialize schedule configuration.

        Use either cron_expression or interval_second, not both.

        Parameters:
        - name: Schedule name
        - cron_expression: Cron expression
        - start_time: When to start the schedule
        - end_time: When to end the schedule
        - interval_second: Run interval as timedelta object
        - catchup: Whether to catch up on missed runs
        - run_once_start_time: When to run the pipeline once

        Example:
        ```python
        from zenml.config import Schedule
        from datetime import datetime, timedelta

        # Cron schedule - daily at midnight
        daily_schedule = Schedule(
            name="daily_training",
            cron_expression="0 0 * * *",
            start_time=datetime.now()
        )

        # Interval schedule - every 2 hours
        interval_schedule = Schedule(
            name="periodic_check",
            interval_second=2 * 60 * 60,
            start_time=datetime.now(),
            end_time=datetime.now() + timedelta(days=30)
        )
        ```
        """

Import from:

from zenml.config import Schedule
from zenml.pipelines import Schedule

Step Retry Config

class StepRetryConfig:
    """
    Configuration for step retry behavior.

    Controls how steps are retried on failure.

    Attributes:
    - max_retries: Maximum number of retries
    - delay: Delay between retries in seconds
    - backoff: Backoff multiplier for delay
    """

    def __init__(
        self,
        max_retries: int = 0,
        delay: int = 1,
        backoff: int = 1
    ):
        """
        Initialize retry configuration.

        Parameters:
        - max_retries: Maximum number of retry attempts
        - delay: Initial delay between retries in seconds
        - backoff: Multiplier for delay on each retry (exponential backoff)

        Example:
        ```python
        from zenml.config import StepRetryConfig

        # Retry up to 3 times with exponential backoff
        retry_config = StepRetryConfig(
            max_retries=3,
            delay=5,      # Start with 5 seconds
            backoff=2     # Double delay each time: 5s, 10s, 20s
        )
        ```
        """

Import from:

from zenml.config import StepRetryConfig

Cache Policy

class CachePolicy:
    """
    Configuration for step caching behavior.

    Controls which components are included in the cache key to determine when step outputs can be reused.

    Attributes:
    - include_step_code: Include step code in cache key (default: True)
    - include_step_parameters: Include step parameters in cache key (default: True)
    - include_artifact_values: Include artifact values in cache key (default: True)
    - include_artifact_ids: Include artifact IDs in cache key (default: True)
    - ignored_inputs: List of input names to ignore in cache key (default: None)
    """

    def __init__(
        self,
        include_step_code: bool = True,
        include_step_parameters: bool = True,
        include_artifact_values: bool = True,
        include_artifact_ids: bool = True,
        ignored_inputs: list = None
    ):
        """
        Initialize cache policy.

        Parameters:
        - include_step_code: Whether to include step code in cache key
        - include_step_parameters: Whether to include step parameters in cache key
        - include_artifact_values: Whether to include artifact values in cache key
        - include_artifact_ids: Whether to include artifact IDs in cache key
        - ignored_inputs: List of input names to ignore when computing cache key

        Example:
        ```python
        from zenml.config import CachePolicy

        # Default policy - includes everything
        default_policy = CachePolicy()

        # Ignore specific inputs
        selective_cache = CachePolicy(
            ignored_inputs=["timestamp", "random_seed"]
        )

        # Only cache based on step code, ignore parameters
        code_only = CachePolicy(
            include_step_code=True,
            include_step_parameters=False,
            include_artifact_values=False,
            include_artifact_ids=False
        )
        ```
        """

    @classmethod
    def default():
        """
        Get the default cache policy.

        Returns:
        CachePolicy: Default policy with all flags enabled
        """

    @classmethod
    def from_string(value: str):
        """
        Create a cache policy from a string.

        Parameters:
        - value: String value (currently supports "default")

        Returns:
        CachePolicy: Cache policy instance

        Raises:
        ValueError: If string is not a valid cache policy
        """

Import from:

from zenml.config import CachePolicy

Store Configuration

class StoreConfiguration:
    """
    Configuration for the ZenML store backend.

    Controls connection to ZenML server or local store.

    Attributes:
    - type: Store type (SQL or REST)
    - url: Store URL
    - secrets_store: Secrets store configuration
    - backup_secrets_store: Backup secrets store configuration
    """

Import from:

from zenml.config import StoreConfiguration

Python Package Installer Enum

class PythonPackageInstaller(str, Enum):
    """
    Python package installer options.

    Values:
    - PIP: Use pip
    - UV: Use uv (faster pip alternative)
    """
    PIP = "pip"
    UV = "uv"

Import from:

from zenml.config import PythonPackageInstaller

Byte Unit Enum

class ByteUnit(str, Enum):
    """
    Units for memory/storage specifications.

    Values:
    - KB: Kilobytes
    - MB: Megabytes
    - GB: Gigabytes
    - TB: Terabytes
    - KIB: Kibibytes
    - MIB: Mebibytes
    - GIB: Gibibytes
    - TIB: Tebibytes
    """
    KB = "KB"
    MB = "MB"
    GB = "GB"
    TB = "TB"
    KIB = "KiB"
    MIB = "MiB"
    GIB = "GiB"
    TIB = "TiB"

Import from:

from zenml.config import ByteUnit

Usage Examples

Docker Settings for Pipeline

from zenml import pipeline, step
from zenml.config import DockerSettings

docker_settings = DockerSettings(
    parent_image="python:3.9-slim",
    requirements=["tensorflow==2.13.0", "numpy==1.24.0"],
    apt_packages=["libgomp1"],
    environment={
        "TF_ENABLE_ONEDNN_OPTS": "0",
        "CUDA_VISIBLE_DEVICES": "0"
    }
)

@step
def train_model(data: list) -> dict:
    import tensorflow as tf
    # Training logic
    return {"model": "trained"}

@pipeline(
    settings={
        "docker": docker_settings
    }
)
def training_pipeline():
    data = [1, 2, 3]
    model = train_model(data)
    return model

Resource Allocation

from zenml import step
from zenml.config import ResourceSettings

@step(
    settings={
        "resources": ResourceSettings(
            cpu_count=16,
            gpu_count=4,
            memory="64GB"
        )
    }
)
def large_scale_training(data: list) -> dict:
    """Step requiring significant resources."""
    # Heavy training logic
    return {"model": "large_model"}

Scheduled Pipeline

from zenml import pipeline
from zenml.config import Schedule
from datetime import datetime, timedelta

# Daily schedule at 2 AM
schedule = Schedule(
    name="nightly_training",
    cron_expression="0 2 * * *",
    start_time=datetime.now(),
    end_time=datetime.now() + timedelta(days=365)
)

@pipeline(schedule=schedule)
def scheduled_pipeline():
    # Pipeline definition
    pass

Retry Configuration

from zenml import step
from zenml.config import StepRetryConfig

retry_config = StepRetryConfig(
    max_retries=5,
    delay=10,
    backoff=2
)

@step(
    settings={
        "retry": retry_config
    }
)
def flaky_external_api_call(endpoint: str) -> dict:
    """Step that might fail due to network issues."""
    # API call that might fail
    return {"status": "success"}

Cache Policy

from zenml import step
from zenml.config import CachePolicy

# Ignore specific inputs in cache key
selective_cache = CachePolicy(
    ignored_inputs=["timestamp", "random_seed"]
)

@step(cache_policy=selective_cache)
def process_data(data: dict, timestamp: str, random_seed: int) -> dict:
    """Step that ignores timestamp and random_seed for caching."""
    # Process data - cache will only consider 'data' input
    return {"processed": data}

# Cache only based on step code, ignore parameters and artifacts
code_only_cache = CachePolicy(
    include_step_code=True,
    include_step_parameters=False,
    include_artifact_values=False,
    include_artifact_ids=False
)

@step(cache_policy=code_only_cache)
def generate_random_data() -> list:
    """Step cached only by code version."""
    import random
    return [random.random() for _ in range(10)]

# Use default cache policy
@step(cache_policy=CachePolicy.default())
def standard_processing(input_data: str) -> dict:
    """Step with default caching behavior."""
    return {"result": input_data}

Combined Settings

from zenml import pipeline, step
from zenml.config import DockerSettings, ResourceSettings, StepRetryConfig

docker_settings = DockerSettings(
    parent_image="nvidia/cuda:11.8.0-cudnn8-runtime-ubuntu22.04",
    requirements=["torch==2.0.0", "transformers==4.30.0"],
    environment={"HF_HOME": "/cache"}
)

resource_settings = ResourceSettings(
    cpu_count=8,
    gpu_count=2,
    memory="32GB"
)

retry_config = StepRetryConfig(
    max_retries=3,
    delay=30,
    backoff=2
)

@step(
    settings={
        "docker": docker_settings,
        "resources": resource_settings,
        "retry": retry_config
    }
)
def train_transformer(data: str) -> dict:
    """LLM training with full configuration."""
    # Training logic
    return {"model": "trained_transformer"}

@pipeline(
    settings={
        "docker": docker_settings
    }
)
def llm_pipeline():
    model = train_transformer("training_data")
    return model

Custom Docker Image

from zenml import pipeline
from zenml.config import DockerSettings

# Use custom Dockerfile
docker_settings = DockerSettings(
    dockerfile="./docker/Dockerfile",
    build_context_root="./",
    build_options={
        "buildargs": {
            "BASE_IMAGE": "python:3.9"
        }
    }
)

@pipeline(
    settings={
        "docker": docker_settings
    }
)
def custom_docker_pipeline():
    # Pipeline using custom Docker image
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-zenml

docs

artifact-config.md

artifacts.md

client.md

config.md

enums.md

exceptions.md

hooks.md

index.md

integrations.md

materializers.md

metadata-tags.md

models.md

pipelines-and-steps.md

pydantic-models.md

services.md

stack-components.md

stacks.md

types.md

utilities.md

tile.json