or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-config.mdartifacts.mdclient.mdconfig.mdenums.mdexceptions.mdhooks.mdindex.mdintegrations.mdmaterializers.mdmetadata-tags.mdmodels.mdpipelines-and-steps.mdpydantic-models.mdservices.mdstack-components.mdstacks.mdtypes.mdutilities.md

pipelines-and-steps.mddocs/

0

# Pipelines and Steps

1

2

Core decorators and context objects for defining ML workflows and their constituent steps. ZenML pipelines are directed acyclic graphs (DAGs) of steps that define reproducible ML workflows with automatic versioning, caching, and lineage tracking.

3

4

## Capabilities

5

6

### Pipeline Decorator

7

8

Decorator to define a ZenML pipeline from a Python function.

9

10

```python { .api }

11

def pipeline(

12

_func=None,

13

*,

14

name: str = None,

15

enable_cache: bool = None,

16

enable_artifact_metadata: bool = None,

17

enable_step_logs: bool = None,

18

environment: dict = None,

19

secrets: list = None,

20

enable_pipeline_logs: bool = None,

21

settings: dict = None,

22

tags: list = None,

23

extra: dict = None,

24

on_failure=None,

25

on_success=None,

26

on_init=None,

27

on_init_kwargs: dict = None,

28

on_cleanup=None,

29

model: Model = None,

30

retry=None,

31

substitutions: dict = None,

32

execution_mode=None,

33

cache_policy=None

34

):

35

"""

36

Decorator to define a ZenML pipeline.

37

38

Parameters:

39

- name: Pipeline name (defaults to function name)

40

- enable_cache: Enable step caching

41

- enable_artifact_metadata: Enable artifact metadata logging

42

- enable_step_logs: Enable step logging

43

- environment: Environment variables to set when running this pipeline

44

- secrets: Secrets to set as environment variables (list of UUIDs or names)

45

- enable_pipeline_logs: Enable pipeline logs

46

- settings: Stack component settings dict

47

- tags: Tags to apply to runs of the pipeline

48

- extra: Extra pipeline metadata dict

49

- on_failure: Failure hook callable or list of callables

50

- on_success: Success hook callable or list of callables

51

- on_init: Callback function to run on initialization of the pipeline

52

- on_init_kwargs: Arguments for the init hook

53

- on_cleanup: Callback function to run on cleanup of the pipeline

54

- model: Model configuration for Model Control Plane

55

- retry: Retry configuration for the pipeline steps

56

- substitutions: Extra placeholders to use in the name templates

57

- execution_mode: The execution mode to use for the pipeline

58

- cache_policy: Cache policy for this pipeline

59

60

Returns:

61

Pipeline decorator function that wraps the pipeline function

62

63

Example:

64

```python

65

from zenml import pipeline, step, Model

66

67

@pipeline(

68

name="training_pipeline",

69

enable_cache=True,

70

model=Model(name="my_model", version="1.0.0")

71

)

72

def my_pipeline():

73

data = load_data()

74

model = train_model(data)

75

return model

76

```

77

"""

78

```

79

80

Import from:

81

82

```python

83

from zenml import pipeline

84

```

85

86

### Step Decorator

87

88

Decorator to define a ZenML step from a Python function.

89

90

```python { .api }

91

def step(

92

_func=None,

93

*,

94

name: str = None,

95

enable_cache: bool = None,

96

enable_artifact_metadata: bool = None,

97

enable_artifact_visualization: bool = None,

98

enable_step_logs: bool = None,

99

experiment_tracker: bool | str = None,

100

step_operator: bool | str = None,

101

output_materializers=None,

102

environment: dict = None,

103

secrets: list = None,

104

settings: dict = None,

105

extra: dict = None,

106

on_failure=None,

107

on_success=None,

108

model: Model = None,

109

retry=None,

110

substitutions: dict = None,

111

cache_policy=None

112

):

113

"""

114

Decorator to define a ZenML step.

115

116

Parameters:

117

- name: Step name (defaults to function name)

118

- enable_cache: Enable caching for this step

119

- enable_artifact_metadata: Enable artifact metadata for this step

120

- enable_artifact_visualization: Enable artifact visualization for this step

121

- enable_step_logs: Enable step logs for this step

122

- experiment_tracker: Name of experiment tracker component to use, or bool to enable/disable

123

- step_operator: Name of step operator for remote execution, or bool to enable/disable

124

- output_materializers: Custom materializers for outputs (single class or dict mapping output names to classes)

125

- environment: Environment variables to set when running this step

126

- secrets: Secrets to set as environment variables (list of UUIDs or names)

127

- settings: Stack component settings dict

128

- extra: Extra step metadata dict

129

- on_failure: Failure hook callable or list of callables

130

- on_success: Success hook callable or list of callables

131

- model: Model configuration for Model Control Plane

132

- retry: Retry configuration in case of step failure

133

- substitutions: Extra placeholders for the step name

134

- cache_policy: Cache policy for this step

135

136

Returns:

137

Step decorator function that wraps the step function

138

139

Example:

140

```python

