CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-zenml

ZenML is a unified MLOps framework that extends battle-tested machine learning operations principles to support the entire AI stack, from classical machine learning models to advanced AI agents.

Overview
Eval results
Files

pipelines-and-steps.mddocs/

Pipelines and Steps

Core decorators and context objects for defining ML workflows and their constituent steps. ZenML pipelines are directed acyclic graphs (DAGs) of steps that define reproducible ML workflows with automatic versioning, caching, and lineage tracking.

Capabilities

Pipeline Decorator

Decorator to define a ZenML pipeline from a Python function.

def pipeline(
    _func=None,
    *,
    name: str = None,
    enable_cache: bool = None,
    enable_artifact_metadata: bool = None,
    enable_step_logs: bool = None,
    environment: dict = None,
    secrets: list = None,
    enable_pipeline_logs: bool = None,
    settings: dict = None,
    tags: list = None,
    extra: dict = None,
    on_failure=None,
    on_success=None,
    on_init=None,
    on_init_kwargs: dict = None,
    on_cleanup=None,
    model: Model = None,
    retry=None,
    substitutions: dict = None,
    execution_mode=None,
    cache_policy=None
):
    """
    Decorator to define a ZenML pipeline.

    Parameters:
    - name: Pipeline name (defaults to function name)
    - enable_cache: Enable step caching
    - enable_artifact_metadata: Enable artifact metadata logging
    - enable_step_logs: Enable step logging
    - environment: Environment variables to set when running this pipeline
    - secrets: Secrets to set as environment variables (list of UUIDs or names)
    - enable_pipeline_logs: Enable pipeline logs
    - settings: Stack component settings dict
    - tags: Tags to apply to runs of the pipeline
    - extra: Extra pipeline metadata dict
    - on_failure: Failure hook callable or list of callables
    - on_success: Success hook callable or list of callables
    - on_init: Callback function to run on initialization of the pipeline
    - on_init_kwargs: Arguments for the init hook
    - on_cleanup: Callback function to run on cleanup of the pipeline
    - model: Model configuration for Model Control Plane
    - retry: Retry configuration for the pipeline steps
    - substitutions: Extra placeholders to use in the name templates
    - execution_mode: The execution mode to use for the pipeline
    - cache_policy: Cache policy for this pipeline

    Returns:
    Pipeline decorator function that wraps the pipeline function

    Example:
    ```python
    from zenml import pipeline, step, Model

    @pipeline(
        name="training_pipeline",
        enable_cache=True,
        model=Model(name="my_model", version="1.0.0")
    )
    def my_pipeline():
        data = load_data()
        model = train_model(data)
        return model
    ```
    """

Import from:

from zenml import pipeline

Step Decorator

Decorator to define a ZenML step from a Python function.

def step(
    _func=None,
    *,
    name: str = None,
    enable_cache: bool = None,
    enable_artifact_metadata: bool = None,
    enable_artifact_visualization: bool = None,
    enable_step_logs: bool = None,
    experiment_tracker: bool | str = None,
    step_operator: bool | str = None,
    output_materializers=None,
    environment: dict = None,
    secrets: list = None,
    settings: dict = None,
    extra: dict = None,
    on_failure=None,
    on_success=None,
    model: Model = None,
    retry=None,
    substitutions: dict = None,
    cache_policy=None
):
    """
    Decorator to define a ZenML step.

    Parameters:
    - name: Step name (defaults to function name)
    - enable_cache: Enable caching for this step
    - enable_artifact_metadata: Enable artifact metadata for this step
    - enable_artifact_visualization: Enable artifact visualization for this step
    - enable_step_logs: Enable step logs for this step
    - experiment_tracker: Name of experiment tracker component to use, or bool to enable/disable
    - step_operator: Name of step operator for remote execution, or bool to enable/disable
    - output_materializers: Custom materializers for outputs (single class or dict mapping output names to classes)
    - environment: Environment variables to set when running this step
    - secrets: Secrets to set as environment variables (list of UUIDs or names)
    - settings: Stack component settings dict
    - extra: Extra step metadata dict
    - on_failure: Failure hook callable or list of callables
    - on_success: Success hook callable or list of callables
    - model: Model configuration for Model Control Plane
    - retry: Retry configuration in case of step failure
    - substitutions: Extra placeholders for the step name
    - cache_policy: Cache policy for this step

    Returns:
    Step decorator function that wraps the step function

    Example:
    ```python
    from zenml import step
    from typing import Tuple

    @step
    def load_data() -> Tuple[list, list]:
        train_data = [1, 2, 3, 4, 5]
        test_data = [6, 7, 8, 9, 10]
        return train_data, test_data

    @step(enable_cache=False)
    def train_model(data: list) -> float:
        # Training logic
        accuracy = 0.95
        return accuracy
    ```
    """

Import from:

from zenml import step

Base Step Class

Base class for implementing custom steps with additional functionality.

class BaseStep:
    """
    Base class for implementing custom steps.

    Use the @step decorator for most cases. This class is for advanced
    scenarios requiring additional control over step behavior.
    """

    def entrypoint(self, *args, **kwargs):
        """
        Main execution method to be implemented by subclass.

        Parameters:
        - *args: Positional arguments
        - **kwargs: Keyword arguments

        Returns:
        Step outputs
        """

