tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Extend Kedro's behavior through lifecycle hooks that execute at key moments during pipeline execution.
from kedro.framework.hooks import hook_impl
from kedro.framework.hooks.manager import _create_hook_manager
from kedro.framework.hooks.markers import HOOK_NAMESPACE, hook_spec
from kedro.io import CatalogProtocol
# Hook specification classes (for type checking and documentation)
from kedro.framework.hooks.specs import (
DataCatalogSpecs,
NodeSpecs,
PipelineSpecs,
DatasetSpecs,
KedroContextSpecs
)def hook_impl(func: Callable) -> Callable:
"""
Decorator to mark methods as hook implementations.
Parameters:
- func: Method to mark as hook implementation
Returns:
Decorated function registered as hook implementation
Example:
>>> from kedro.framework.hooks import hook_impl
>>> class MyHooks:
... @hook_impl
... def after_node_run(self, node, outputs):
... print(f"Node {node.name} completed")
"""HOOK_NAMESPACE: str # "kedro" - Namespace identifier for Kedro hooks
hook_spec: HookspecMarker # Decorator for marking hook specifications
"""
Marker for hook specification methods.
Used internally to define hook specifications in hook spec classes.
Example:
>>> from kedro.framework.hooks.markers import hook_spec
>>> class MyHookSpecs:
... @hook_spec
... def my_hook(self, arg1, arg2):
... pass
"""def _create_hook_manager() -> PluginManager:
"""
Create a hook manager for managing and executing hooks.
Returns:
PluginManager instance configured for Kedro hooks
Note:
This is typically called internally by KedroSession.
Users register hooks via settings.py HOOKS tuple.
"""Hook specification classes define the hook points available in Kedro's execution lifecycle. These classes use the @hook_spec decorator and define the method signatures that hook implementations must follow. Users typically don't need to import or inherit from these classes directly - they simply implement methods with matching signatures and decorate them with @hook_impl.
class DataCatalogSpecs:
"""
Hook specifications for DataCatalog lifecycle events.
Defines the after_catalog_created hook point.
"""
@hook_spec
def after_catalog_created(
self,
catalog: CatalogProtocol,
conf_catalog: dict[str, Any],
conf_creds: dict[str, Any],
parameters: dict[str, Any],
save_version: str,
load_versions: dict[str, str]
) -> None:
"""
Hook called after the DataCatalog is created.
Allows modification or inspection of the catalog after instantiation
but before it's used by the pipeline.
Parameters:
- catalog: The created catalog instance (implements CatalogProtocol)
- conf_catalog: Catalog configuration dictionary
- conf_creds: Credentials configuration dictionary
- parameters: Parameters dictionary added to catalog after creation
- save_version: Version string for saving datasets
- load_versions: Dictionary mapping dataset names to load versions
Example:
>>> @hook_impl
>>> def after_catalog_created(self, catalog, **kwargs):
... # Add runtime data to catalog
... catalog["runtime_data"] = MemoryDataset()
"""class NodeSpecs:
"""
Hook specifications for Node execution lifecycle events.
Defines before_node_run, after_node_run, and on_node_error hook points.
"""
@hook_spec
def before_node_run(
self,
node: Node,
catalog: CatalogProtocol,
inputs: dict[str, Any],
is_async: bool,
run_id: str
) -> dict[str, Any] | None:
"""
Hook called before a node runs.
Parameters:
- node: Node instance about to be executed
- catalog: Catalog instance for the run (implements CatalogProtocol)
- inputs: Dictionary mapping dataset names to loaded input data
- is_async: Whether running asynchronously
- run_id: Unique run identifier
Returns:
Optional dictionary mapping dataset names to new values.
If returned, these values will UPDATE the node inputs (shallow merge).
Only the returned keys are modified; other inputs remain unchanged.
Input Modification Behavior:
- Return None or {} to leave inputs unchanged
- Return {"dataset1": new_value} to override just dataset1
- Returned values must match expected input types for the node function
- Changes apply only to this specific node execution
WARNING: Modifying inputs can make pipeline behavior non-deterministic
and harder to debug. Use sparingly for cross-cutting concerns like:
- Adding timestamps or run metadata
- Injecting credentials or configuration
- Data sanitization or validation
Examples:
>>> @hook_impl
>>> def before_node_run(self, node, inputs, **kwargs):
... print(f"Starting node: {node.name}")
... # Example 1: Add metadata
... if "params:metadata" in inputs:
... return {"params:metadata": {**inputs["params:metadata"], "run_time": time.time()}}
...
>>> @hook_impl
>>> def before_node_run(self, node, inputs, **kwargs):
... # Example 2: Inject credentials
... if node.name == "fetch_api_data":
... return {"credentials": load_secure_credentials()}
"""
@hook_spec
def after_node_run(
self,
node: Node,
catalog: CatalogProtocol,
inputs: dict[str, Any],
outputs: dict[str, Any],
is_async: bool,
run_id: str
) -> None:
"""
Hook called after a node successfully runs.
Parameters:
- node: Node instance that was executed
- catalog: Catalog instance for the run (implements CatalogProtocol)
- inputs: Dictionary mapping dataset names to loaded input data
- outputs: Dictionary mapping dataset names to computed output data
- is_async: Whether running asynchronously
- run_id: Unique run identifier
Example:
>>> @hook_impl
>>> def after_node_run(self, node, outputs, **kwargs):
... print(f"Completed node: {node.name}")
"""
@hook_spec
def on_node_error(
self,
error: Exception,
node: Node,
catalog: CatalogProtocol,
inputs: dict[str, Any],
is_async: bool,
run_id: str
) -> None:
"""
Hook called when a node execution fails.
Parameters:
- error: Exception that was raised during node execution
- node: Node instance that failed
- catalog: Catalog instance for the run (implements CatalogProtocol)
- inputs: Dictionary mapping dataset names to loaded input data
- is_async: Whether running asynchronously
- run_id: Unique run identifier
Example:
>>> @hook_impl
>>> def on_node_error(self, error, node, **kwargs):
... print(f"Node {node.name} failed: {error}")
"""class PipelineSpecs:
"""
Hook specifications for Pipeline execution lifecycle events.
Defines before_pipeline_run, after_pipeline_run, and on_pipeline_error hook points.
These hooks are invoked by KedroSession when running pipelines through
session.run(). They provide pipeline-level lifecycle tracking for monitoring,
logging, and integration with external systems.
Note: Pipeline-level hooks are called once per pipeline execution, while
node-level hooks are called for each individual node. Use node-level hooks
(before_node_run, after_node_run, on_node_error) for fine-grained tracking
of individual computation steps.
"""
@hook_spec
def before_pipeline_run(
self,
run_params: dict[str, Any],
pipeline: Pipeline,
catalog: CatalogProtocol
) -> None:
"""
Hook called before a pipeline runs.
Parameters:
- run_params: Runtime parameters dictionary with keys:
* run_id: str - Unique run identifier
* project_path: str - Path to Kedro project
* env: str - Environment name
* kedro_version: str - Kedro version
* tags: Optional[list[str]] - Pipeline tags filter
* from_nodes: Optional[list[str]] - Starting node names
* to_nodes: Optional[list[str]] - Ending node names
* node_names: Optional[list[str]] - Specific nodes to run
* from_inputs: Optional[list[str]] - Starting dataset names
* to_outputs: Optional[list[str]] - Ending dataset names
* load_versions: Optional[dict[str, str]] - Dataset version overrides
* runtime_params: Optional[dict[str, Any]] - Additional runtime params
* pipeline_name: str - Name of the pipeline
* namespace: Optional[str] - Pipeline namespace
* runner: str - Runner class name
- pipeline: Pipeline instance to be executed
- catalog: Catalog instance for the run (implements CatalogProtocol)
Example:
>>> @hook_impl
>>> def before_pipeline_run(self, run_params, pipeline, **kwargs):
... print(f"Starting {run_params['pipeline_name']} with {len(pipeline.nodes)} nodes")
"""
@hook_spec
def after_pipeline_run(
self,
run_params: dict[str, Any],
run_result: dict[str, Any],
pipeline: Pipeline,
catalog: CatalogProtocol
) -> None:
"""
Hook called after a pipeline successfully completes.
Parameters:
- run_params: Runtime parameters dictionary (see before_pipeline_run for schema)
- run_result: Dictionary containing pipeline execution results
- pipeline: Pipeline instance that was executed
- catalog: Catalog instance used during the run (implements CatalogProtocol)
Example:
>>> @hook_impl
>>> def after_pipeline_run(self, run_params, pipeline, **kwargs):
... print(f"Pipeline {run_params['pipeline_name']} completed successfully")
"""
@hook_spec
def on_pipeline_error(
self,
error: Exception,
run_params: dict[str, Any],
pipeline: Pipeline,
catalog: CatalogProtocol
) -> None:
"""
Hook called when a pipeline execution fails.
Parameters:
- error: Exception that was raised during pipeline execution
- run_params: Runtime parameters dictionary (see before_pipeline_run for schema)
- pipeline: Pipeline instance that failed
- catalog: Catalog instance used during the run (implements CatalogProtocol)
Example:
>>> @hook_impl
>>> def on_pipeline_error(self, error, run_params, pipeline, **kwargs):
... print(f"Pipeline {run_params['pipeline_name']} failed: {error}")
"""class DatasetSpecs:
"""
Hook specifications for Dataset I/O lifecycle events.
Defines hooks for dataset load and save operations.
"""
@hook_spec
def before_dataset_loaded(self, dataset_name: str, node: Node) -> None:
"""
Hook called before loading a dataset from the catalog.
Parameters:
- dataset_name: Name of dataset to be loaded
- node: Node requesting the load
Example:
>>> @hook_impl
>>> def before_dataset_loaded(self, dataset_name, node, **kwargs):
... print(f"Loading dataset: {dataset_name} for node: {node.name}")
"""
@hook_spec
def after_dataset_loaded(
self,
dataset_name: str,
data: Any,
node: Node
) -> None:
"""
Hook called after loading a dataset from the catalog.
Parameters:
- dataset_name: Name of loaded dataset
- data: The actual loaded data (not the dataset instance)
- node: Node that requested the load
Example:
>>> @hook_impl
>>> def after_dataset_loaded(self, dataset_name, data, **kwargs):
... print(f"Loaded dataset: {dataset_name}, type: {type(data).__name__}")
"""
@hook_spec
def before_dataset_saved(self, dataset_name: str, data: Any, node: Node) -> None:
"""
Hook called before saving a dataset to the catalog.
Parameters:
- dataset_name: Name of dataset to be saved
- data: The actual data to be saved (not the dataset instance)
- node: Node producing the output
Example:
>>> @hook_impl
>>> def before_dataset_saved(self, dataset_name, data, node, **kwargs):
... print(f"Saving dataset: {dataset_name} from node: {node.name}")
"""
@hook_spec
def after_dataset_saved(self, dataset_name: str, data: Any, node: Node) -> None:
"""
Hook called after saving a dataset to the catalog.
Parameters:
- dataset_name: Name of saved dataset
- data: The actual data that was saved (not the dataset instance)
- node: Node that produced the output
Example:
>>> @hook_impl
>>> def after_dataset_saved(self, dataset_name, data, **kwargs):
... print(f"Saved dataset: {dataset_name}")
"""class KedroContextSpecs:
"""
Hook specifications for KedroContext lifecycle events.
"""
@hook_spec
def after_context_created(
self,
context: KedroContext
) -> None:
"""
Hook called after KedroContext is created.
Parameters:
- context: The created KedroContext instance
Example:
>>> @hook_impl
>>> def after_context_created(self, context):
... print(f"Context created for env: {context.env}")
"""Register hooks in settings.py:
# settings.py
from my_project.hooks import MyHooks, MonitoringHooks
HOOKS = (
MyHooks(),
MonitoringHooks(),
)from kedro.framework.hooks import hook_impl
import time
class TimingHooks:
def __init__(self):
self.node_times = {}
@hook_impl
def before_node_run(self, node, **kwargs):
self.node_times[node.name] = time.time()
@hook_impl
def after_node_run(self, node, **kwargs):
duration = time.time() - self.node_times[node.name]
print(f"Node {node.name} took {duration:.2f}s")from kedro.framework.hooks import hook_impl
class DatasetLoggingHooks:
@hook_impl
def before_dataset_loaded(self, dataset_name, **kwargs):
print(f"Loading: {dataset_name}")
@hook_impl
def after_dataset_saved(self, dataset_name, **kwargs):
print(f"Saved: {dataset_name}")from kedro.framework.hooks import hook_impl
class PipelineMonitoringHooks:
@hook_impl
def before_pipeline_run(self, pipeline, **kwargs):
print(f"Starting pipeline with {len(pipeline.nodes)} nodes")
@hook_impl
def after_pipeline_run(self, **kwargs):
print("Pipeline completed successfully")
@hook_impl
def on_pipeline_error(self, error, **kwargs):
print(f"Pipeline failed: {error}")For testing or programmatic usage outside of a Kedro project, you can create a hook manager and register hooks directly:
from kedro.framework.hooks import hook_impl
from kedro.framework.hooks.manager import _create_hook_manager
from kedro.runner import SequentialRunner
from kedro.io import DataCatalog, MemoryDataset
from kedro.pipeline import node, pipeline
# Define hooks
class MyHooks:
def __init__(self):
self.calls = []
@hook_impl
def before_node_run(self, node, **kwargs):
self.calls.append(f"before:{node.name}")
@hook_impl
def after_node_run(self, node, **kwargs):
self.calls.append(f"after:{node.name}")
# Create hook manager and register hooks
hook_manager = _create_hook_manager()
my_hooks = MyHooks()
hook_manager.register(my_hooks)
# Use with runner
def process_data(data):
return [x * 2 for x in data]
pipe = pipeline([
node(process_data, "input", "output", name="process")
])
catalog = DataCatalog({
"input": MemoryDataset([1, 2, 3]),
"output": MemoryDataset()
})
runner = SequentialRunner()
runner.run(pipe, catalog, hook_manager=hook_manager)
# Check hook calls
print(my_hooks.calls) # ['before:process', 'after:process']Note: In production Kedro projects, hooks are typically registered via the HOOKS tuple in settings.py rather than programmatically.
See also: