or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-project.mdconfiguration.mdcontext-session.mddata-catalog.mdhooks.mdindex.mdipython-integration.mdpipeline-construction.mdpipeline-execution.md

hooks.mddocs/

0

# Hook System

1

2

Kedro's hook system provides a plugin architecture for extending framework functionality at various lifecycle points. Hooks enable custom behavior injection during node execution, pipeline runs, and catalog operations without modifying core code.

3

4

## Capabilities

5

6

### Hook Implementation

7

8

Decorator for marking methods as hook implementations with the plugin system.

9

10

```python { .api }

11

def hook_impl(func):

12

"""

13

Decorator to mark methods as hook implementations.

14

15

Args:

16

func (Callable): Function to mark as hook implementation

17

18

Returns:

19

Callable: Decorated function with hook metadata

20

21

Usage:

22

@hook_impl

23

def before_node_run(self, node, catalog, inputs, is_async, session_id):

24

# Custom logic before node execution

25

pass

26

"""

27

```

28

29

### Hook Manager

30

31

Factory function for creating hook managers that coordinate plugin execution.

32

33

```python { .api }

34

def _create_hook_manager():

35

"""

36

Create hook manager for plugin system.

37

38

Returns:

39

HookManager: Configured hook manager instance

40

41

Note:

42

This is an internal function used by the framework.

43

Users typically don't call this directly.

44

"""

45

```

46

47

## Available Hook Specifications

48

49

### Node Execution Hooks

50

51

Hooks that run during individual node execution lifecycle.

52

53

```python { .api }

54

class NodeHookSpecs:

55

"""Hook specifications for node execution events."""

56

57

def before_node_run(self, node, catalog, inputs, is_async, session_id):

58

"""

59

Called before a node runs.

60

61

Args:

62

node (Node): The node to be executed

63

catalog (DataCatalog): Data catalog instance

64

inputs (dict): Node inputs

65

is_async (bool): Whether node runs asynchronously

66

session_id (str): Session identifier

67

"""

68

69

def after_node_run(self, node, catalog, inputs, outputs, is_async, session_id):

70

"""

71

Called after a node runs successfully.

72

73

Args:

74

node (Node): The executed node

75

catalog (DataCatalog): Data catalog instance

76

inputs (dict): Node inputs that were used

77

outputs (dict): Node outputs that were produced

78

is_async (bool): Whether node ran asynchronously

79

session_id (str): Session identifier

80

"""

81

82

def on_node_error(self, error, node, catalog, inputs, is_async, session_id):

83

"""

84

Called when a node execution fails.

85

86

Args:

87

error (Exception): The exception that occurred

88

node (Node): The node that failed

89

catalog (DataCatalog): Data catalog instance

90

inputs (dict): Node inputs that were used

91

is_async (bool): Whether node was running asynchronously

92

session_id (str): Session identifier

93

"""

94

```

95

96

### Pipeline Execution Hooks

97

98

Hooks that run during pipeline execution lifecycle.

99

100

```python { .api }

101

class PipelineHookSpecs:

102

"""Hook specifications for pipeline execution events."""

103

104

def before_pipeline_run(self, run_params, pipeline, catalog):

105

"""

106

Called before a pipeline runs.

107

108

Args:

109

run_params (dict): Parameters for the pipeline run

110

pipeline (Pipeline): The pipeline to be executed

111

catalog (DataCatalog): Data catalog instance

112

"""

113

114

def after_pipeline_run(self, run_params, pipeline, catalog):

115

"""

116

Called after a pipeline runs successfully.

117

118

Args:

119

run_params (dict): Parameters used for the pipeline run

120

pipeline (Pipeline): The executed pipeline

121

catalog (DataCatalog): Data catalog instance

122

"""

123

124

def on_pipeline_error(self, error, run_params, pipeline, catalog):

125

"""

126

Called when a pipeline execution fails.

127

128

Args:

129

error (Exception): The exception that occurred

130

run_params (dict): Parameters used for the pipeline run

131

pipeline (Pipeline): The pipeline that failed

132

catalog (DataCatalog): Data catalog instance

133

"""

134

```

