CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kedro

Kedro helps you build production-ready data and analytics pipelines

Overall
score

98%

Overview
Eval results
Files

runners.mddocs/api/

Runners API Reference

Execute pipelines in different modes: sequentially, or in parallel using multiprocessing or threading.

Module Import

from kedro.runner import SequentialRunner, ParallelRunner, ThreadRunner, AbstractRunner
from kedro.io import DataCatalog, CatalogProtocol, SharedMemoryCatalogProtocol

SequentialRunner

class SequentialRunner:
    """Run pipeline nodes sequentially in topological order."""

    def __init__(self, is_async: bool = False):
        """Initialize SequentialRunner."""

    def run(
        self,
        pipeline: Pipeline,
        catalog: CatalogProtocol | SharedMemoryCatalogProtocol,
        hook_manager: PluginManager | None = None,
        run_id: str | None = None,
        only_missing_outputs: bool = False
    ) -> dict[str, Any]:
        """
        Run the pipeline sequentially.

        Parameters:
        - pipeline: Pipeline to execute
        - catalog: DataCatalog (or any CatalogProtocol/SharedMemoryCatalogProtocol implementation) for data I/O
        - hook_manager: Optional PluginManager instance for executing hooks (before_node_run, after_node_run, etc.)
        - run_id: Unique identifier for this run (used in logging and session tracking)
        - only_missing_outputs: Only run nodes with missing outputs

        Returns:
        Dictionary mapping output dataset names to AbstractDataset instances.
        The returned datasets are typically MemoryDataset instances containing
        the node outputs. IMPORTANT: Call .load() on each dataset to retrieve
        the actual data values.

        Example:
        >>> outputs = runner.run(pipeline, catalog)
        >>> # outputs is dict[str, AbstractDataset], not dict[str, actual_data]
        >>> result_data = outputs["my_output"].load()  # Extract actual data
        """

ParallelRunner

class ParallelRunner:
    """Run pipeline nodes in parallel using multiprocessing."""

    def __init__(
        self,
        max_workers: int | None = None,
        is_async: bool = False
    ):
        """
        Initialize ParallelRunner.

        Parameters:
        - max_workers: Maximum number of parallel processes (defaults to CPU count)
        - is_async: Whether to run asynchronously
        """

    def run(
        self,
        pipeline: Pipeline,
        catalog: SharedMemoryCatalogProtocol,
        hook_manager: PluginManager | None = None,
        run_id: str | None = None,
        only_missing_outputs: bool = False
    ) -> dict[str, Any]:
        """
        Run pipeline in parallel using multiprocessing.

        Parameters:
        - pipeline: Pipeline to execute
        - catalog: MUST be SharedMemoryDataCatalog (or implement SharedMemoryCatalogProtocol)
                  Regular DataCatalog will fail - use SharedMemoryDataCatalog for parallel execution
        - hook_manager: Optional PluginManager instance for executing hooks
        - run_id: Unique identifier for this run
        - only_missing_outputs: Only run nodes with missing outputs

        Returns:
        Dictionary mapping output dataset names to AbstractDataset instances.
        The returned datasets are typically MemoryDataset instances containing
        the node outputs. IMPORTANT: Call .load() on each dataset to retrieve
        the actual data values.

        Example:
        >>> outputs = runner.run(pipeline, catalog)
        >>> result_data = outputs["my_output"].load()  # Extract actual data

        Note:
        - ParallelRunner automatically converts DataCatalog to SharedMemoryDataCatalog
          internally if needed - manual conversion is NOT required
        - Requires catalog datasets to be serializable/picklable for multiprocessing
        - Nodes run in parallel when dependencies allow
        - Catalog must support SharedMemoryCatalogProtocol (validate_catalog(), set_manager_datasets())
        """

ThreadRunner

class ThreadRunner:
    """Run pipeline nodes in parallel using threading."""

    def __init__(
        self,
        max_workers: int | None = None,
        is_async: bool = False
    ):
        """
        Initialize ThreadRunner.

        Parameters:
        - max_workers: Maximum number of parallel threads
        - is_async: Whether to run asynchronously
        """

    def run(
        self,
        pipeline: Pipeline,
        catalog: CatalogProtocol | SharedMemoryCatalogProtocol,
        hook_manager: PluginManager | None = None,
        run_id: str | None = None,
        only_missing_outputs: bool = False
    ) -> dict[str, Any]:
        """
        Run pipeline in parallel using threading.

        Parameters:
        - pipeline: Pipeline to execute
        - catalog: DataCatalog (or any CatalogProtocol/SharedMemoryCatalogProtocol implementation) for data I/O
        - hook_manager: Optional PluginManager instance for executing hooks
        - run_id: Unique identifier for this run
        - only_missing_outputs: Only run nodes with missing outputs

        Returns:
        Dictionary mapping output dataset names to AbstractDataset instances.
        The returned datasets are typically MemoryDataset instances containing
        the node outputs. IMPORTANT: Call .load() on each dataset to retrieve
        the actual data values.

        Example:
        >>> outputs = runner.run(pipeline, catalog)
        >>> result_data = outputs["my_output"].load()  # Extract actual data

        Note:
        - Nodes run in parallel when dependencies allow
        - Shares memory space (no pickling required)
        - Subject to Python GIL for CPU-bound operations
        - Best for I/O-bound workloads
        """

Runner Selection Guide

RunnerUse ForAdvantagesLimitations
SequentialRunnerDebugging, simple workflowsPredictable, easy to debugNo parallelism
ParallelRunnerCPU-bound workloadsTrue parallelism, bypasses GILRequires picklable data
ThreadRunnerI/O-bound workloadsLow overhead, shared memorySubject to GIL

Usage Examples

Sequential Execution

from kedro.runner import SequentialRunner

runner = SequentialRunner()
outputs = runner.run(pipeline, catalog)
# Note: outputs is a dict of dataset names to Dataset objects
# Use .load() to access the actual data
result = outputs["my_output"].load()

Parallel Execution (Multiprocessing)

from kedro.runner import ParallelRunner

runner = ParallelRunner(max_workers=4)
outputs = runner.run(pipeline, catalog)
# Access data from dataset objects
result = outputs["my_output"].load()

Parallel Execution (Threading)

from kedro.runner import ThreadRunner

runner = ThreadRunner(max_workers=8)
outputs = runner.run(pipeline, catalog)
# Access data from dataset objects
result = outputs["my_output"].load()

Incremental Execution

# Only run nodes whose outputs don't exist
runner = SequentialRunner()
outputs = runner.run(pipeline, catalog, only_missing_outputs=True)
# Access results using .load()
result = outputs["my_output"].load()

See also:

  • Parallel Execution Guide - Detailed runner usage
  • Pipeline API - Create pipelines to run

Install with Tessl CLI

npx tessl i tessl/pypi-kedro

docs

api

configuration.md

data-catalog-advanced.md

data-catalog.md

hooks.md

pipeline.md

runners-advanced.md

runners.md

index.md

tile.json