or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-config.mdartifacts.mdclient.mdconfig.mdenums.mdexceptions.mdhooks.mdindex.mdintegrations.mdmaterializers.mdmetadata-tags.mdmodels.mdpipelines-and-steps.mdpydantic-models.mdservices.mdstack-components.mdstacks.mdtypes.mdutilities.md

hooks.mddocs/

0

# Hooks

1

2

Pre-built hooks for alerting and custom hook utilities. Hooks enable executing custom logic in response to pipeline and step success or failure events.

3

4

## Capabilities

5

6

### Alerter Success Hook

7

8

```python { .api }

9

def alerter_success_hook() -> None:

10

"""

11

Standard success hook that executes after step finishes successfully.

12

13

This hook uses any `BaseAlerter` that is configured within the active

14

stack to post a success notification message. The message includes

15

pipeline name, run name, step name, and parameters.

16

17

The hook automatically detects the alerter from the active stack.

18

If no alerter is configured, a warning is logged and the hook is skipped.

19

20

Example:

21

```python

22

from zenml import pipeline, step

23

from zenml.hooks import alerter_success_hook

24

25

@step(on_success=alerter_success_hook)

26

def train_model(data: list) -> dict:

27

return {"model": "trained", "accuracy": 0.95}

28

29

@pipeline(on_success=alerter_success_hook)

30

def my_pipeline():

31

data = [1, 2, 3]

32

train_model(data)

33

```

34

"""

35

```

36

37

Import from:

38

39

```python

40

from zenml.hooks import alerter_success_hook

41

```

42

43

### Alerter Failure Hook

44

45

```python { .api }

46

def alerter_failure_hook(exception: BaseException) -> None:

47

"""

48

Standard failure hook that executes after step fails.

49

50

This hook uses any `BaseAlerter` that is configured within the active

51

stack to post a failure notification message. The message includes

52

pipeline name, run name, step name, parameters, and exception details

53

with traceback.

54

55

The hook automatically detects the alerter from the active stack.

56

If no alerter is configured, a warning is logged and the hook is skipped.

57

58

Parameters:

59

- exception: Original exception that led to step failing

60

61

Example:

62

```python

63

from zenml import pipeline, step

64

from zenml.hooks import alerter_failure_hook

65

66

@step(on_failure=alerter_failure_hook)

67

def train_model(data: list) -> dict:

68

if not data:

69

raise ValueError("No data provided")

70

return {"model": "trained"}

71

72

@pipeline(on_failure=alerter_failure_hook)

73

def my_pipeline():

74

data = []

75

train_model(data)

76

```

77

"""

78

```

79

80

Import from:

81

82

```python

83

from zenml.hooks import alerter_failure_hook

84

```

85

86

### Resolve and Validate Hook

87

88

```python { .api }

89

def resolve_and_validate_hook(hook):

90

"""

91

Utility to resolve and validate custom hooks.

92

93

Resolves hook specifications (strings, functions, or callables)

94

and validates they meet hook requirements.

95

96

Parameters:

97

- hook: Hook specification (string path, function, or callable)

98

99

Returns:

100

Resolved and validated hook callable

101

102

Raises:

103

HookValidationException: If hook is invalid

104

105

Example:

106

```python

107

from zenml.hooks import resolve_and_validate_hook

108

109

def my_custom_hook():

110

print("Custom hook executed")

111

112

# Validate hook

113

validated = resolve_and_validate_hook(my_custom_hook)

114

```

115

"""

116

```

117

118

Import from:

119

120

```python

121

from zenml.hooks import resolve_and_validate_hook

122

```

123

124

## Usage Examples

125

126

### Basic Alerter Hooks

127

128

```python

129

from zenml import pipeline, step

130

from zenml.hooks import alerter_success_hook, alerter_failure_hook

131

132

@step

133

def train_model(data: list) -> dict:

134

"""Training step."""

135

return {"model": "trained", "accuracy": 0.95}

136

137

@pipeline(

138

on_success=alerter_success_hook,

139

on_failure=alerter_failure_hook

140

)

141

def monitored_pipeline():

142

"""Pipeline with alerting.

143

144

The hooks will use any alerter configured in the active stack.

145

"""

146

data = [1, 2, 3, 4, 5]

147

model = train_model(data)

148

return model

149

```

150

151

### Step-Level Alerter Hooks

152

153

```python

154

from zenml import step

155

from zenml.hooks import alerter_success_hook, alerter_failure_hook

156

157

@step(

158

on_success=alerter_success_hook,

159

on_failure=alerter_failure_hook

160

)

161

def critical_step(data: list) -> dict:

162

"""Critical step with both success and failure alerting."""

163

if not data:

164

raise ValueError("No data provided")

165

return {"processed": data}

166

```

167

168

### Custom Hook Function

169

170

```python

171

from zenml import pipeline, get_pipeline_context

172

173

def custom_success_hook():

174

"""Custom success hook."""

175

context = get_pipeline_context()

176

print(f"Pipeline {context.name} completed!")

177

print(f"Run name: {context.run_name}")

178

179

# Custom logic (e.g., trigger downstream processes)

180

# trigger_deployment()

181

# update_dashboard()

182

183

@pipeline(on_success=custom_success_hook)

184

def pipeline_with_custom_hook():

185

"""Pipeline with custom hook."""

186

pass

187

```

