CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-zenml

ZenML is a unified MLOps framework that extends battle-tested machine learning operations principles to support the entire AI stack, from classical machine learning models to advanced AI agents.

Overview
Eval results
Files

hooks.mddocs/

Hooks

Pre-built hooks for alerting and custom hook utilities. Hooks enable executing custom logic in response to pipeline and step success or failure events.

Capabilities

Alerter Success Hook

def alerter_success_hook() -> None:
    """
    Standard success hook that executes after step finishes successfully.

    This hook uses any `BaseAlerter` that is configured within the active
    stack to post a success notification message. The message includes
    pipeline name, run name, step name, and parameters.

    The hook automatically detects the alerter from the active stack.
    If no alerter is configured, a warning is logged and the hook is skipped.

    Example:
    ```python
    from zenml import pipeline, step
    from zenml.hooks import alerter_success_hook

    @step(on_success=alerter_success_hook)
    def train_model(data: list) -> dict:
        return {"model": "trained", "accuracy": 0.95}

    @pipeline(on_success=alerter_success_hook)
    def my_pipeline():
        data = [1, 2, 3]
        train_model(data)
    ```
    """

Import from:

from zenml.hooks import alerter_success_hook

Alerter Failure Hook

def alerter_failure_hook(exception: BaseException) -> None:
    """
    Standard failure hook that executes after step fails.

    This hook uses any `BaseAlerter` that is configured within the active
    stack to post a failure notification message. The message includes
    pipeline name, run name, step name, parameters, and exception details
    with traceback.

    The hook automatically detects the alerter from the active stack.
    If no alerter is configured, a warning is logged and the hook is skipped.

    Parameters:
    - exception: Original exception that led to step failing

    Example:
    ```python
    from zenml import pipeline, step
    from zenml.hooks import alerter_failure_hook

    @step(on_failure=alerter_failure_hook)
    def train_model(data: list) -> dict:
        if not data:
            raise ValueError("No data provided")
        return {"model": "trained"}

    @pipeline(on_failure=alerter_failure_hook)
    def my_pipeline():
        data = []
        train_model(data)
    ```
    """

Import from:

from zenml.hooks import alerter_failure_hook

Resolve and Validate Hook

def resolve_and_validate_hook(hook):
    """
    Utility to resolve and validate custom hooks.

    Resolves hook specifications (strings, functions, or callables)
    and validates they meet hook requirements.

    Parameters:
    - hook: Hook specification (string path, function, or callable)

    Returns:
    Resolved and validated hook callable

    Raises:
    HookValidationException: If hook is invalid

    Example:
    ```python
    from zenml.hooks import resolve_and_validate_hook

    def my_custom_hook():
        print("Custom hook executed")

    # Validate hook
    validated = resolve_and_validate_hook(my_custom_hook)
    ```
    """

Import from:

from zenml.hooks import resolve_and_validate_hook

Usage Examples

Basic Alerter Hooks

from zenml import pipeline, step
from zenml.hooks import alerter_success_hook, alerter_failure_hook

@step
def train_model(data: list) -> dict:
    """Training step."""
    return {"model": "trained", "accuracy": 0.95}

@pipeline(
    on_success=alerter_success_hook,
    on_failure=alerter_failure_hook
)
def monitored_pipeline():
    """Pipeline with alerting.

    The hooks will use any alerter configured in the active stack.
    """
    data = [1, 2, 3, 4, 5]
    model = train_model(data)
    return model

Step-Level Alerter Hooks

from zenml import step
from zenml.hooks import alerter_success_hook, alerter_failure_hook

@step(
    on_success=alerter_success_hook,
    on_failure=alerter_failure_hook
)
def critical_step(data: list) -> dict:
    """Critical step with both success and failure alerting."""
    if not data:
        raise ValueError("No data provided")
    return {"processed": data}

Custom Hook Function

from zenml import pipeline, get_pipeline_context

def custom_success_hook():
    """Custom success hook."""
    context = get_pipeline_context()
    print(f"Pipeline {context.name} completed!")
    print(f"Run name: {context.run_name}")

    # Custom logic (e.g., trigger downstream processes)
    # trigger_deployment()
    # update_dashboard()

@pipeline(on_success=custom_success_hook)
def pipeline_with_custom_hook():
    """Pipeline with custom hook."""
    pass

Hook with Arguments

from zenml import pipeline

def custom_hook_with_args(threshold: float):
    """Hook factory that creates a hook with arguments."""
    def hook():
        print(f"Checking threshold: {threshold}")
        # Custom logic using threshold
    return hook

