or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-project.mdconfiguration.mdcontext-session.mddata-catalog.mdhooks.mdindex.mdipython-integration.mdpipeline-construction.mdpipeline-execution.md

pipeline-construction.mddocs/

0

# Pipeline Construction

1

2

Kedro's pipeline system provides tools for building directed acyclic graphs (DAGs) of computation nodes with automatic dependency resolution. Pipelines enable modular, reusable data processing workflows with filtering, composition, and transformation capabilities.

3

4

## Capabilities

5

6

### Pipeline Class

7

8

Container for nodes with dependency management, filtering, and composition operations.

9

10

```python { .api }

11

class Pipeline:

12

"""Collection of nodes with dependency management and filtering."""

13

14

def __init__(self, nodes, *, inputs=None, outputs=None, parameters=None, tags=None, namespace=None, prefix_datasets_with_namespace=True):

15

"""

16

Initialize pipeline with nodes.

17

18

Args:

19

nodes (Iterable[Node] or Pipeline): Collection of pipeline nodes or existing pipeline

20

inputs (str or set or dict, optional): Input names to expose as connection points

21

outputs (str or set or dict, optional): Output names to expose as connection points

22

parameters (str or set or dict, optional): Parameter names to expose

23

tags (str or Iterable[str], optional): Tags to apply to all nodes

24

namespace (str, optional): Namespace for the pipeline

25

prefix_datasets_with_namespace (bool): Whether to prefix dataset names with namespace

26

"""

27

28

def filter(self, tags=None, from_nodes=None, to_nodes=None, node_names=None, from_inputs=None, to_outputs=None):

29

"""

30

Filter pipeline based on various criteria.

31

32

Args:

33

tags (str or Iterable[str], optional): Filter by node tags

34

from_nodes (str or Iterable[str], optional): Include nodes from specified nodes

35

to_nodes (str or Iterable[str], optional): Include nodes up to specified nodes

36

node_names (str or Iterable[str], optional): Filter by node names

37

from_inputs (str or Iterable[str], optional): Include nodes from specified inputs

38

to_outputs (str or Iterable[str], optional): Include nodes to specified outputs

39

40

Returns:

41

Pipeline: Filtered pipeline

42

"""

43

44

def tag(self, tags):

45

"""

46

Add tags to all nodes in pipeline.

47

48

Args:

49

tags (str or Iterable[str]): Tags to add

50

51

Returns:

52

Pipeline: Pipeline with tagged nodes

53

"""

54

55

def only_nodes(self, *node_names):

56

"""

57

Create pipeline with only specified nodes.

58

59

Args:

60

*node_names: Node names to include

61

62

Returns:

63

Pipeline: Pipeline with only specified nodes

64

"""

65

66

def from_nodes(self, *node_names):

67

"""

68

Create pipeline starting from specified nodes.

69

70

Args:

71

*node_names: Node names to start from

72

73

Returns:

74

Pipeline: Pipeline starting from specified nodes

75

"""

76

77

def to_nodes(self, *node_names):

78

"""

79

Create pipeline ending at specified nodes.

80

81

Args:

82

*node_names: Node names to end at

83

84

Returns:

85

Pipeline: Pipeline ending at specified nodes

86

"""

87

88

def from_inputs(self, *inputs):

89

"""

90

Create pipeline starting from specified inputs.

91

92

Args:

93

*inputs: Input dataset names

94

95

Returns:

96

Pipeline: Pipeline starting from specified inputs

97

"""

98

99

def to_outputs(self, *outputs):

100

"""

101

Create pipeline ending at specified outputs.

102

103

Args:

104

*outputs: Output dataset names

105

106

Returns:

107

Pipeline: Pipeline ending at specified outputs

108

"""

109

110

def describe(self):

111

"""

112

Describe pipeline structure.

113

114

Returns:

115

str: Pipeline description

116

"""

117

118

def __add__(self, other):

119

"""

120

Combine pipelines using + operator.

121

122

Args:

123

other (Pipeline): Pipeline to combine with

124

125

Returns:

126

Pipeline: Combined pipeline

127

"""

128

129

def __or__(self, other):

130

"""

131

Combine pipelines using | operator (union).

132

133

Args:

134

other (Pipeline): Pipeline to combine with

135

136

Returns:

137

Pipeline: Combined pipeline

138

"""

139

140

@property

141

def nodes(self):

142

"""List of nodes in pipeline."""

143

144

@property

145

def all_inputs(self):

146

"""Set of all input dataset names."""

147

148

@property

149

def all_outputs(self):

150

"""Set of all output dataset names."""

151

```

152

153

### Node Class

154

155

Individual computation unit that transforms inputs to outputs via Python functions.

156

157

```python { .api }

158

class Node:

159

"""Represents a single computation unit in a pipeline."""

160

161

def __init__(self, func, inputs, outputs, name=None, tags=None, confirms=None):

162

"""

163

Initialize node.

164

165

Args:

166

func (Callable): Python function to execute

167

inputs (str or list): Input dataset name(s)

168

outputs (str or list): Output dataset name(s)

169

name (str, optional): Node name (defaults to function name)

170

tags (str or Iterable[str], optional): Node tags

171

confirms (str or list, optional): Dataset names to confirm

172

"""

173

174

def run(self, inputs):

175

"""

176

Execute node with given inputs.

177

178

Args:

179

inputs (dict): Input data mapped by dataset names

180

181

Returns:

182

dict: Output data mapped by dataset names

183

"""

184

185

def bind(self, **kwargs):

186

"""

187

Bind specific values to node inputs.

188

189

Args:

190

**kwargs: Input names and values to bind

191

192

Returns:

193

Node: New node with bound inputs

194

"""

195

196

def tag(self, tags):

197

"""

198

Add tags to node.

199

200

Args:

201

tags (str or Iterable[str]): Tags to add

202

203

Returns:

204

Node: Node with added tags

205

"""

206

207

def describe(self):

208

"""

209

Describe node configuration.

210

211

Returns:

212

dict: Node description

213

"""

214

215

def to_dict(self):

216

"""

217

Convert node to dictionary representation.

218

219

Returns:

220

dict: Node as dictionary

221

"""

222

223

@classmethod

224

def from_dict(cls, node_dict):

225

"""

226

Create node from dictionary representation.

227

228

Args:

229

node_dict (dict): Node dictionary

230

231

Returns:

232

Node: Node instance

233

"""

234

235

@property

236

def name(self):

237

"""Node name."""

238

239

@property

240

def inputs(self):

241

"""Node inputs."""

242

243

@property

244

def outputs(self):

245

"""Node outputs."""

246

247

@property

248

def tags(self):

249

"""Node tags."""

250

251

class GroupedNodes:

252

"""Represents a group of nodes that can be treated as a single unit."""

253

254

def __init__(self, nodes):

255

"""

256

Initialize grouped nodes.

257

258

Args:

259

nodes (Iterable[Node]): Nodes to group

260

"""

261

262

def run(self, inputs):

263

"""Execute all nodes in group."""

264

265

def tag(self, tags):

266

"""Add tags to all nodes in group."""

267

268

def describe(self):

269

"""Describe grouped nodes."""

270

```

271

272

### Factory Functions

273

274

Convenient factory functions for creating nodes and pipelines.

275

276

```python { .api }

277

def node(func, inputs, outputs, name=None, tags=None, confirms=None):

278

"""

279

Create a Node instance.

280

281

Args:

282

func (Callable): Python function to execute

283

inputs (str or list): Input dataset name(s)

284

outputs (str or list): Output dataset name(s)

285

name (str, optional): Node name

286

tags (str or Iterable[str], optional): Node tags

287

confirms (str or list, optional): Dataset names to confirm

288

289

Returns:

290

Node: Node instance

291

"""

292

293

def pipeline(pipe, inputs=None, outputs=None, parameters=None, tags=None):

294

"""

295

Create a Pipeline instance.

296

297

Args:

298

pipe (Iterable[Node] or Pipeline): Nodes or existing pipeline

299

inputs (dict, optional): Input mapping for pipeline parameterization

300

outputs (dict, optional): Output mapping for pipeline parameterization

301

parameters (dict, optional): Parameter mapping for pipeline

302

tags (str or Iterable[str], optional): Tags to apply to all nodes

303

304

Returns:

305

Pipeline: Pipeline instance

306

"""

307

```

308

309

## Usage Examples

310

311

### Basic Node and Pipeline Creation

312

313

```python

314

from kedro.pipeline import node, pipeline

315

316

# Define processing functions

317

def clean_data(raw_data):

318

"""Remove nulls and duplicates."""

319

return [x for x in raw_data if x is not None]

320

321

def transform_data(clean_data):

322

"""Apply transformations."""

323

return [x * 2 for x in clean_data]

324

325

def aggregate_data(transformed_data):

326

"""Calculate summary statistics."""

327

return {

328

'count': len(transformed_data),

329

'sum': sum(transformed_data),

330

'average': sum(transformed_data) / len(transformed_data)

331

}

332

333

# Create nodes

334

clean_node = node(

335

func=clean_data,

336

inputs="raw_data",

337

outputs="clean_data",

338

name="clean_data_node",

339

tags=["preprocessing"]

340

)

341

342

transform_node = node(

343

func=transform_data,

344

inputs="clean_data",

345

outputs="transformed_data",

346

name="transform_data_node",

347

tags=["processing"]

348

)

349

350

aggregate_node = node(

351

func=aggregate_data,

352

inputs="transformed_data",

353

outputs="summary_stats",

354

name="aggregate_data_node",

355

tags=["aggregation"]

356

)

357

358

# Create pipeline

359

data_pipeline = pipeline([

360

clean_node,

361

transform_node,

362

aggregate_node

363

])

364

```

365

366

### Pipeline Filtering and Composition

367

368

