tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Build a data processing pipeline runner that executes multiple data transformation tasks in parallel using multiprocessing.
Your system should:
@generates
from typing import Any, Callable, Dict, List, Optional
class Node:
"""Represents a computational unit in the pipeline."""
def __init__(
self,
func: Callable,
inputs: List[str],
outputs: List[str],
name: Optional[str] = None
):
"""
Create a node.
Args:
func: The function to execute
inputs: List of input dataset names
outputs: List of output dataset names
name: Optional name for the node
"""
pass
class Pipeline:
"""Represents a collection of nodes to execute."""
def __init__(self, nodes: List[Node]):
"""
Create a pipeline.
Args:
nodes: List of nodes in the pipeline
"""
pass
class DataCatalog:
"""Manages datasets for the pipeline."""
def __init__(self, datasets: Optional[Dict[str, Any]] = None):
"""
Create a data catalog.
Args:
datasets: Initial datasets as a dictionary
"""
pass
def load(self, dataset_name: str) -> Any:
"""Load a dataset."""
pass
def save(self, dataset_name: str, data: Any) -> None:
"""Save a dataset."""
pass
class ParallelRunner:
"""Executes pipeline nodes in parallel using multiprocessing."""
def __init__(self, max_workers: Optional[int] = None):
"""
Create a parallel runner.
Args:
max_workers: Maximum number of parallel workers (None = CPU count)
"""
pass
def run(self, pipeline: Pipeline, catalog: DataCatalog) -> None:
"""
Execute the pipeline in parallel.
Args:
pipeline: The pipeline to execute
catalog: The data catalog containing input datasets
"""
passProvides pipeline execution framework with parallel runner support.
@satisfied-by