or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

task.mdevals/scenario-8/

Data Processing Pipeline Runner

Build a data processing pipeline runner that executes multiple data transformation tasks in parallel using multiprocessing.

Requirements

Your system should:

  1. Create a data processing pipeline with at least 4 computational nodes that can be executed in parallel where dependencies allow
  2. Use multiprocessing to execute independent nodes concurrently
  3. Control the maximum number of parallel workers
  4. Ensure nodes execute in the correct order based on their data dependencies
  5. Handle both in-memory datasets and the results of node computations

Capabilities

Parallel Pipeline Execution

  • Given a pipeline with 3 independent nodes (no shared dependencies), when executed with a parallel runner, all 3 nodes run concurrently @test
  • Given a pipeline with 2 worker limit, when executed with 4 independent nodes, at most 2 nodes execute simultaneously @test

Dependency-Based Ordering

  • Given a pipeline where node B depends on node A's output, when executed in parallel, node A completes before node B starts @test
  • Given a diamond-shaped dependency (A -> B,C -> D), when executed in parallel, nodes B and C run concurrently after A, and D runs after both B and C complete @test

Implementation

@generates

API

from typing import Any, Callable, Dict, List, Optional


class Node:
    """Represents a computational unit in the pipeline."""

    def __init__(
        self,
        func: Callable,
        inputs: List[str],
        outputs: List[str],
        name: Optional[str] = None
    ):
        """
        Create a node.

        Args:
            func: The function to execute
            inputs: List of input dataset names
            outputs: List of output dataset names
            name: Optional name for the node
        """
        pass


class Pipeline:
    """Represents a collection of nodes to execute."""

    def __init__(self, nodes: List[Node]):
        """
        Create a pipeline.

        Args:
            nodes: List of nodes in the pipeline
        """
        pass


class DataCatalog:
    """Manages datasets for the pipeline."""

    def __init__(self, datasets: Optional[Dict[str, Any]] = None):
        """
        Create a data catalog.

        Args:
            datasets: Initial datasets as a dictionary
        """
        pass

    def load(self, dataset_name: str) -> Any:
        """Load a dataset."""
        pass

    def save(self, dataset_name: str, data: Any) -> None:
        """Save a dataset."""
        pass


class ParallelRunner:
    """Executes pipeline nodes in parallel using multiprocessing."""

    def __init__(self, max_workers: Optional[int] = None):
        """
        Create a parallel runner.

        Args:
            max_workers: Maximum number of parallel workers (None = CPU count)
        """
        pass

    def run(self, pipeline: Pipeline, catalog: DataCatalog) -> None:
        """
        Execute the pipeline in parallel.

        Args:
            pipeline: The pipeline to execute
            catalog: The data catalog containing input datasets
        """
        pass

Dependencies { .dependencies }

kedro { .dependency }

Provides pipeline execution framework with parallel runner support.

@satisfied-by