tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Implement lifecycle hooks to extend Kedro's behavior without modifying core code.
from kedro.framework.hooks import hook_impl
import logging
logger = logging.getLogger(__name__)
class ProjectHooks:
"""Collection of project-wide hooks."""
@hook_impl
def after_node_run(self, node, outputs):
logger.info(f"Node {node.name} completed")
logger.info(f"Outputs: {list(outputs.keys())}")# settings.py
from my_project.hooks import ProjectHooks
HOOKS = (
ProjectHooks(),
)class NodeHooks:
@hook_impl
def before_node_run(self, node, catalog, inputs):
"""Called before a node runs."""
logger.info(f"Starting node: {node.name}")
@hook_impl
def after_node_run(self, node, outputs):
"""Called after a node successfully runs."""
logger.info(f"Completed node: {node.name}")
@hook_impl
def on_node_error(self, error, node):
"""Called when a node execution fails."""
logger.error(f"Node {node.name} failed: {error}")class PipelineHooks:
@hook_impl
def before_pipeline_run(self, pipeline, catalog):
"""Called before a pipeline runs."""
logger.info(f"Starting pipeline with {len(pipeline.nodes)} nodes")
@hook_impl
def after_pipeline_run(self, pipeline):
"""Called after a pipeline successfully completes."""
logger.info("Pipeline completed successfully")
@hook_impl
def on_pipeline_error(self, error, pipeline):
"""Called when a pipeline execution fails."""
logger.error(f"Pipeline failed: {error}")class DatasetHooks:
@hook_impl
def before_dataset_loaded(self, dataset_name):
"""Called before loading a dataset."""
logger.info(f"Loading dataset: {dataset_name}")
@hook_impl
def after_dataset_loaded(self, dataset_name, data):
"""Called after loading a dataset."""
if hasattr(data, '__len__'):
logger.info(f"Loaded {dataset_name}: {len(data)} records")
@hook_impl
def before_dataset_saved(self, dataset_name, data):
"""Called before saving a dataset."""
logger.info(f"Saving dataset: {dataset_name}")
@hook_impl
def after_dataset_saved(self, dataset_name):
"""Called after saving a dataset."""
logger.info(f"Saved dataset: {dataset_name}")class CatalogHooks:
@hook_impl
def after_catalog_created(self, catalog):
"""Called after the DataCatalog is created."""
logger.info(f"Catalog created with {len(catalog.filter())} datasets")
# Can modify catalog using dict-like assignment
from kedro.io import MemoryDataset
catalog["runtime_metrics"] = MemoryDataset()import time
class TimingHooks:
def __init__(self):
self.node_times = {}
self.pipeline_start = None
@hook_impl
def before_pipeline_run(self, run_params):
self.pipeline_start = time.time()
@hook_impl
def after_pipeline_run(self):
duration = time.time() - self.pipeline_start
logger.info(f"Pipeline completed in {duration:.2f}s")
@hook_impl
def before_node_run(self, node):
self.node_times[node.name] = time.time()
@hook_impl
def after_node_run(self, node):
duration = time.time() - self.node_times[node.name]
logger.info(f"Node {node.name} took {duration:.2f}s")class ValidationHooks:
@hook_impl
def after_dataset_loaded(self, dataset_name, data):
"""Validate data after loading."""
if hasattr(data, '__len__') and len(data) == 0:
logger.warning(f"Empty dataset: {dataset_name}")
@hook_impl
def before_dataset_saved(self, dataset_name, data):
"""Validate data before saving."""
if data is None:
raise ValueError(f"Cannot save None to {dataset_name}")class NotificationHooks:
@hook_impl
def on_node_error(self, error, node):
"""Send notification on node failure."""
send_alert(f"Node {node.name} failed: {error}")
@hook_impl
def on_pipeline_error(self, error):
"""Send notification on pipeline failure."""
send_alert(f"Pipeline failed: {error}")class MetricsHooks:
def __init__(self):
self.metrics = {
"nodes_run": 0,
"nodes_failed": 0,
"datasets_loaded": 0,
"datasets_saved": 0
}
@hook_impl
def after_node_run(self, node):
self.metrics["nodes_run"] += 1
@hook_impl
def on_node_error(self, error, node):
self.metrics["nodes_failed"] += 1
@hook_impl
def after_dataset_loaded(self, dataset_name):
self.metrics["datasets_loaded"] += 1
@hook_impl
def after_dataset_saved(self, dataset_name):
self.metrics["datasets_saved"] += 1
@hook_impl
def after_pipeline_run(self):
logger.info(f"Pipeline metrics: {self.metrics}")class DynamicCatalogHooks:
@hook_impl
def after_catalog_created(self, catalog):
"""Add dynamic datasets to catalog."""
from kedro.io import MemoryDataset
import datetime
# Add timestamp dataset
catalog.add("run_timestamp", MemoryDataset(datetime.datetime.now()))
# Add metrics placeholder
catalog.add("pipeline_metrics", MemoryDataset())# settings.py
from my_project.hooks import (
TimingHooks,
ValidationHooks,
NotificationHooks,
MetricsHooks
)
HOOKS = (
TimingHooks(),
ValidationHooks(),
NotificationHooks(),
MetricsHooks()
)Hooks execute in registration order:
class Hook1:
@hook_impl
def after_node_run(self, node):
print("Hook 1")
class Hook2:
@hook_impl
def after_node_run(self, node):
print("Hook 2")
# settings.py
HOOKS = (Hook1(), Hook2())
# Output when node runs:
# Hook 1
# Hook 2# ✅ Good: Fast logging
@hook_impl
def after_node_run(self, node):
logger.info(f"Node {node.name} completed")
# ❌ Bad: Heavy computation
@hook_impl
def after_node_run(self, node, outputs):
# Don't do expensive operations in hooks
expensive_analysis(outputs)@hook_impl
def after_dataset_saved(self, dataset_name):
try:
send_notification(dataset_name)
except Exception as e:
# Log error but don't fail the pipeline
logger.warning(f"Failed to send notification: {e}")from kedro.pipeline import Node
from kedro.io import DataCatalog
@hook_impl
def before_node_run(
self,
node: Node,
catalog: DataCatalog
) -> None:
"""Type hints improve clarity and IDE support."""
pass# monitoring_hooks.py
class MonitoringHooks:
"""Hooks for performance monitoring and metrics."""
pass
# validation_hooks.py
class ValidationHooks:
"""Hooks for data quality validation."""
pass
# notification_hooks.py
class NotificationHooks:
"""Hooks for alerting and notifications."""
passSee also: