or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

api

configuration.mddata-catalog-advanced.mddata-catalog.mdhooks.mdpipeline.mdrunners-advanced.mdrunners.md
index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

hooks.mddocs/api/

Hooks API Reference

Extend Kedro's behavior through lifecycle hooks that execute at key moments during pipeline execution.

Imports

from kedro.framework.hooks import hook_impl
from kedro.framework.hooks.manager import _create_hook_manager
from kedro.framework.hooks.markers import HOOK_NAMESPACE, hook_spec
from kedro.io import CatalogProtocol

# Hook specification classes (for type checking and documentation)
from kedro.framework.hooks.specs import (
    DataCatalogSpecs,
    NodeSpecs,
    PipelineSpecs,
    DatasetSpecs,
    KedroContextSpecs
)

Hook Decorator

def hook_impl(func: Callable) -> Callable:
    """
    Decorator to mark methods as hook implementations.

    Parameters:
    - func: Method to mark as hook implementation

    Returns:
    Decorated function registered as hook implementation

    Example:
    >>> from kedro.framework.hooks import hook_impl
    >>> class MyHooks:
    ...     @hook_impl
    ...     def after_node_run(self, node, outputs):
    ...         print(f"Node {node.name} completed")
    """

Hook Constants

HOOK_NAMESPACE: str  # "kedro" - Namespace identifier for Kedro hooks

hook_spec: HookspecMarker  # Decorator for marking hook specifications
    """
    Marker for hook specification methods.
    Used internally to define hook specifications in hook spec classes.

    Example:
    >>> from kedro.framework.hooks.markers import hook_spec
    >>> class MyHookSpecs:
    ...     @hook_spec
    ...     def my_hook(self, arg1, arg2):
    ...         pass
    """

Hook Manager

def _create_hook_manager() -> PluginManager:
    """
    Create a hook manager for managing and executing hooks.

    Returns:
    PluginManager instance configured for Kedro hooks

    Note:
    This is typically called internally by KedroSession.
    Users register hooks via settings.py HOOKS tuple.
    """

Hook Specifications

Hook specification classes define the hook points available in Kedro's execution lifecycle. These classes use the @hook_spec decorator and define the method signatures that hook implementations must follow. Users typically don't need to import or inherit from these classes directly - they simply implement methods with matching signatures and decorate them with @hook_impl.

Data Catalog Hooks

class DataCatalogSpecs:
    """
    Hook specifications for DataCatalog lifecycle events.
    Defines the after_catalog_created hook point.
    """

    @hook_spec
    def after_catalog_created(
        self,
        catalog: CatalogProtocol,
        conf_catalog: dict[str, Any],
        conf_creds: dict[str, Any],
        parameters: dict[str, Any],
        save_version: str,
        load_versions: dict[str, str]
    ) -> None:
        """
        Hook called after the DataCatalog is created.

        Allows modification or inspection of the catalog after instantiation
        but before it's used by the pipeline.

        Parameters:
        - catalog: The created catalog instance (implements CatalogProtocol)
        - conf_catalog: Catalog configuration dictionary
        - conf_creds: Credentials configuration dictionary
        - parameters: Parameters dictionary added to catalog after creation
        - save_version: Version string for saving datasets
        - load_versions: Dictionary mapping dataset names to load versions

        Example:
        >>> @hook_impl
        >>> def after_catalog_created(self, catalog, **kwargs):
        ...     # Add runtime data to catalog
        ...     catalog["runtime_data"] = MemoryDataset()
        """

Node Hooks

class NodeSpecs:
    """
    Hook specifications for Node execution lifecycle events.
    Defines before_node_run, after_node_run, and on_node_error hook points.
    """

    @hook_spec
    def before_node_run(
        self,
        node: Node,
        catalog: CatalogProtocol,
        inputs: dict[str, Any],
        is_async: bool,
        run_id: str
    ) -> dict[str, Any] | None:
        """
        Hook called before a node runs.

        Parameters:
        - node: Node instance about to be executed
        - catalog: Catalog instance for the run (implements CatalogProtocol)
        - inputs: Dictionary mapping dataset names to loaded input data
        - is_async: Whether running asynchronously
        - run_id: Unique run identifier

        Returns:
        Optional dictionary mapping dataset names to new values.
        If returned, these values will UPDATE the node inputs (shallow merge).
        Only the returned keys are modified; other inputs remain unchanged.

        Input Modification Behavior:
        - Return None or {} to leave inputs unchanged
        - Return {"dataset1": new_value} to override just dataset1
        - Returned values must match expected input types for the node function
        - Changes apply only to this specific node execution

        WARNING: Modifying inputs can make pipeline behavior non-deterministic
        and harder to debug. Use sparingly for cross-cutting concerns like:
        - Adding timestamps or run metadata
        - Injecting credentials or configuration
        - Data sanitization or validation

        Examples:
        >>> @hook_impl
        >>> def before_node_run(self, node, inputs, **kwargs):
        ...     print(f"Starting node: {node.name}")
        ...     # Example 1: Add metadata
        ...     if "params:metadata" in inputs:
        ...         return {"params:metadata": {**inputs["params:metadata"], "run_time": time.time()}}
        ...
        >>> @hook_impl
        >>> def before_node_run(self, node, inputs, **kwargs):
        ...     # Example 2: Inject credentials
        ...     if node.name == "fetch_api_data":
        ...         return {"credentials": load_secure_credentials()}
        """

    @hook_spec
    def after_node_run(
        self,
        node: Node,
        catalog: CatalogProtocol,
        inputs: dict[str, Any],
        outputs: dict[str, Any],
        is_async: bool,
        run_id: str
    ) -> None:
        """
        Hook called after a node successfully runs.

        Parameters:
        - node: Node instance that was executed
        - catalog: Catalog instance for the run (implements CatalogProtocol)
        - inputs: Dictionary mapping dataset names to loaded input data
        - outputs: Dictionary mapping dataset names to computed output data
        - is_async: Whether running asynchronously
        - run_id: Unique run identifier

        Example:
        >>> @hook_impl
        >>> def after_node_run(self, node, outputs, **kwargs):
        ...     print(f"Completed node: {node.name}")
        """

    @hook_spec
    def on_node_error(
        self,
        error: Exception,
        node: Node,
        catalog: CatalogProtocol,
        inputs: dict[str, Any],
        is_async: bool,
        run_id: str
    ) -> None:
        """
        Hook called when a node execution fails.

        Parameters:
        - error: Exception that was raised during node execution
        - node: Node instance that failed
        - catalog: Catalog instance for the run (implements CatalogProtocol)
        - inputs: Dictionary mapping dataset names to loaded input data
        - is_async: Whether running asynchronously
        - run_id: Unique run identifier

        Example:
        >>> @hook_impl
        >>> def on_node_error(self, error, node, **kwargs):
        ...     print(f"Node {node.name} failed: {error}")
        """

Pipeline Hooks

class PipelineSpecs:
    """
    Hook specifications for Pipeline execution lifecycle events.
    Defines before_pipeline_run, after_pipeline_run, and on_pipeline_error hook points.

    These hooks are invoked by KedroSession when running pipelines through
    session.run(). They provide pipeline-level lifecycle tracking for monitoring,
    logging, and integration with external systems.

    Note: Pipeline-level hooks are called once per pipeline execution, while
    node-level hooks are called for each individual node. Use node-level hooks
    (before_node_run, after_node_run, on_node_error) for fine-grained tracking
    of individual computation steps.
    """

    @hook_spec
    def before_pipeline_run(
        self,
        run_params: dict[str, Any],
        pipeline: Pipeline,
        catalog: CatalogProtocol
    ) -> None:
        """
        Hook called before a pipeline runs.

        Parameters:
        - run_params: Runtime parameters dictionary with keys:
          * run_id: str - Unique run identifier
          * project_path: str - Path to Kedro project
          * env: str - Environment name
          * kedro_version: str - Kedro version
          * tags: Optional[list[str]] - Pipeline tags filter
          * from_nodes: Optional[list[str]] - Starting node names
          * to_nodes: Optional[list[str]] - Ending node names
          * node_names: Optional[list[str]] - Specific nodes to run
          * from_inputs: Optional[list[str]] - Starting dataset names
          * to_outputs: Optional[list[str]] - Ending dataset names
          * load_versions: Optional[dict[str, str]] - Dataset version overrides
          * runtime_params: Optional[dict[str, Any]] - Additional runtime params
          * pipeline_name: str - Name of the pipeline
          * namespace: Optional[str] - Pipeline namespace
          * runner: str - Runner class name
        - pipeline: Pipeline instance to be executed
        - catalog: Catalog instance for the run (implements CatalogProtocol)

        Example:
        >>> @hook_impl
        >>> def before_pipeline_run(self, run_params, pipeline, **kwargs):
        ...     print(f"Starting {run_params['pipeline_name']} with {len(pipeline.nodes)} nodes")
        """

    @hook_spec
    def after_pipeline_run(
        self,
        run_params: dict[str, Any],
        run_result: dict[str, Any],
        pipeline: Pipeline,
        catalog: CatalogProtocol
    ) -> None:
        """
        Hook called after a pipeline successfully completes.

        Parameters:
        - run_params: Runtime parameters dictionary (see before_pipeline_run for schema)
        - run_result: Dictionary containing pipeline execution results
        - pipeline: Pipeline instance that was executed
        - catalog: Catalog instance used during the run (implements CatalogProtocol)

        Example:
        >>> @hook_impl
        >>> def after_pipeline_run(self, run_params, pipeline, **kwargs):
        ...     print(f"Pipeline {run_params['pipeline_name']} completed successfully")
        """

    @hook_spec
    def on_pipeline_error(
        self,
        error: Exception,
        run_params: dict[str, Any],
        pipeline: Pipeline,
        catalog: CatalogProtocol
    ) -> None:
        """
        Hook called when a pipeline execution fails.

        Parameters:
        - error: Exception that was raised during pipeline execution
        - run_params: Runtime parameters dictionary (see before_pipeline_run for schema)
        - pipeline: Pipeline instance that failed
        - catalog: Catalog instance used during the run (implements CatalogProtocol)

        Example:
        >>> @hook_impl
        >>> def on_pipeline_error(self, error, run_params, pipeline, **kwargs):
        ...     print(f"Pipeline {run_params['pipeline_name']} failed: {error}")
        """

Dataset Hooks

class DatasetSpecs:
    """
    Hook specifications for Dataset I/O lifecycle events.
    Defines hooks for dataset load and save operations.
    """

    @hook_spec
    def before_dataset_loaded(self, dataset_name: str, node: Node) -> None:
        """
        Hook called before loading a dataset from the catalog.

        Parameters:
        - dataset_name: Name of dataset to be loaded
        - node: Node requesting the load

        Example:
        >>> @hook_impl
        >>> def before_dataset_loaded(self, dataset_name, node, **kwargs):
        ...     print(f"Loading dataset: {dataset_name} for node: {node.name}")
        """

    @hook_spec
    def after_dataset_loaded(
        self,
        dataset_name: str,
        data: Any,
        node: Node
    ) -> None:
        """
        Hook called after loading a dataset from the catalog.

        Parameters:
        - dataset_name: Name of loaded dataset
        - data: The actual loaded data (not the dataset instance)
        - node: Node that requested the load

        Example:
        >>> @hook_impl
        >>> def after_dataset_loaded(self, dataset_name, data, **kwargs):
        ...     print(f"Loaded dataset: {dataset_name}, type: {type(data).__name__}")
        """

    @hook_spec
    def before_dataset_saved(self, dataset_name: str, data: Any, node: Node) -> None:
        """
        Hook called before saving a dataset to the catalog.

        Parameters:
        - dataset_name: Name of dataset to be saved
        - data: The actual data to be saved (not the dataset instance)
        - node: Node producing the output

        Example:
        >>> @hook_impl
        >>> def before_dataset_saved(self, dataset_name, data, node, **kwargs):
        ...     print(f"Saving dataset: {dataset_name} from node: {node.name}")
        """

    @hook_spec
    def after_dataset_saved(self, dataset_name: str, data: Any, node: Node) -> None:
        """
        Hook called after saving a dataset to the catalog.

        Parameters:
        - dataset_name: Name of saved dataset
        - data: The actual data that was saved (not the dataset instance)
        - node: Node that produced the output

        Example:
        >>> @hook_impl
        >>> def after_dataset_saved(self, dataset_name, data, **kwargs):
        ...     print(f"Saved dataset: {dataset_name}")
        """

Context Hooks

class KedroContextSpecs:
    """
    Hook specifications for KedroContext lifecycle events.
    """

    @hook_spec
    def after_context_created(
        self,
        context: KedroContext
    ) -> None:
        """
        Hook called after KedroContext is created.

        Parameters:
        - context: The created KedroContext instance

        Example:
        >>> @hook_impl
        >>> def after_context_created(self, context):
        ...     print(f"Context created for env: {context.env}")
        """

Hook Registration

Register hooks in settings.py:

# settings.py
from my_project.hooks import MyHooks, MonitoringHooks

HOOKS = (
    MyHooks(),
    MonitoringHooks(),
)

Usage Examples

Node Timing Hooks

from kedro.framework.hooks import hook_impl
import time

class TimingHooks:
    def __init__(self):
        self.node_times = {}

    @hook_impl
    def before_node_run(self, node, **kwargs):
        self.node_times[node.name] = time.time()

    @hook_impl
    def after_node_run(self, node, **kwargs):
        duration = time.time() - self.node_times[node.name]
        print(f"Node {node.name} took {duration:.2f}s")

Dataset Logging Hooks

from kedro.framework.hooks import hook_impl

class DatasetLoggingHooks:
    @hook_impl
    def before_dataset_loaded(self, dataset_name, **kwargs):
        print(f"Loading: {dataset_name}")

    @hook_impl
    def after_dataset_saved(self, dataset_name, **kwargs):
        print(f"Saved: {dataset_name}")

Pipeline Monitoring Hooks

from kedro.framework.hooks import hook_impl

class PipelineMonitoringHooks:
    @hook_impl
    def before_pipeline_run(self, pipeline, **kwargs):
        print(f"Starting pipeline with {len(pipeline.nodes)} nodes")

    @hook_impl
    def after_pipeline_run(self, **kwargs):
        print("Pipeline completed successfully")

    @hook_impl
    def on_pipeline_error(self, error, **kwargs):
        print(f"Pipeline failed: {error}")

Programmatic Hook Usage

For testing or programmatic usage outside of a Kedro project, you can create a hook manager and register hooks directly:

from kedro.framework.hooks import hook_impl
from kedro.framework.hooks.manager import _create_hook_manager
from kedro.runner import SequentialRunner
from kedro.io import DataCatalog, MemoryDataset
from kedro.pipeline import node, pipeline

# Define hooks
class MyHooks:
    def __init__(self):
        self.calls = []

    @hook_impl
    def before_node_run(self, node, **kwargs):
        self.calls.append(f"before:{node.name}")

    @hook_impl
    def after_node_run(self, node, **kwargs):
        self.calls.append(f"after:{node.name}")

# Create hook manager and register hooks
hook_manager = _create_hook_manager()
my_hooks = MyHooks()
hook_manager.register(my_hooks)

# Use with runner
def process_data(data):
    return [x * 2 for x in data]

pipe = pipeline([
    node(process_data, "input", "output", name="process")
])

catalog = DataCatalog({
    "input": MemoryDataset([1, 2, 3]),
    "output": MemoryDataset()
})

runner = SequentialRunner()
runner.run(pipe, catalog, hook_manager=hook_manager)

# Check hook calls
print(my_hooks.calls)  # ['before:process', 'after:process']

Note: In production Kedro projects, hooks are typically registered via the HOOKS tuple in settings.py rather than programmatically.

See also:

  • Hooks and Plugins Guide - Implementation patterns
  • CLI Hooks - CLI-specific hooks