or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-project.mdconfiguration.mdcontext-session.mddata-catalog.mdhooks.mdindex.mdipython-integration.mdpipeline-construction.mdpipeline-execution.md

pipeline-execution.mddocs/

0

# Pipeline Execution

1

2

Kedro provides multiple execution strategies for running pipelines with different parallelization approaches. Runners handle the orchestration of node execution, data loading/saving, and dependency resolution.

3

4

## Capabilities

5

6

### Abstract Runner

7

8

Base class defining the interface for all pipeline runners with common execution patterns.

9

10

```python { .api }

11

class AbstractRunner:

12

"""Abstract base class for all pipeline runners."""

13

14

def run(self, pipeline, catalog, hook_manager=None, run_id=None, only_missing_outputs=False):

15

"""

16

Run pipeline with given catalog.

17

18

Args:

19

pipeline (Pipeline): Pipeline to execute

20

catalog (CatalogProtocol): Data catalog for loading/saving datasets

21

hook_manager (PluginManager, optional): Hook manager for lifecycle events

22

run_id (str, optional): Run identifier for tracking

23

only_missing_outputs (bool): Whether to skip nodes whose outputs already exist

24

25

Returns:

26

dict: Mapping of dataset names to final node that produced them

27

"""

28

29

def run_only_missing(self, pipeline, catalog, hook_manager=None, run_id=None):

30

"""

31

Run pipeline but skip nodes whose outputs already exist.

32

33

Args:

34

pipeline (Pipeline): Pipeline to execute

35

catalog (CatalogProtocol): Data catalog for loading/saving datasets

36

hook_manager (PluginManager, optional): Hook manager for lifecycle events

37

run_id (str, optional): Run identifier for tracking

38

39

Returns:

40

dict: Mapping of dataset names to final node that produced them

41

"""

42

43

def create_default_data_set(self, ds_name):

44

"""

45

Create default dataset for missing catalog entries.

46

47

Args:

48

ds_name (str): Dataset name

49

50

Returns:

51

AbstractDataset: Default dataset instance

52

"""

53

```

54

55

### Sequential Runner

56

57

Executes pipeline nodes sequentially in a single thread with simple dependency resolution.

58

59

```python { .api }

60

class SequentialRunner(AbstractRunner):

61

"""Runs pipeline nodes sequentially in a single thread."""

62

63

def __init__(self, is_async=False):

64

"""

65

Initialize sequential runner.

66

67

Args:

68

is_async (bool): Whether to run nodes asynchronously

69

"""

70

```

71

72

### Parallel Runner

73

74

Executes pipeline nodes in parallel using multiprocessing for CPU-intensive workloads.

75

76

```python { .api }

77

class ParallelRunner(AbstractRunner):

78

"""Runs pipeline nodes in parallel using multiprocessing."""

79

80

def __init__(self, max_workers=None, is_async=False):

81

"""

82

Initialize parallel runner.

83

84

Args:

85

max_workers (int, optional): Maximum number of worker processes

86

is_async (bool): Whether to run nodes asynchronously

87

"""

88

```

89

90

### Thread Runner

91

92

Executes pipeline nodes in parallel using threading for I/O-bound workloads.

93

94

```python { .api }

95

class ThreadRunner(AbstractRunner):

96

"""Runs pipeline nodes in parallel using threading."""

97

98

def __init__(self, max_workers=None, is_async=False):

99

"""

100

Initialize thread runner.

101

102

Args:

103

max_workers (int, optional): Maximum number of worker threads

104

is_async (bool): Whether to run nodes asynchronously

105

"""

106

```

107

108

### Task Utilities

109

110

Support classes for parallel execution and task management.

111

112

```python { .api }

113

class Task:

114

"""Represents a runnable task for parallel execution."""

115

116

def __init__(self, node, catalog):

117

"""

118

Initialize task.

119

120

Args:

121

node (Node): Pipeline node to execute

122

catalog (DataCatalog): Data catalog for loading/saving

123

"""

124

125

def run(self):

126

"""

127

Execute the task.

128

129

Returns:

130

Any: Task execution result

131

"""

132

133

@property

134

def node(self):

135

"""Pipeline node being executed."""

136

137

@property

138

def catalog(self):

139

"""Data catalog for dataset operations."""

140

```

141

142

## Usage Examples

143

144

### Basic Runner Usage

145

146

