0
# Pipelines and Steps
1
2
Core decorators and context objects for defining ML workflows and their constituent steps. ZenML pipelines are directed acyclic graphs (DAGs) of steps that define reproducible ML workflows with automatic versioning, caching, and lineage tracking.
3
4
## Capabilities
5
6
### Pipeline Decorator
7
8
Decorator to define a ZenML pipeline from a Python function.
9
10
```python { .api }
11
def pipeline(
12
_func=None,
13
*,
14
name: str = None,
15
enable_cache: bool = None,
16
enable_artifact_metadata: bool = None,
17
enable_step_logs: bool = None,
18
environment: dict = None,
19
secrets: list = None,
20
enable_pipeline_logs: bool = None,
21
settings: dict = None,
22
tags: list = None,
23
extra: dict = None,
24
on_failure=None,
25
on_success=None,
26
on_init=None,
27
on_init_kwargs: dict = None,
28
on_cleanup=None,
29
model: Model = None,
30
retry=None,
31
substitutions: dict = None,
32
execution_mode=None,
33
cache_policy=None
34
):
35
"""
36
Decorator to define a ZenML pipeline.
37
38
Parameters:
39
- name: Pipeline name (defaults to function name)
40
- enable_cache: Enable step caching
41
- enable_artifact_metadata: Enable artifact metadata logging
42
- enable_step_logs: Enable step logging
43
- environment: Environment variables to set when running this pipeline
44
- secrets: Secrets to set as environment variables (list of UUIDs or names)
45
- enable_pipeline_logs: Enable pipeline logs
46
- settings: Stack component settings dict
47
- tags: Tags to apply to runs of the pipeline
48
- extra: Extra pipeline metadata dict
49
- on_failure: Failure hook callable or list of callables
50
- on_success: Success hook callable or list of callables
51
- on_init: Callback function to run on initialization of the pipeline
52
- on_init_kwargs: Arguments for the init hook
53
- on_cleanup: Callback function to run on cleanup of the pipeline
54
- model: Model configuration for Model Control Plane
55
- retry: Retry configuration for the pipeline steps
56
- substitutions: Extra placeholders to use in the name templates
57
- execution_mode: The execution mode to use for the pipeline
58
- cache_policy: Cache policy for this pipeline
59
60
Returns:
61
Pipeline decorator function that wraps the pipeline function
62
63
Example:
64
```python
65
from zenml import pipeline, step, Model
66
67
@pipeline(
68
name="training_pipeline",
69
enable_cache=True,
70
model=Model(name="my_model", version="1.0.0")
71
)
72
def my_pipeline():
73
data = load_data()
74
model = train_model(data)
75
return model
76
```
77
"""
78
```
79
80
Import from:
81
82
```python
83
from zenml import pipeline
84
```
85
86
### Step Decorator
87
88
Decorator to define a ZenML step from a Python function.
89
90
```python { .api }
91
def step(
92
_func=None,
93
*,
94
name: str = None,
95
enable_cache: bool = None,
96
enable_artifact_metadata: bool = None,
97
enable_artifact_visualization: bool = None,
98
enable_step_logs: bool = None,
99
experiment_tracker: bool | str = None,
100
step_operator: bool | str = None,
101
output_materializers=None,
102
environment: dict = None,
103
secrets: list = None,
104
settings: dict = None,
105
extra: dict = None,
106
on_failure=None,
107
on_success=None,
108
model: Model = None,
109
retry=None,
110
substitutions: dict = None,
111
cache_policy=None
112
):
113
"""
114
Decorator to define a ZenML step.
115
116
Parameters:
117
- name: Step name (defaults to function name)
118
- enable_cache: Enable caching for this step
119
- enable_artifact_metadata: Enable artifact metadata for this step
120
- enable_artifact_visualization: Enable artifact visualization for this step
121
- enable_step_logs: Enable step logs for this step
122
- experiment_tracker: Name of experiment tracker component to use, or bool to enable/disable
123
- step_operator: Name of step operator for remote execution, or bool to enable/disable
124
- output_materializers: Custom materializers for outputs (single class or dict mapping output names to classes)
125
- environment: Environment variables to set when running this step
126
- secrets: Secrets to set as environment variables (list of UUIDs or names)
127
- settings: Stack component settings dict
128
- extra: Extra step metadata dict
129
- on_failure: Failure hook callable or list of callables
130
- on_success: Success hook callable or list of callables
131
- model: Model configuration for Model Control Plane
132
- retry: Retry configuration in case of step failure
133
- substitutions: Extra placeholders for the step name
134
- cache_policy: Cache policy for this step
135
136
Returns:
137
Step decorator function that wraps the step function
138
139
Example:
140
```python
141
from zenml import step
142
from typing import Tuple
143
144
@step
145
def load_data() -> Tuple[list, list]:
146
train_data = [1, 2, 3, 4, 5]
147
test_data = [6, 7, 8, 9, 10]
148
return train_data, test_data
149
150
@step(enable_cache=False)
151
def train_model(data: list) -> float:
152
# Training logic
153
accuracy = 0.95
154
return accuracy
155
```
156
"""
157
```
158
159
Import from:
160
161
```python
162
from zenml import step
163
```
164
165
### Base Step Class
166
167
Base class for implementing custom steps with additional functionality.
168
169
```python { .api }
170
class BaseStep:
171
"""
172
Base class for implementing custom steps.
173
174
Use the @step decorator for most cases. This class is for advanced
175
scenarios requiring additional control over step behavior.
176
"""
177
178
def entrypoint(self, *args, **kwargs):
179
"""
180
Main execution method to be implemented by subclass.
181
182
Parameters:
183
- *args: Positional arguments
184
- **kwargs: Keyword arguments
185
186
Returns:
187
Step outputs
188
"""
189
```
190
191
Import from:
192
193
```python
194
from zenml.steps import BaseStep
195
```
196
197
### Pipeline Context
198
199
Access pipeline execution context and metadata.
200
201
```python { .api }
202
class PipelineContext:
203
"""
204
Pipeline execution context object.
205
206
Attributes:
207
- name: Pipeline name
208
- run_name: Pipeline run name
209
- pipeline_run: PipelineRunResponse object with full run details
210
- extra: Extra metadata dict
211
- model: Model configuration if set
212
"""
213
214
@property
215
def name(self) -> str:
216
"""Get pipeline name."""
217
218
@property
219
def run_name(self) -> str:
220
"""Get pipeline run name."""
221
222
@property
223
def pipeline_run(self):
224
"""
225
Get pipeline run response object.
226
227
Returns:
228
PipelineRunResponse: Full pipeline run details including status, timestamps, configuration
229
"""
230
231
@property
232
def extra(self) -> dict:
233
"""Get extra metadata."""
234
235
@property
236
def model(self):
237
"""
238
Get model configuration.
239
240
Returns:
241
ModelVersionResponse or None
242
"""
243
244
245
def get_pipeline_context() -> PipelineContext:
246
"""
247
Get the current pipeline execution context.
248
249
Returns:
250
PipelineContext: Context object with pipeline metadata
251
252
Raises:
253
RuntimeError: If called outside pipeline execution
254
255
Example:
256
```python
257
from zenml import step, get_pipeline_context
258
259
@step
260
def my_step():
261
context = get_pipeline_context()
262
print(f"Running in pipeline: {context.name}")
263
print(f"Run name: {context.run_name}")
264
if context.model:
265
print(f"Model: {context.model.name}")
266
```
267
"""
268
```
269
270
Import from:
271
272
```python
273
from zenml import get_pipeline_context
274
from zenml.pipelines import PipelineContext
275
```
276
277
### Step Context
278
279
Access step execution context and utilities.
280
281
```python { .api }
282
class StepContext:
283
"""
284
Step execution context object.
285
286
Provides access to step metadata, pipeline information, and utilities
287
for interacting with the stack during step execution.
288
289
Attributes:
290
- step_name: Current step name
291
- pipeline_name: Parent pipeline name
292
- run_name: Pipeline run name
293
- step_run: StepRunResponse object with full step run details
294
- model: Model configuration if set
295
- inputs: Input artifacts metadata
296
- outputs: Output artifacts metadata
297
"""
298
299
@property
300
def step_name(self) -> str:
301
"""Get step name."""
302
303
@property
304
def pipeline_name(self) -> str:
305
"""Get pipeline name."""
306
307
@property
308
def run_name(self) -> str:
309
"""Get pipeline run name."""
310
311
@property
312
def step_run(self):
313
"""
314
Get step run response object.
315
316
Returns:
317
StepRunResponse: Full step run details including status, timestamps, configuration
318
"""
319
320
@property
321
def model(self):
322
"""
323
Get model configuration.
324
325
Returns:
326
ModelVersionResponse or None
327
"""
328
329
@property
330
def inputs(self) -> dict:
331
"""Get input artifacts metadata."""
332
333
@property
334
def outputs(self) -> dict:
335
"""Get output artifacts metadata."""
336
337
338
def get_step_context() -> StepContext:
339
"""
340
Get the current step execution context.
341
342
Returns:
343
StepContext: Context object with step metadata and utilities
344
345
Raises:
346
RuntimeError: If called outside step execution
347
348
Example:
349
```python
350
from zenml import step, get_step_context
351
352
@step
353
def my_step(data: list) -> float:
354
context = get_step_context()
355
print(f"Running step: {context.step_name}")
356
print(f"In pipeline: {context.pipeline_name}")
357
print(f"Run: {context.run_name}")
358
359
# Access step run details
360
print(f"Status: {context.step_run.status}")
361
362
# Training logic
363
accuracy = 0.95
364
return accuracy
365
```
366
"""
367
```
368
369
Import from:
370
371
```python
372
from zenml import get_step_context
373
from zenml.steps import StepContext
374
```
375
376
### Schedule Configuration
377
378
Configuration for scheduling pipeline runs.
379
380
```python { .api }
381
class Schedule:
382
"""
383
Schedule configuration for pipeline runs.
384
385
Supports cron expressions and interval-based scheduling.
386
387
Attributes:
388
- name: Schedule name
389
- cron_expression: Cron expression (e.g., "0 0 * * *" for daily at midnight)
390
- start_time: Schedule start datetime
391
- end_time: Schedule end datetime
392
- interval_second: Interval as timedelta between runs (for periodic schedules)
393
- catchup: Whether to catch up on missed runs
394
- run_once_start_time: When to run the pipeline once (for one-time schedules)
395
"""
396
397
def __init__(
398
self,
399
name: str = None,
400
cron_expression: str = None,
401
start_time: datetime = None,
402
end_time: datetime = None,
403
interval_second: timedelta = None,
404
catchup: bool = False,
405
run_once_start_time: datetime = None
406
):
407
"""
408
Initialize schedule configuration.
409
410
Use either cron_expression or interval_second, not both.
411
412
Parameters:
413
- name: Schedule name
414
- cron_expression: Cron expression for schedule
415
- start_time: When to start the schedule
416
- end_time: When to end the schedule
417
- interval_second: Run interval as timedelta object
418
- catchup: Whether to catch up on missed runs
419
- run_once_start_time: When to run the pipeline once
420
421
Example:
422
```python
423
from zenml import pipeline, Schedule
424
from datetime import datetime, timedelta
425
426
# Daily schedule
427
schedule = Schedule(
428
name="daily_training",
429
cron_expression="0 0 * * *",
430
start_time=datetime.now()
431
)
432
433
# Interval-based schedule (every 2 hours)
434
schedule = Schedule(
435
name="periodic_training",
436
interval_second=2 * 60 * 60,
437
start_time=datetime.now()
438
)
439
440
@pipeline(schedule=schedule)
441
def my_pipeline():
442
# Pipeline definition
443
pass
444
```
445
"""
446
```
447
448
Import from:
449
450
```python
451
from zenml import Schedule
452
from zenml.config import Schedule
453
from zenml.pipelines import Schedule
454
```
455
456
### Resource Settings
457
458
Resource allocation settings for steps.
459
460
```python { .api }
461
class ResourceSettings:
462
"""
463
Settings for resource allocation (CPU, GPU, memory).
464
465
Attributes:
466
- cpu_count: Number of CPUs
467
- gpu_count: Number of GPUs
468
- memory: Memory allocation (e.g., "4GB", "512MB")
469
"""
470
471
def __init__(
472
self,
473
cpu_count: int = None,
474
gpu_count: int = None,
475
memory: str = None
476
):
477
"""
478
Initialize resource settings.
479
480
Parameters:
481
- cpu_count: Number of CPUs to allocate
482
- gpu_count: Number of GPUs to allocate
483
- memory: Memory to allocate (e.g., "4GB", "512MB")
484
485
Example:
486
```python
487
from zenml import step
488
from zenml.config import ResourceSettings
489
490
@step(
491
settings={
492
"resources": ResourceSettings(
493
cpu_count=4,
494
gpu_count=1,
495
memory="8GB"
496
)
497
}
498
)
499
def train_model(data: list) -> float:
500
# Training with allocated resources
501
return 0.95
502
```
503
"""
504
```
505
506
Import from:
507
508
```python
509
from zenml.config import ResourceSettings
510
from zenml.steps import ResourceSettings
511
```
512
513
## Usage Examples
514
515
### Basic Pipeline with Multiple Steps
516
517
```python
518
from zenml import pipeline, step
519
from typing import Tuple
520
521
@step
522
def load_data() -> Tuple[list, list]:
523
"""Load training and test data."""
524
train_data = [1, 2, 3, 4, 5]
525
test_data = [6, 7, 8, 9, 10]
526
return train_data, test_data
527
528
@step
529
def preprocess_data(train: list, test: list) -> Tuple[list, list]:
530
"""Preprocess data."""
531
train_processed = [x * 2 for x in train]
532
test_processed = [x * 2 for x in test]
533
return train_processed, test_processed
534
535
@step
536
def train_model(data: list) -> dict:
537
"""Train a model."""
538
return {"accuracy": 0.95, "loss": 0.05}
539
540
@step
541
def evaluate_model(model: dict, test_data: list) -> float:
542
"""Evaluate model on test data."""
543
return model["accuracy"] * 0.98
544
545
@pipeline
546
def ml_pipeline():
547
"""Complete ML training pipeline."""
548
train, test = load_data()
549
train_processed, test_processed = preprocess_data(train, test)
550
model = train_model(train_processed)
551
accuracy = evaluate_model(model, test_processed)
552
return accuracy
553
554
if __name__ == "__main__":
555
ml_pipeline()
556
```
557
558
### Pipeline with Model Control Plane
559
560
```python
561
from zenml import pipeline, step, Model
562
563
@step
564
def train_model(data: list) -> dict:
565
"""Train and return model."""
566
return {"weights": [0.1, 0.2, 0.3], "accuracy": 0.95}
567
568
@pipeline(
569
model=Model(
570
name="text_classifier",
571
version="1.0.0",
572
license="Apache-2.0",
573
description="Text classification model",
574
tags=["nlp", "classification"]
575
)
576
)
577
def training_pipeline():
578
"""Pipeline with model tracking."""
579
data = [1, 2, 3, 4, 5]
580
model = train_model(data)
581
return model
582
583
if __name__ == "__main__":
584
training_pipeline()
585
```
586
587
### Pipeline with Hooks
588
589
```python
590
from zenml import pipeline, step
591
from zenml.hooks import alerter_success_hook, alerter_failure_hook
592
593
@step
594
def train_model(data: list) -> float:
595
"""Train model."""
596
return 0.95
597
598
@pipeline(
599
on_success=alerter_success_hook("slack_alerter", "Training completed!"),
600
on_failure=alerter_failure_hook("slack_alerter", "Training failed!")
601
)
602
def monitored_pipeline():
603
"""Pipeline with alerting."""
604
data = [1, 2, 3]
605
accuracy = train_model(data)
606
return accuracy
607
```
608
609
### Accessing Context in Steps
610
611
```python
612
from zenml import step, get_step_context, get_pipeline_context
613
614
@step
615
def contextual_step(data: list) -> dict:
616
"""Step that uses context."""
617
step_context = get_step_context()
618
pipeline_context = get_pipeline_context()
619
620
print(f"Step: {step_context.step_name}")
621
print(f"Pipeline: {pipeline_context.name}")
622
print(f"Run: {pipeline_context.run_name}")
623
624
# Access model if configured
625
if pipeline_context.model:
626
print(f"Model: {pipeline_context.model.name}")
627
628
return {
629
"step": step_context.step_name,
630
"pipeline": pipeline_context.name,
631
"processed_data": [x * 2 for x in data]
632
}
633
```
634
635
### Step with Resource Allocation
636
637
```python
638
from zenml import step
639
from zenml.config import ResourceSettings
640
641
@step(
642
settings={
643
"resources": ResourceSettings(
644
cpu_count=8,
645
gpu_count=2,
646
memory="16GB"
647
)
648
}
649
)
650
def gpu_intensive_step(data: list) -> dict:
651
"""Step requiring GPU resources."""
652
# GPU training logic
653
return {"model": "trained_model", "accuracy": 0.98}
654
```
655