or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kedro@1.1.x

docs

index.md
tile.json

tessl/pypi-kedro

tessl install tessl/pypi-kedro@1.1.0

Kedro helps you build production-ready data and analytics pipelines

Agent Success

Agent success rate when using this tile

98%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.32x

Baseline

Agent success rate without this tile

74%

parallel-execution.mddocs/guides/

Parallel Execution Guide

Choose and configure runners for optimal pipeline execution performance.

Runner Comparison

RunnerBest ForAdvantagesLimitationsMax Workers
SequentialRunnerDebugging, simple workflowsPredictable, easy to debugNo parallelismN/A
ParallelRunnerCPU-bound tasksTrue parallelism, bypasses GILRequires picklable data, overheadCPU count
ThreadRunnerI/O-bound tasksLow overhead, shared memorySubject to GILCPU count + 4

Sequential Execution

When to Use

  • Debugging pipelines
  • Simple workflows
  • When parallelism provides no benefit

Usage

from kedro.runner import SequentialRunner

runner = SequentialRunner()
outputs = runner.run(pipeline, catalog)
# Access actual data using .load()
result = outputs["my_output"].load()

Parallel Execution (Multiprocessing)

When to Use

  • CPU-bound tasks (data processing, computations)
  • Independent nodes that can run simultaneously
  • Large datasets that benefit from parallel processing

Usage

from kedro.runner import ParallelRunner

# Use default number of workers (CPU count)
runner = ParallelRunner()
outputs = runner.run(pipeline, catalog)
# Note: outputs is a dict of dataset names to Dataset objects
result = outputs["my_output"].load()

# Specify max workers
runner = ParallelRunner(max_workers=4)
outputs = runner.run(pipeline, catalog)
result = outputs["my_output"].load()

Requirements

Use SharedMemoryDataCatalog for ParallelRunner:

ParallelRunner requires a catalog that implements validate_catalog(). Use SharedMemoryDataCatalog instead of regular DataCatalog:

from kedro.io import SharedMemoryDataCatalog, MemoryDataset

# ✅ Correct - use SharedMemoryDataCatalog with ParallelRunner
catalog = SharedMemoryDataCatalog({
    "data": MemoryDataset([1, 2, 3]),
    "output": MemoryDataset()
})

runner = ParallelRunner()
outputs = runner.run(pipeline, catalog)

# ❌ Incorrect - regular DataCatalog will cause errors
catalog = DataCatalog({...})  # Missing validate_catalog() method
runner = ParallelRunner()  # Will fail during execution

All datasets must be picklable:

# ✅ Picklable datasets
catalog = SharedMemoryDataCatalog({
    "data": MemoryDataset([1, 2, 3]),  # Picklable
    "output": CSVDataset("output.csv")  # Picklable
})

# ❌ Non-picklable datasets
catalog = SharedMemoryDataCatalog({
    "connection": DatabaseConnection()  # May not be picklable
})

How It Works

# Pipeline with independent branches
pipeline([
    node(process_a, "input", "output_a"),  # Can run in parallel
    node(process_b, "input", "output_b"),  # Can run in parallel
    node(combine, ["output_a", "output_b"], "final")  # Waits for both
])

# With ParallelRunner:
# - process_a and process_b run simultaneously
# - combine waits for both to complete

Thread-Based Parallel Execution

When to Use

  • I/O-bound tasks (API calls, file I/O, database queries)
  • Tasks that spend time waiting
  • When datasets aren't picklable

Usage

from kedro.runner import ThreadRunner

# Use default number of workers
runner = ThreadRunner()
outputs = runner.run(pipeline, catalog)
# Access data using .load()
result = outputs["my_output"].load()

# Specify max workers
runner = ThreadRunner(max_workers=8)
outputs = runner.run(pipeline, catalog)
result = outputs["my_output"].load()

Example: I/O-Bound Pipeline

import time

def fetch_api_a():
    time.sleep(1)  # Simulated API call
    return {"source": "api_a"}

def fetch_api_b():
    time.sleep(1)  # Simulated API call
    return {"source": "api_b"}

def merge_data(data_a, data_b):
    return {**data_a, **data_b}

# Create pipeline
pipeline_io = pipeline([
    node(fetch_api_a, None, "data_a"),
    node(fetch_api_b, None, "data_b"),
    node(merge_data, ["data_a", "data_b"], "merged")
])

# Run with ThreadRunner
runner = ThreadRunner(max_workers=2)
outputs = runner.run(pipeline_io, catalog)
# Completes in ~1 second instead of ~2 seconds

Incremental Execution

Skip Nodes with Existing Outputs

# Only run nodes whose outputs don't exist
runner = SequentialRunner()
outputs = runner.run(
    pipeline,
    catalog,
    only_missing_outputs=True
)

How It Works

  1. Checks which nodes have missing persistent outputs
  2. Runs only necessary nodes
  3. Skips nodes with existing outputs

Example:

# Pipeline: raw → clean → process → train → evaluate
# If clean, process, train outputs exist but evaluate output missing:
# - Runs only evaluate node
# - Skips raw, clean, process, train nodes

runner.run(pipeline, catalog, only_missing_outputs=True)

Choosing the Right Runner

Decision Tree