135

136

### Dataset Hooks

137

138

Hooks that run during dataset operations.

139

140

```python { .api }

141

class DatasetHookSpecs:

142

"""Hook specifications for dataset operation events."""

143

144

def before_dataset_loaded(self, dataset_name, node):

145

"""

146

Called before a dataset is loaded.

147

148

Args:

149

dataset_name (str): Name of the dataset to be loaded

150

node (Node): Node requesting the dataset (if applicable)

151

"""

152

153

def after_dataset_loaded(self, dataset_name, data, node):

154

"""

155

Called after a dataset is loaded.

156

157

Args:

158

dataset_name (str): Name of the dataset that was loaded

159

data: The loaded data

160

node (Node): Node that requested the dataset (if applicable)

161

"""

162

163

def before_dataset_saved(self, dataset_name, data, node):

164

"""

165

Called before a dataset is saved.

166

167

Args:

168

dataset_name (str): Name of the dataset to be saved

169

data: The data to be saved

170

node (Node): Node producing the dataset (if applicable)

171

"""

172

173

def after_dataset_saved(self, dataset_name, data, node):

174

"""

175

Called after a dataset is saved.

176

177

Args:

178

dataset_name (str): Name of the dataset that was saved

179

data: The data that was saved

180

node (Node): Node that produced the dataset (if applicable)

181

"""

182

```

183

184

## Usage Examples

185

186

### Basic Hook Implementation

187

188

```python

189

from kedro.framework.hooks import hook_impl

190

import logging

191

192

class ProjectHooks:

193

"""Custom hooks for project-specific functionality."""

194

195

@hook_impl

196

def before_node_run(self, node, catalog, inputs, is_async, session_id):

197

"""Log node execution start."""

198

logging.info(f"Starting execution of node: {node.name}")

199

logging.info(f"Node inputs: {list(inputs.keys())}")

200

201

@hook_impl

202

def after_node_run(self, node, catalog, inputs, outputs, is_async, session_id):

203

"""Log node execution completion."""

204

logging.info(f"Completed execution of node: {node.name}")

205

logging.info(f"Node outputs: {list(outputs.keys())}")

206

207

@hook_impl

208

def on_node_error(self, error, node, catalog, inputs, is_async, session_id):

209

"""Log node execution errors."""

210

logging.error(f"Node {node.name} failed with error: {str(error)}")

211

# Could also send alerts, save error state, etc.

212

213

# Register hooks in settings.py

214

HOOKS = (ProjectHooks(),)

215

```

216

217

### Performance Monitoring Hooks

218

219

```python

220

from kedro.framework.hooks import hook_impl

221

import time

222

from collections import defaultdict

223

224

class PerformanceHooks:

225

"""Hooks for monitoring execution performance."""

226

227

def __init__(self):

228

self.node_times = defaultdict(list)

229

self.pipeline_start_time = None

230

231

@hook_impl

232

def before_pipeline_run(self, run_params, pipeline, catalog):

233

"""Record pipeline start time."""

234

self.pipeline_start_time = time.time()

235

print(f"Starting pipeline with {len(pipeline.nodes)} nodes")

236

237

@hook_impl

238

def after_pipeline_run(self, run_params, pipeline, catalog):

239

"""Report pipeline execution statistics."""

240

total_time = time.time() - self.pipeline_start_time

241

print(f"Pipeline completed in {total_time:.2f} seconds")

242

243

# Report node-level statistics

244

print("\nNode execution times:")

245

for node_name, times in self.node_times.items():

246

avg_time = sum(times) / len(times)

247

print(f" {node_name}: {avg_time:.2f}s (avg of {len(times)} runs)")

248

249

@hook_impl

250

def before_node_run(self, node, catalog, inputs, is_async, session_id):

251

"""Record node start time."""

252

node._start_time = time.time()

253

254

@hook_impl

255

def after_node_run(self, node, catalog, inputs, outputs, is_async, session_id):

256

"""Record node execution time."""

257

execution_time = time.time() - node._start_time

258

self.node_times[node.name].append(execution_time)

259

260

if execution_time > 10: # Warn about slow nodes

261

print(f"WARNING: Node {node.name} took {execution_time:.2f}s to execute")

262

263

# Usage in settings.py

264

HOOKS = (PerformanceHooks(),)

265

```

266

267

### Data Validation Hooks

268

269

```python

270

from kedro.framework.hooks import hook_impl

271

import pandas as pd

272

273

class DataValidationHooks:

274

"""Hooks for automatic data validation."""

275

276

@hook_impl

277

def after_dataset_loaded(self, dataset_name, data, node):

278

"""Validate loaded data."""

279

if isinstance(data, pd.DataFrame):

280

# Check for missing data

281

if data.isnull().sum().sum() > 0:

282

print(f"WARNING: Dataset {dataset_name} contains missing values")

283

284

# Check for empty datasets

285

if len(data) == 0:

286

raise ValueError(f"Dataset {dataset_name} is empty")

287

288

# Log dataset info

289

print(f"Loaded {dataset_name}: {data.shape[0]} rows, {data.shape[1]} columns")

290

291

@hook_impl

292

def before_dataset_saved(self, dataset_name, data, node):

293

"""Validate data before saving."""

294

if isinstance(data, pd.DataFrame):

295

# Ensure no infinity values

296

if data.select_dtypes(include=[float]).isin([float('inf'), -float('inf')]).sum().sum() > 0:

297

raise ValueError(f"Dataset {dataset_name} contains infinity values")

298

299

# Ensure reasonable data types

300

for col in data.columns:

301

if data[col].dtype == 'object' and dataset_name.endswith('_numeric'):

302

print(f"WARNING: Numeric dataset {dataset_name} has object column: {col}")

303

304

# Usage in settings.py

305

HOOKS = (DataValidationHooks(),)

306

```

307

308

### ML Experiment Tracking Hooks

309

310

```python

311

from kedro.framework.hooks import hook_impl

312

import mlflow

313

import joblib

314

from pathlib import Path

315

316

class MLflowHooks:

317

"""Hooks for MLflow experiment tracking integration."""

318

319

def __init__(self, experiment_name="kedro-experiment"):

320

mlflow.set_experiment(experiment_name)

321

322

@hook_impl

323

def before_pipeline_run(self, run_params, pipeline, catalog):

324

"""Start MLflow run."""

325

mlflow.start_run()

326

327

# Log pipeline parameters

328

if 'tags' in run_params:

329

mlflow.set_tags({"pipeline_tags": str(run_params['tags'])})

330

331

mlflow.log_param("pipeline_nodes", len(pipeline.nodes))

332

333

@hook_impl

334

def after_pipeline_run(self, run_params, pipeline, catalog):

335

"""End MLflow run."""

336

mlflow.end_run()

337

338

@hook_impl

339

def after_node_run(self, node, catalog, inputs, outputs, is_async, session_id):

340

"""Log model artifacts and metrics."""

341

# Log models

342

for output_name, output_data in outputs.items():

343

if output_name.endswith('_model'):

344

# Save and log model

345

model_path = f"models/{output_name}.pkl"

346

joblib.dump(output_data, model_path)

347

mlflow.log_artifact(model_path)

348

349

# Log metrics

350

elif output_name.endswith('_metrics') and isinstance(output_data, dict):

351

for metric_name, metric_value in output_data.items():

352

if isinstance(metric_value, (int, float)):

353

mlflow.log_metric(f"{node.name}_{metric_name}", metric_value)

354

355

# Usage in settings.py

356

HOOKS = (MLflowHooks(experiment_name="my-kedro-project"),)

357

```

358

359

### Error Recovery Hooks

360

361