141

from zenml import step

142

from typing import Tuple

143

144

@step

145

def load_data() -> Tuple[list, list]:

146

train_data = [1, 2, 3, 4, 5]

147

test_data = [6, 7, 8, 9, 10]

148

return train_data, test_data

149

150

@step(enable_cache=False)

151

def train_model(data: list) -> float:

152

# Training logic

153

accuracy = 0.95

154

return accuracy

155

```

156

"""

157

```

158

159

Import from:

160

161

```python

162

from zenml import step

163

```

164

165

### Base Step Class

166

167

Base class for implementing custom steps with additional functionality.

168

169

```python { .api }

170

class BaseStep:

171

"""

172

Base class for implementing custom steps.

173

174

Use the @step decorator for most cases. This class is for advanced

175

scenarios requiring additional control over step behavior.

176

"""

177

178

def entrypoint(self, *args, **kwargs):

179

"""

180

Main execution method to be implemented by subclass.

181

182

Parameters:

183

- *args: Positional arguments

184

- **kwargs: Keyword arguments

185

186

Returns:

187

Step outputs

188

"""

189

```

190

191

Import from:

192

193

```python

194

from zenml.steps import BaseStep

195

```

196

197

### Pipeline Context

198

199

Access pipeline execution context and metadata.

200

201

```python { .api }

202

class PipelineContext:

203

"""

204

Pipeline execution context object.

205

206

Attributes:

207

- name: Pipeline name

208

- run_name: Pipeline run name

209

- pipeline_run: PipelineRunResponse object with full run details

210

- extra: Extra metadata dict

211

- model: Model configuration if set

212

"""

213

214

@property

215

def name(self) -> str:

216

"""Get pipeline name."""

217

218

@property

219

def run_name(self) -> str:

220

"""Get pipeline run name."""

221

222

@property

223

def pipeline_run(self):

224

"""

225

Get pipeline run response object.

226

227

Returns:

228

PipelineRunResponse: Full pipeline run details including status, timestamps, configuration

229

"""

230

231

@property

232

def extra(self) -> dict:

233

"""Get extra metadata."""

234

235

@property

236

def model(self):

237

"""

238

Get model configuration.

239

240

Returns:

241

ModelVersionResponse or None

242

"""

243

244

245

def get_pipeline_context() -> PipelineContext:

246

"""

247

Get the current pipeline execution context.

248

249

Returns:

250

PipelineContext: Context object with pipeline metadata

251

252

Raises:

253

RuntimeError: If called outside pipeline execution

254

255

Example:

256

```python

257

from zenml import step, get_pipeline_context

258

259

@step

260

def my_step():

261

context = get_pipeline_context()

262

print(f"Running in pipeline: {context.name}")

263

print(f"Run name: {context.run_name}")

264

if context.model:

265

print(f"Model: {context.model.name}")

266

```

267

"""

268

```

269

270

Import from:

271

272

```python

273

from zenml import get_pipeline_context

274

from zenml.pipelines import PipelineContext

275

```

276

277

### Step Context

278

279

Access step execution context and utilities.

280

281

