0
# Core Workflows
1
2
Core workflow functionality in Prefect, including flow and task creation with decorators, execution control, serving deployments, and flow lifecycle management. This forms the foundation of Prefect's orchestration capabilities.
3
4
## Capabilities
5
6
### Flow Decorator
7
8
Creates Prefect flows from Python functions, enabling workflow orchestration with built-in retry logic, state management, and observability.
9
10
```python { .api }
11
def flow(
12
fn=None,
13
*,
14
name: str = None,
15
description: str = None,
16
version: str = None,
17
flow_run_name: Union[str, Callable[[], str]] = None,
18
task_runner: TaskRunner = None,
19
timeout_seconds: Union[int, float] = None,
20
validate_parameters: bool = True,
21
retries: int = None,
22
retry_delay_seconds: Union[int, float] = None,
23
persist_result: bool = None,
24
result_storage: ResultStorage = None,
25
result_serializer: ResultSerializer = None,
26
cache_result_in_memory: bool = True,
27
on_completion: List[FlowStateHook] = None,
28
on_failure: List[FlowStateHook] = None,
29
on_cancellation: List[FlowStateHook] = None,
30
on_crashed: List[FlowStateHook] = None,
31
on_running: List[FlowStateHook] = None,
32
log_prints: bool = None,
33
):
34
"""
35
Decorator to create Prefect flows from functions.
36
37
Parameters:
38
- fn: Function to decorate (provided automatically when used as decorator)
39
- name: Name for the flow (defaults to function name)
40
- description: Description of the flow's purpose
41
- version: Version string for the flow
42
- flow_run_name: Name template for flow runs
43
- task_runner: Task runner for executing tasks within the flow
44
- timeout_seconds: Maximum runtime for the flow
45
- validate_parameters: Whether to validate flow parameters against type hints
46
- retries: Number of retry attempts on failure
47
- retry_delay_seconds: Delay between retry attempts
48
- persist_result: Whether to persist flow results
49
- result_storage: Storage backend for results
50
- result_serializer: Serializer for results
51
- cache_result_in_memory: Whether to cache results in memory
52
- on_completion: Hooks to run when flow completes successfully
53
- on_failure: Hooks to run when flow fails
54
- on_cancellation: Hooks to run when flow is cancelled
55
- on_crashed: Hooks to run when flow crashes
56
- on_running: Hooks to run when flow starts running
57
- log_prints: Whether to log print statements
58
59
Returns:
60
Flow object when used as decorator
61
"""
62
```
63
64
#### Usage Examples
65
66
```python
67
from prefect import flow, task
68
from prefect.task_runners import ThreadPoolTaskRunner
69
70
# Basic flow
71
@flow
72
def my_workflow():
73
"""Simple workflow example."""
74
return "Hello, Prefect!"
75
76
# Flow with configuration
77
@flow(
78
name="Data Processing Pipeline",
79
description="Process incoming data files",
80
version="1.0.0",
81
retries=3,
82
retry_delay_seconds=60,
83
task_runner=ThreadPoolTaskRunner(max_workers=4),
84
timeout_seconds=3600
85
)
86
def data_pipeline(file_path: str):
87
"""Data processing workflow with retry and timeout configuration."""
88
# Workflow logic here
89
pass
90
91
# Flow with hooks
92
@flow(
93
on_completion=[lambda flow, flow_run, state: print("Flow completed!")],
94
on_failure=[lambda flow, flow_run, state: print("Flow failed!")],
95
)
96
def monitored_flow():
97
"""Flow with state change hooks for monitoring."""
98
pass
99
```
100
101
### Task Decorator
102
103
Creates Prefect tasks from Python functions, enabling parallel execution, caching, retries, and state management.
104
105
```python { .api }
106
def task(
107
fn=None,
108
*,
109
name: str = None,
110
description: str = None,
111
tags: Iterable[str] = None,
112
version: str = None,
113
cache_key_fn: Callable[..., str] = None,
114
cache_expiration: datetime.timedelta = None,
115
task_run_name: Union[str, Callable[[], str]] = None,
116
retries: int = None,
117
retry_delay_seconds: Union[int, float] = None,
118
retry_condition_fn: Callable[..., bool] = None,
119
persist_result: bool = None,
120
result_storage: ResultStorage = None,
121
result_serializer: ResultSerializer = None,
122
cache_result_in_memory: bool = True,
123
timeout_seconds: Union[int, float] = None,
124
log_prints: bool = None,
125
refresh_cache: bool = None,
126
on_completion: List[StateHookCallable] = None,
127
on_failure: List[StateHookCallable] = None,
128
):
129
"""
130
Decorator to create Prefect tasks from functions.
131
132
Parameters:
133
- fn: Function to decorate (provided automatically when used as decorator)
134
- name: Name for the task (defaults to function name)
135
- description: Description of the task's purpose
136
- tags: Tags to apply to the task and its runs
137
- version: Version string for the task
138
- cache_key_fn: Function to generate cache keys from task inputs
139
- cache_expiration: Duration after which cached results expire
140
- task_run_name: Name template for task runs
141
- retries: Number of retry attempts on failure
142
- retry_delay_seconds: Delay between retry attempts
143
- retry_condition_fn: Function to determine if task should retry
144
- persist_result: Whether to persist task results
145
- result_storage: Storage backend for results
146
- result_serializer: Serializer for results
147
- cache_result_in_memory: Whether to cache results in memory
148
- timeout_seconds: Maximum runtime for the task
149
- log_prints: Whether to log print statements
150
- refresh_cache: Whether to refresh cached results
151
- on_completion: Hooks to run when task completes successfully
152
- on_failure: Hooks to run when task fails
153
154
Returns:
155
Task object when used as decorator
156
"""
157
```
158
159
#### Usage Examples
160
161
```python
162
from prefect import flow, task
163
from prefect.tasks import task_input_hash
164
from datetime import timedelta
165
166
# Basic task
167
@task
168
def process_data(data):
169
"""Simple task example."""
170
return data * 2
171
172
# Task with caching
173
@task(
174
cache_key_fn=task_input_hash,
175
cache_expiration=timedelta(hours=1),
176
retries=3,
177
retry_delay_seconds=30
178
)
179
def expensive_computation(input_value):
180
"""Task with caching and retry configuration."""
181
# Expensive computation here
182
return input_value ** 2
183
184
# Task with custom retry condition
185
def should_retry(task, task_run, state):
186
"""Custom retry condition function."""
187
return "timeout" in state.message.lower() if state.message else False
188
189
@task(
190
retries=5,
191
retry_condition_fn=should_retry,
192
timeout_seconds=300,
193
tags=["api", "external"]
194
)
195
def api_call(endpoint: str):
196
"""Task with custom retry logic for API calls."""
197
# API call logic here
198
pass
199
200
# Using tasks in a flow
201
@flow
202
def workflow():
203
data = process_data(10)
204
result = expensive_computation(data)
205
return result
206
```
207
208
### Flow Class
209
210
The Flow class represents a Prefect workflow definition with methods for execution, validation, and deployment.
211
212
```python { .api }
213
class Flow(Generic[P, R]):
214
"""
215
Prefect workflow definition class.
216
217
Attributes:
218
- name: Flow name
219
- fn: The decorated function
220
- description: Flow description
221
- version: Flow version
222
- flow_run_name: Template for flow run names
223
- task_runner: Task runner for executing tasks
224
- timeout_seconds: Flow timeout
225
- validate_parameters: Whether to validate parameters
226
- retries: Number of retry attempts
227
- retry_delay_seconds: Delay between retries
228
- isasync: Whether the flow function is async
229
"""
230
231
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
232
"""Execute the flow with given parameters."""
233
234
def with_options(
235
self,
236
*,
237
name: str = None,
238
description: str = None,
239
version: str = None,
240
flow_run_name: Union[str, Callable] = None,
241
retries: int = None,
242
retry_delay_seconds: Union[int, float] = None,
243
timeout_seconds: Union[int, float] = None,
244
validate_parameters: bool = None,
245
persist_result: bool = None,
246
result_storage: ResultStorage = None,
247
result_serializer: ResultSerializer = None,
248
cache_result_in_memory: bool = None,
249
log_prints: bool = None,
250
) -> "Flow[P, R]":
251
"""Create a copy of the flow with modified options."""
252
253
def serve(
254
self,
255
name: str = None,
256
tags: List[str] = None,
257
parameters: Dict[str, Any] = None,
258
description: str = None,
259
version: str = None,
260
enforce_parameter_schema: bool = None,
261
pause_on_shutdown: bool = True,
262
print_starting_message: bool = True,
263
limit: int = None,
264
webserver: bool = False,
265
**kwargs
266
) -> None:
267
"""Serve the flow for remote execution."""
268
```
269
270
### Task Class
271
272
The Task class represents a Prefect task definition with methods for execution and configuration.
273
274
```python { .api }
275
class Task(Generic[P, R]):
276
"""
277
Prefect task definition class.
278
279
Attributes:
280
- name: Task name
281
- fn: The decorated function
282
- description: Task description
283
- version: Task version
284
- tags: Task tags
285
- cache_key_fn: Cache key generation function
286
- cache_expiration: Cache expiration duration
287
- retries: Number of retry attempts
288
- retry_delay_seconds: Delay between retries
289
- retry_condition_fn: Custom retry condition function
290
- timeout_seconds: Task timeout
291
- isasync: Whether the task function is async
292
"""
293
294
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
295
"""Execute the task with given parameters."""
296
297
def with_options(
298
self,
299
*,
300
name: str = None,
301
description: str = None,
302
tags: Iterable[str] = None,
303
version: str = None,
304
cache_key_fn: Callable = None,
305
cache_expiration: timedelta = None,
306
retries: int = None,
307
retry_delay_seconds: Union[int, float] = None,
308
retry_condition_fn: Callable = None,
309
persist_result: bool = None,
310
result_storage: ResultStorage = None,
311
result_serializer: ResultSerializer = None,
312
cache_result_in_memory: bool = None,
313
timeout_seconds: Union[int, float] = None,
314
log_prints: bool = None,
315
refresh_cache: bool = None,
316
) -> "Task[P, R]":
317
"""Create a copy of the task with modified options."""
318
319
def map(
320
self,
321
*args: Any,
322
**kwargs: Any,
323
) -> List[R]:
324
"""Execute the task over iterable inputs."""
325
```
326
327
### Serving Flows
328
329
Functions for serving flows as deployments that can be triggered remotely.
330
331
```python { .api }
332
def serve(
333
*flows: Flow,
334
name: str = None,
335
tags: List[str] = None,
336
parameters: Dict[str, Any] = None,
337
description: str = None,
338
version: str = None,
339
enforce_parameter_schema: bool = None,
340
pause_on_shutdown: bool = True,
341
print_starting_message: bool = True,
342
limit: int = None,
343
webserver: bool = False,
344
**kwargs
345
) -> None:
346
"""
347
Serve multiple flows for remote execution.
348
349
Parameters:
350
- flows: One or more flow objects to serve
351
- name: Name for the served deployment
352
- tags: Tags to apply to the served flows
353
- parameters: Default parameters for flow runs
354
- description: Description for the deployment
355
- version: Version for the deployment
356
- enforce_parameter_schema: Whether to validate parameters
357
- pause_on_shutdown: Whether to pause on shutdown
358
- print_starting_message: Whether to print startup message
359
- limit: Maximum number of concurrent flow runs
360
- webserver: Whether to start a webserver
361
"""
362
363
async def aserve(
364
*flows: Flow,
365
name: str = None,
366
tags: List[str] = None,
367
parameters: Dict[str, Any] = None,
368
description: str = None,
369
version: str = None,
370
enforce_parameter_schema: bool = None,
371
pause_on_shutdown: bool = True,
372
print_starting_message: bool = True,
373
limit: int = None,
374
webserver: bool = False,
375
**kwargs
376
) -> None:
377
"""
378
Asynchronously serve multiple flows for remote execution.
379
380
Same parameters as serve() but runs asynchronously.
381
"""
382
```
383
384
#### Usage Examples
385
386
```python
387
from prefect import flow, serve
388
389
@flow
390
def data_pipeline():
391
# Pipeline logic here
392
pass
393
394
@flow
395
def monitoring_flow():
396
# Monitoring logic here
397
pass
398
399
# Serve multiple flows
400
if __name__ == "__main__":
401
serve(
402
data_pipeline,
403
monitoring_flow,
404
name="Production Flows",
405
tags=["production", "data"],
406
limit=10,
407
webserver=True
408
)
409
```
410
411
### Caching and Performance
412
413
Task caching utilities for improving performance and avoiding redundant computation.
414
415
```python { .api }
416
def task_input_hash(*args, **kwargs) -> str:
417
"""
418
Generate a cache key from task inputs.
419
420
Creates a hash of the task inputs for use as a cache key. This function
421
can be used as the cache_key_fn parameter in task decorators.
422
423
Parameters:
424
- args: Positional arguments to hash
425
- kwargs: Keyword arguments to hash
426
427
Returns:
428
String hash of the inputs
429
"""
430
431
def exponential_backoff(backoff_factor: float = 2.0) -> Iterator[float]:
432
"""
433
Generate exponential backoff delays.
434
435
Yields increasingly longer delays for retry attempts, useful for
436
handling rate limits and temporary failures.
437
438
Parameters:
439
- backoff_factor: Multiplier for each successive delay
440
441
Yields:
442
Float delay values in seconds
443
"""
444
```
445
446
#### Usage Examples
447
448
```python
449
from prefect import task
450
from prefect.tasks import task_input_hash, exponential_backoff
451
from datetime import timedelta
452
453
@task(
454
cache_key_fn=task_input_hash,
455
cache_expiration=timedelta(hours=1)
456
)
457
def cached_computation(x, y):
458
"""Task with input-based caching."""
459
# Expensive computation
460
return x ** y
461
462
# Custom retry with exponential backoff
463
@task(
464
retries=5,
465
retry_delay_seconds=list(exponential_backoff(2.0))[:5]
466
)
467
def api_with_backoff():
468
"""Task with exponential backoff retry."""
469
# API call that might need retries
470
pass
471
```
472
473
## Types
474
475
Types specific to workflow functionality:
476
477
```python { .api }
478
from typing import Callable, List, Optional, Union, Any, Iterable, Generic, TypeVar
479
from datetime import timedelta
480
481
P = TypeVar("P") # Parameters
482
R = TypeVar("R") # Return type
483
484
# Hook types
485
FlowStateHook = Callable[[Flow, FlowRun, State], None]
486
StateHookCallable = Callable[[Task, TaskRun, State], None]
487
RetryConditionCallable = Callable[[Task, TaskRun, State], bool]
488
489
# Task runner interface
490
class TaskRunner:
491
"""Base class for task execution backends."""
492
pass
493
494
# Configuration types
495
class ResultStorage:
496
"""Result storage interface."""
497
pass
498
499
class ResultSerializer:
500
"""Result serialization interface."""
501
pass
502
503
# Task options type
504
class TaskOptions(TypedDict, total=False):
505
name: Optional[str]
506
description: Optional[str]
507
tags: Optional[Iterable[str]]
508
version: Optional[str]
509
cache_key_fn: Optional[Callable[..., str]]
510
cache_expiration: Optional[timedelta]
511
retries: Optional[int]
512
retry_delay_seconds: Optional[Union[int, float]]
513
retry_condition_fn: Optional[RetryConditionCallable]
514
timeout_seconds: Optional[Union[int, float]]
515
```