```python

362

from kedro.framework.hooks import hook_impl

363

import logging

364

from datetime import datetime

365

366

class ErrorRecoveryHooks:

367

"""Hooks for error recovery and resilience."""

368

369

def __init__(self):

370

self.failed_nodes = []

371

self.recovery_strategies = {

372

'data_loading_node': self._recover_data_loading,

373

'model_training_node': self._recover_model_training,

374

}

375

376

@hook_impl

377

def on_node_error(self, error, node, catalog, inputs, is_async, session_id):

378

"""Attempt error recovery for failed nodes."""

379

self.failed_nodes.append({

380

'node_name': node.name,

381

'error': str(error),

382

'timestamp': datetime.now(),

383

'inputs': list(inputs.keys())

384

})

385

386

# Attempt recovery if strategy exists

387

if node.name in self.recovery_strategies:

388

logging.info(f"Attempting recovery for node: {node.name}")

389

try:

390

self.recovery_strategies[node.name](node, catalog, inputs, error)

391

logging.info(f"Successfully recovered node: {node.name}")

392

except Exception as recovery_error:

393

logging.error(f"Recovery failed for {node.name}: {recovery_error}")

394

395

def _recover_data_loading(self, node, catalog, inputs, error):

396

"""Recovery strategy for data loading failures."""

397

# Try alternative data source

398

alternative_dataset = f"{node.inputs[0]}_backup"

399

if catalog.exists(alternative_dataset):

400

# Temporarily replace dataset

401

original_data = catalog._data_sets[node.inputs[0]]

402

catalog._data_sets[node.inputs[0]] = catalog._data_sets[alternative_dataset]

403

logging.info(f"Using backup dataset for {node.inputs[0]}")

404

405

def _recover_model_training(self, node, catalog, inputs, error):

406

"""Recovery strategy for model training failures."""

407

# Use simpler model parameters

408

if 'parameters' in inputs:

409

params = inputs['parameters'].copy()

410

params['model_complexity'] = 'simple'

411

catalog.save('parameters:recovery_params', params)

412

logging.info("Switched to simpler model parameters")

413

414

@hook_impl

415

def after_pipeline_run(self, run_params, pipeline, catalog):

416

"""Report error summary after pipeline completion."""

417

if self.failed_nodes:

418

logging.warning(f"Pipeline completed with {len(self.failed_nodes)} node failures:")

419

for failure in self.failed_nodes:

420

logging.warning(f" - {failure['node_name']}: {failure['error']}")

421

422

# Usage in settings.py

423

HOOKS = (ErrorRecoveryHooks(),)

424

```

425

426

### Multiple Hook Classes

427

428

```python

429

from kedro.framework.hooks import hook_impl

430

431

class LoggingHooks:

432

"""Hooks for enhanced logging."""

433

434

@hook_impl

435

def before_pipeline_run(self, run_params, pipeline, catalog):

436

logging.info("=== Pipeline Execution Started ===")

437

438

class SecurityHooks:

439

"""Hooks for security and compliance."""

440

441

@hook_impl

442

def before_dataset_loaded(self, dataset_name, node):

443

if 'sensitive' in dataset_name:

444

logging.info(f"Accessing sensitive dataset: {dataset_name}")

445

446

class NotificationHooks:

447

"""Hooks for external notifications."""

448

449

@hook_impl

450

def on_pipeline_error(self, error, run_params, pipeline, catalog):

451

# Send alert to monitoring system

452

send_alert(f"Pipeline failed: {str(error)}")

453

454

# Register multiple hook classes in settings.py

455

HOOKS = (

456

LoggingHooks(),

457

SecurityHooks(),

458

NotificationHooks(),

459

)

460

```

461

462

## Types

463

464

```python { .api }

465

from typing import Any, Dict, List, Optional, Callable

466

from kedro.pipeline import Node, Pipeline

467

from kedro.io import DataCatalog

468

469

HookImplementation = Callable[..., Any]

470

HookManager = Any

471

SessionId = str

472

RunParams = Dict[str, Any]

473

DatasetName = str

474

NodeInputs = Dict[str, Any]

475

NodeOutputs = Dict[str, Any]

476

ErrorType = Exception

477

```