0
# Pipeline Construction
1
2
Kedro's pipeline system provides tools for building directed acyclic graphs (DAGs) of computation nodes with automatic dependency resolution. Pipelines enable modular, reusable data processing workflows with filtering, composition, and transformation capabilities.
3
4
## Capabilities
5
6
### Pipeline Class
7
8
Container for nodes with dependency management, filtering, and composition operations.
9
10
```python { .api }
11
class Pipeline:
12
"""Collection of nodes with dependency management and filtering."""
13
14
def __init__(self, nodes, *, inputs=None, outputs=None, parameters=None, tags=None, namespace=None, prefix_datasets_with_namespace=True):
15
"""
16
Initialize pipeline with nodes.
17
18
Args:
19
nodes (Iterable[Node] or Pipeline): Collection of pipeline nodes or existing pipeline
20
inputs (str or set or dict, optional): Input names to expose as connection points
21
outputs (str or set or dict, optional): Output names to expose as connection points
22
parameters (str or set or dict, optional): Parameter names to expose
23
tags (str or Iterable[str], optional): Tags to apply to all nodes
24
namespace (str, optional): Namespace for the pipeline
25
prefix_datasets_with_namespace (bool): Whether to prefix dataset names with namespace
26
"""
27
28
def filter(self, tags=None, from_nodes=None, to_nodes=None, node_names=None, from_inputs=None, to_outputs=None):
29
"""
30
Filter pipeline based on various criteria.
31
32
Args:
33
tags (str or Iterable[str], optional): Filter by node tags
34
from_nodes (str or Iterable[str], optional): Include nodes from specified nodes
35
to_nodes (str or Iterable[str], optional): Include nodes up to specified nodes
36
node_names (str or Iterable[str], optional): Filter by node names
37
from_inputs (str or Iterable[str], optional): Include nodes from specified inputs
38
to_outputs (str or Iterable[str], optional): Include nodes to specified outputs
39
40
Returns:
41
Pipeline: Filtered pipeline
42
"""
43
44
def tag(self, tags):
45
"""
46
Add tags to all nodes in pipeline.
47
48
Args:
49
tags (str or Iterable[str]): Tags to add
50
51
Returns:
52
Pipeline: Pipeline with tagged nodes
53
"""
54
55
def only_nodes(self, *node_names):
56
"""
57
Create pipeline with only specified nodes.
58
59
Args:
60
*node_names: Node names to include
61
62
Returns:
63
Pipeline: Pipeline with only specified nodes
64
"""
65
66
def from_nodes(self, *node_names):
67
"""
68
Create pipeline starting from specified nodes.
69
70
Args:
71
*node_names: Node names to start from
72
73
Returns:
74
Pipeline: Pipeline starting from specified nodes
75
"""
76
77
def to_nodes(self, *node_names):
78
"""
79
Create pipeline ending at specified nodes.
80
81
Args:
82
*node_names: Node names to end at
83
84
Returns:
85
Pipeline: Pipeline ending at specified nodes
86
"""
87
88
def from_inputs(self, *inputs):
89
"""
90
Create pipeline starting from specified inputs.
91
92
Args:
93
*inputs: Input dataset names
94
95
Returns:
96
Pipeline: Pipeline starting from specified inputs
97
"""
98
99
def to_outputs(self, *outputs):
100
"""
101
Create pipeline ending at specified outputs.
102
103
Args:
104
*outputs: Output dataset names
105
106
Returns:
107
Pipeline: Pipeline ending at specified outputs
108
"""
109
110
def describe(self):
111
"""
112
Describe pipeline structure.
113
114
Returns:
115
str: Pipeline description
116
"""
117
118
def __add__(self, other):
119
"""
120
Combine pipelines using + operator.
121
122
Args:
123
other (Pipeline): Pipeline to combine with
124
125
Returns:
126
Pipeline: Combined pipeline
127
"""
128
129
def __or__(self, other):
130
"""
131
Combine pipelines using | operator (union).
132
133
Args:
134
other (Pipeline): Pipeline to combine with
135
136
Returns:
137
Pipeline: Combined pipeline
138
"""
139
140
@property
141
def nodes(self):
142
"""List of nodes in pipeline."""
143
144
@property
145
def all_inputs(self):
146
"""Set of all input dataset names."""
147
148
@property
149
def all_outputs(self):
150
"""Set of all output dataset names."""
151
```
152
153
### Node Class
154
155
Individual computation unit that transforms inputs to outputs via Python functions.
156
157
```python { .api }
158
class Node:
159
"""Represents a single computation unit in a pipeline."""
160
161
def __init__(self, func, inputs, outputs, name=None, tags=None, confirms=None):
162
"""
163
Initialize node.
164
165
Args:
166
func (Callable): Python function to execute
167
inputs (str or list): Input dataset name(s)
168
outputs (str or list): Output dataset name(s)
169
name (str, optional): Node name (defaults to function name)
170
tags (str or Iterable[str], optional): Node tags
171
confirms (str or list, optional): Dataset names to confirm
172
"""
173
174
def run(self, inputs):
175
"""
176
Execute node with given inputs.
177
178
Args:
179
inputs (dict): Input data mapped by dataset names
180
181
Returns:
182
dict: Output data mapped by dataset names
183
"""
184
185
def bind(self, **kwargs):
186
"""
187
Bind specific values to node inputs.
188
189
Args:
190
**kwargs: Input names and values to bind
191
192
Returns:
193
Node: New node with bound inputs
194
"""
195
196
def tag(self, tags):
197
"""
198
Add tags to node.
199
200
Args:
201
tags (str or Iterable[str]): Tags to add
202
203
Returns:
204
Node: Node with added tags
205
"""
206
207
def describe(self):
208
"""
209
Describe node configuration.
210
211
Returns:
212
dict: Node description
213
"""
214
215
def to_dict(self):
216
"""
217
Convert node to dictionary representation.
218
219
Returns:
220
dict: Node as dictionary
221
"""
222
223
@classmethod
224
def from_dict(cls, node_dict):
225
"""
226
Create node from dictionary representation.
227
228
Args:
229
node_dict (dict): Node dictionary
230
231
Returns:
232
Node: Node instance
233
"""
234
235
@property
236
def name(self):
237
"""Node name."""
238
239
@property
240
def inputs(self):
241
"""Node inputs."""
242
243
@property
244
def outputs(self):
245
"""Node outputs."""
246
247
@property
248
def tags(self):
249
"""Node tags."""
250
251
class GroupedNodes:
252
"""Represents a group of nodes that can be treated as a single unit."""
253
254
def __init__(self, nodes):
255
"""
256
Initialize grouped nodes.
257
258
Args:
259
nodes (Iterable[Node]): Nodes to group
260
"""
261
262
def run(self, inputs):
263
"""Execute all nodes in group."""
264
265
def tag(self, tags):
266
"""Add tags to all nodes in group."""
267
268
def describe(self):
269
"""Describe grouped nodes."""
270
```
271
272
### Factory Functions
273
274
Convenient factory functions for creating nodes and pipelines.
275
276
```python { .api }
277
def node(func, inputs, outputs, name=None, tags=None, confirms=None):
278
"""
279
Create a Node instance.
280
281
Args:
282
func (Callable): Python function to execute
283
inputs (str or list): Input dataset name(s)
284
outputs (str or list): Output dataset name(s)
285
name (str, optional): Node name
286
tags (str or Iterable[str], optional): Node tags
287
confirms (str or list, optional): Dataset names to confirm
288
289
Returns:
290
Node: Node instance
291
"""
292
293
def pipeline(pipe, inputs=None, outputs=None, parameters=None, tags=None):
294
"""
295
Create a Pipeline instance.
296
297
Args:
298
pipe (Iterable[Node] or Pipeline): Nodes or existing pipeline
299
inputs (dict, optional): Input mapping for pipeline parameterization
300
outputs (dict, optional): Output mapping for pipeline parameterization
301
parameters (dict, optional): Parameter mapping for pipeline
302
tags (str or Iterable[str], optional): Tags to apply to all nodes
303
304
Returns:
305
Pipeline: Pipeline instance
306
"""
307
```
308
309
## Usage Examples
310
311
### Basic Node and Pipeline Creation
312
313
```python
314
from kedro.pipeline import node, pipeline
315
316
# Define processing functions
317
def clean_data(raw_data):
318
"""Remove nulls and duplicates."""
319
return [x for x in raw_data if x is not None]
320
321
def transform_data(clean_data):
322
"""Apply transformations."""
323
return [x * 2 for x in clean_data]
324
325
def aggregate_data(transformed_data):
326
"""Calculate summary statistics."""
327
return {
328
'count': len(transformed_data),
329
'sum': sum(transformed_data),
330
'average': sum(transformed_data) / len(transformed_data)
331
}
332
333
# Create nodes
334
clean_node = node(
335
func=clean_data,
336
inputs="raw_data",
337
outputs="clean_data",
338
name="clean_data_node",
339
tags=["preprocessing"]
340
)
341
342
transform_node = node(
343
func=transform_data,
344
inputs="clean_data",
345
outputs="transformed_data",
346
name="transform_data_node",
347
tags=["processing"]
348
)
349
350
aggregate_node = node(
351
func=aggregate_data,
352
inputs="transformed_data",
353
outputs="summary_stats",
354
name="aggregate_data_node",
355
tags=["aggregation"]
356
)
357
358
# Create pipeline
359
data_pipeline = pipeline([
360
clean_node,
361
transform_node,
362
aggregate_node
363
])
364
```
365
366
### Pipeline Filtering and Composition
367
368
```python
369
from kedro.pipeline import node, pipeline
370
371
# Create multiple processing pipelines
372
preprocessing_pipeline = pipeline([
373
node(validate_data, "raw_data", "validated_data", tags=["validation"]),
374
node(clean_data, "validated_data", "clean_data", tags=["cleaning"])
375
])
376
377
feature_pipeline = pipeline([
378
node(extract_features, "clean_data", "features", tags=["feature_extraction"]),
379
node(scale_features, "features", "scaled_features", tags=["scaling"])
380
])
381
382
model_pipeline = pipeline([
383
node(train_model, ["scaled_features", "parameters:model"], "trained_model", tags=["training"]),
384
node(evaluate_model, ["trained_model", "test_data"], "metrics", tags=["evaluation"])
385
])
386
387
# Combine pipelines
388
full_pipeline = preprocessing_pipeline + feature_pipeline + model_pipeline
389
390
# Filter by tags
391
training_pipeline = full_pipeline.filter(tags=["training", "evaluation"])
392
393
# Filter by node names
394
validation_only = full_pipeline.only_nodes("validate_data_node")
395
396
# Filter by inputs/outputs
397
feature_to_model = full_pipeline.from_inputs("clean_data").to_outputs("metrics")
398
```
399
400
### Pipeline Parameterization
401
402
```python
403
from kedro.pipeline import node, pipeline
404
405
def process_with_config(data, config):
406
"""Process data with configuration parameters."""
407
return [x * config["multiplier"] for x in data]
408
409
# Create parameterized pipeline
410
def create_processing_pipeline():
411
return pipeline([
412
node(
413
func=process_with_config,
414
inputs=["input_data", "parameters:processing_config"],
415
outputs="processed_data",
416
name="process_data"
417
)
418
])
419
420
# Use with different parameter sets
421
pipeline_v1 = create_processing_pipeline()
422
423
# Create pipeline variant with different inputs/outputs
424
pipeline_v2 = pipeline(
425
pipeline_v1,
426
inputs={"input_data": "alternative_input"},
427
outputs={"processed_data": "alternative_output"}
428
)
429
```
430
431
### Node Input/Output Patterns
432
433
```python
434
from kedro.pipeline import node
435
436
# Single input/output
437
simple_node = node(
438
func=lambda x: x * 2,
439
inputs="input_data",
440
outputs="output_data"
441
)
442
443
# Multiple inputs
444
multi_input_node = node(
445
func=lambda x, y: x + y,
446
inputs=["data_a", "data_b"],
447
outputs="combined_data"
448
)
449
450
# Multiple outputs
451
multi_output_node = node(
452
func=lambda data: (data[:5], data[5:]),
453
inputs="full_data",
454
outputs=["first_half", "second_half"]
455
)
456
457
# Dictionary inputs (for named parameters)
458
dict_input_node = node(
459
func=lambda data, config: process_data(data, **config),
460
inputs={"data": "raw_data", "config": "parameters:processing"},
461
outputs="processed_data"
462
)
463
464
# Parameter inputs
465
param_node = node(
466
func=lambda data, multiplier: [x * multiplier for x in data],
467
inputs=["input_data", "parameters:multiplier"],
468
outputs="scaled_data"
469
)
470
```
471
472
### Advanced Pipeline Operations
473
474
```python
475
from kedro.pipeline import node, pipeline
476
477
# Create base pipeline
478
base_pipeline = pipeline([
479
node(load_data, None, "raw_data"),
480
node(preprocess, "raw_data", "clean_data"),
481
node(analyze, "clean_data", "results")
482
])
483
484
# Apply tags to entire pipeline
485
tagged_pipeline = base_pipeline.tag(["analysis", "v1"])
486
487
# Create conditional pipeline branches
488
training_branch = pipeline([
489
node(split_data, "clean_data", ["train_data", "test_data"]),
490
node(train_model, "train_data", "model"),
491
node(evaluate_model, ["model", "test_data"], "metrics")
492
])
493
494
inference_branch = pipeline([
495
node(load_model, "parameters:model_path", "model"),
496
node(predict, ["model", "clean_data"], "predictions")
497
])
498
499
# Combine with base pipeline
500
full_training_pipeline = base_pipeline + training_branch
501
full_inference_pipeline = base_pipeline + inference_branch
502
503
# Create pipeline that runs different branches based on mode
504
def create_conditional_pipeline(mode="training"):
505
if mode == "training":
506
return full_training_pipeline
507
else:
508
return full_inference_pipeline
509
```
510
511
## Types
512
513
```python { .api }
514
from typing import Callable, Dict, List, Set, Any, Optional, Union, Iterable
515
516
NodeFunc = Callable[..., Any]
517
NodeInputs = Union[str, List[str], Dict[str, str]]
518
NodeOutputs = Union[str, List[str]]
519
NodeTags = Union[str, Set[str], List[str]]
520
NodeName = str
521
DatasetName = str
522
ParameterName = str
523
```