Do you need parallelism?
├─ No → SequentialRunner
└─ Yes
    ├─ CPU-bound tasks? → ParallelRunner
    │   ├─ Data picklable? → Yes: ParallelRunner
    │   └─ Data not picklable? → ThreadRunner (or refactor)
    └─ I/O-bound tasks? → ThreadRunner

Examples by Use Case

Data Processing Pipeline (CPU-bound)

# Parallel data transformations
runner = ParallelRunner(max_workers=4)

API Integration Pipeline (I/O-bound)

# Parallel API calls
runner = ThreadRunner(max_workers=8)

ML Training Pipeline (Mixed)

# Use SequentialRunner for training (single GPU)
# Use ParallelRunner for parallel data preprocessing
preprocessing_runner = ParallelRunner()
training_runner = SequentialRunner()

Debugging

# Always use SequentialRunner for debugging
runner = SequentialRunner()

Performance Optimization

1. Pipeline Structure

# ✅ Good: Independent branches enable parallelism
pipeline([
    node(process_region_a, "data", "result_a"),
    node(process_region_b, "data", "result_b"),
    node(process_region_c, "data", "result_c"),
    node(combine, ["result_a", "result_b", "result_c"], "final")
])

# ❌ Less parallel: Sequential dependencies
pipeline([
    node(step1, "data", "out1"),
    node(step2, "out1", "out2"),
    node(step3, "out2", "out3"),
    node(step4, "out3", "final")
])

2. Optimal Worker Count

import os

# CPU-bound: Use CPU count
cpu_count = os.cpu_count()
runner = ParallelRunner(max_workers=cpu_count)

# I/O-bound: Can use more workers
runner = ThreadRunner(max_workers=cpu_count * 2)

3. Dataset Caching

from kedro.io import CachedDataset

# Cache expensive loads
catalog["expensive_data"] = CachedDataset(
    ExpensiveDataset("source.db")
)

4. Incremental Runs

# Skip recomputing existing outputs
runner.run(pipeline, catalog, only_missing_outputs=True)

Common Patterns

Pattern: Mixed Runner Strategy

# Different runners for different pipeline stages
preprocessing_pipe = pipeline([...], tags=["preprocessing"])
training_pipe = pipeline([...], tags=["training"])

# Parallel preprocessing
ParallelRunner().run(
    full_pipeline.only_nodes_with_tags("preprocessing"),
    catalog
)

# Sequential training
SequentialRunner().run(
    full_pipeline.only_nodes_with_tags("training"),
    catalog
)

Pattern: Dynamic Worker Count

import os

def get_optimal_workers(task_type):
    cpu_count = os.cpu_count() or 1

    if task_type == "cpu":
        return cpu_count
    elif task_type == "io":
        return cpu_count * 2
    else:
        return 1

# Use dynamically
runner = ParallelRunner(max_workers=get_optimal_workers("cpu"))

Pattern: Conditional Parallelism

# Use parallel execution only for large datasets
def run_pipeline(pipeline, catalog, data_size):
    if data_size > 1000000:
        runner = ParallelRunner(max_workers=4)
    else:
        runner = SequentialRunner()

    return runner.run(pipeline, catalog)

Troubleshooting

Issue: PickleError with ParallelRunner

Problem: Dataset can't be serialized

# Error: Can't pickle database connection
catalog["db"] = DatabaseConnection(...)

Solution 1: Use ThreadRunner

runner = ThreadRunner()

Solution 2: Make dataset picklable

class PicklableDBDataset(AbstractDataset):
    def __init__(self, connection_params):
        self._params = connection_params  # Store params, not connection

    def _load(self):
        conn = create_connection(self._params)  # Recreate in each process
        return conn.fetch_data()

Issue: Poor ParallelRunner Performance

Problem: Parallel execution slower than sequential

Causes:

  • Small datasets (overhead > benefit)
  • Sequential dependencies (no parallelism possible)
  • Excessive number of workers

Solution: Profile and adjust

import time

start = time.time()
SequentialRunner().run(pipeline, catalog)
seq_time = time.time() - start

start = time.time()
ParallelRunner(max_workers=4).run(pipeline, catalog)
par_time = time.time() - start

print(f"Sequential: {seq_time:.2f}s, Parallel: {par_time:.2f}s")

Issue: ThreadRunner Not Faster

Problem: Thread-based execution not improving performance

Cause: CPU-bound tasks subject to GIL

Solution: Use ParallelRunner for CPU-bound tasks

# CPU-bound: Use processes
runner = ParallelRunner()

# I/O-bound: Use threads
runner = ThreadRunner()

Best Practices

1. Start with SequentialRunner

# Debug with SequentialRunner first
runner = SequentialRunner()
runner.run(pipeline, catalog)

# Then optimize with parallel runners

2. Match Runner to Workload

# CPU-intensive data processing
ParallelRunner()

# API calls, file I/O
ThreadRunner()

# Debugging, simple workflows
SequentialRunner()

3. Profile Before Optimizing

# Measure actual performance
import time

for runner_type in [SequentialRunner(), ParallelRunner(), ThreadRunner()]:
    start = time.time()
    runner_type.run(pipeline, catalog)
    print(f"{runner_type.__class__.__name__}: {time.time() - start:.2f}s")

4. Consider Memory Usage

# ParallelRunner creates copies in each process
# Can use significant memory for large datasets
# Monitor memory usage and adjust max_workers

See also:

  • Runners API Reference - Complete API documentation
  • Creating Pipelines Guide - Build parallelizable pipelines