or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

api

configuration.mddata-catalog-advanced.mddata-catalog.mdhooks.mdpipeline.mdrunners-advanced.mdrunners.md
index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

runners-advanced.mddocs/api/

Runners Advanced APIs

Detailed runner implementation classes for custom runner development.

Module Import

from kedro.runner import AbstractRunner
from concurrent.futures import ProcessPoolExecutor
from multiprocessing.managers import SyncManager

AbstractRunner

class AbstractRunner:
    """
    Abstract base class for all Pipeline runner implementations.
    Defines the interface and common behavior for pipeline execution.
    """

    def run(
        self,
        pipeline: Pipeline,
        catalog: DataCatalog,
        hook_manager: Any | None = None,
        run_id: str | None = None,
        only_missing_outputs: bool = False
    ) -> dict[str, Any]:
        """
        Run the pipeline.

        Parameters:
        - pipeline: Pipeline to execute
        - catalog: DataCatalog for data I/O operations
        - hook_manager: Optional hook manager for lifecycle callbacks
        - run_id: Unique identifier for tracking this run
        - only_missing_outputs: If True, only run nodes whose outputs don't
          already exist in the catalog

        Returns:
        Dictionary mapping output dataset names to MemoryDataset instances

        Note:
        The only_missing_outputs parameter enables incremental pipeline execution
        by filtering out nodes whose persistent outputs already exist in the
        catalog. The runner uses reverse topological order to determine which
        nodes need to run based on:
        1. Nodes with no outputs always run
        2. Nodes with missing persistent outputs must run
        3. Nodes producing ephemeral/missing data for downstream nodes must run
        """

    def _run(
        self,
        pipeline: Pipeline,
        catalog: DataCatalog,
        hook_manager: Any | None = None,
        run_id: str | None = None
    ) -> None:
        """
        Internal method implementing the run logic.
        Must be implemented by subclasses.

        Parameters:
        - pipeline: Pipeline to execute
        - catalog: DataCatalog for data I/O
        - hook_manager: Optional hook manager
        - run_id: Unique run identifier
        """

    def _filter_pipeline_for_missing_outputs(
        self,
        pipeline: Pipeline,
        catalog: DataCatalog
    ) -> Pipeline:
        """
        Filter pipeline to only include nodes with missing outputs.

        Uses reverse topological traversal to determine which nodes are needed.
        Starts from outputs and works backwards, including nodes if:
        - They have no outputs (side effects)
        - Their persistent outputs don't exist
        - Downstream nodes need their ephemeral outputs

        Parameters:
        - pipeline: Pipeline to filter
        - catalog: DataCatalog to check for existing outputs

        Returns:
        Filtered pipeline with only necessary nodes
        """

ParallelRunnerManager

class ParallelRunnerManager:
    """
    Manager for coordinating parallel runner execution.
    Handles process pool management, data sharing via multiprocessing,
    and synchronization of catalog state across processes.

    Note:
    This is an internal implementation detail of ParallelRunner.
    Users typically don't interact with this class directly.
    """

    def __init__(self, max_workers: int | None = None):
        """
        Initialize ParallelRunnerManager.

        Parameters:
        - max_workers: Maximum number of worker processes
                      (defaults to CPU count if None)
        """

    def _init_manager(self) -> SyncManager:
        """
        Initialize multiprocessing SyncManager.

        Returns:
        Started SyncManager instance for sharing data across processes
        """

    def _create_process_pool(self) -> ProcessPoolExecutor:
        """
        Create process pool for parallel execution.

        Returns:
        ProcessPoolExecutor with configured max_workers
        """

    def _sync_catalog(
        self,
        catalog: DataCatalog,
        manager: SyncManager
    ) -> SharedMemoryDataCatalog:
        """
        Synchronize catalog for multiprocessing.

        Converts regular DataCatalog to SharedMemoryDataCatalog and
        sets up shared memory for datasets.

        Parameters:
        - catalog: Original DataCatalog
        - manager: SyncManager for shared memory

        Returns:
        SharedMemoryDataCatalog ready for parallel execution
        """

    def run_nodes_in_parallel(
        self,
        nodes: list[Node],
        catalog: SharedMemoryDataCatalog,
        run_id: str
    ) -> None:
        """
        Execute nodes in parallel across worker processes.

        Parameters:
        - nodes: List of nodes to execute
        - catalog: Shared memory catalog
        - run_id: Unique run identifier
        """

Task