```python { .api }

282

class StepContext:

283

"""

284

Step execution context object.

285

286

Provides access to step metadata, pipeline information, and utilities

287

for interacting with the stack during step execution.

288

289

Attributes:

290

- step_name: Current step name

291

- pipeline_name: Parent pipeline name

292

- run_name: Pipeline run name

293

- step_run: StepRunResponse object with full step run details

294

- model: Model configuration if set

295

- inputs: Input artifacts metadata

296

- outputs: Output artifacts metadata

297

"""

298

299

@property

300

def step_name(self) -> str:

301

"""Get step name."""

302

303

@property

304

def pipeline_name(self) -> str:

305

"""Get pipeline name."""

306

307

@property

308

def run_name(self) -> str:

309

"""Get pipeline run name."""

310

311

@property

312

def step_run(self):

313

"""

314

Get step run response object.

315

316

Returns:

317

StepRunResponse: Full step run details including status, timestamps, configuration

318

"""

319

320

@property

321

def model(self):

322

"""

323

Get model configuration.

324

325

Returns:

326

ModelVersionResponse or None

327

"""

328

329

@property

330

def inputs(self) -> dict:

331

"""Get input artifacts metadata."""

332

333

@property

334

def outputs(self) -> dict:

335

"""Get output artifacts metadata."""

336

337

338

def get_step_context() -> StepContext:

339

"""

340

Get the current step execution context.

341

342

Returns:

343

StepContext: Context object with step metadata and utilities

344

345

Raises:

346

RuntimeError: If called outside step execution

347

348

Example:

349

```python

350

from zenml import step, get_step_context

351

352

@step

353

def my_step(data: list) -> float:

354

context = get_step_context()

355

print(f"Running step: {context.step_name}")

356

print(f"In pipeline: {context.pipeline_name}")

357

print(f"Run: {context.run_name}")

358

359

# Access step run details

360

print(f"Status: {context.step_run.status}")

361

362

# Training logic

363

accuracy = 0.95

364

return accuracy

365

```

366

"""

367

```

368

369

Import from:

370

371

```python

372

from zenml import get_step_context

373

from zenml.steps import StepContext

374

```

375

376

### Schedule Configuration

377

378

Configuration for scheduling pipeline runs.

379

380

```python { .api }

381

class Schedule:

382

"""

383

Schedule configuration for pipeline runs.

384

385

Supports cron expressions and interval-based scheduling.

386

387

Attributes:

388

- name: Schedule name

389

- cron_expression: Cron expression (e.g., "0 0 * * *" for daily at midnight)

390

- start_time: Schedule start datetime

391

- end_time: Schedule end datetime

392

- interval_second: Interval as timedelta between runs (for periodic schedules)

393

- catchup: Whether to catch up on missed runs

394

- run_once_start_time: When to run the pipeline once (for one-time schedules)

395

"""

396

397

def __init__(

398

self,

399

name: str = None,

400

cron_expression: str = None,

401

start_time: datetime = None,

402

end_time: datetime = None,

403

interval_second: timedelta = None,

404

catchup: bool = False,

405

run_once_start_time: datetime = None

406

):

407

"""

408

Initialize schedule configuration.

409

410

Use either cron_expression or interval_second, not both.

411

412

Parameters:

413

- name: Schedule name

414

- cron_expression: Cron expression for schedule

415

- start_time: When to start the schedule

416

- end_time: When to end the schedule

417

- interval_second: Run interval as timedelta object

418

- catchup: Whether to catch up on missed runs

419

- run_once_start_time: When to run the pipeline once

420

421

Example:

422

```python

423

from zenml import pipeline, Schedule

424

from datetime import datetime, timedelta

425

426

# Daily schedule

427

schedule = Schedule(

428

name="daily_training",

429

cron_expression="0 0 * * *",

430

start_time=datetime.now()

431

)

432

433

# Interval-based schedule (every 2 hours)

434

schedule = Schedule(

435

name="periodic_training",

436

interval_second=2 * 60 * 60,

437

start_time=datetime.now()

438

)

439

440

@pipeline(schedule=schedule)

441

def my_pipeline():

442

# Pipeline definition

443

pass

444

```

445

"""

446

```

447

448

Import from:

449

450

```python

451

from zenml import Schedule

452

from zenml.config import Schedule

453

from zenml.pipelines import Schedule

454

```

455

456

### Resource Settings

457

458

Resource allocation settings for steps.

459

460

```python { .api }

461

class ResourceSettings:

462

"""

463

Settings for resource allocation (CPU, GPU, memory).

464

465

Attributes:

466

- cpu_count: Number of CPUs

467

- gpu_count: Number of GPUs

468

- memory: Memory allocation (e.g., "4GB", "512MB")

469

"""

470

471

def __init__(

472

self,

473

cpu_count: int = None,

474

gpu_count: int = None,

475

memory: str = None

476

):

477

"""

478

Initialize resource settings.

479

480

Parameters:

481

- cpu_count: Number of CPUs to allocate

482

- gpu_count: Number of GPUs to allocate

483

- memory: Memory to allocate (e.g., "4GB", "512MB")

484

485

Example:

486

```python

487

from zenml import step

488

from zenml.config import ResourceSettings

489

