Workflow orchestration and management framework for building resilient data pipelines.
npx @tessl/cli install tessl/pypi-prefect@3.4.00
# Prefect
1
2
Prefect is a workflow orchestration and management framework for building resilient data pipelines in Python. It provides a modern approach to workflow management with features like automatic retries, dynamic mapping, caching, and real-time observability, enabling robust and scalable data workflows.
3
4
## Package Information
5
6
- **Package Name**: prefect
7
- **Language**: Python
8
- **Installation**: `pip install prefect`
9
10
## Core Imports
11
12
```python
13
import prefect
14
from prefect import flow, task, get_run_logger, get_client, State
15
```
16
17
Common imports for workflow development:
18
19
```python
20
from prefect import flow, task
21
from prefect.tasks import task_input_hash
22
from prefect.deployments import deploy
23
from prefect.client.orchestration import get_client
24
from prefect.states import Completed, Failed, Running
25
from prefect.context import TaskRunContext
26
```
27
28
## Basic Usage
29
30
```python
31
from prefect import flow, task, get_run_logger
32
33
@task
34
def extract_data(url: str):
35
"""Extract data from a source."""
36
logger = get_run_logger()
37
logger.info(f"Extracting data from {url}")
38
# Simulate data extraction
39
return {"records": 100, "source": url}
40
41
@task
42
def transform_data(raw_data: dict):
43
"""Transform the extracted data."""
44
logger = get_run_logger()
45
logger.info("Transforming data")
46
return {
47
"processed_records": raw_data["records"],
48
"status": "transformed"
49
}
50
51
@task
52
def load_data(processed_data: dict):
53
"""Load data to destination."""
54
logger = get_run_logger()
55
logger.info(f"Loading {processed_data['processed_records']} records")
56
return {"loaded": True}
57
58
@flow(name="ETL Pipeline")
59
def etl_flow(source_url: str):
60
"""Complete ETL workflow."""
61
raw = extract_data(source_url)
62
processed = transform_data(raw)
63
result = load_data(processed)
64
return result
65
66
if __name__ == "__main__":
67
# Run the flow
68
result = etl_flow("https://api.example.com/data")
69
print(result)
70
```
71
72
## Architecture
73
74
Prefect's architecture is built around several key concepts:
75
76
- **Flows**: Top-level workflow containers that define the execution logic and dependencies between tasks
77
- **Tasks**: Individual units of work that can be cached, retried, and run in parallel
78
- **States**: Immutable snapshots representing the current status of flows and tasks (Pending, Running, Completed, Failed, etc.)
79
- **Deployments**: Infrastructure-aware flow configurations that enable scheduled and triggered execution
80
- **Work Pools**: Infrastructure abstraction layer for executing flows across different environments
81
- **Blocks**: Reusable configuration objects for credentials, connections, and infrastructure settings
82
83
This design enables building resilient workflows with automatic error handling, observability, and scalable execution across diverse infrastructure environments.
84
85
## Capabilities
86
87
### Core Workflows
88
89
Flow and task creation with decorators, execution control, serving deployments, and flow lifecycle management.
90
91
```python { .api }
92
def flow(
93
fn=None,
94
*,
95
name: str = None,
96
description: str = None,
97
version: str = None,
98
flow_run_name: str = None,
99
task_runner: TaskRunner = None,
100
timeout_seconds: Union[int, float] = None,
101
validate_parameters: bool = True,
102
retries: int = None,
103
retry_delay_seconds: Union[int, float] = None,
104
persist_result: bool = None,
105
result_storage: ResultStorage = None,
106
result_serializer: ResultSerializer = None,
107
cache_result_in_memory: bool = True,
108
on_completion: List[FlowStateHook] = None,
109
on_failure: List[FlowStateHook] = None,
110
on_cancellation: List[FlowStateHook] = None,
111
on_crashed: List[FlowStateHook] = None,
112
on_running: List[FlowStateHook] = None,
113
log_prints: bool = None,
114
): ...
115
116
def task(
117
fn=None,
118
*,
119
name: str = None,
120
description: str = None,
121
tags: Iterable[str] = None,
122
version: str = None,
123
cache_key_fn: Callable[[TaskRunContext, Dict[str, Any]], Optional[str]] = None,
124
cache_expiration: datetime.timedelta = None,
125
task_run_name: Union[str, Callable[[], str]] = None,
126
retries: int = None,
127
retry_delay_seconds: Union[int, float] = None,
128
retry_condition_fn: RetryConditionCallable = None,
129
persist_result: bool = None,
130
result_storage: ResultStorage = None,
131
result_serializer: ResultSerializer = None,
132
cache_result_in_memory: bool = True,
133
timeout_seconds: Union[int, float] = None,
134
log_prints: bool = None,
135
refresh_cache: bool = None,
136
on_completion: List[StateHookCallable] = None,
137
on_failure: List[StateHookCallable] = None,
138
): ...
139
140
def serve(*flows: Flow, **kwargs) -> None: ...
141
def aserve(*flows: Flow, **kwargs) -> None: ...
142
```
143
144
[Core Workflows](./core-workflows.md)
145
146
### State Management
147
148
State creation functions, state utilities, and flow run lifecycle control.
149
150
```python { .api }
151
def Completed(cls: type = State, **kwargs: Any) -> State: ...
152
153
def Failed(cls: type = State, **kwargs: Any) -> State: ...
154
155
def Running(cls: type = State, **kwargs: Any) -> State: ...
156
157
def Pending(cls: type = State, **kwargs: Any) -> State: ...
158
159
def Cancelled(cls: type = State, **kwargs: Any) -> State: ...
160
161
def Crashed(cls: type = State, **kwargs: Any) -> State: ...
162
163
def Scheduled(
164
scheduled_time: datetime = None,
165
name: str = None,
166
message: str = None,
167
type: StateType = None,
168
) -> State: ...
169
170
def Paused(
171
cls: type = State,
172
timeout_seconds: Optional[int] = None,
173
pause_expiration_time: Optional[datetime] = None,
174
reschedule: bool = False,
175
pause_key: Optional[str] = None,
176
**kwargs: Any,
177
) -> State: ...
178
179
def Suspended(
180
cls: type = State,
181
timeout_seconds: Optional[int] = None,
182
pause_expiration_time: Optional[datetime] = None,
183
pause_key: Optional[str] = None,
184
**kwargs: Any,
185
) -> State: ...
186
187
def AwaitingRetry(
188
cls: type = State,
189
scheduled_time: Optional[datetime] = None,
190
**kwargs: Any,
191
) -> State: ...
192
193
def Retrying(cls: type = State, **kwargs: Any) -> State: ...
194
195
def pause_flow_run(
196
wait_for_input: Optional[type] = None,
197
timeout: int = 3600,
198
poll_interval: int = 10,
199
key: Optional[str] = None,
200
) -> Any: ...
201
202
def resume_flow_run(
203
flow_run_id: UUID,
204
run_input: Optional[Dict[str, Any]] = None,
205
) -> None: ...
206
```
207
208
[State Management](./state-management.md)
209
210
### Deployments
211
212
Deploy flows to work pools, manage deployments, and run deployed workflows.
213
214
```python { .api }
215
def deploy(
216
*deployments: RunnerDeployment,
217
work_pool_name: Optional[str] = None,
218
image: Optional[Union[str, DockerImage]] = None,
219
build: bool = True,
220
push: bool = True,
221
print_next_steps_message: bool = True,
222
ignore_warnings: bool = False,
223
) -> List[UUID]: ...
224
225
def initialize_project(
226
name: str = None,
227
recipe: str = None,
228
) -> None: ...
229
230
def run_deployment(
231
name: str,
232
parameters: Dict[str, Any] = None,
233
scheduled_time: datetime = None,
234
flow_run_name: str = None,
235
timeout: int = None,
236
poll_interval: int = 10,
237
tags: List[str] = None,
238
idempotency_key: str = None,
239
work_queue_name: str = None,
240
) -> FlowRun: ...
241
```
242
243
[Deployments](./deployments.md)
244
245
### Client API
246
247
HTTP clients for interacting with the Prefect API server and cloud services.
248
249
```python { .api }
250
def get_client(
251
httpx_settings: dict = None,
252
sync_client: bool = None,
253
) -> Union[PrefectClient, SyncPrefectClient]: ...
254
255
class PrefectClient:
256
def __init__(
257
self,
258
api: str = None,
259
api_key: str = None,
260
api_version: str = None,
261
httpx_settings: dict = None,
262
): ...
263
264
async def create_flow_run(
265
self,
266
flow: Flow,
267
name: str = None,
268
parameters: Dict[str, Any] = None,
269
context: Dict[str, Any] = None,
270
tags: List[str] = None,
271
parent_task_run_id: UUID = None,
272
state: State = None,
273
) -> FlowRun: ...
274
```
275
276
[Client API](./client-api.md)
277
278
### Context & Utilities
279
280
Context management, logging, annotations, and transaction support.
281
282
```python { .api }
283
def get_run_logger(name: str = None) -> logging.Logger: ...
284
285
def tags(*tags: str, **kwargs) -> ContextManager: ...
286
287
class unmapped:
288
def __init__(self, value: Any): ...
289
290
class allow_failure:
291
def __init__(self, value: Any): ...
292
293
class Transaction:
294
def __init__(
295
self,
296
key: Optional[str] = None,
297
timeout: Optional[float] = None,
298
): ...
299
300
def __enter__(self) -> "Transaction": ...
301
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
302
```
303
304
[Context & Utilities](./context-utilities.md)
305
306
### Runtime Context Access
307
308
Access current execution context including deployment parameters, flow run metadata, and task run information during workflow execution.
309
310
```python { .api }
311
# Runtime context modules for accessing execution information
312
import prefect.runtime.deployment
313
import prefect.runtime.flow_run
314
import prefect.runtime.task_run
315
316
# Deployment context attributes
317
prefect.runtime.deployment.id: str
318
prefect.runtime.deployment.name: str
319
prefect.runtime.deployment.parameters: Dict[str, Any]
320
prefect.runtime.deployment.version: str
321
322
# Flow run context attributes
323
prefect.runtime.flow_run.id: str
324
prefect.runtime.flow_run.name: str
325
prefect.runtime.flow_run.parameters: Dict[str, Any]
326
prefect.runtime.flow_run.tags: List[str]
327
328
# Task run context attributes
329
prefect.runtime.task_run.id: str
330
prefect.runtime.task_run.name: str
331
prefect.runtime.task_run.task_key: str
332
```
333
334
[Runtime Context Access](./runtime-context.md)
335
336
### Variables Management
337
338
Named, mutable JSON values that can be shared across tasks, flows, and deployments for configuration and data storage.
339
340
```python { .api }
341
from prefect.variables import Variable
342
343
class Variable(BaseModel):
344
name: str
345
value: StrictVariableValue
346
tags: Optional[List[str]]
347
348
@classmethod
349
def get(
350
cls,
351
name: str,
352
default: StrictVariableValue = None,
353
) -> StrictVariableValue: ...
354
355
@classmethod
356
async def aget(
357
cls,
358
name: str,
359
default: StrictVariableValue = None,
360
) -> StrictVariableValue: ...
361
362
@classmethod
363
def set(
364
cls,
365
name: str,
366
value: StrictVariableValue,
367
tags: Optional[List[str]] = None,
368
overwrite: bool = False,
369
) -> "Variable": ...
370
371
@classmethod
372
async def aset(
373
cls,
374
name: str,
375
value: StrictVariableValue,
376
tags: Optional[List[str]] = None,
377
overwrite: bool = False,
378
) -> "Variable": ...
379
```
380
381
[Variables Management](./variables.md)
382
383
### Configuration
384
385
Settings, blocks, and configuration management for Prefect infrastructure and credentials.
386
387
```python { .api }
388
class Block(BaseModel):
389
def save(
390
self,
391
name: str,
392
overwrite: bool = False,
393
) -> UUID: ...
394
395
@classmethod
396
def load(cls, name: str) -> "Block": ...
397
398
def get_settings_context() -> SettingsContext: ...
399
```
400
401
[Configuration](./configuration.md)
402
403
## Types
404
405
Core types used throughout the Prefect API:
406
407
```python { .api }
408
from typing import Any, Callable, Dict, List, Optional, Union, TypeVar, Generic
409
from datetime import datetime, timedelta
410
from uuid import UUID
411
import logging
412
413
# Generic type parameters
414
P = TypeVar("P") # Parameters
415
R = TypeVar("R") # Return type
416
417
class Flow(Generic[P, R]):
418
name: str
419
fn: Callable[P, R]
420
description: Optional[str]
421
version: Optional[str]
422
423
class Task(Generic[P, R]):
424
name: str
425
fn: Callable[P, R]
426
description: Optional[str]
427
version: Optional[str]
428
429
class State:
430
type: StateType
431
name: Optional[str]
432
message: Optional[str]
433
data: Any
434
timestamp: datetime
435
436
class FlowRun:
437
id: UUID
438
name: str
439
flow_id: UUID
440
state: State
441
parameters: Dict[str, Any]
442
443
class TaskRun:
444
id: UUID
445
name: str
446
task_key: str
447
flow_run_id: UUID
448
state: State
449
450
# Enums and Constants
451
class StateType(str, Enum):
452
SCHEDULED = "SCHEDULED"
453
PENDING = "PENDING"
454
RUNNING = "RUNNING"
455
COMPLETED = "COMPLETED"
456
FAILED = "FAILED"
457
CANCELLED = "CANCELLED"
458
CRASHED = "CRASHED"
459
PAUSED = "PAUSED"
460
SUSPENDED = "SUSPENDED"
461
AWAITING_RETRY = "AWAITING_RETRY"
462
RETRYING = "RETRYING"
463
464
# Context types
465
class TaskRunContext:
466
"""Context information available during task execution."""
467
pass
468
469
# Callable types
470
FlowStateHook = Callable[[Flow, FlowRun, State], None]
471
StateHookCallable = Callable[[Task, TaskRun, State], None]
472
RetryConditionCallable = Callable[[Task, TaskRun, State], bool]
473
474
# Configuration types
475
class TaskRunner:
476
pass
477
478
class ResultStorage:
479
pass
480
481
class ResultSerializer:
482
pass
483
484
# Deployment types
485
class RunnerDeployment:
486
"""Deployment configuration for flows."""
487
pass
488
489
class DockerImage:
490
"""Docker image configuration for deployments."""
491
pass
492
493
class DeploymentImage:
494
"""Deployment image configuration."""
495
pass
496
```