tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Execute pipelines in different modes: sequentially, or in parallel using multiprocessing or threading.
from kedro.runner import SequentialRunner, ParallelRunner, ThreadRunner, AbstractRunner
from kedro.io import DataCatalog, CatalogProtocol, SharedMemoryCatalogProtocolclass SequentialRunner:
"""Run pipeline nodes sequentially in topological order."""
def __init__(self, is_async: bool = False):
"""Initialize SequentialRunner."""
def run(
self,
pipeline: Pipeline,
catalog: CatalogProtocol | SharedMemoryCatalogProtocol,
hook_manager: PluginManager | None = None,
run_id: str | None = None,
only_missing_outputs: bool = False
) -> dict[str, Any]:
"""
Run the pipeline sequentially.
Parameters:
- pipeline: Pipeline to execute
- catalog: DataCatalog (or any CatalogProtocol/SharedMemoryCatalogProtocol implementation) for data I/O
- hook_manager: Optional PluginManager instance for executing hooks (before_node_run, after_node_run, etc.)
- run_id: Unique identifier for this run (used in logging and session tracking)
- only_missing_outputs: Only run nodes with missing outputs
Returns:
Dictionary mapping output dataset names to AbstractDataset instances.
The returned datasets are typically MemoryDataset instances containing
the node outputs. IMPORTANT: Call .load() on each dataset to retrieve
the actual data values.
Example:
>>> outputs = runner.run(pipeline, catalog)
>>> # outputs is dict[str, AbstractDataset], not dict[str, actual_data]
>>> result_data = outputs["my_output"].load() # Extract actual data
"""class ParallelRunner:
"""Run pipeline nodes in parallel using multiprocessing."""
def __init__(
self,
max_workers: int | None = None,
is_async: bool = False
):
"""
Initialize ParallelRunner.
Parameters:
- max_workers: Maximum number of parallel processes (defaults to CPU count)
- is_async: Whether to run asynchronously
"""
def run(
self,
pipeline: Pipeline,
catalog: SharedMemoryCatalogProtocol,
hook_manager: PluginManager | None = None,
run_id: str | None = None,
only_missing_outputs: bool = False
) -> dict[str, Any]:
"""
Run pipeline in parallel using multiprocessing.
Parameters:
- pipeline: Pipeline to execute
- catalog: MUST be SharedMemoryDataCatalog (or implement SharedMemoryCatalogProtocol)
Regular DataCatalog will fail - use SharedMemoryDataCatalog for parallel execution
- hook_manager: Optional PluginManager instance for executing hooks
- run_id: Unique identifier for this run
- only_missing_outputs: Only run nodes with missing outputs
Returns:
Dictionary mapping output dataset names to AbstractDataset instances.
The returned datasets are typically MemoryDataset instances containing
the node outputs. IMPORTANT: Call .load() on each dataset to retrieve
the actual data values.
Example:
>>> outputs = runner.run(pipeline, catalog)
>>> result_data = outputs["my_output"].load() # Extract actual data
Note:
- ParallelRunner automatically converts DataCatalog to SharedMemoryDataCatalog
internally if needed - manual conversion is NOT required
- Requires catalog datasets to be serializable/picklable for multiprocessing
- Nodes run in parallel when dependencies allow
- Catalog must support SharedMemoryCatalogProtocol (validate_catalog(), set_manager_datasets())
"""class ThreadRunner:
"""Run pipeline nodes in parallel using threading."""
def __init__(
self,
max_workers: int | None = None,
is_async: bool = False
):
"""
Initialize ThreadRunner.
Parameters:
- max_workers: Maximum number of parallel threads
- is_async: Whether to run asynchronously
"""
def run(
self,
pipeline: Pipeline,
catalog: CatalogProtocol | SharedMemoryCatalogProtocol,
hook_manager: PluginManager | None = None,
run_id: str | None = None,
only_missing_outputs: bool = False
) -> dict[str, Any]:
"""
Run pipeline in parallel using threading.
Parameters:
- pipeline: Pipeline to execute
- catalog: DataCatalog (or any CatalogProtocol/SharedMemoryCatalogProtocol implementation) for data I/O
- hook_manager: Optional PluginManager instance for executing hooks
- run_id: Unique identifier for this run
- only_missing_outputs: Only run nodes with missing outputs
Returns:
Dictionary mapping output dataset names to AbstractDataset instances.
The returned datasets are typically MemoryDataset instances containing
the node outputs. IMPORTANT: Call .load() on each dataset to retrieve
the actual data values.
Example:
>>> outputs = runner.run(pipeline, catalog)
>>> result_data = outputs["my_output"].load() # Extract actual data
Note:
- Nodes run in parallel when dependencies allow
- Shares memory space (no pickling required)
- Subject to Python GIL for CPU-bound operations
- Best for I/O-bound workloads
"""| Runner | Use For | Advantages | Limitations |
|---|---|---|---|
| SequentialRunner | Debugging, simple workflows | Predictable, easy to debug | No parallelism |
| ParallelRunner | CPU-bound workloads | True parallelism, bypasses GIL | Requires picklable data |
| ThreadRunner | I/O-bound workloads | Low overhead, shared memory | Subject to GIL |
from kedro.runner import SequentialRunner
runner = SequentialRunner()
outputs = runner.run(pipeline, catalog)
# Note: outputs is a dict of dataset names to Dataset objects
# Use .load() to access the actual data
result = outputs["my_output"].load()from kedro.runner import ParallelRunner
runner = ParallelRunner(max_workers=4)
outputs = runner.run(pipeline, catalog)
# Access data from dataset objects
result = outputs["my_output"].load()from kedro.runner import ThreadRunner
runner = ThreadRunner(max_workers=8)
outputs = runner.run(pipeline, catalog)
# Access data from dataset objects
result = outputs["my_output"].load()# Only run nodes whose outputs don't exist
runner = SequentialRunner()
outputs = runner.run(pipeline, catalog, only_missing_outputs=True)
# Access results using .load()
result = outputs["my_output"].load()See also: