or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

hooks-and-plugins.mddocs/guides/

Hooks and Plugins Guide

Implement lifecycle hooks to extend Kedro's behavior without modifying core code.

Basic Hook Implementation

Step 1: Create Hook Class

from kedro.framework.hooks import hook_impl
import logging

logger = logging.getLogger(__name__)

class ProjectHooks:
    """Collection of project-wide hooks."""

    @hook_impl
    def after_node_run(self, node, outputs):
        logger.info(f"Node {node.name} completed")
        logger.info(f"Outputs: {list(outputs.keys())}")

Step 2: Register Hooks

# settings.py
from my_project.hooks import ProjectHooks

HOOKS = (
    ProjectHooks(),
)

Available Hook Points

Node Hooks

class NodeHooks:
    @hook_impl
    def before_node_run(self, node, catalog, inputs):
        """Called before a node runs."""
        logger.info(f"Starting node: {node.name}")

    @hook_impl
    def after_node_run(self, node, outputs):
        """Called after a node successfully runs."""
        logger.info(f"Completed node: {node.name}")

    @hook_impl
    def on_node_error(self, error, node):
        """Called when a node execution fails."""
        logger.error(f"Node {node.name} failed: {error}")

Pipeline Hooks

class PipelineHooks:
    @hook_impl
    def before_pipeline_run(self, pipeline, catalog):
        """Called before a pipeline runs."""
        logger.info(f"Starting pipeline with {len(pipeline.nodes)} nodes")

    @hook_impl
    def after_pipeline_run(self, pipeline):
        """Called after a pipeline successfully completes."""
        logger.info("Pipeline completed successfully")

    @hook_impl
    def on_pipeline_error(self, error, pipeline):
        """Called when a pipeline execution fails."""
        logger.error(f"Pipeline failed: {error}")

Dataset Hooks

class DatasetHooks:
    @hook_impl
    def before_dataset_loaded(self, dataset_name):
        """Called before loading a dataset."""
        logger.info(f"Loading dataset: {dataset_name}")

    @hook_impl
    def after_dataset_loaded(self, dataset_name, data):
        """Called after loading a dataset."""
        if hasattr(data, '__len__'):
            logger.info(f"Loaded {dataset_name}: {len(data)} records")

    @hook_impl
    def before_dataset_saved(self, dataset_name, data):
        """Called before saving a dataset."""
        logger.info(f"Saving dataset: {dataset_name}")

    @hook_impl
    def after_dataset_saved(self, dataset_name):
        """Called after saving a dataset."""
        logger.info(f"Saved dataset: {dataset_name}")

Catalog Hooks

class CatalogHooks:
    @hook_impl
    def after_catalog_created(self, catalog):
        """Called after the DataCatalog is created."""
        logger.info(f"Catalog created with {len(catalog.filter())} datasets")

        # Can modify catalog using dict-like assignment
        from kedro.io import MemoryDataset
        catalog["runtime_metrics"] = MemoryDataset()

Common Hook Patterns

Pattern: Performance Monitoring

import time

class TimingHooks:
    def __init__(self):
        self.node_times = {}
        self.pipeline_start = None

    @hook_impl
    def before_pipeline_run(self, run_params):
        self.pipeline_start = time.time()

    @hook_impl
    def after_pipeline_run(self):
        duration = time.time() - self.pipeline_start
        logger.info(f"Pipeline completed in {duration:.2f}s")

    @hook_impl
    def before_node_run(self, node):
        self.node_times[node.name] = time.time()

    @hook_impl
    def after_node_run(self, node):
        duration = time.time() - self.node_times[node.name]
        logger.info(f"Node {node.name} took {duration:.2f}s")

Pattern: Data Validation

class ValidationHooks:
    @hook_impl
    def after_dataset_loaded(self, dataset_name, data):
        """Validate data after loading."""
        if hasattr(data, '__len__') and len(data) == 0:
            logger.warning(f"Empty dataset: {dataset_name}")

    @hook_impl
    def before_dataset_saved(self, dataset_name, data):
        """Validate data before saving."""
        if data is None:
            raise ValueError(f"Cannot save None to {dataset_name}")

Pattern: Error Notifications

class NotificationHooks:
    @hook_impl
    def on_node_error(self, error, node):
        """Send notification on node failure."""
        send_alert(f"Node {node.name} failed: {error}")

    @hook_impl
    def on_pipeline_error(self, error):
        """Send notification on pipeline failure."""
        send_alert(f"Pipeline failed: {error}")

Pattern: Metrics Collection

class MetricsHooks:
    def __init__(self):
        self.metrics = {
            "nodes_run": 0,
            "nodes_failed": 0,
            "datasets_loaded": 0,
            "datasets_saved": 0
        }

    @hook_impl
    def after_node_run(self, node):
        self.metrics["nodes_run"] += 1

    @hook_impl
    def on_node_error(self, error, node):
        self.metrics["nodes_failed"] += 1

    @hook_impl
    def after_dataset_loaded(self, dataset_name):
        self.metrics["datasets_loaded"] += 1

    @hook_impl
    def after_dataset_saved(self, dataset_name):
        self.metrics["datasets_saved"] += 1

    @hook_impl
    def after_pipeline_run(self):
        logger.info(f"Pipeline metrics: {self.metrics}")

Pattern: Dynamic Catalog Modification

class DynamicCatalogHooks:
    @hook_impl
    def after_catalog_created(self, catalog):
        """Add dynamic datasets to catalog."""
        from kedro.io import MemoryDataset
        import datetime

        # Add timestamp dataset
        catalog.add("run_timestamp", MemoryDataset(datetime.datetime.now()))

        # Add metrics placeholder
        catalog.add("pipeline_metrics", MemoryDataset())

Multiple Hook Classes

# settings.py
from my_project.hooks import (
    TimingHooks,
    ValidationHooks,
    NotificationHooks,
    MetricsHooks
)

HOOKS = (
    TimingHooks(),
    ValidationHooks(),
    NotificationHooks(),
    MetricsHooks()
)

Hook Execution Order

Hooks execute in registration order:

class Hook1:
    @hook_impl
    def after_node_run(self, node):
        print("Hook 1")

class Hook2:
    @hook_impl
    def after_node_run(self, node):
        print("Hook 2")

# settings.py
HOOKS = (Hook1(), Hook2())

# Output when node runs:
# Hook 1
# Hook 2

Best Practices

1. Keep Hooks Lightweight

# ✅ Good: Fast logging
@hook_impl
def after_node_run(self, node):
    logger.info(f"Node {node.name} completed")

# ❌ Bad: Heavy computation
@hook_impl
def after_node_run(self, node, outputs):
    # Don't do expensive operations in hooks
    expensive_analysis(outputs)

2. Handle Errors Gracefully

@hook_impl
def after_dataset_saved(self, dataset_name):
    try:
        send_notification(dataset_name)
    except Exception as e:
        # Log error but don't fail the pipeline
        logger.warning(f"Failed to send notification: {e}")

3. Use Type Hints

from kedro.pipeline import Node
from kedro.io import DataCatalog

@hook_impl
def before_node_run(
    self,
    node: Node,
    catalog: DataCatalog
) -> None:
    """Type hints improve clarity and IDE support."""
    pass

4. Organize by Concern

# monitoring_hooks.py
class MonitoringHooks:
    """Hooks for performance monitoring and metrics."""
    pass

# validation_hooks.py
class ValidationHooks:
    """Hooks for data quality validation."""
    pass

# notification_hooks.py
class NotificationHooks:
    """Hooks for alerting and notifications."""
    pass

See also:

  • Hooks API Reference - Complete API documentation