or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-api.mdconfiguration.mdcontext-utilities.mdcore-workflows.mddeployments.mdindex.mdruntime-context.mdstate-management.mdvariables.md

core-workflows.mddocs/

0

# Core Workflows

1

2

Core workflow functionality in Prefect, including flow and task creation with decorators, execution control, serving deployments, and flow lifecycle management. This forms the foundation of Prefect's orchestration capabilities.

3

4

## Capabilities

5

6

### Flow Decorator

7

8

Creates Prefect flows from Python functions, enabling workflow orchestration with built-in retry logic, state management, and observability.

9

10

```python { .api }

11

def flow(

12

fn=None,

13

*,

14

name: str = None,

15

description: str = None,

16

version: str = None,

17

flow_run_name: Union[str, Callable[[], str]] = None,

18

task_runner: TaskRunner = None,

19

timeout_seconds: Union[int, float] = None,

20

validate_parameters: bool = True,

21

retries: int = None,

22

retry_delay_seconds: Union[int, float] = None,

23

persist_result: bool = None,

24

result_storage: ResultStorage = None,

25

result_serializer: ResultSerializer = None,

26

cache_result_in_memory: bool = True,

27

on_completion: List[FlowStateHook] = None,

28

on_failure: List[FlowStateHook] = None,

29

on_cancellation: List[FlowStateHook] = None,

30

on_crashed: List[FlowStateHook] = None,

31

on_running: List[FlowStateHook] = None,

32

log_prints: bool = None,

33

):

34

"""

35

Decorator to create Prefect flows from functions.

36

37

Parameters:

38

- fn: Function to decorate (provided automatically when used as decorator)

39

- name: Name for the flow (defaults to function name)

40

- description: Description of the flow's purpose

41

- version: Version string for the flow

42

- flow_run_name: Name template for flow runs

43

- task_runner: Task runner for executing tasks within the flow

44

- timeout_seconds: Maximum runtime for the flow

45

- validate_parameters: Whether to validate flow parameters against type hints

46

- retries: Number of retry attempts on failure

47

- retry_delay_seconds: Delay between retry attempts

48

- persist_result: Whether to persist flow results

49

- result_storage: Storage backend for results

50

- result_serializer: Serializer for results

51

- cache_result_in_memory: Whether to cache results in memory

52

- on_completion: Hooks to run when flow completes successfully

53

- on_failure: Hooks to run when flow fails

54

- on_cancellation: Hooks to run when flow is cancelled

55

- on_crashed: Hooks to run when flow crashes

56

- on_running: Hooks to run when flow starts running

57

- log_prints: Whether to log print statements

58

59

Returns:

60

Flow object when used as decorator

61

"""

62

```

63

64

#### Usage Examples

65

66

```python

67

from prefect import flow, task

68

from prefect.task_runners import ThreadPoolTaskRunner

69

70

# Basic flow

71

@flow

72

def my_workflow():

73

"""Simple workflow example."""

74

return "Hello, Prefect!"

75

76

# Flow with configuration

77

@flow(

78

name="Data Processing Pipeline",

79

description="Process incoming data files",

80

version="1.0.0",

81

retries=3,

82

retry_delay_seconds=60,

83

task_runner=ThreadPoolTaskRunner(max_workers=4),

84

timeout_seconds=3600

85

)

86

def data_pipeline(file_path: str):

87

"""Data processing workflow with retry and timeout configuration."""

88

# Workflow logic here

89

pass

90

91

# Flow with hooks

92

@flow(

93

on_completion=[lambda flow, flow_run, state: print("Flow completed!")],

94

on_failure=[lambda flow, flow_run, state: print("Flow failed!")],

95

)

96

def monitored_flow():

97

"""Flow with state change hooks for monitoring."""

98

pass

99

```

100

101

### Task Decorator

102

103

Creates Prefect tasks from Python functions, enabling parallel execution, caching, retries, and state management.

104

105

```python { .api }

106

def task(

107

fn=None,

108

*,

109

name: str = None,

110

description: str = None,

111

tags: Iterable[str] = None,

112

version: str = None,

113

cache_key_fn: Callable[..., str] = None,

114

cache_expiration: datetime.timedelta = None,

115

task_run_name: Union[str, Callable[[], str]] = None,

116

retries: int = None,

117

retry_delay_seconds: Union[int, float] = None,

118

retry_condition_fn: Callable[..., bool] = None,

119

persist_result: bool = None,

120

result_storage: ResultStorage = None,

121

result_serializer: ResultSerializer = None,

122

cache_result_in_memory: bool = True,

123

timeout_seconds: Union[int, float] = None,

124

log_prints: bool = None,

125

refresh_cache: bool = None,

126

on_completion: List[StateHookCallable] = None,

127

on_failure: List[StateHookCallable] = None,

128

):

129

"""

130

Decorator to create Prefect tasks from functions.

131

132

Parameters:

133

- fn: Function to decorate (provided automatically when used as decorator)

134

- name: Name for the task (defaults to function name)

135

- description: Description of the task's purpose

136

- tags: Tags to apply to the task and its runs

137

- version: Version string for the task

138

- cache_key_fn: Function to generate cache keys from task inputs

139

- cache_expiration: Duration after which cached results expire

140

- task_run_name: Name template for task runs

141

- retries: Number of retry attempts on failure

142

- retry_delay_seconds: Delay between retry attempts

143

- retry_condition_fn: Function to determine if task should retry

144

- persist_result: Whether to persist task results

145

- result_storage: Storage backend for results

146

- result_serializer: Serializer for results

147

- cache_result_in_memory: Whether to cache results in memory

148

- timeout_seconds: Maximum runtime for the task

149

- log_prints: Whether to log print statements

150

- refresh_cache: Whether to refresh cached results

151

- on_completion: Hooks to run when task completes successfully

152

- on_failure: Hooks to run when task fails

153

154

Returns:

155

Task object when used as decorator

156

"""

157

```

158

159

#### Usage Examples

160

161

```python

162

from prefect import flow, task

163

from prefect.tasks import task_input_hash

164

from datetime import timedelta

165

166

# Basic task

167

@task

168

def process_data(data):

169

"""Simple task example."""

170

return data * 2

171

172

# Task with caching

173

@task(

174

cache_key_fn=task_input_hash,

175

cache_expiration=timedelta(hours=1),

176

retries=3,

177

retry_delay_seconds=30

178

)

179

def expensive_computation(input_value):

180

"""Task with caching and retry configuration."""

181

# Expensive computation here

182

return input_value ** 2

183

184

# Task with custom retry condition

185

def should_retry(task, task_run, state):

186

"""Custom retry condition function."""

187

return "timeout" in state.message.lower() if state.message else False

188

189

@task(

190

retries=5,

191

retry_condition_fn=should_retry,

192

timeout_seconds=300,

193

tags=["api", "external"]

194

)

195

def api_call(endpoint: str):

196

"""Task with custom retry logic for API calls."""

197

# API call logic here

198

pass

199

200

# Using tasks in a flow

201

@flow

202

def workflow():

203

data = process_data(10)

204

result = expensive_computation(data)

205

return result

206

```

207

208

### Flow Class

209

210

The Flow class represents a Prefect workflow definition with methods for execution, validation, and deployment.

211

212

```python { .api }

213

class Flow(Generic[P, R]):

214

"""

215

Prefect workflow definition class.

216

217

Attributes:

218

- name: Flow name

219

- fn: The decorated function

220

- description: Flow description

221

- version: Flow version

222

- flow_run_name: Template for flow run names

223

- task_runner: Task runner for executing tasks

224

- timeout_seconds: Flow timeout

225

- validate_parameters: Whether to validate parameters

226

- retries: Number of retry attempts

227

- retry_delay_seconds: Delay between retries

228

- isasync: Whether the flow function is async

229

"""

230

231

def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:

232

"""Execute the flow with given parameters."""

233

234

def with_options(

235

self,

236

*,

237

name: str = None,

238

description: str = None,

239

version: str = None,

240

flow_run_name: Union[str, Callable] = None,

241

retries: int = None,

242

retry_delay_seconds: Union[int, float] = None,

243

timeout_seconds: Union[int, float] = None,

244

validate_parameters: bool = None,

245

persist_result: bool = None,

246

result_storage: ResultStorage = None,

247

result_serializer: ResultSerializer = None,

248

cache_result_in_memory: bool = None,

249

log_prints: bool = None,

250

) -> "Flow[P, R]":

251

"""Create a copy of the flow with modified options."""

252

253

def serve(

254

self,

255

name: str = None,

256

tags: List[str] = None,

257

parameters: Dict[str, Any] = None,

258

description: str = None,

259

version: str = None,

260

enforce_parameter_schema: bool = None,

261

pause_on_shutdown: bool = True,

262

print_starting_message: bool = True,

263

limit: int = None,

264

webserver: bool = False,

265

**kwargs

266

) -> None:

267

"""Serve the flow for remote execution."""

268

```

269

270

### Task Class

271

272

The Task class represents a Prefect task definition with methods for execution and configuration.

273

274

```python { .api }

275

class Task(Generic[P, R]):

276

"""

277

Prefect task definition class.

278

279

Attributes:

280

- name: Task name

281

- fn: The decorated function

282

- description: Task description

283

- version: Task version

284

- tags: Task tags

285

- cache_key_fn: Cache key generation function

286

- cache_expiration: Cache expiration duration

287

- retries: Number of retry attempts

288

- retry_delay_seconds: Delay between retries

289

- retry_condition_fn: Custom retry condition function

290

- timeout_seconds: Task timeout

291

- isasync: Whether the task function is async

292

"""

293

294

def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:

295

"""Execute the task with given parameters."""

296

297

def with_options(

298

self,

299

*,

300

name: str = None,

301

description: str = None,

302

tags: Iterable[str] = None,

303

version: str = None,

304

cache_key_fn: Callable = None,

305

cache_expiration: timedelta = None,

306

retries: int = None,

307

retry_delay_seconds: Union[int, float] = None,

308

retry_condition_fn: Callable = None,

309

persist_result: bool = None,

310

result_storage: ResultStorage = None,

311

result_serializer: ResultSerializer = None,

312

cache_result_in_memory: bool = None,

313

timeout_seconds: Union[int, float] = None,

314

log_prints: bool = None,

315

refresh_cache: bool = None,

316

) -> "Task[P, R]":

317

"""Create a copy of the task with modified options."""

318

319

def map(

320

self,

321

*args: Any,

322

**kwargs: Any,

323

) -> List[R]:

324

"""Execute the task over iterable inputs."""

325

```

326

327

### Serving Flows

328

329

Functions for serving flows as deployments that can be triggered remotely.

330

331

```python { .api }

332

def serve(

333

*flows: Flow,

334

name: str = None,

335

tags: List[str] = None,

336

parameters: Dict[str, Any] = None,

337

description: str = None,

338

version: str = None,

339

enforce_parameter_schema: bool = None,

340

pause_on_shutdown: bool = True,

341

print_starting_message: bool = True,

342

limit: int = None,

343

webserver: bool = False,

344

**kwargs

345

) -> None:

346

"""

347

Serve multiple flows for remote execution.

348

349

Parameters:

350

- flows: One or more flow objects to serve

351

- name: Name for the served deployment

352

- tags: Tags to apply to the served flows

353

- parameters: Default parameters for flow runs

354

- description: Description for the deployment

355

- version: Version for the deployment

356

- enforce_parameter_schema: Whether to validate parameters

357

- pause_on_shutdown: Whether to pause on shutdown

358

- print_starting_message: Whether to print startup message

359

- limit: Maximum number of concurrent flow runs

360

- webserver: Whether to start a webserver

361

"""

362

363

async def aserve(

364

*flows: Flow,

365

name: str = None,

366

tags: List[str] = None,

367

parameters: Dict[str, Any] = None,

368

description: str = None,

369

version: str = None,

370

enforce_parameter_schema: bool = None,

371

pause_on_shutdown: bool = True,

372

print_starting_message: bool = True,

373

limit: int = None,

374

webserver: bool = False,

375

**kwargs

376

) -> None:

377

"""

378

Asynchronously serve multiple flows for remote execution.

379

380

Same parameters as serve() but runs asynchronously.

381

"""

382

```

