ZenML is a unified MLOps framework that extends battle-tested machine learning operations principles to support the entire AI stack, from classical machine learning models to advanced AI agents.
Pre-built hooks for alerting and custom hook utilities. Hooks enable executing custom logic in response to pipeline and step success or failure events.
def alerter_success_hook() -> None:
"""
Standard success hook that executes after step finishes successfully.
This hook uses any `BaseAlerter` that is configured within the active
stack to post a success notification message. The message includes
pipeline name, run name, step name, and parameters.
The hook automatically detects the alerter from the active stack.
If no alerter is configured, a warning is logged and the hook is skipped.
Example:
```python
from zenml import pipeline, step
from zenml.hooks import alerter_success_hook
@step(on_success=alerter_success_hook)
def train_model(data: list) -> dict:
return {"model": "trained", "accuracy": 0.95}
@pipeline(on_success=alerter_success_hook)
def my_pipeline():
data = [1, 2, 3]
train_model(data)
```
"""Import from:
from zenml.hooks import alerter_success_hookdef alerter_failure_hook(exception: BaseException) -> None:
"""
Standard failure hook that executes after step fails.
This hook uses any `BaseAlerter` that is configured within the active
stack to post a failure notification message. The message includes
pipeline name, run name, step name, parameters, and exception details
with traceback.
The hook automatically detects the alerter from the active stack.
If no alerter is configured, a warning is logged and the hook is skipped.
Parameters:
- exception: Original exception that led to step failing
Example:
```python
from zenml import pipeline, step
from zenml.hooks import alerter_failure_hook
@step(on_failure=alerter_failure_hook)
def train_model(data: list) -> dict:
if not data:
raise ValueError("No data provided")
return {"model": "trained"}
@pipeline(on_failure=alerter_failure_hook)
def my_pipeline():
data = []
train_model(data)
```
"""Import from:
from zenml.hooks import alerter_failure_hookdef resolve_and_validate_hook(hook):
"""
Utility to resolve and validate custom hooks.
Resolves hook specifications (strings, functions, or callables)
and validates they meet hook requirements.
Parameters:
- hook: Hook specification (string path, function, or callable)
Returns:
Resolved and validated hook callable
Raises:
HookValidationException: If hook is invalid
Example:
```python
from zenml.hooks import resolve_and_validate_hook
def my_custom_hook():
print("Custom hook executed")
# Validate hook
validated = resolve_and_validate_hook(my_custom_hook)
```
"""Import from:
from zenml.hooks import resolve_and_validate_hookfrom zenml import pipeline, step
from zenml.hooks import alerter_success_hook, alerter_failure_hook
@step
def train_model(data: list) -> dict:
"""Training step."""
return {"model": "trained", "accuracy": 0.95}
@pipeline(
on_success=alerter_success_hook,
on_failure=alerter_failure_hook
)
def monitored_pipeline():
"""Pipeline with alerting.
The hooks will use any alerter configured in the active stack.
"""
data = [1, 2, 3, 4, 5]
model = train_model(data)
return modelfrom zenml import step
from zenml.hooks import alerter_success_hook, alerter_failure_hook
@step(
on_success=alerter_success_hook,
on_failure=alerter_failure_hook
)
def critical_step(data: list) -> dict:
"""Critical step with both success and failure alerting."""
if not data:
raise ValueError("No data provided")
return {"processed": data}from zenml import pipeline, get_pipeline_context
def custom_success_hook():
"""Custom success hook."""
context = get_pipeline_context()
print(f"Pipeline {context.name} completed!")
print(f"Run name: {context.run_name}")
# Custom logic (e.g., trigger downstream processes)
# trigger_deployment()
# update_dashboard()
@pipeline(on_success=custom_success_hook)
def pipeline_with_custom_hook():
"""Pipeline with custom hook."""
passfrom zenml import pipeline
def custom_hook_with_args(threshold: float):
"""Hook factory that creates a hook with arguments."""
def hook():
print(f"Checking threshold: {threshold}")
# Custom logic using threshold
return hook
@pipeline(
on_success=custom_hook_with_args(threshold=0.95)
)
def threshold_pipeline():
"""Pipeline with parameterized hook."""
passfrom zenml import pipeline, get_pipeline_context
def safe_success_hook():
"""Success hook with error handling."""
try:
context = get_pipeline_context()
print(f"Pipeline {context.name} succeeded")
# Potentially failing operations
# send_notification()
# update_external_system()
except Exception as e:
print(f"Hook failed but pipeline succeeded: {e}")
# Log error but don't fail pipeline
@pipeline(on_success=safe_success_hook)
def resilient_pipeline():
"""Pipeline with resilient hooks."""
passfrom zenml import pipeline, get_pipeline_context
import os
def conditional_alert():
"""Alert only in production environment."""
if os.getenv("ENV") == "production":
# Send alert
print("Production alert sent")
else:
print("Non-production environment, skipping alert")
@pipeline(on_success=conditional_alert)
def environment_aware_pipeline():
"""Pipeline with environment-aware alerting."""
passfrom zenml import pipeline
import requests
def webhook_success_hook(webhook_url: str):
"""Create hook that calls external webhook."""
def hook():
try:
response = requests.post(
webhook_url,
json={
"status": "success",
"pipeline": "my_pipeline",
"timestamp": "2024-01-15T10:00:00Z"
},
timeout=10
)
response.raise_for_status()
print("Webhook notification sent")
except Exception as e:
print(f"Failed to send webhook: {e}")
return hook
@pipeline(
on_success=webhook_success_hook("https://api.example.com/webhook")
)
def webhook_pipeline():
"""Pipeline with webhook notification."""
passfrom zenml import pipeline, step, log_metadata, get_pipeline_context
def metadata_logging_hook():
"""Hook that logs additional metadata."""
context = get_pipeline_context()
log_metadata({
"completion_hook_executed": True,
"pipeline_name": context.name,
"run_name": context.run_name
})
@step
def training_step(data: list) -> dict:
return {"model": "trained"}
@pipeline(on_success=metadata_logging_hook)
def metadata_aware_pipeline():
"""Pipeline that logs metadata on success."""
train_data = [1, 2, 3]
model = training_step(train_data)from zenml.hooks import resolve_and_validate_hook
from zenml.exceptions import HookValidationException
def my_hook():
"""Valid hook."""
print("Hook executed")
def invalid_hook(required_arg):
"""Invalid hook - requires arguments."""
print(f"This won't work: {required_arg}")
# Validate hooks
try:
valid = resolve_and_validate_hook(my_hook)
print("Valid hook")
except HookValidationException as e:
print(f"Invalid: {e}")
try:
invalid = resolve_and_validate_hook(invalid_hook)
except HookValidationException as e:
print(f"Hook validation failed: {e}")from zenml import pipeline
# Hook defined in a module
# my_hooks.py:
# def success_notification():
# print("Success!")
@pipeline(
on_success="my_hooks.success_notification"
)
def string_path_pipeline():
"""Pipeline using string path to hook."""
passfrom zenml import pipeline, step, get_pipeline_context, get_step_context
def detailed_success_hook():
"""Hook that accesses execution context."""
try:
# Try to get pipeline context
pipeline_ctx = get_pipeline_context()
print(f"Pipeline: {pipeline_ctx.name}")
print(f"Run: {pipeline_ctx.run_name}")
if pipeline_ctx.model:
print(f"Model: {pipeline_ctx.model.name}")
# Access run details
run = pipeline_ctx.pipeline_run
print(f"Status: {run.status}")
print(f"Start time: {run.start_time}")
except Exception as e:
print(f"Context access error: {e}")
@pipeline(on_success=detailed_success_hook)
def context_aware_pipeline():
"""Pipeline with context-aware hook."""
passInstall with Tessl CLI
npx tessl i tessl/pypi-zenml