@pipeline(
    on_success=custom_hook_with_args(threshold=0.95)
)
def threshold_pipeline():
    """Pipeline with parameterized hook."""
    pass

Error Handling in Hooks

from zenml import pipeline, get_pipeline_context

def safe_success_hook():
    """Success hook with error handling."""
    try:
        context = get_pipeline_context()
        print(f"Pipeline {context.name} succeeded")

        # Potentially failing operations
        # send_notification()
        # update_external_system()

    except Exception as e:
        print(f"Hook failed but pipeline succeeded: {e}")
        # Log error but don't fail pipeline

@pipeline(on_success=safe_success_hook)
def resilient_pipeline():
    """Pipeline with resilient hooks."""
    pass

Conditional Hooks

from zenml import pipeline, get_pipeline_context
import os

def conditional_alert():
    """Alert only in production environment."""
    if os.getenv("ENV") == "production":
        # Send alert
        print("Production alert sent")
    else:
        print("Non-production environment, skipping alert")

@pipeline(on_success=conditional_alert)
def environment_aware_pipeline():
    """Pipeline with environment-aware alerting."""
    pass

Hook with External Service

from zenml import pipeline
import requests

def webhook_success_hook(webhook_url: str):
    """Create hook that calls external webhook."""
    def hook():
        try:
            response = requests.post(
                webhook_url,
                json={
                    "status": "success",
                    "pipeline": "my_pipeline",
                    "timestamp": "2024-01-15T10:00:00Z"
                },
                timeout=10
            )
            response.raise_for_status()
            print("Webhook notification sent")
        except Exception as e:
            print(f"Failed to send webhook: {e}")

    return hook

@pipeline(
    on_success=webhook_success_hook("https://api.example.com/webhook")
)
def webhook_pipeline():
    """Pipeline with webhook notification."""
    pass

Combining Hooks and Metadata

from zenml import pipeline, step, log_metadata, get_pipeline_context

def metadata_logging_hook():
    """Hook that logs additional metadata."""
    context = get_pipeline_context()

    log_metadata({
        "completion_hook_executed": True,
        "pipeline_name": context.name,
        "run_name": context.run_name
    })

@step
def training_step(data: list) -> dict:
    return {"model": "trained"}

@pipeline(on_success=metadata_logging_hook)
def metadata_aware_pipeline():
    """Pipeline that logs metadata on success."""
    train_data = [1, 2, 3]
    model = training_step(train_data)

Validating Custom Hooks

from zenml.hooks import resolve_and_validate_hook
from zenml.exceptions import HookValidationException

def my_hook():
    """Valid hook."""
    print("Hook executed")

def invalid_hook(required_arg):
    """Invalid hook - requires arguments."""
    print(f"This won't work: {required_arg}")

# Validate hooks
try:
    valid = resolve_and_validate_hook(my_hook)
    print("Valid hook")
except HookValidationException as e:
    print(f"Invalid: {e}")

try:
    invalid = resolve_and_validate_hook(invalid_hook)
except HookValidationException as e:
    print(f"Hook validation failed: {e}")

String Path Hooks

from zenml import pipeline

# Hook defined in a module
# my_hooks.py:
# def success_notification():
#     print("Success!")

@pipeline(
    on_success="my_hooks.success_notification"
)
def string_path_pipeline():
    """Pipeline using string path to hook."""
    pass

Hook Execution Context

from zenml import pipeline, step, get_pipeline_context, get_step_context

def detailed_success_hook():
    """Hook that accesses execution context."""
    try:
        # Try to get pipeline context
        pipeline_ctx = get_pipeline_context()
        print(f"Pipeline: {pipeline_ctx.name}")
        print(f"Run: {pipeline_ctx.run_name}")

        if pipeline_ctx.model:
            print(f"Model: {pipeline_ctx.model.name}")

        # Access run details
        run = pipeline_ctx.pipeline_run
        print(f"Status: {run.status}")
        print(f"Start time: {run.start_time}")

    except Exception as e:
        print(f"Context access error: {e}")

@pipeline(on_success=detailed_success_hook)
def context_aware_pipeline():
    """Pipeline with context-aware hook."""
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-zenml

docs

artifact-config.md

artifacts.md

client.md

config.md

enums.md

exceptions.md

hooks.md

index.md

integrations.md

materializers.md

metadata-tags.md

models.md

pipelines-and-steps.md

pydantic-models.md

services.md

stack-components.md

stacks.md

types.md

utilities.md

tile.json