```python

147

from kedro.pipeline import node, pipeline

148

from kedro.io import DataCatalog, MemoryDataset

149

from kedro.runner import SequentialRunner

150

151

# Define processing functions

152

def process_data(input_data):

153

return [x * 2 for x in input_data]

154

155

def summarize_data(processed_data):

156

return {"count": len(processed_data), "sum": sum(processed_data)}

157

158

# Create pipeline

159

data_pipeline = pipeline([

160

node(process_data, "raw_data", "processed_data"),

161

node(summarize_data, "processed_data", "summary")

162

])

163

164

# Set up catalog

165

catalog = DataCatalog({

166

"raw_data": MemoryDataset([1, 2, 3, 4, 5]),

167

"processed_data": MemoryDataset(),

168

"summary": MemoryDataset()

169

})

170

171

# Run with sequential runner

172

runner = SequentialRunner()

173

result = runner.run(data_pipeline, catalog)

174

175

# Access results

176

summary = catalog.load("summary")

177

print(summary) # {"count": 5, "sum": 30}

178

```

179

180

### Parallel Execution

181

182

```python

183

from kedro.runner import ParallelRunner

184

from kedro.pipeline import node, pipeline

185

from kedro.io import DataCatalog, MemoryDataset

186

187

# CPU-intensive processing functions

188

def expensive_computation_a(data):

189

# Simulate expensive computation

190

import time

191

time.sleep(1)

192

return [x ** 2 for x in data]

193

194

def expensive_computation_b(data):

195

# Simulate expensive computation

196

import time

197

time.sleep(1)

198

return [x ** 3 for x in data]

199

200

def combine_results(results_a, results_b):

201

return list(zip(results_a, results_b))

202

203

# Create pipeline with parallel branches

204

parallel_pipeline = pipeline([

205

node(expensive_computation_a, "input_data", "squared_data", name="square"),

206

node(expensive_computation_b, "input_data", "cubed_data", name="cube"),

207

node(combine_results, ["squared_data", "cubed_data"], "combined_data", name="combine")

208

])

209

210

# Set up catalog

211

catalog = DataCatalog({

212

"input_data": MemoryDataset([1, 2, 3, 4, 5]),

213

"squared_data": MemoryDataset(),

214

"cubed_data": MemoryDataset(),

215

"combined_data": MemoryDataset()

216

})

217

218

# Run with parallel runner (square and cube nodes run in parallel)

219

parallel_runner = ParallelRunner(max_workers=2)

220

result = parallel_runner.run(parallel_pipeline, catalog)

221

222

# Access results

223

combined = catalog.load("combined_data")

224

print(combined) # [(1, 1), (4, 8), (9, 27), (16, 64), (25, 125)]

225

```

226

227

### Thread Runner for I/O Operations

228

229

```python

230

from kedro.runner import ThreadRunner

231

from kedro.pipeline import node, pipeline

232

from kedro.io import DataCatalog, MemoryDataset

233

import time

234

import requests

235

236

# I/O-bound functions

237

def fetch_data_source_a():

238

# Simulate API call

239

time.sleep(0.5)

240

return {"source": "A", "data": [1, 2, 3]}

241

242

def fetch_data_source_b():

243

# Simulate API call

244

time.sleep(0.5)

245

return {"source": "B", "data": [4, 5, 6]}

246

247

def merge_sources(source_a, source_b):

248

return {

249

"sources": [source_a["source"], source_b["source"]],

250

"combined_data": source_a["data"] + source_b["data"]

251

}

252

253

# Create pipeline with I/O operations

254

io_pipeline = pipeline([

255

node(fetch_data_source_a, None, "data_a", name="fetch_a"),

256

node(fetch_data_source_b, None, "data_b", name="fetch_b"),

257

node(merge_sources, ["data_a", "data_b"], "merged_data", name="merge")

258

])

259

260

# Set up catalog

261

catalog = DataCatalog({

262

"data_a": MemoryDataset(),

263

"data_b": MemoryDataset(),

264

"merged_data": MemoryDataset()

265

})

266

267

# Run with thread runner (fetch operations run concurrently)

268

thread_runner = ThreadRunner(max_workers=2)

269

result = thread_runner.run(io_pipeline, catalog)

270

271

# Access results

272

merged = catalog.load("merged_data")

273

print(merged) # {"sources": ["A", "B"], "combined_data": [1, 2, 3, 4, 5, 6]}

274

```

275

276

### Partial Pipeline Execution

277

278

