or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

api

configuration.mddata-catalog-advanced.mddata-catalog.mdhooks.mdpipeline.mdrunners-advanced.mdrunners.md
index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

runners.mddocs/api/

Runners API Reference

Execute pipelines in different modes: sequentially, or in parallel using multiprocessing or threading.

Module Import

from kedro.runner import SequentialRunner, ParallelRunner, ThreadRunner, AbstractRunner
from kedro.io import DataCatalog, CatalogProtocol, SharedMemoryCatalogProtocol

SequentialRunner

class SequentialRunner:
    """Run pipeline nodes sequentially in topological order."""

    def __init__(self, is_async: bool = False):
        """Initialize SequentialRunner."""

    def run(
        self,
        pipeline: Pipeline,
        catalog: CatalogProtocol | SharedMemoryCatalogProtocol,
        hook_manager: PluginManager | None = None,
        run_id: str | None = None,
        only_missing_outputs: bool = False
    ) -> dict[str, Any]:
        """
        Run the pipeline sequentially.

        Parameters:
        - pipeline: Pipeline to execute
        - catalog: DataCatalog (or any CatalogProtocol/SharedMemoryCatalogProtocol implementation) for data I/O
        - hook_manager: Optional PluginManager instance for executing hooks (before_node_run, after_node_run, etc.)
        - run_id: Unique identifier for this run (used in logging and session tracking)
        - only_missing_outputs: Only run nodes with missing outputs

        Returns:
        Dictionary mapping output dataset names to AbstractDataset instances.
        The returned datasets are typically MemoryDataset instances containing
        the node outputs. IMPORTANT: Call .load() on each dataset to retrieve
        the actual data values.

        Example:
        >>> outputs = runner.run(pipeline, catalog)
        >>> # outputs is dict[str, AbstractDataset], not dict[str, actual_data]
        >>> result_data = outputs["my_output"].load()  # Extract actual data
        """

ParallelRunner

class ParallelRunner:
    """Run pipeline nodes in parallel using multiprocessing."""

    def __init__(
        self,
        max_workers: int | None = None,
        is_async: bool = False
    ):
        """
        Initialize ParallelRunner.

        Parameters:
        - max_workers: Maximum number of parallel processes (defaults to CPU count)
        - is_async: Whether to run asynchronously
        """

    def run(
        self,
        pipeline: Pipeline,
        catalog: SharedMemoryCatalogProtocol,
        hook_manager: PluginManager | None = None,
        run_id: str | None = None,
        only_missing_outputs: bool = False
    ) -> dict[str, Any]:
        """
        Run pipeline in parallel using multiprocessing.

        Parameters:
        - pipeline: Pipeline to execute
        - catalog: MUST be SharedMemoryDataCatalog (or implement SharedMemoryCatalogProtocol)
                  Regular DataCatalog will fail - use SharedMemoryDataCatalog for parallel execution
        - hook_manager: Optional PluginManager instance for executing hooks
        - run_id: Unique identifier for this run
        - only_missing_outputs: Only run nodes with missing outputs

        Returns:
        Dictionary mapping output dataset names to AbstractDataset instances.
        The returned datasets are typically MemoryDataset instances containing
        the node outputs. IMPORTANT: Call .load() on each dataset to retrieve
        the actual data values.

        Example:
        >>> outputs = runner.run(pipeline, catalog)
        >>> result_data = outputs["my_output"].load()  # Extract actual data

        Note:
        - ParallelRunner automatically converts DataCatalog to SharedMemoryDataCatalog
          internally if needed - manual conversion is NOT required
        - Requires catalog datasets to be serializable/picklable for multiprocessing
        - Nodes run in parallel when dependencies allow
        - Catalog must support SharedMemoryCatalogProtocol (validate_catalog(), set_manager_datasets())
        """

ThreadRunner

class ThreadRunner:
    """Run pipeline nodes in parallel using threading."""

    def __init__(
        self,
        max_workers: int | None = None,
        is_async: bool = False
    ):
        """
        Initialize ThreadRunner.

        Parameters:
        - max_workers: Maximum number of parallel threads
        - is_async: Whether to run asynchronously
        """

    def run(
        self,
        pipeline: Pipeline,
        catalog: CatalogProtocol | SharedMemoryCatalogProtocol,
        hook_manager: PluginManager | None = None,
        run_id: str | None = None,
        only_missing_outputs: bool = False
    ) -> dict[str, Any]:
        """
        Run pipeline in parallel using threading.

        Parameters:
        - pipeline: Pipeline to execute
        - catalog: DataCatalog (or any CatalogProtocol/SharedMemoryCatalogProtocol implementation) for data I/O
        - hook_manager: Optional PluginManager instance for executing hooks
        - run_id: Unique identifier for this run
        - only_missing_outputs: Only run nodes with missing outputs

        Returns:
        Dictionary mapping output dataset names to AbstractDataset instances.
        The returned datasets are typically MemoryDataset instances containing
        the node outputs. IMPORTANT: Call .load() on each dataset to retrieve
        the actual data values.

        Example:
        >>> outputs = runner.run(pipeline, catalog)
        >>> result_data = outputs["my_output"].load()  # Extract actual data

        Note:
        - Nodes run in parallel when dependencies allow
        - Shares memory space (no pickling required)
        - Subject to Python GIL for CPU-bound operations
        - Best for I/O-bound workloads
        """

Runner Selection Guide

RunnerUse ForAdvantagesLimitations
SequentialRunnerDebugging, simple workflowsPredictable, easy to debugNo parallelism
ParallelRunnerCPU-bound workloadsTrue parallelism, bypasses GILRequires picklable data
ThreadRunnerI/O-bound workloadsLow overhead, shared memorySubject to GIL

Usage Examples

Sequential Execution

from kedro.runner import SequentialRunner

runner = SequentialRunner()
outputs = runner.run(pipeline, catalog)
# Note: outputs is a dict of dataset names to Dataset objects
# Use .load() to access the actual data
result = outputs["my_output"].load()

Parallel Execution (Multiprocessing)

from kedro.runner import ParallelRunner

runner = ParallelRunner(max_workers=4)
outputs = runner.run(pipeline, catalog)
# Access data from dataset objects
result = outputs["my_output"].load()

Parallel Execution (Threading)

from kedro.runner import ThreadRunner

runner = ThreadRunner(max_workers=8)
outputs = runner.run(pipeline, catalog)
# Access data from dataset objects
result = outputs["my_output"].load()

Incremental Execution

# Only run nodes whose outputs don't exist
runner = SequentialRunner()
outputs = runner.run(pipeline, catalog, only_missing_outputs=True)
# Access results using .load()
result = outputs["my_output"].load()

See also:

  • Parallel Execution Guide - Detailed runner usage
  • Pipeline API - Create pipelines to run