490

@step(

491

settings={

492

"resources": ResourceSettings(

493

cpu_count=4,

494

gpu_count=1,

495

memory="8GB"

496

)

497

}

498

)

499

def train_model(data: list) -> float:

500

# Training with allocated resources

501

return 0.95

502

```

503

"""

504

```

505

506

Import from:

507

508

```python

509

from zenml.config import ResourceSettings

510

from zenml.steps import ResourceSettings

511

```

512

513

## Usage Examples

514

515

### Basic Pipeline with Multiple Steps

516

517

```python

518

from zenml import pipeline, step

519

from typing import Tuple

520

521

@step

522

def load_data() -> Tuple[list, list]:

523

"""Load training and test data."""

524

train_data = [1, 2, 3, 4, 5]

525

test_data = [6, 7, 8, 9, 10]

526

return train_data, test_data

527

528

@step

529

def preprocess_data(train: list, test: list) -> Tuple[list, list]:

530

"""Preprocess data."""

531

train_processed = [x * 2 for x in train]

532

test_processed = [x * 2 for x in test]

533

return train_processed, test_processed

534

535

@step

536

def train_model(data: list) -> dict:

537

"""Train a model."""

538

return {"accuracy": 0.95, "loss": 0.05}

539

540

@step

541

def evaluate_model(model: dict, test_data: list) -> float:

542

"""Evaluate model on test data."""

543

return model["accuracy"] * 0.98

544

545

@pipeline

546

def ml_pipeline():

547

"""Complete ML training pipeline."""

548

train, test = load_data()

549

train_processed, test_processed = preprocess_data(train, test)

550

model = train_model(train_processed)

551

accuracy = evaluate_model(model, test_processed)

552

return accuracy

553

554

if __name__ == "__main__":

555

ml_pipeline()

556

```

557

558

### Pipeline with Model Control Plane

559

560

```python

561

from zenml import pipeline, step, Model

562

563

@step

564

def train_model(data: list) -> dict:

565

"""Train and return model."""

566

return {"weights": [0.1, 0.2, 0.3], "accuracy": 0.95}

567

568

@pipeline(

569

model=Model(

570

name="text_classifier",

571

version="1.0.0",

572

license="Apache-2.0",

573

description="Text classification model",

574

tags=["nlp", "classification"]

575

)

576

)

577

def training_pipeline():

578

"""Pipeline with model tracking."""

579

data = [1, 2, 3, 4, 5]

580

model = train_model(data)

581

return model

582

583

if __name__ == "__main__":

584

training_pipeline()

585

```

586

587

### Pipeline with Hooks

588

589

```python

590

from zenml import pipeline, step

591

from zenml.hooks import alerter_success_hook, alerter_failure_hook

592

593

@step

594

def train_model(data: list) -> float:

595

"""Train model."""

596

return 0.95

597

598

@pipeline(

599

on_success=alerter_success_hook("slack_alerter", "Training completed!"),

600

on_failure=alerter_failure_hook("slack_alerter", "Training failed!")

601

)

602

def monitored_pipeline():

603

"""Pipeline with alerting."""

604

data = [1, 2, 3]

605

accuracy = train_model(data)

606

return accuracy

607

```

608

609

### Accessing Context in Steps

610

611

```python

612

from zenml import step, get_step_context, get_pipeline_context

613

614

@step

615

def contextual_step(data: list) -> dict:

616

"""Step that uses context."""

617

step_context = get_step_context()

618

pipeline_context = get_pipeline_context()

619

620

print(f"Step: {step_context.step_name}")

621

print(f"Pipeline: {pipeline_context.name}")

622

print(f"Run: {pipeline_context.run_name}")

623

624

# Access model if configured

625

if pipeline_context.model:

626

print(f"Model: {pipeline_context.model.name}")

627

628

return {

629

"step": step_context.step_name,

630

"pipeline": pipeline_context.name,

631

"processed_data": [x * 2 for x in data]

632

}

633

```

634

635

### Step with Resource Allocation

636

637

```python

638

from zenml import step

639

from zenml.config import ResourceSettings

640

641

@step(

642

settings={

643

"resources": ResourceSettings(

644

cpu_count=8,

645

gpu_count=2,

646

memory="16GB"

647

)

648

}

649

)

650

def gpu_intensive_step(data: list) -> dict:

651

"""Step requiring GPU resources."""

652

# GPU training logic

653

return {"model": "trained_model", "accuracy": 0.98}

654

```

655