```python

369

from kedro.pipeline import node, pipeline

370

371

# Create multiple processing pipelines

372

preprocessing_pipeline = pipeline([

373

node(validate_data, "raw_data", "validated_data", tags=["validation"]),

374

node(clean_data, "validated_data", "clean_data", tags=["cleaning"])

375

])

376

377

feature_pipeline = pipeline([

378

node(extract_features, "clean_data", "features", tags=["feature_extraction"]),

379

node(scale_features, "features", "scaled_features", tags=["scaling"])

380

])

381

382

model_pipeline = pipeline([

383

node(train_model, ["scaled_features", "parameters:model"], "trained_model", tags=["training"]),

384

node(evaluate_model, ["trained_model", "test_data"], "metrics", tags=["evaluation"])

385

])

386

387

# Combine pipelines

388

full_pipeline = preprocessing_pipeline + feature_pipeline + model_pipeline

389

390

# Filter by tags

391

training_pipeline = full_pipeline.filter(tags=["training", "evaluation"])

392

393

# Filter by node names

394

validation_only = full_pipeline.only_nodes("validate_data_node")

395

396

# Filter by inputs/outputs

397

feature_to_model = full_pipeline.from_inputs("clean_data").to_outputs("metrics")

398

```

399

400

### Pipeline Parameterization

401

402

```python

403

from kedro.pipeline import node, pipeline

404

405

def process_with_config(data, config):

406

"""Process data with configuration parameters."""

407

return [x * config["multiplier"] for x in data]

408

409

# Create parameterized pipeline

410

def create_processing_pipeline():

411

return pipeline([

412

node(

413

func=process_with_config,

414

inputs=["input_data", "parameters:processing_config"],

415

outputs="processed_data",

416

name="process_data"

417

)

418

])

419

420

# Use with different parameter sets

421

pipeline_v1 = create_processing_pipeline()

422

423

# Create pipeline variant with different inputs/outputs

424

pipeline_v2 = pipeline(

425

pipeline_v1,

426

inputs={"input_data": "alternative_input"},

427

outputs={"processed_data": "alternative_output"}

428

)

429

```

430

431

### Node Input/Output Patterns

432

433

```python

434

from kedro.pipeline import node

435

436

# Single input/output

437

simple_node = node(

438

func=lambda x: x * 2,

439

inputs="input_data",

440

outputs="output_data"

441

)

442

443

# Multiple inputs

444

multi_input_node = node(

445

func=lambda x, y: x + y,

446

inputs=["data_a", "data_b"],

447

outputs="combined_data"

448

)

449

450

# Multiple outputs

451

multi_output_node = node(

452

func=lambda data: (data[:5], data[5:]),

453

inputs="full_data",

454

outputs=["first_half", "second_half"]

455

)

456

457

# Dictionary inputs (for named parameters)

458

dict_input_node = node(

459

func=lambda data, config: process_data(data, **config),

460

inputs={"data": "raw_data", "config": "parameters:processing"},

461

outputs="processed_data"

462

)

463

464

# Parameter inputs

465

param_node = node(

466

func=lambda data, multiplier: [x * multiplier for x in data],

467

inputs=["input_data", "parameters:multiplier"],

468

outputs="scaled_data"

469

)

470

```

471

472

### Advanced Pipeline Operations

473

474

```python

475

from kedro.pipeline import node, pipeline

476

477

# Create base pipeline

478

base_pipeline = pipeline([

479

node(load_data, None, "raw_data"),

480

node(preprocess, "raw_data", "clean_data"),

481

node(analyze, "clean_data", "results")

482

])

483

484

# Apply tags to entire pipeline

485

tagged_pipeline = base_pipeline.tag(["analysis", "v1"])

486

487

# Create conditional pipeline branches

488

training_branch = pipeline([

489

node(split_data, "clean_data", ["train_data", "test_data"]),

490

node(train_model, "train_data", "model"),

491

node(evaluate_model, ["model", "test_data"], "metrics")

492

])

493

494

inference_branch = pipeline([

495

node(load_model, "parameters:model_path", "model"),

496

node(predict, ["model", "clean_data"], "predictions")

497

])

498

499

# Combine with base pipeline

500

full_training_pipeline = base_pipeline + training_branch

501

full_inference_pipeline = base_pipeline + inference_branch

502

503

# Create pipeline that runs different branches based on mode

504

def create_conditional_pipeline(mode="training"):

505

if mode == "training":

506

return full_training_pipeline

507

else:

508

return full_inference_pipeline

509

```

510

511

## Types

512

513

```python { .api }

514

from typing import Callable, Dict, List, Set, Any, Optional, Union, Iterable

515

516

NodeFunc = Callable[..., Any]

517

NodeInputs = Union[str, List[str], Dict[str, str]]

518

NodeOutputs = Union[str, List[str]]

519

NodeTags = Union[str, Set[str], List[str]]

520

NodeName = str

521

DatasetName = str

522

ParameterName = str

523

```