tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Detailed runner implementation classes for custom runner development.
from kedro.runner import AbstractRunner
from concurrent.futures import ProcessPoolExecutor
from multiprocessing.managers import SyncManagerclass AbstractRunner:
"""
Abstract base class for all Pipeline runner implementations.
Defines the interface and common behavior for pipeline execution.
"""
def run(
self,
pipeline: Pipeline,
catalog: DataCatalog,
hook_manager: Any | None = None,
run_id: str | None = None,
only_missing_outputs: bool = False
) -> dict[str, Any]:
"""
Run the pipeline.
Parameters:
- pipeline: Pipeline to execute
- catalog: DataCatalog for data I/O operations
- hook_manager: Optional hook manager for lifecycle callbacks
- run_id: Unique identifier for tracking this run
- only_missing_outputs: If True, only run nodes whose outputs don't
already exist in the catalog
Returns:
Dictionary mapping output dataset names to MemoryDataset instances
Note:
The only_missing_outputs parameter enables incremental pipeline execution
by filtering out nodes whose persistent outputs already exist in the
catalog. The runner uses reverse topological order to determine which
nodes need to run based on:
1. Nodes with no outputs always run
2. Nodes with missing persistent outputs must run
3. Nodes producing ephemeral/missing data for downstream nodes must run
"""
def _run(
self,
pipeline: Pipeline,
catalog: DataCatalog,
hook_manager: Any | None = None,
run_id: str | None = None
) -> None:
"""
Internal method implementing the run logic.
Must be implemented by subclasses.
Parameters:
- pipeline: Pipeline to execute
- catalog: DataCatalog for data I/O
- hook_manager: Optional hook manager
- run_id: Unique run identifier
"""
def _filter_pipeline_for_missing_outputs(
self,
pipeline: Pipeline,
catalog: DataCatalog
) -> Pipeline:
"""
Filter pipeline to only include nodes with missing outputs.
Uses reverse topological traversal to determine which nodes are needed.
Starts from outputs and works backwards, including nodes if:
- They have no outputs (side effects)
- Their persistent outputs don't exist
- Downstream nodes need their ephemeral outputs
Parameters:
- pipeline: Pipeline to filter
- catalog: DataCatalog to check for existing outputs
Returns:
Filtered pipeline with only necessary nodes
"""class ParallelRunnerManager:
"""
Manager for coordinating parallel runner execution.
Handles process pool management, data sharing via multiprocessing,
and synchronization of catalog state across processes.
Note:
This is an internal implementation detail of ParallelRunner.
Users typically don't interact with this class directly.
"""
def __init__(self, max_workers: int | None = None):
"""
Initialize ParallelRunnerManager.
Parameters:
- max_workers: Maximum number of worker processes
(defaults to CPU count if None)
"""
def _init_manager(self) -> SyncManager:
"""
Initialize multiprocessing SyncManager.
Returns:
Started SyncManager instance for sharing data across processes
"""
def _create_process_pool(self) -> ProcessPoolExecutor:
"""
Create process pool for parallel execution.
Returns:
ProcessPoolExecutor with configured max_workers
"""
def _sync_catalog(
self,
catalog: DataCatalog,
manager: SyncManager
) -> SharedMemoryDataCatalog:
"""
Synchronize catalog for multiprocessing.
Converts regular DataCatalog to SharedMemoryDataCatalog and
sets up shared memory for datasets.
Parameters:
- catalog: Original DataCatalog
- manager: SyncManager for shared memory
Returns:
SharedMemoryDataCatalog ready for parallel execution
"""
def run_nodes_in_parallel(
self,
nodes: list[Node],
catalog: SharedMemoryDataCatalog,
run_id: str
) -> None:
"""
Execute nodes in parallel across worker processes.
Parameters:
- nodes: List of nodes to execute
- catalog: Shared memory catalog
- run_id: Unique run identifier
"""class Task:
"""
Represents a single task wrapping node execution with catalog and hooks.
Coordinates node execution, data loading/saving, and hook callbacks.
Used internally by runners to execute pipeline nodes with proper
resource management and error handling.
"""
def __init__(
self,
node: Node,
catalog: DataCatalog,
hook_manager: Any | None = None,
is_async: bool = False,
run_id: str | None = None,
parallel: bool = False
):
"""
Initialize Task.
Parameters:
- node: Node to execute
- catalog: DataCatalog for loading inputs and saving outputs
- hook_manager: Optional hook manager for lifecycle callbacks
- is_async: Whether task runs asynchronously
- run_id: Unique run identifier for tracking
- parallel: Whether task is running in parallel mode
"""
def execute(self) -> Node:
"""
Execute the task (run the node with its inputs/outputs).
Process:
1. Load node inputs from catalog
2. Call before_node_run hooks
3. Execute node function
4. Save node outputs to catalog
5. Call after_node_run hooks
6. Handle errors with on_node_error hooks
Returns:
The node that was executed
Raises:
TaskError: If task execution fails (wraps underlying exceptions)
"""
def __call__(self) -> Node:
"""
Make the Task instance callable for ProcessPoolExecutor.
This allows Task instances to be submitted directly to process pools
for parallel execution. Internally calls execute().
Returns:
The node that was executed
"""
@property
def node(self) -> Node:
"""
Get the node being executed.
Returns:
Node instance
"""class TaskError(Exception):
"""
Raised for task execution failures during pipeline node execution.
Wraps underlying exceptions that occur during task execution in parallel runners.
"""from kedro.runner import AbstractRunner
from kedro.runner.runner import Task
class CustomRunner(AbstractRunner):
"""Custom runner with specific execution logic."""
def _run(self, pipeline, catalog, hook_manager=None, run_id=None):
"""Implement custom execution strategy."""
# Get nodes in execution order
nodes = pipeline.nodes
# Execute each node
for node in nodes:
task = Task(
node=node,
catalog=catalog,
hook_manager=hook_manager,
run_id=run_id
)
task.execute()from kedro.runner import SequentialRunner
runner = SequentialRunner()
# Enable incremental execution
outputs = runner.run(
pipeline,
catalog,
only_missing_outputs=True
)
# Runner internally calls _filter_pipeline_for_missing_outputs
# to determine which nodes to run based on existing outputsfrom kedro.runner.runner import Task, TaskError
task = Task(
node=my_node,
catalog=catalog,
run_id="run_001"
)
try:
result_node = task.execute()
print(f"Task completed: {result_node.name}")
except TaskError as e:
print(f"Task failed: {e.node_name}")
print(f"Original error: {e.original_exception}")from kedro.runner import ParallelRunner
runner = ParallelRunner(max_workers=4)
# Internally, ParallelRunner:
# 1. Creates ParallelRunnerManager
# 2. Initializes SyncManager for shared memory
# 3. Converts DataCatalog to SharedMemoryDataCatalog
# 4. Creates process pool
# 5. Executes nodes in parallel groups
# 6. Synchronizes results back to main processfrom kedro.runner.runner import Task
class MonitoredTask(Task):
"""Task with monitoring."""
def execute(self):
"""Execute with timing."""
import time
start = time.time()
try:
result = super().execute()
duration = time.time() - start
print(f"Task {self.node.name} took {duration:.2f}s")
return result
except Exception as e:
duration = time.time() - start
print(f"Task {self.node.name} failed after {duration:.2f}s")
raiseSee also: