0
# Hooks
1
2
Pre-built hooks for alerting and custom hook utilities. Hooks enable executing custom logic in response to pipeline and step success or failure events.
3
4
## Capabilities
5
6
### Alerter Success Hook
7
8
```python { .api }
9
def alerter_success_hook() -> None:
10
"""
11
Standard success hook that executes after step finishes successfully.
12
13
This hook uses any `BaseAlerter` that is configured within the active
14
stack to post a success notification message. The message includes
15
pipeline name, run name, step name, and parameters.
16
17
The hook automatically detects the alerter from the active stack.
18
If no alerter is configured, a warning is logged and the hook is skipped.
19
20
Example:
21
```python
22
from zenml import pipeline, step
23
from zenml.hooks import alerter_success_hook
24
25
@step(on_success=alerter_success_hook)
26
def train_model(data: list) -> dict:
27
return {"model": "trained", "accuracy": 0.95}
28
29
@pipeline(on_success=alerter_success_hook)
30
def my_pipeline():
31
data = [1, 2, 3]
32
train_model(data)
33
```
34
"""
35
```
36
37
Import from:
38
39
```python
40
from zenml.hooks import alerter_success_hook
41
```
42
43
### Alerter Failure Hook
44
45
```python { .api }
46
def alerter_failure_hook(exception: BaseException) -> None:
47
"""
48
Standard failure hook that executes after step fails.
49
50
This hook uses any `BaseAlerter` that is configured within the active
51
stack to post a failure notification message. The message includes
52
pipeline name, run name, step name, parameters, and exception details
53
with traceback.
54
55
The hook automatically detects the alerter from the active stack.
56
If no alerter is configured, a warning is logged and the hook is skipped.
57
58
Parameters:
59
- exception: Original exception that led to step failing
60
61
Example:
62
```python
63
from zenml import pipeline, step
64
from zenml.hooks import alerter_failure_hook
65
66
@step(on_failure=alerter_failure_hook)
67
def train_model(data: list) -> dict:
68
if not data:
69
raise ValueError("No data provided")
70
return {"model": "trained"}
71
72
@pipeline(on_failure=alerter_failure_hook)
73
def my_pipeline():
74
data = []
75
train_model(data)
76
```
77
"""
78
```
79
80
Import from:
81
82
```python
83
from zenml.hooks import alerter_failure_hook
84
```
85
86
### Resolve and Validate Hook
87
88
```python { .api }
89
def resolve_and_validate_hook(hook):
90
"""
91
Utility to resolve and validate custom hooks.
92
93
Resolves hook specifications (strings, functions, or callables)
94
and validates they meet hook requirements.
95
96
Parameters:
97
- hook: Hook specification (string path, function, or callable)
98
99
Returns:
100
Resolved and validated hook callable
101
102
Raises:
103
HookValidationException: If hook is invalid
104
105
Example:
106
```python
107
from zenml.hooks import resolve_and_validate_hook
108
109
def my_custom_hook():
110
print("Custom hook executed")
111
112
# Validate hook
113
validated = resolve_and_validate_hook(my_custom_hook)
114
```
115
"""
116
```
117
118
Import from:
119
120
```python
121
from zenml.hooks import resolve_and_validate_hook
122
```
123
124
## Usage Examples
125
126
### Basic Alerter Hooks
127
128
```python
129
from zenml import pipeline, step
130
from zenml.hooks import alerter_success_hook, alerter_failure_hook
131
132
@step
133
def train_model(data: list) -> dict:
134
"""Training step."""
135
return {"model": "trained", "accuracy": 0.95}
136
137
@pipeline(
138
on_success=alerter_success_hook,
139
on_failure=alerter_failure_hook
140
)
141
def monitored_pipeline():
142
"""Pipeline with alerting.
143
144
The hooks will use any alerter configured in the active stack.
145
"""
146
data = [1, 2, 3, 4, 5]
147
model = train_model(data)
148
return model
149
```
150
151
### Step-Level Alerter Hooks
152
153
```python
154
from zenml import step
155
from zenml.hooks import alerter_success_hook, alerter_failure_hook
156
157
@step(
158
on_success=alerter_success_hook,
159
on_failure=alerter_failure_hook
160
)
161
def critical_step(data: list) -> dict:
162
"""Critical step with both success and failure alerting."""
163
if not data:
164
raise ValueError("No data provided")
165
return {"processed": data}
166
```
167
168
### Custom Hook Function
169
170
```python
171
from zenml import pipeline, get_pipeline_context
172
173
def custom_success_hook():
174
"""Custom success hook."""
175
context = get_pipeline_context()
176
print(f"Pipeline {context.name} completed!")
177
print(f"Run name: {context.run_name}")
178
179
# Custom logic (e.g., trigger downstream processes)
180
# trigger_deployment()
181
# update_dashboard()
182
183
@pipeline(on_success=custom_success_hook)
184
def pipeline_with_custom_hook():
185
"""Pipeline with custom hook."""
186
pass
187
```
188
189
### Hook with Arguments
190
191
```python
192
from zenml import pipeline
193
194
def custom_hook_with_args(threshold: float):
195
"""Hook factory that creates a hook with arguments."""
196
def hook():
197
print(f"Checking threshold: {threshold}")
198
# Custom logic using threshold
199
return hook
200
201
@pipeline(
202
on_success=custom_hook_with_args(threshold=0.95)
203
)
204
def threshold_pipeline():
205
"""Pipeline with parameterized hook."""
206
pass
207
```
208
209
### Error Handling in Hooks
210
211
```python
212
from zenml import pipeline, get_pipeline_context
213
214
def safe_success_hook():
215
"""Success hook with error handling."""
216
try:
217
context = get_pipeline_context()
218
print(f"Pipeline {context.name} succeeded")
219
220
# Potentially failing operations
221
# send_notification()
222
# update_external_system()
223
224
except Exception as e:
225
print(f"Hook failed but pipeline succeeded: {e}")
226
# Log error but don't fail pipeline
227
228
@pipeline(on_success=safe_success_hook)
229
def resilient_pipeline():
230
"""Pipeline with resilient hooks."""
231
pass
232
```
233
234
### Conditional Hooks
235
236
```python
237
from zenml import pipeline, get_pipeline_context
238
import os
239
240
def conditional_alert():
241
"""Alert only in production environment."""
242
if os.getenv("ENV") == "production":
243
# Send alert
244
print("Production alert sent")
245
else:
246
print("Non-production environment, skipping alert")
247
248
@pipeline(on_success=conditional_alert)
249
def environment_aware_pipeline():
250
"""Pipeline with environment-aware alerting."""
251
pass
252
```
253
254
### Hook with External Service
255
256
```python
257
from zenml import pipeline
258
import requests
259
260
def webhook_success_hook(webhook_url: str):
261
"""Create hook that calls external webhook."""
262
def hook():
263
try:
264
response = requests.post(
265
webhook_url,
266
json={
267
"status": "success",
268
"pipeline": "my_pipeline",
269
"timestamp": "2024-01-15T10:00:00Z"
270
},
271
timeout=10
272
)
273
response.raise_for_status()
274
print("Webhook notification sent")
275
except Exception as e:
276
print(f"Failed to send webhook: {e}")
277
278
return hook
279
280
@pipeline(
281
on_success=webhook_success_hook("https://api.example.com/webhook")
282
)
283
def webhook_pipeline():
284
"""Pipeline with webhook notification."""
285
pass
286
```
287
288
### Combining Hooks and Metadata
289
290
```python
291
from zenml import pipeline, step, log_metadata, get_pipeline_context
292
293
def metadata_logging_hook():
294
"""Hook that logs additional metadata."""
295
context = get_pipeline_context()
296
297
log_metadata({
298
"completion_hook_executed": True,
299
"pipeline_name": context.name,
300
"run_name": context.run_name
301
})
302
303
@step
304
def training_step(data: list) -> dict:
305
return {"model": "trained"}
306
307
@pipeline(on_success=metadata_logging_hook)
308
def metadata_aware_pipeline():
309
"""Pipeline that logs metadata on success."""
310
train_data = [1, 2, 3]
311
model = training_step(train_data)
312
```
313
314
### Validating Custom Hooks
315
316
```python
317
from zenml.hooks import resolve_and_validate_hook
318
from zenml.exceptions import HookValidationException
319
320
def my_hook():
321
"""Valid hook."""
322
print("Hook executed")
323
324
def invalid_hook(required_arg):
325
"""Invalid hook - requires arguments."""
326
print(f"This won't work: {required_arg}")
327
328
# Validate hooks
329
try:
330
valid = resolve_and_validate_hook(my_hook)
331
print("Valid hook")
332
except HookValidationException as e:
333
print(f"Invalid: {e}")
334
335
try:
336
invalid = resolve_and_validate_hook(invalid_hook)
337
except HookValidationException as e:
338
print(f"Hook validation failed: {e}")
339
```
340
341
### String Path Hooks
342
343
```python
344
from zenml import pipeline
345
346
# Hook defined in a module
347
# my_hooks.py:
348
# def success_notification():
349
# print("Success!")
350
351
@pipeline(
352
on_success="my_hooks.success_notification"
353
)
354
def string_path_pipeline():
355
"""Pipeline using string path to hook."""
356
pass
357
```
358
359
### Hook Execution Context
360
361
```python
362
from zenml import pipeline, step, get_pipeline_context, get_step_context
363
364
def detailed_success_hook():
365
"""Hook that accesses execution context."""
366
try:
367
# Try to get pipeline context
368
pipeline_ctx = get_pipeline_context()
369
print(f"Pipeline: {pipeline_ctx.name}")
370
print(f"Run: {pipeline_ctx.run_name}")
371
372
if pipeline_ctx.model:
373
print(f"Model: {pipeline_ctx.model.name}")
374
375
# Access run details
376
run = pipeline_ctx.pipeline_run
377
print(f"Status: {run.status}")
378
print(f"Start time: {run.start_time}")
379
380
except Exception as e:
381
print(f"Context access error: {e}")
382
383
@pipeline(on_success=detailed_success_hook)
384
def context_aware_pipeline():
385
"""Pipeline with context-aware hook."""
386
pass
387
```
388