0
# Pipeline Execution
1
2
Kedro provides multiple execution strategies for running pipelines with different parallelization approaches. Runners handle the orchestration of node execution, data loading/saving, and dependency resolution.
3
4
## Capabilities
5
6
### Abstract Runner
7
8
Base class defining the interface for all pipeline runners with common execution patterns.
9
10
```python { .api }
11
class AbstractRunner:
12
"""Abstract base class for all pipeline runners."""
13
14
def run(self, pipeline, catalog, hook_manager=None, run_id=None, only_missing_outputs=False):
15
"""
16
Run pipeline with given catalog.
17
18
Args:
19
pipeline (Pipeline): Pipeline to execute
20
catalog (CatalogProtocol): Data catalog for loading/saving datasets
21
hook_manager (PluginManager, optional): Hook manager for lifecycle events
22
run_id (str, optional): Run identifier for tracking
23
only_missing_outputs (bool): Whether to skip nodes whose outputs already exist
24
25
Returns:
26
dict: Mapping of dataset names to final node that produced them
27
"""
28
29
def run_only_missing(self, pipeline, catalog, hook_manager=None, run_id=None):
30
"""
31
Run pipeline but skip nodes whose outputs already exist.
32
33
Args:
34
pipeline (Pipeline): Pipeline to execute
35
catalog (CatalogProtocol): Data catalog for loading/saving datasets
36
hook_manager (PluginManager, optional): Hook manager for lifecycle events
37
run_id (str, optional): Run identifier for tracking
38
39
Returns:
40
dict: Mapping of dataset names to final node that produced them
41
"""
42
43
def create_default_data_set(self, ds_name):
44
"""
45
Create default dataset for missing catalog entries.
46
47
Args:
48
ds_name (str): Dataset name
49
50
Returns:
51
AbstractDataset: Default dataset instance
52
"""
53
```
54
55
### Sequential Runner
56
57
Executes pipeline nodes sequentially in a single thread with simple dependency resolution.
58
59
```python { .api }
60
class SequentialRunner(AbstractRunner):
61
"""Runs pipeline nodes sequentially in a single thread."""
62
63
def __init__(self, is_async=False):
64
"""
65
Initialize sequential runner.
66
67
Args:
68
is_async (bool): Whether to run nodes asynchronously
69
"""
70
```
71
72
### Parallel Runner
73
74
Executes pipeline nodes in parallel using multiprocessing for CPU-intensive workloads.
75
76
```python { .api }
77
class ParallelRunner(AbstractRunner):
78
"""Runs pipeline nodes in parallel using multiprocessing."""
79
80
def __init__(self, max_workers=None, is_async=False):
81
"""
82
Initialize parallel runner.
83
84
Args:
85
max_workers (int, optional): Maximum number of worker processes
86
is_async (bool): Whether to run nodes asynchronously
87
"""
88
```
89
90
### Thread Runner
91
92
Executes pipeline nodes in parallel using threading for I/O-bound workloads.
93
94
```python { .api }
95
class ThreadRunner(AbstractRunner):
96
"""Runs pipeline nodes in parallel using threading."""
97
98
def __init__(self, max_workers=None, is_async=False):
99
"""
100
Initialize thread runner.
101
102
Args:
103
max_workers (int, optional): Maximum number of worker threads
104
is_async (bool): Whether to run nodes asynchronously
105
"""
106
```
107
108
### Task Utilities
109
110
Support classes for parallel execution and task management.
111
112
```python { .api }
113
class Task:
114
"""Represents a runnable task for parallel execution."""
115
116
def __init__(self, node, catalog):
117
"""
118
Initialize task.
119
120
Args:
121
node (Node): Pipeline node to execute
122
catalog (DataCatalog): Data catalog for loading/saving
123
"""
124
125
def run(self):
126
"""
127
Execute the task.
128
129
Returns:
130
Any: Task execution result
131
"""
132
133
@property
134
def node(self):
135
"""Pipeline node being executed."""
136
137
@property
138
def catalog(self):
139
"""Data catalog for dataset operations."""
140
```
141
142
## Usage Examples
143
144
### Basic Runner Usage
145
146
```python
147
from kedro.pipeline import node, pipeline
148
from kedro.io import DataCatalog, MemoryDataset
149
from kedro.runner import SequentialRunner
150
151
# Define processing functions
152
def process_data(input_data):
153
return [x * 2 for x in input_data]
154
155
def summarize_data(processed_data):
156
return {"count": len(processed_data), "sum": sum(processed_data)}
157
158
# Create pipeline
159
data_pipeline = pipeline([
160
node(process_data, "raw_data", "processed_data"),
161
node(summarize_data, "processed_data", "summary")
162
])
163
164
# Set up catalog
165
catalog = DataCatalog({
166
"raw_data": MemoryDataset([1, 2, 3, 4, 5]),
167
"processed_data": MemoryDataset(),
168
"summary": MemoryDataset()
169
})
170
171
# Run with sequential runner
172
runner = SequentialRunner()
173
result = runner.run(data_pipeline, catalog)
174
175
# Access results
176
summary = catalog.load("summary")
177
print(summary) # {"count": 5, "sum": 30}
178
```
179
180
### Parallel Execution
181
182
```python
183
from kedro.runner import ParallelRunner
184
from kedro.pipeline import node, pipeline
185
from kedro.io import DataCatalog, MemoryDataset
186
187
# CPU-intensive processing functions
188
def expensive_computation_a(data):
189
# Simulate expensive computation
190
import time
191
time.sleep(1)
192
return [x ** 2 for x in data]
193
194
def expensive_computation_b(data):
195
# Simulate expensive computation
196
import time
197
time.sleep(1)
198
return [x ** 3 for x in data]
199
200
def combine_results(results_a, results_b):
201
return list(zip(results_a, results_b))
202
203
# Create pipeline with parallel branches
204
parallel_pipeline = pipeline([
205
node(expensive_computation_a, "input_data", "squared_data", name="square"),
206
node(expensive_computation_b, "input_data", "cubed_data", name="cube"),
207
node(combine_results, ["squared_data", "cubed_data"], "combined_data", name="combine")
208
])
209
210
# Set up catalog
211
catalog = DataCatalog({
212
"input_data": MemoryDataset([1, 2, 3, 4, 5]),
213
"squared_data": MemoryDataset(),
214
"cubed_data": MemoryDataset(),
215
"combined_data": MemoryDataset()
216
})
217
218
# Run with parallel runner (square and cube nodes run in parallel)
219
parallel_runner = ParallelRunner(max_workers=2)
220
result = parallel_runner.run(parallel_pipeline, catalog)
221
222
# Access results
223
combined = catalog.load("combined_data")
224
print(combined) # [(1, 1), (4, 8), (9, 27), (16, 64), (25, 125)]
225
```
226
227
### Thread Runner for I/O Operations
228
229
```python
230
from kedro.runner import ThreadRunner
231
from kedro.pipeline import node, pipeline
232
from kedro.io import DataCatalog, MemoryDataset
233
import time
234
import requests
235
236
# I/O-bound functions
237
def fetch_data_source_a():
238
# Simulate API call
239
time.sleep(0.5)
240
return {"source": "A", "data": [1, 2, 3]}
241
242
def fetch_data_source_b():
243
# Simulate API call
244
time.sleep(0.5)
245
return {"source": "B", "data": [4, 5, 6]}
246
247
def merge_sources(source_a, source_b):
248
return {
249
"sources": [source_a["source"], source_b["source"]],
250
"combined_data": source_a["data"] + source_b["data"]
251
}
252
253
# Create pipeline with I/O operations
254
io_pipeline = pipeline([
255
node(fetch_data_source_a, None, "data_a", name="fetch_a"),
256
node(fetch_data_source_b, None, "data_b", name="fetch_b"),
257
node(merge_sources, ["data_a", "data_b"], "merged_data", name="merge")
258
])
259
260
# Set up catalog
261
catalog = DataCatalog({
262
"data_a": MemoryDataset(),
263
"data_b": MemoryDataset(),
264
"merged_data": MemoryDataset()
265
})
266
267
# Run with thread runner (fetch operations run concurrently)
268
thread_runner = ThreadRunner(max_workers=2)
269
result = thread_runner.run(io_pipeline, catalog)
270
271
# Access results
272
merged = catalog.load("merged_data")
273
print(merged) # {"sources": ["A", "B"], "combined_data": [1, 2, 3, 4, 5, 6]}
274
```
275
276
### Partial Pipeline Execution
277
278
```python
279
from kedro.runner import SequentialRunner
280
from kedro.pipeline import node, pipeline
281
from kedro.io import DataCatalog, MemoryDataset
282
283
# Create multi-stage pipeline
284
full_pipeline = pipeline([
285
node(load_raw_data, None, "raw_data", name="load"),
286
node(validate_data, "raw_data", "validated_data", name="validate"),
287
node(clean_data, "validated_data", "clean_data", name="clean"),
288
node(feature_engineering, "clean_data", "features", name="features"),
289
node(train_model, "features", "model", name="train"),
290
node(evaluate_model, "model", "metrics", name="evaluate")
291
])
292
293
# Set up catalog with some existing data
294
catalog = DataCatalog({
295
"raw_data": MemoryDataset(),
296
"validated_data": MemoryDataset(),
297
"clean_data": MemoryDataset([1, 2, 3, 4, 5]), # Pre-existing clean data
298
"features": MemoryDataset(),
299
"model": MemoryDataset(),
300
"metrics": MemoryDataset()
301
})
302
303
# Run only missing nodes (will skip load, validate, clean)
304
runner = SequentialRunner()
305
result = runner.run_only_missing(full_pipeline, catalog)
306
307
# Or run specific pipeline subset
308
feature_and_model_pipeline = full_pipeline.from_nodes("features").to_nodes("evaluate")
309
result = runner.run(feature_and_model_pipeline, catalog)
310
```
311
312
### Custom Runner Implementation
313
314
```python
315
from kedro.runner import AbstractRunner
316
from kedro.io import MemoryDataset
317
import logging
318
319
class LoggingRunner(AbstractRunner):
320
"""Custom runner that logs detailed execution information."""
321
322
def __init__(self):
323
self.logger = logging.getLogger(__name__)
324
325
def run(self, pipeline, catalog, hook_manager=None, session_id=None):
326
"""Run pipeline with detailed logging."""
327
self.logger.info(f"Starting pipeline execution with {len(pipeline.nodes)} nodes")
328
329
# Simple sequential execution with logging
330
for node in pipeline.nodes:
331
self.logger.info(f"Executing node: {node.name}")
332
333
# Load inputs
334
inputs = {}
335
for input_name in node.inputs:
336
if catalog.exists(input_name):
337
inputs[input_name] = catalog.load(input_name)
338
self.logger.info(f"Loaded input: {input_name}")
339
else:
340
# Create default dataset
341
catalog.add(input_name, self.create_default_data_set(input_name))
342
inputs[input_name] = catalog.load(input_name)
343
self.logger.warning(f"Created default dataset for: {input_name}")
344
345
# Execute node
346
try:
347
outputs = node.run(inputs)
348
self.logger.info(f"Node {node.name} executed successfully")
349
350
# Save outputs
351
if isinstance(outputs, dict):
352
for output_name, output_data in outputs.items():
353
catalog.save(output_name, output_data)
354
self.logger.info(f"Saved output: {output_name}")
355
else:
356
# Single output
357
output_name = node.outputs[0] if isinstance(node.outputs, list) else node.outputs
358
catalog.save(output_name, outputs)
359
self.logger.info(f"Saved output: {output_name}")
360
361
except Exception as e:
362
self.logger.error(f"Node {node.name} failed: {str(e)}")
363
raise
364
365
self.logger.info("Pipeline execution completed")
366
return {}
367
368
# Usage
369
logging.basicConfig(level=logging.INFO)
370
custom_runner = LoggingRunner()
371
result = custom_runner.run(pipeline, catalog)
372
```
373
374
### Runner Selection Based on Pipeline Characteristics
375
376
```python
377
def select_runner(pipeline, max_parallel_nodes=4):
378
"""Select appropriate runner based on pipeline characteristics."""
379
380
# Count nodes that can run in parallel
381
parallel_nodes = 0
382
node_dependencies = {}
383
384
for node in pipeline.nodes:
385
dependencies = set()
386
for other_node in pipeline.nodes:
387
if set(node.inputs) & set(other_node.outputs):
388
dependencies.add(other_node.name)
389
node_dependencies[node.name] = dependencies
390
391
# Simple heuristic: if many nodes have no dependencies, use parallel execution
392
independent_nodes = sum(1 for deps in node_dependencies.values() if not deps)
393
394
if independent_nodes >= max_parallel_nodes:
395
return ParallelRunner(max_workers=min(independent_nodes, 4))
396
elif independent_nodes >= 2:
397
return ThreadRunner(max_workers=min(independent_nodes, 2))
398
else:
399
return SequentialRunner()
400
401
# Usage
402
optimal_runner = select_runner(my_pipeline)
403
result = optimal_runner.run(my_pipeline, catalog)
404
```
405
406
## Types
407
408
```python { .api }
409
from typing import Dict, Optional, Any, Callable
410
from kedro.pipeline import Pipeline
411
from kedro.io import DataCatalog
412
413
RunResult = Dict[str, Any]
414
HookManager = Any # Hook manager type
415
SessionId = Optional[str]
416
WorkerCount = Optional[int]
417
```