383

384

#### Usage Examples

385

386

```python

387

from prefect import flow, serve

388

389

@flow

390

def data_pipeline():

391

# Pipeline logic here

392

pass

393

394

@flow

395

def monitoring_flow():

396

# Monitoring logic here

397

pass

398

399

# Serve multiple flows

400

if __name__ == "__main__":

401

serve(

402

data_pipeline,

403

monitoring_flow,

404

name="Production Flows",

405

tags=["production", "data"],

406

limit=10,

407

webserver=True

408

)

409

```

410

411

### Caching and Performance

412

413

Task caching utilities for improving performance and avoiding redundant computation.

414

415

```python { .api }

416

def task_input_hash(*args, **kwargs) -> str:

417

"""

418

Generate a cache key from task inputs.

419

420

Creates a hash of the task inputs for use as a cache key. This function

421

can be used as the cache_key_fn parameter in task decorators.

422

423

Parameters:

424

- args: Positional arguments to hash

425

- kwargs: Keyword arguments to hash

426

427

Returns:

428

String hash of the inputs

429

"""

430

431

def exponential_backoff(backoff_factor: float = 2.0) -> Iterator[float]:

432

"""

433

Generate exponential backoff delays.

434

435

Yields increasingly longer delays for retry attempts, useful for

436

handling rate limits and temporary failures.

437

438

Parameters:

439

- backoff_factor: Multiplier for each successive delay

440

441

Yields:

442

Float delay values in seconds

443

"""

444

```

445

446

#### Usage Examples

447

448

```python

449

from prefect import task

450

from prefect.tasks import task_input_hash, exponential_backoff

451

from datetime import timedelta

452

453

@task(

454

cache_key_fn=task_input_hash,

455

cache_expiration=timedelta(hours=1)

456

)

457

def cached_computation(x, y):

458

"""Task with input-based caching."""

459

# Expensive computation

460

return x ** y

461

462

# Custom retry with exponential backoff

463

@task(

464

retries=5,

465

retry_delay_seconds=list(exponential_backoff(2.0))[:5]

466

)

467

def api_with_backoff():

468

"""Task with exponential backoff retry."""

469

# API call that might need retries

470

pass

471

```

472

473

## Types

474

475

Types specific to workflow functionality:

476

477

```python { .api }

478

from typing import Callable, List, Optional, Union, Any, Iterable, Generic, TypeVar

479

from datetime import timedelta

480

481

P = TypeVar("P") # Parameters

482

R = TypeVar("R") # Return type

483

484

# Hook types

485

FlowStateHook = Callable[[Flow, FlowRun, State], None]

486

StateHookCallable = Callable[[Task, TaskRun, State], None]

487

RetryConditionCallable = Callable[[Task, TaskRun, State], bool]

488

489

# Task runner interface

490

class TaskRunner:

491

"""Base class for task execution backends."""

492

pass

493

494

# Configuration types

495

class ResultStorage:

496

"""Result storage interface."""

497

pass

498

499

class ResultSerializer:

500

"""Result serialization interface."""

501

pass

502

503

# Task options type

504

class TaskOptions(TypedDict, total=False):

505

name: Optional[str]

506

description: Optional[str]

507

tags: Optional[Iterable[str]]

508

version: Optional[str]

509

cache_key_fn: Optional[Callable[..., str]]

510

cache_expiration: Optional[timedelta]

511

retries: Optional[int]

512

retry_delay_seconds: Optional[Union[int, float]]

513

retry_condition_fn: Optional[RetryConditionCallable]

514

timeout_seconds: Optional[Union[int, float]]

515

```