```python

279

from kedro.runner import SequentialRunner

280

from kedro.pipeline import node, pipeline

281

from kedro.io import DataCatalog, MemoryDataset

282

283

# Create multi-stage pipeline

284

full_pipeline = pipeline([

285

node(load_raw_data, None, "raw_data", name="load"),

286

node(validate_data, "raw_data", "validated_data", name="validate"),

287

node(clean_data, "validated_data", "clean_data", name="clean"),

288

node(feature_engineering, "clean_data", "features", name="features"),

289

node(train_model, "features", "model", name="train"),

290

node(evaluate_model, "model", "metrics", name="evaluate")

291

])

292

293

# Set up catalog with some existing data

294

catalog = DataCatalog({

295

"raw_data": MemoryDataset(),

296

"validated_data": MemoryDataset(),

297

"clean_data": MemoryDataset([1, 2, 3, 4, 5]), # Pre-existing clean data

298

"features": MemoryDataset(),

299

"model": MemoryDataset(),

300

"metrics": MemoryDataset()

301

})

302

303

# Run only missing nodes (will skip load, validate, clean)

304

runner = SequentialRunner()

305

result = runner.run_only_missing(full_pipeline, catalog)

306

307

# Or run specific pipeline subset

308

feature_and_model_pipeline = full_pipeline.from_nodes("features").to_nodes("evaluate")

309

result = runner.run(feature_and_model_pipeline, catalog)

310

```

311

312

### Custom Runner Implementation

313

314

```python

315

from kedro.runner import AbstractRunner

316

from kedro.io import MemoryDataset

317

import logging

318

319

class LoggingRunner(AbstractRunner):

320

"""Custom runner that logs detailed execution information."""

321

322

def __init__(self):

323

self.logger = logging.getLogger(__name__)

324

325

def run(self, pipeline, catalog, hook_manager=None, session_id=None):

326

"""Run pipeline with detailed logging."""

327

self.logger.info(f"Starting pipeline execution with {len(pipeline.nodes)} nodes")

328

329

# Simple sequential execution with logging

330

for node in pipeline.nodes:

331

self.logger.info(f"Executing node: {node.name}")

332

333

# Load inputs

334

inputs = {}

335

for input_name in node.inputs:

336

if catalog.exists(input_name):

337

inputs[input_name] = catalog.load(input_name)

338

self.logger.info(f"Loaded input: {input_name}")

339

else:

340

# Create default dataset

341

catalog.add(input_name, self.create_default_data_set(input_name))

342

inputs[input_name] = catalog.load(input_name)

343

self.logger.warning(f"Created default dataset for: {input_name}")

344

345

# Execute node

346

try:

347

outputs = node.run(inputs)

348

self.logger.info(f"Node {node.name} executed successfully")

349

350

# Save outputs

351

if isinstance(outputs, dict):

352

for output_name, output_data in outputs.items():

353

catalog.save(output_name, output_data)

354

self.logger.info(f"Saved output: {output_name}")

355

else:

356

# Single output

357

output_name = node.outputs[0] if isinstance(node.outputs, list) else node.outputs

358

catalog.save(output_name, outputs)

359

self.logger.info(f"Saved output: {output_name}")

360

361

except Exception as e:

362

self.logger.error(f"Node {node.name} failed: {str(e)}")

363

raise

364

365

self.logger.info("Pipeline execution completed")

366

return {}

367

368

# Usage

369

logging.basicConfig(level=logging.INFO)

370

custom_runner = LoggingRunner()

371

result = custom_runner.run(pipeline, catalog)

372

```

373

374

### Runner Selection Based on Pipeline Characteristics

375

376

```python

377

def select_runner(pipeline, max_parallel_nodes=4):

378

"""Select appropriate runner based on pipeline characteristics."""

379

380

# Count nodes that can run in parallel

381

parallel_nodes = 0

382

node_dependencies = {}

383

384

for node in pipeline.nodes:

385

dependencies = set()

386

for other_node in pipeline.nodes:

387

if set(node.inputs) & set(other_node.outputs):

388

dependencies.add(other_node.name)

389

node_dependencies[node.name] = dependencies

390

391

# Simple heuristic: if many nodes have no dependencies, use parallel execution

392

independent_nodes = sum(1 for deps in node_dependencies.values() if not deps)

393

394

if independent_nodes >= max_parallel_nodes:

395

return ParallelRunner(max_workers=min(independent_nodes, 4))

396

elif independent_nodes >= 2:

397

return ThreadRunner(max_workers=min(independent_nodes, 2))

398

else:

399

return SequentialRunner()

400

401

# Usage

402

optimal_runner = select_runner(my_pipeline)

403

result = optimal_runner.run(my_pipeline, catalog)

404

```

405

406

## Types

407

408

```python { .api }

409

from typing import Dict, Optional, Any, Callable

410

from kedro.pipeline import Pipeline

411

from kedro.io import DataCatalog

412

413

RunResult = Dict[str, Any]

414

HookManager = Any # Hook manager type

415

SessionId = Optional[str]

416

WorkerCount = Optional[int]

417

```