class Task:
    """
    Represents a single task wrapping node execution with catalog and hooks.
    Coordinates node execution, data loading/saving, and hook callbacks.

    Used internally by runners to execute pipeline nodes with proper
    resource management and error handling.
    """

    def __init__(
        self,
        node: Node,
        catalog: DataCatalog,
        hook_manager: Any | None = None,
        is_async: bool = False,
        run_id: str | None = None,
        parallel: bool = False
    ):
        """
        Initialize Task.

        Parameters:
        - node: Node to execute
        - catalog: DataCatalog for loading inputs and saving outputs
        - hook_manager: Optional hook manager for lifecycle callbacks
        - is_async: Whether task runs asynchronously
        - run_id: Unique run identifier for tracking
        - parallel: Whether task is running in parallel mode
        """

    def execute(self) -> Node:
        """
        Execute the task (run the node with its inputs/outputs).

        Process:
        1. Load node inputs from catalog
        2. Call before_node_run hooks
        3. Execute node function
        4. Save node outputs to catalog
        5. Call after_node_run hooks
        6. Handle errors with on_node_error hooks

        Returns:
        The node that was executed

        Raises:
        TaskError: If task execution fails (wraps underlying exceptions)
        """

    def __call__(self) -> Node:
        """
        Make the Task instance callable for ProcessPoolExecutor.

        This allows Task instances to be submitted directly to process pools
        for parallel execution. Internally calls execute().

        Returns:
        The node that was executed
        """

    @property
    def node(self) -> Node:
        """
        Get the node being executed.

        Returns:
        Node instance
        """

TaskError

class TaskError(Exception):
    """
    Raised for task execution failures during pipeline node execution.
    Wraps underlying exceptions that occur during task execution in parallel runners.
    """

Usage Examples

Custom Runner Implementation

from kedro.runner import AbstractRunner
from kedro.runner.runner import Task

class CustomRunner(AbstractRunner):
    """Custom runner with specific execution logic."""

    def _run(self, pipeline, catalog, hook_manager=None, run_id=None):
        """Implement custom execution strategy."""
        # Get nodes in execution order
        nodes = pipeline.nodes

        # Execute each node
        for node in nodes:
            task = Task(
                node=node,
                catalog=catalog,
                hook_manager=hook_manager,
                run_id=run_id
            )
            task.execute()

Incremental Execution Logic

from kedro.runner import SequentialRunner

runner = SequentialRunner()

# Enable incremental execution
outputs = runner.run(
    pipeline,
    catalog,
    only_missing_outputs=True
)

# Runner internally calls _filter_pipeline_for_missing_outputs
# to determine which nodes to run based on existing outputs

Task Execution with Error Handling

from kedro.runner.runner import Task, TaskError

task = Task(
    node=my_node,
    catalog=catalog,
    run_id="run_001"
)

try:
    result_node = task.execute()
    print(f"Task completed: {result_node.name}")
except TaskError as e:
    print(f"Task failed: {e.node_name}")
    print(f"Original error: {e.original_exception}")

ParallelRunner Internals

from kedro.runner import ParallelRunner

runner = ParallelRunner(max_workers=4)

# Internally, ParallelRunner:
# 1. Creates ParallelRunnerManager
# 2. Initializes SyncManager for shared memory
# 3. Converts DataCatalog to SharedMemoryDataCatalog
# 4. Creates process pool
# 5. Executes nodes in parallel groups
# 6. Synchronizes results back to main process

Custom Task Processing

from kedro.runner.runner import Task

class MonitoredTask(Task):
    """Task with monitoring."""

    def execute(self):
        """Execute with timing."""
        import time
        start = time.time()

        try:
            result = super().execute()
            duration = time.time() - start
            print(f"Task {self.node.name} took {duration:.2f}s")
            return result
        except Exception as e:
            duration = time.time() - start
            print(f"Task {self.node.name} failed after {duration:.2f}s")
            raise

Implementation Notes

Runner Responsibilities

  1. Pipeline Validation: Ensure pipeline is valid DAG
  2. Execution Order: Determine node execution order from dependencies
  3. Resource Management: Handle catalog, hooks, and process pools
  4. Error Handling: Catch and report node failures
  5. Output Collection: Gather and return pipeline outputs

Task Lifecycle

  1. Initialization: Task created with node, catalog, hooks
  2. Input Loading: Load all required inputs from catalog
  3. Execution: Run node function with inputs
  4. Output Saving: Save all outputs to catalog
  5. Completion: Return executed node

Parallel Execution Flow

  1. Preparation: Convert catalog to shared memory catalog
  2. Grouping: Group nodes by execution level (parallel groups)
  3. Distribution: Distribute tasks across worker processes
  4. Synchronization: Wait for group completion before next group
  5. Cleanup: Release resources and return outputs

See also:

  • Runners API - Main runner classes
  • Parallel Execution Guide - Usage patterns
  • Task Error Handling - Error handling