Import from:

from zenml.steps import BaseStep

Pipeline Context

Access pipeline execution context and metadata.

class PipelineContext:
    """
    Pipeline execution context object.

    Attributes:
    - name: Pipeline name
    - run_name: Pipeline run name
    - pipeline_run: PipelineRunResponse object with full run details
    - extra: Extra metadata dict
    - model: Model configuration if set
    """

    @property
    def name(self) -> str:
        """Get pipeline name."""

    @property
    def run_name(self) -> str:
        """Get pipeline run name."""

    @property
    def pipeline_run(self):
        """
        Get pipeline run response object.

        Returns:
        PipelineRunResponse: Full pipeline run details including status, timestamps, configuration
        """

    @property
    def extra(self) -> dict:
        """Get extra metadata."""

    @property
    def model(self):
        """
        Get model configuration.

        Returns:
        ModelVersionResponse or None
        """


def get_pipeline_context() -> PipelineContext:
    """
    Get the current pipeline execution context.

    Returns:
    PipelineContext: Context object with pipeline metadata

    Raises:
    RuntimeError: If called outside pipeline execution

    Example:
    ```python
    from zenml import step, get_pipeline_context

    @step
    def my_step():
        context = get_pipeline_context()
        print(f"Running in pipeline: {context.name}")
        print(f"Run name: {context.run_name}")
        if context.model:
            print(f"Model: {context.model.name}")
    ```
    """

Import from:

from zenml import get_pipeline_context
from zenml.pipelines import PipelineContext

Step Context

Access step execution context and utilities.

class StepContext:
    """
    Step execution context object.

    Provides access to step metadata, pipeline information, and utilities
    for interacting with the stack during step execution.

    Attributes:
    - step_name: Current step name
    - pipeline_name: Parent pipeline name
    - run_name: Pipeline run name
    - step_run: StepRunResponse object with full step run details
    - model: Model configuration if set
    - inputs: Input artifacts metadata
    - outputs: Output artifacts metadata
    """

    @property
    def step_name(self) -> str:
        """Get step name."""

    @property
    def pipeline_name(self) -> str:
        """Get pipeline name."""

    @property
    def run_name(self) -> str:
        """Get pipeline run name."""

    @property
    def step_run(self):
        """
        Get step run response object.

        Returns:
        StepRunResponse: Full step run details including status, timestamps, configuration
        """

    @property
    def model(self):
        """
        Get model configuration.

        Returns:
        ModelVersionResponse or None
        """

    @property
    def inputs(self) -> dict:
        """Get input artifacts metadata."""

    @property
    def outputs(self) -> dict:
        """Get output artifacts metadata."""


def get_step_context() -> StepContext:
    """
    Get the current step execution context.

    Returns:
    StepContext: Context object with step metadata and utilities

    Raises:
    RuntimeError: If called outside step execution

    Example:
    ```python
    from zenml import step, get_step_context

    @step
    def my_step(data: list) -> float:
        context = get_step_context()
        print(f"Running step: {context.step_name}")
        print(f"In pipeline: {context.pipeline_name}")
        print(f"Run: {context.run_name}")

        # Access step run details
        print(f"Status: {context.step_run.status}")

        # Training logic
        accuracy = 0.95
        return accuracy
    ```
    """

Import from:

from zenml import get_step_context
from zenml.steps import StepContext

Schedule Configuration

Configuration for scheduling pipeline runs.

class Schedule:
    """
    Schedule configuration for pipeline runs.

    Supports cron expressions and interval-based scheduling.

    Attributes:
    - name: Schedule name
    - cron_expression: Cron expression (e.g., "0 0 * * *" for daily at midnight)
    - start_time: Schedule start datetime
    - end_time: Schedule end datetime
    - interval_second: Interval as timedelta between runs (for periodic schedules)
    - catchup: Whether to catch up on missed runs
    - run_once_start_time: When to run the pipeline once (for one-time schedules)
    """

    def __init__(
        self,
        name: str = None,
        cron_expression: str = None,
        start_time: datetime = None,
        end_time: datetime = None,
        interval_second: timedelta = None,
        catchup: bool = False,
        run_once_start_time: datetime = None
    ):
        """
        Initialize schedule configuration.

        Use either cron_expression or interval_second, not both.

        Parameters:
        - name: Schedule name
        - cron_expression: Cron expression for schedule
        - start_time: When to start the schedule
        - end_time: When to end the schedule
        - interval_second: Run interval as timedelta object
        - catchup: Whether to catch up on missed runs
        - run_once_start_time: When to run the pipeline once

        Example:
        ```python
        from zenml import pipeline, Schedule
        from datetime import datetime, timedelta

        # Daily schedule
        schedule = Schedule(
            name="daily_training",
            cron_expression="0 0 * * *",
            start_time=datetime.now()
        )

        # Interval-based schedule (every 2 hours)
        schedule = Schedule(
            name="periodic_training",
            interval_second=2 * 60 * 60,
            start_time=datetime.now()
        )

        @pipeline(schedule=schedule)
        def my_pipeline():
            # Pipeline definition
            pass
        ```
        """

Import from:

from zenml import Schedule
from zenml.config import Schedule
from zenml.pipelines import Schedule

Resource Settings

Resource allocation settings for steps.

class ResourceSettings:
    """
    Settings for resource allocation (CPU, GPU, memory).

    Attributes:
    - cpu_count: Number of CPUs
    - gpu_count: Number of GPUs
    - memory: Memory allocation (e.g., "4GB", "512MB")
    """

    def __init__(
        self,
        cpu_count: int = None,
        gpu_count: int = None,
        memory: str = None
    ):
        """
        Initialize resource settings.

        Parameters:
        - cpu_count: Number of CPUs to allocate
        - gpu_count: Number of GPUs to allocate
        - memory: Memory to allocate (e.g., "4GB", "512MB")

        Example:
        ```python
        from zenml import step
        from zenml.config import ResourceSettings

        @step(
            settings={
                "resources": ResourceSettings(
                    cpu_count=4,
                    gpu_count=1,
                    memory="8GB"
                )
            }
        )
        def train_model(data: list) -> float:
            # Training with allocated resources
            return 0.95
        ```
        """

Import from:

from zenml.config import ResourceSettings
from zenml.steps import ResourceSettings

Usage Examples

Basic Pipeline with Multiple Steps

from zenml import pipeline, step
from typing import Tuple

@step
def load_data() -> Tuple[list, list]:
    """Load training and test data."""
    train_data = [1, 2, 3, 4, 5]
    test_data = [6, 7, 8, 9, 10]
    return train_data, test_data

@step
def preprocess_data(train: list, test: list) -> Tuple[list, list]:
    """Preprocess data."""
    train_processed = [x * 2 for x in train]
    test_processed = [x * 2 for x in test]
    return train_processed, test_processed

@step
def train_model(data: list) -> dict:
    """Train a model."""
    return {"accuracy": 0.95, "loss": 0.05}

@step
def evaluate_model(model: dict, test_data: list) -> float:
    """Evaluate model on test data."""
    return model["accuracy"] * 0.98

@pipeline
def ml_pipeline():
    """Complete ML training pipeline."""
    train, test = load_data()
    train_processed, test_processed = preprocess_data(train, test)
    model = train_model(train_processed)
    accuracy = evaluate_model(model, test_processed)
    return accuracy

if __name__ == "__main__":
    ml_pipeline()

Pipeline with Model Control Plane

from zenml import pipeline, step, Model

@step
def train_model(data: list) -> dict:
    """Train and return model."""
    return {"weights": [0.1, 0.2, 0.3], "accuracy": 0.95}

@pipeline(
    model=Model(
        name="text_classifier",
        version="1.0.0",
        license="Apache-2.0",
        description="Text classification model",
        tags=["nlp", "classification"]
    )
)
def training_pipeline():
    """Pipeline with model tracking."""
    data = [1, 2, 3, 4, 5]
    model = train_model(data)
    return model

if __name__ == "__main__":
    training_pipeline()

Pipeline with Hooks

from zenml import pipeline, step
from zenml.hooks import alerter_success_hook, alerter_failure_hook

@step
def train_model(data: list) -> float:
    """Train model."""
    return 0.95

@pipeline(
    on_success=alerter_success_hook("slack_alerter", "Training completed!"),
    on_failure=alerter_failure_hook("slack_alerter", "Training failed!")
)
def monitored_pipeline():
    """Pipeline with alerting."""
    data = [1, 2, 3]
    accuracy = train_model(data)
    return accuracy

Accessing Context in Steps

from zenml import step, get_step_context, get_pipeline_context

@step
def contextual_step(data: list) -> dict:
    """Step that uses context."""
    step_context = get_step_context()
    pipeline_context = get_pipeline_context()

    print(f"Step: {step_context.step_name}")
    print(f"Pipeline: {pipeline_context.name}")
    print(f"Run: {pipeline_context.run_name}")

    # Access model if configured
    if pipeline_context.model:
        print(f"Model: {pipeline_context.model.name}")

    return {
        "step": step_context.step_name,
        "pipeline": pipeline_context.name,
        "processed_data": [x * 2 for x in data]
    }

Step with Resource Allocation

from zenml import step
from zenml.config import ResourceSettings

@step(
    settings={
        "resources": ResourceSettings(
            cpu_count=8,
            gpu_count=2,
            memory="16GB"
        )
    }
)
def gpu_intensive_step(data: list) -> dict:
    """Step requiring GPU resources."""
    # GPU training logic
    return {"model": "trained_model", "accuracy": 0.98}

Install with Tessl CLI

npx tessl i tessl/pypi-zenml

docs

artifact-config.md

artifacts.md

client.md

config.md

enums.md

exceptions.md

hooks.md

index.md

integrations.md

materializers.md

metadata-tags.md

models.md

pipelines-and-steps.md

pydantic-models.md

services.md

stack-components.md

stacks.md

types.md

utilities.md

tile.json