188

189

### Hook with Arguments

190

191

```python

192

from zenml import pipeline

193

194

def custom_hook_with_args(threshold: float):

195

"""Hook factory that creates a hook with arguments."""

196

def hook():

197

print(f"Checking threshold: {threshold}")

198

# Custom logic using threshold

199

return hook

200

201

@pipeline(

202

on_success=custom_hook_with_args(threshold=0.95)

203

)

204

def threshold_pipeline():

205

"""Pipeline with parameterized hook."""

206

pass

207

```

208

209

### Error Handling in Hooks

210

211

```python

212

from zenml import pipeline, get_pipeline_context

213

214

def safe_success_hook():

215

"""Success hook with error handling."""

216

try:

217

context = get_pipeline_context()

218

print(f"Pipeline {context.name} succeeded")

219

220

# Potentially failing operations

221

# send_notification()

222

# update_external_system()

223

224

except Exception as e:

225

print(f"Hook failed but pipeline succeeded: {e}")

226

# Log error but don't fail pipeline

227

228

@pipeline(on_success=safe_success_hook)

229

def resilient_pipeline():

230

"""Pipeline with resilient hooks."""

231

pass

232

```

233

234

### Conditional Hooks

235

236

```python

237

from zenml import pipeline, get_pipeline_context

238

import os

239

240

def conditional_alert():

241

"""Alert only in production environment."""

242

if os.getenv("ENV") == "production":

243

# Send alert

244

print("Production alert sent")

245

else:

246

print("Non-production environment, skipping alert")

247

248

@pipeline(on_success=conditional_alert)

249

def environment_aware_pipeline():

250

"""Pipeline with environment-aware alerting."""

251

pass

252

```

253

254

### Hook with External Service

255

256

```python

257

from zenml import pipeline

258

import requests

259

260

def webhook_success_hook(webhook_url: str):

261

"""Create hook that calls external webhook."""

262

def hook():

263

try:

264

response = requests.post(

265

webhook_url,

266

json={

267

"status": "success",

268

"pipeline": "my_pipeline",

269

"timestamp": "2024-01-15T10:00:00Z"

270

},

271

timeout=10

272

)

273

response.raise_for_status()

274

print("Webhook notification sent")

275

except Exception as e:

276

print(f"Failed to send webhook: {e}")

277

278

return hook

279

280

@pipeline(

281

on_success=webhook_success_hook("https://api.example.com/webhook")

282

)

283

def webhook_pipeline():

284

"""Pipeline with webhook notification."""

285

pass

286

```

287

288

### Combining Hooks and Metadata

289

290

```python

291

from zenml import pipeline, step, log_metadata, get_pipeline_context

292

293

def metadata_logging_hook():

294

"""Hook that logs additional metadata."""

295

context = get_pipeline_context()

296

297

log_metadata({

298

"completion_hook_executed": True,

299

"pipeline_name": context.name,

300

"run_name": context.run_name

301

})

302

303

@step

304

def training_step(data: list) -> dict:

305

return {"model": "trained"}

306

307

@pipeline(on_success=metadata_logging_hook)

308

def metadata_aware_pipeline():

309

"""Pipeline that logs metadata on success."""

310

train_data = [1, 2, 3]

311

model = training_step(train_data)

312

```

313

314

### Validating Custom Hooks

315

316

```python

317

from zenml.hooks import resolve_and_validate_hook

318

from zenml.exceptions import HookValidationException

319

320

def my_hook():

321

"""Valid hook."""

322

print("Hook executed")

323

324

def invalid_hook(required_arg):

325

"""Invalid hook - requires arguments."""

326

print(f"This won't work: {required_arg}")

327

328

# Validate hooks

329

try:

330

valid = resolve_and_validate_hook(my_hook)

331

print("Valid hook")

332

except HookValidationException as e:

333

print(f"Invalid: {e}")

334

335

try:

336

invalid = resolve_and_validate_hook(invalid_hook)

337

except HookValidationException as e:

338

print(f"Hook validation failed: {e}")

339

```

340

341

### String Path Hooks

342

343

```python

344

from zenml import pipeline

345

346

# Hook defined in a module

347

# my_hooks.py:

348

# def success_notification():

349

# print("Success!")

350

351

@pipeline(

352

on_success="my_hooks.success_notification"

353

)

354

def string_path_pipeline():

355

"""Pipeline using string path to hook."""

356

pass

357

```

358

359

### Hook Execution Context

360

361

```python

362

from zenml import pipeline, step, get_pipeline_context, get_step_context

363

364

def detailed_success_hook():

365

"""Hook that accesses execution context."""

366

try:

367

# Try to get pipeline context

368

pipeline_ctx = get_pipeline_context()

369

print(f"Pipeline: {pipeline_ctx.name}")

370

print(f"Run: {pipeline_ctx.run_name}")

371

372

if pipeline_ctx.model:

373

print(f"Model: {pipeline_ctx.model.name}")

374

375

# Access run details

376

run = pipeline_ctx.pipeline_run

377

print(f"Status: {run.status}")

378

print(f"Start time: {run.start_time}")

379

380

except Exception as e:

381

print(f"Context access error: {e}")

382

383

@pipeline(on_success=detailed_success_hook)

384

def context_aware_pipeline():

385

"""Pipeline with context-aware hook."""

386

pass

387

```

388