tessl install tessl/pypi-kedro@1.1.0Kedro helps you build production-ready data and analytics pipelines
Agent Success
Agent success rate when using this tile
98%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.32x
Baseline
Agent success rate without this tile
74%
Choose and configure runners for optimal pipeline execution performance.
| Runner | Best For | Advantages | Limitations | Max Workers |
|---|---|---|---|---|
| SequentialRunner | Debugging, simple workflows | Predictable, easy to debug | No parallelism | N/A |
| ParallelRunner | CPU-bound tasks | True parallelism, bypasses GIL | Requires picklable data, overhead | CPU count |
| ThreadRunner | I/O-bound tasks | Low overhead, shared memory | Subject to GIL | CPU count + 4 |
from kedro.runner import SequentialRunner
runner = SequentialRunner()
outputs = runner.run(pipeline, catalog)
# Access actual data using .load()
result = outputs["my_output"].load()from kedro.runner import ParallelRunner
# Use default number of workers (CPU count)
runner = ParallelRunner()
outputs = runner.run(pipeline, catalog)
# Note: outputs is a dict of dataset names to Dataset objects
result = outputs["my_output"].load()
# Specify max workers
runner = ParallelRunner(max_workers=4)
outputs = runner.run(pipeline, catalog)
result = outputs["my_output"].load()Use SharedMemoryDataCatalog for ParallelRunner:
ParallelRunner requires a catalog that implements validate_catalog(). Use SharedMemoryDataCatalog instead of regular DataCatalog:
from kedro.io import SharedMemoryDataCatalog, MemoryDataset
# ✅ Correct - use SharedMemoryDataCatalog with ParallelRunner
catalog = SharedMemoryDataCatalog({
"data": MemoryDataset([1, 2, 3]),
"output": MemoryDataset()
})
runner = ParallelRunner()
outputs = runner.run(pipeline, catalog)
# ❌ Incorrect - regular DataCatalog will cause errors
catalog = DataCatalog({...}) # Missing validate_catalog() method
runner = ParallelRunner() # Will fail during executionAll datasets must be picklable:
# ✅ Picklable datasets
catalog = SharedMemoryDataCatalog({
"data": MemoryDataset([1, 2, 3]), # Picklable
"output": CSVDataset("output.csv") # Picklable
})
# ❌ Non-picklable datasets
catalog = SharedMemoryDataCatalog({
"connection": DatabaseConnection() # May not be picklable
})# Pipeline with independent branches
pipeline([
node(process_a, "input", "output_a"), # Can run in parallel
node(process_b, "input", "output_b"), # Can run in parallel
node(combine, ["output_a", "output_b"], "final") # Waits for both
])
# With ParallelRunner:
# - process_a and process_b run simultaneously
# - combine waits for both to completefrom kedro.runner import ThreadRunner
# Use default number of workers
runner = ThreadRunner()
outputs = runner.run(pipeline, catalog)
# Access data using .load()
result = outputs["my_output"].load()
# Specify max workers
runner = ThreadRunner(max_workers=8)
outputs = runner.run(pipeline, catalog)
result = outputs["my_output"].load()import time
def fetch_api_a():
time.sleep(1) # Simulated API call
return {"source": "api_a"}
def fetch_api_b():
time.sleep(1) # Simulated API call
return {"source": "api_b"}
def merge_data(data_a, data_b):
return {**data_a, **data_b}
# Create pipeline
pipeline_io = pipeline([
node(fetch_api_a, None, "data_a"),
node(fetch_api_b, None, "data_b"),
node(merge_data, ["data_a", "data_b"], "merged")
])
# Run with ThreadRunner
runner = ThreadRunner(max_workers=2)
outputs = runner.run(pipeline_io, catalog)
# Completes in ~1 second instead of ~2 seconds# Only run nodes whose outputs don't exist
runner = SequentialRunner()
outputs = runner.run(
pipeline,
catalog,
only_missing_outputs=True
)Example:
# Pipeline: raw → clean → process → train → evaluate
# If clean, process, train outputs exist but evaluate output missing:
# - Runs only evaluate node
# - Skips raw, clean, process, train nodes
runner.run(pipeline, catalog, only_missing_outputs=True)Do you need parallelism?
├─ No → SequentialRunner
└─ Yes
├─ CPU-bound tasks? → ParallelRunner
│ ├─ Data picklable? → Yes: ParallelRunner
│ └─ Data not picklable? → ThreadRunner (or refactor)
└─ I/O-bound tasks? → ThreadRunnerData Processing Pipeline (CPU-bound)
# Parallel data transformations
runner = ParallelRunner(max_workers=4)API Integration Pipeline (I/O-bound)
# Parallel API calls
runner = ThreadRunner(max_workers=8)ML Training Pipeline (Mixed)
# Use SequentialRunner for training (single GPU)
# Use ParallelRunner for parallel data preprocessing
preprocessing_runner = ParallelRunner()
training_runner = SequentialRunner()Debugging
# Always use SequentialRunner for debugging
runner = SequentialRunner()# ✅ Good: Independent branches enable parallelism
pipeline([
node(process_region_a, "data", "result_a"),
node(process_region_b, "data", "result_b"),
node(process_region_c, "data", "result_c"),
node(combine, ["result_a", "result_b", "result_c"], "final")
])
# ❌ Less parallel: Sequential dependencies
pipeline([
node(step1, "data", "out1"),
node(step2, "out1", "out2"),
node(step3, "out2", "out3"),
node(step4, "out3", "final")
])import os
# CPU-bound: Use CPU count
cpu_count = os.cpu_count()
runner = ParallelRunner(max_workers=cpu_count)
# I/O-bound: Can use more workers
runner = ThreadRunner(max_workers=cpu_count * 2)from kedro.io import CachedDataset
# Cache expensive loads
catalog["expensive_data"] = CachedDataset(
ExpensiveDataset("source.db")
)# Skip recomputing existing outputs
runner.run(pipeline, catalog, only_missing_outputs=True)# Different runners for different pipeline stages
preprocessing_pipe = pipeline([...], tags=["preprocessing"])
training_pipe = pipeline([...], tags=["training"])
# Parallel preprocessing
ParallelRunner().run(
full_pipeline.only_nodes_with_tags("preprocessing"),
catalog
)
# Sequential training
SequentialRunner().run(
full_pipeline.only_nodes_with_tags("training"),
catalog
)import os
def get_optimal_workers(task_type):
cpu_count = os.cpu_count() or 1
if task_type == "cpu":
return cpu_count
elif task_type == "io":
return cpu_count * 2
else:
return 1
# Use dynamically
runner = ParallelRunner(max_workers=get_optimal_workers("cpu"))# Use parallel execution only for large datasets
def run_pipeline(pipeline, catalog, data_size):
if data_size > 1000000:
runner = ParallelRunner(max_workers=4)
else:
runner = SequentialRunner()
return runner.run(pipeline, catalog)Problem: Dataset can't be serialized
# Error: Can't pickle database connection
catalog["db"] = DatabaseConnection(...)Solution 1: Use ThreadRunner
runner = ThreadRunner()Solution 2: Make dataset picklable
class PicklableDBDataset(AbstractDataset):
def __init__(self, connection_params):
self._params = connection_params # Store params, not connection
def _load(self):
conn = create_connection(self._params) # Recreate in each process
return conn.fetch_data()Problem: Parallel execution slower than sequential
Causes:
Solution: Profile and adjust
import time
start = time.time()
SequentialRunner().run(pipeline, catalog)
seq_time = time.time() - start
start = time.time()
ParallelRunner(max_workers=4).run(pipeline, catalog)
par_time = time.time() - start
print(f"Sequential: {seq_time:.2f}s, Parallel: {par_time:.2f}s")Problem: Thread-based execution not improving performance
Cause: CPU-bound tasks subject to GIL
Solution: Use ParallelRunner for CPU-bound tasks
# CPU-bound: Use processes
runner = ParallelRunner()
# I/O-bound: Use threads
runner = ThreadRunner()# Debug with SequentialRunner first
runner = SequentialRunner()
runner.run(pipeline, catalog)
# Then optimize with parallel runners# CPU-intensive data processing
ParallelRunner()
# API calls, file I/O
ThreadRunner()
# Debugging, simple workflows
SequentialRunner()# Measure actual performance
import time
for runner_type in [SequentialRunner(), ParallelRunner(), ThreadRunner()]:
start = time.time()
runner_type.run(pipeline, catalog)
print(f"{runner_type.__class__.__name__}: {time.time() - start:.2f}s")# ParallelRunner creates copies in each process
# Can use significant memory for large datasets
# Monitor memory usage and adjust max_workersSee also: