0
# Hook System
1
2
Kedro's hook system provides a plugin architecture for extending framework functionality at various lifecycle points. Hooks enable custom behavior injection during node execution, pipeline runs, and catalog operations without modifying core code.
3
4
## Capabilities
5
6
### Hook Implementation
7
8
Decorator for marking methods as hook implementations with the plugin system.
9
10
```python { .api }
11
def hook_impl(func):
12
"""
13
Decorator to mark methods as hook implementations.
14
15
Args:
16
func (Callable): Function to mark as hook implementation
17
18
Returns:
19
Callable: Decorated function with hook metadata
20
21
Usage:
22
@hook_impl
23
def before_node_run(self, node, catalog, inputs, is_async, session_id):
24
# Custom logic before node execution
25
pass
26
"""
27
```
28
29
### Hook Manager
30
31
Factory function for creating hook managers that coordinate plugin execution.
32
33
```python { .api }
34
def _create_hook_manager():
35
"""
36
Create hook manager for plugin system.
37
38
Returns:
39
HookManager: Configured hook manager instance
40
41
Note:
42
This is an internal function used by the framework.
43
Users typically don't call this directly.
44
"""
45
```
46
47
## Available Hook Specifications
48
49
### Node Execution Hooks
50
51
Hooks that run during individual node execution lifecycle.
52
53
```python { .api }
54
class NodeHookSpecs:
55
"""Hook specifications for node execution events."""
56
57
def before_node_run(self, node, catalog, inputs, is_async, session_id):
58
"""
59
Called before a node runs.
60
61
Args:
62
node (Node): The node to be executed
63
catalog (DataCatalog): Data catalog instance
64
inputs (dict): Node inputs
65
is_async (bool): Whether node runs asynchronously
66
session_id (str): Session identifier
67
"""
68
69
def after_node_run(self, node, catalog, inputs, outputs, is_async, session_id):
70
"""
71
Called after a node runs successfully.
72
73
Args:
74
node (Node): The executed node
75
catalog (DataCatalog): Data catalog instance
76
inputs (dict): Node inputs that were used
77
outputs (dict): Node outputs that were produced
78
is_async (bool): Whether node ran asynchronously
79
session_id (str): Session identifier
80
"""
81
82
def on_node_error(self, error, node, catalog, inputs, is_async, session_id):
83
"""
84
Called when a node execution fails.
85
86
Args:
87
error (Exception): The exception that occurred
88
node (Node): The node that failed
89
catalog (DataCatalog): Data catalog instance
90
inputs (dict): Node inputs that were used
91
is_async (bool): Whether node was running asynchronously
92
session_id (str): Session identifier
93
"""
94
```
95
96
### Pipeline Execution Hooks
97
98
Hooks that run during pipeline execution lifecycle.
99
100
```python { .api }
101
class PipelineHookSpecs:
102
"""Hook specifications for pipeline execution events."""
103
104
def before_pipeline_run(self, run_params, pipeline, catalog):
105
"""
106
Called before a pipeline runs.
107
108
Args:
109
run_params (dict): Parameters for the pipeline run
110
pipeline (Pipeline): The pipeline to be executed
111
catalog (DataCatalog): Data catalog instance
112
"""
113
114
def after_pipeline_run(self, run_params, pipeline, catalog):
115
"""
116
Called after a pipeline runs successfully.
117
118
Args:
119
run_params (dict): Parameters used for the pipeline run
120
pipeline (Pipeline): The executed pipeline
121
catalog (DataCatalog): Data catalog instance
122
"""
123
124
def on_pipeline_error(self, error, run_params, pipeline, catalog):
125
"""
126
Called when a pipeline execution fails.
127
128
Args:
129
error (Exception): The exception that occurred
130
run_params (dict): Parameters used for the pipeline run
131
pipeline (Pipeline): The pipeline that failed
132
catalog (DataCatalog): Data catalog instance
133
"""
134
```
135
136
### Dataset Hooks
137
138
Hooks that run during dataset operations.
139
140
```python { .api }
141
class DatasetHookSpecs:
142
"""Hook specifications for dataset operation events."""
143
144
def before_dataset_loaded(self, dataset_name, node):
145
"""
146
Called before a dataset is loaded.
147
148
Args:
149
dataset_name (str): Name of the dataset to be loaded
150
node (Node): Node requesting the dataset (if applicable)
151
"""
152
153
def after_dataset_loaded(self, dataset_name, data, node):
154
"""
155
Called after a dataset is loaded.
156
157
Args:
158
dataset_name (str): Name of the dataset that was loaded
159
data: The loaded data
160
node (Node): Node that requested the dataset (if applicable)
161
"""
162
163
def before_dataset_saved(self, dataset_name, data, node):
164
"""
165
Called before a dataset is saved.
166
167
Args:
168
dataset_name (str): Name of the dataset to be saved
169
data: The data to be saved
170
node (Node): Node producing the dataset (if applicable)
171
"""
172
173
def after_dataset_saved(self, dataset_name, data, node):
174
"""
175
Called after a dataset is saved.
176
177
Args:
178
dataset_name (str): Name of the dataset that was saved
179
data: The data that was saved
180
node (Node): Node that produced the dataset (if applicable)
181
"""
182
```
183
184
## Usage Examples
185
186
### Basic Hook Implementation
187
188
```python
189
from kedro.framework.hooks import hook_impl
190
import logging
191
192
class ProjectHooks:
193
"""Custom hooks for project-specific functionality."""
194
195
@hook_impl
196
def before_node_run(self, node, catalog, inputs, is_async, session_id):
197
"""Log node execution start."""
198
logging.info(f"Starting execution of node: {node.name}")
199
logging.info(f"Node inputs: {list(inputs.keys())}")
200
201
@hook_impl
202
def after_node_run(self, node, catalog, inputs, outputs, is_async, session_id):
203
"""Log node execution completion."""
204
logging.info(f"Completed execution of node: {node.name}")
205
logging.info(f"Node outputs: {list(outputs.keys())}")
206
207
@hook_impl
208
def on_node_error(self, error, node, catalog, inputs, is_async, session_id):
209
"""Log node execution errors."""
210
logging.error(f"Node {node.name} failed with error: {str(error)}")
211
# Could also send alerts, save error state, etc.
212
213
# Register hooks in settings.py
214
HOOKS = (ProjectHooks(),)
215
```
216
217
### Performance Monitoring Hooks
218
219
```python
220
from kedro.framework.hooks import hook_impl
221
import time
222
from collections import defaultdict
223
224
class PerformanceHooks:
225
"""Hooks for monitoring execution performance."""
226
227
def __init__(self):
228
self.node_times = defaultdict(list)
229
self.pipeline_start_time = None
230
231
@hook_impl
232
def before_pipeline_run(self, run_params, pipeline, catalog):
233
"""Record pipeline start time."""
234
self.pipeline_start_time = time.time()
235
print(f"Starting pipeline with {len(pipeline.nodes)} nodes")
236
237
@hook_impl
238
def after_pipeline_run(self, run_params, pipeline, catalog):
239
"""Report pipeline execution statistics."""
240
total_time = time.time() - self.pipeline_start_time
241
print(f"Pipeline completed in {total_time:.2f} seconds")
242
243
# Report node-level statistics
244
print("\nNode execution times:")
245
for node_name, times in self.node_times.items():
246
avg_time = sum(times) / len(times)
247
print(f" {node_name}: {avg_time:.2f}s (avg of {len(times)} runs)")
248
249
@hook_impl
250
def before_node_run(self, node, catalog, inputs, is_async, session_id):
251
"""Record node start time."""
252
node._start_time = time.time()
253
254
@hook_impl
255
def after_node_run(self, node, catalog, inputs, outputs, is_async, session_id):
256
"""Record node execution time."""
257
execution_time = time.time() - node._start_time
258
self.node_times[node.name].append(execution_time)
259
260
if execution_time > 10: # Warn about slow nodes
261
print(f"WARNING: Node {node.name} took {execution_time:.2f}s to execute")
262
263
# Usage in settings.py
264
HOOKS = (PerformanceHooks(),)
265
```
266
267
### Data Validation Hooks
268
269
```python
270
from kedro.framework.hooks import hook_impl
271
import pandas as pd
272
273
class DataValidationHooks:
274
"""Hooks for automatic data validation."""
275
276
@hook_impl
277
def after_dataset_loaded(self, dataset_name, data, node):
278
"""Validate loaded data."""
279
if isinstance(data, pd.DataFrame):
280
# Check for missing data
281
if data.isnull().sum().sum() > 0:
282
print(f"WARNING: Dataset {dataset_name} contains missing values")
283
284
# Check for empty datasets
285
if len(data) == 0:
286
raise ValueError(f"Dataset {dataset_name} is empty")
287
288
# Log dataset info
289
print(f"Loaded {dataset_name}: {data.shape[0]} rows, {data.shape[1]} columns")
290
291
@hook_impl
292
def before_dataset_saved(self, dataset_name, data, node):
293
"""Validate data before saving."""
294
if isinstance(data, pd.DataFrame):
295
# Ensure no infinity values
296
if data.select_dtypes(include=[float]).isin([float('inf'), -float('inf')]).sum().sum() > 0:
297
raise ValueError(f"Dataset {dataset_name} contains infinity values")
298
299
# Ensure reasonable data types
300
for col in data.columns:
301
if data[col].dtype == 'object' and dataset_name.endswith('_numeric'):
302
print(f"WARNING: Numeric dataset {dataset_name} has object column: {col}")
303
304
# Usage in settings.py
305
HOOKS = (DataValidationHooks(),)
306
```
307
308
### ML Experiment Tracking Hooks
309
310
```python
311
from kedro.framework.hooks import hook_impl
312
import mlflow
313
import joblib
314
from pathlib import Path
315
316
class MLflowHooks:
317
"""Hooks for MLflow experiment tracking integration."""
318
319
def __init__(self, experiment_name="kedro-experiment"):
320
mlflow.set_experiment(experiment_name)
321
322
@hook_impl
323
def before_pipeline_run(self, run_params, pipeline, catalog):
324
"""Start MLflow run."""
325
mlflow.start_run()
326
327
# Log pipeline parameters
328
if 'tags' in run_params:
329
mlflow.set_tags({"pipeline_tags": str(run_params['tags'])})
330
331
mlflow.log_param("pipeline_nodes", len(pipeline.nodes))
332
333
@hook_impl
334
def after_pipeline_run(self, run_params, pipeline, catalog):
335
"""End MLflow run."""
336
mlflow.end_run()
337
338
@hook_impl
339
def after_node_run(self, node, catalog, inputs, outputs, is_async, session_id):
340
"""Log model artifacts and metrics."""
341
# Log models
342
for output_name, output_data in outputs.items():
343
if output_name.endswith('_model'):
344
# Save and log model
345
model_path = f"models/{output_name}.pkl"
346
joblib.dump(output_data, model_path)
347
mlflow.log_artifact(model_path)
348
349
# Log metrics
350
elif output_name.endswith('_metrics') and isinstance(output_data, dict):
351
for metric_name, metric_value in output_data.items():
352
if isinstance(metric_value, (int, float)):
353
mlflow.log_metric(f"{node.name}_{metric_name}", metric_value)
354
355
# Usage in settings.py
356
HOOKS = (MLflowHooks(experiment_name="my-kedro-project"),)
357
```
358
359
### Error Recovery Hooks
360
361
```python
362
from kedro.framework.hooks import hook_impl
363
import logging
364
from datetime import datetime
365
366
class ErrorRecoveryHooks:
367
"""Hooks for error recovery and resilience."""
368
369
def __init__(self):
370
self.failed_nodes = []
371
self.recovery_strategies = {
372
'data_loading_node': self._recover_data_loading,
373
'model_training_node': self._recover_model_training,
374
}
375
376
@hook_impl
377
def on_node_error(self, error, node, catalog, inputs, is_async, session_id):
378
"""Attempt error recovery for failed nodes."""
379
self.failed_nodes.append({
380
'node_name': node.name,
381
'error': str(error),
382
'timestamp': datetime.now(),
383
'inputs': list(inputs.keys())
384
})
385
386
# Attempt recovery if strategy exists
387
if node.name in self.recovery_strategies:
388
logging.info(f"Attempting recovery for node: {node.name}")
389
try:
390
self.recovery_strategies[node.name](node, catalog, inputs, error)
391
logging.info(f"Successfully recovered node: {node.name}")
392
except Exception as recovery_error:
393
logging.error(f"Recovery failed for {node.name}: {recovery_error}")
394
395
def _recover_data_loading(self, node, catalog, inputs, error):
396
"""Recovery strategy for data loading failures."""
397
# Try alternative data source
398
alternative_dataset = f"{node.inputs[0]}_backup"
399
if catalog.exists(alternative_dataset):
400
# Temporarily replace dataset
401
original_data = catalog._data_sets[node.inputs[0]]
402
catalog._data_sets[node.inputs[0]] = catalog._data_sets[alternative_dataset]
403
logging.info(f"Using backup dataset for {node.inputs[0]}")
404
405
def _recover_model_training(self, node, catalog, inputs, error):
406
"""Recovery strategy for model training failures."""
407
# Use simpler model parameters
408
if 'parameters' in inputs:
409
params = inputs['parameters'].copy()
410
params['model_complexity'] = 'simple'
411
catalog.save('parameters:recovery_params', params)
412
logging.info("Switched to simpler model parameters")
413
414
@hook_impl
415
def after_pipeline_run(self, run_params, pipeline, catalog):
416
"""Report error summary after pipeline completion."""
417
if self.failed_nodes:
418
logging.warning(f"Pipeline completed with {len(self.failed_nodes)} node failures:")
419
for failure in self.failed_nodes:
420
logging.warning(f" - {failure['node_name']}: {failure['error']}")
421
422
# Usage in settings.py
423
HOOKS = (ErrorRecoveryHooks(),)
424
```
425
426
### Multiple Hook Classes
427
428
```python
429
from kedro.framework.hooks import hook_impl
430
431
class LoggingHooks:
432
"""Hooks for enhanced logging."""
433
434
@hook_impl
435
def before_pipeline_run(self, run_params, pipeline, catalog):
436
logging.info("=== Pipeline Execution Started ===")
437
438
class SecurityHooks:
439
"""Hooks for security and compliance."""
440
441
@hook_impl
442
def before_dataset_loaded(self, dataset_name, node):
443
if 'sensitive' in dataset_name:
444
logging.info(f"Accessing sensitive dataset: {dataset_name}")
445
446
class NotificationHooks:
447
"""Hooks for external notifications."""
448
449
@hook_impl
450
def on_pipeline_error(self, error, run_params, pipeline, catalog):
451
# Send alert to monitoring system
452
send_alert(f"Pipeline failed: {str(error)}")
453
454
# Register multiple hook classes in settings.py
455
HOOKS = (
456
LoggingHooks(),
457
SecurityHooks(),
458
NotificationHooks(),
459
)
460
```
461
462
## Types
463
464
```python { .api }
465
from typing import Any, Dict, List, Optional, Callable
466
from kedro.pipeline import Node, Pipeline
467
from kedro.io import DataCatalog
468
469
HookImplementation = Callable[..., Any]
470
HookManager = Any
471
SessionId = str
472
RunParams = Dict[str, Any]
473
DatasetName = str
474
NodeInputs = Dict[str, Any]
475
NodeOutputs = Dict[str, Any]
476
ErrorType = Exception
477
```