0
# State Management
1
2
Prefect's state management system provides comprehensive control over workflow execution through immutable state objects and lifecycle functions. States represent the current status of flows and tasks, enabling fine-grained control over execution, retry logic, and error handling.
3
4
## Capabilities
5
6
### State Creation Functions
7
8
Factory functions for creating different types of states that represent various stages in the lifecycle of flows and tasks.
9
10
```python { .api }
11
def Completed(cls: type = State, **kwargs: Any) -> State:
12
"""
13
Create a completed state indicating successful execution.
14
15
Parameters:
16
- data: Result data from the completed operation
17
- name: Optional state name
18
- message: Optional descriptive message
19
- type: State type (defaults to COMPLETED)
20
21
Returns:
22
State object representing successful completion
23
"""
24
25
def Failed(cls: type = State, **kwargs: Any) -> State:
26
"""
27
Create a failed state indicating execution failure.
28
29
Parameters:
30
- data: Error information or exception details
31
- name: Optional state name
32
- message: Optional error message
33
- type: State type (defaults to FAILED)
34
35
Returns:
36
State object representing failure
37
"""
38
39
def Running(cls: type = State, **kwargs: Any) -> State:
40
"""
41
Create a running state indicating active execution.
42
43
Parameters:
44
- data: Optional data about the running operation
45
- name: Optional state name
46
- message: Optional status message
47
- type: State type (defaults to RUNNING)
48
49
Returns:
50
State object representing active execution
51
"""
52
53
def Scheduled(
54
scheduled_time: datetime = None,
55
name: str = None,
56
message: str = None,
57
type: StateType = None,
58
) -> State:
59
"""
60
Create a scheduled state for future execution.
61
62
Parameters:
63
- scheduled_time: When the operation is scheduled to run
64
- name: Optional state name
65
- message: Optional scheduling message
66
- type: State type (defaults to SCHEDULED)
67
68
Returns:
69
State object representing scheduled execution
70
"""
71
72
def Pending(cls: type = State, **kwargs: Any) -> State:
73
"""
74
Create a pending state for operations awaiting execution.
75
76
Parameters:
77
- name: Optional state name
78
- message: Optional pending message
79
- type: State type (defaults to PENDING)
80
81
Returns:
82
State object representing pending execution
83
"""
84
85
def Crashed(cls: type = State, **kwargs: Any) -> State:
86
"""
87
Create a crashed state for unexpected failures.
88
89
Parameters:
90
- data: Crash information or exception details
91
- name: Optional state name
92
- message: Optional crash message
93
- type: State type (defaults to CRASHED)
94
95
Returns:
96
State object representing system crash
97
"""
98
99
def Cancelled(cls: type = State, **kwargs: Any) -> State:
100
"""
101
Create a cancelled state for deliberately stopped operations.
102
103
Parameters:
104
- data: Optional cancellation data
105
- name: Optional state name
106
- message: Optional cancellation message
107
- type: State type (defaults to CANCELLED)
108
109
Returns:
110
State object representing cancellation
111
"""
112
113
def Cancelling(
114
data: Any = None,
115
name: str = None,
116
message: str = None,
117
type: StateType = None,
118
) -> State:
119
"""
120
Create a cancelling state for operations in the process of being cancelled.
121
122
Parameters:
123
- data: Optional cancellation data
124
- name: Optional state name
125
- message: Optional cancellation message
126
- type: State type (defaults to CANCELLING)
127
128
Returns:
129
State object representing active cancellation
130
"""
131
132
def Paused(
133
cls: type = State,
134
timeout_seconds: Optional[int] = None,
135
pause_expiration_time: Optional[datetime] = None,
136
reschedule: bool = False,
137
pause_key: Optional[str] = None,
138
**kwargs: Any,
139
) -> State:
140
"""
141
Create a paused state for temporarily halted operations.
142
143
Parameters:
144
- data: Optional pause data
145
- name: Optional state name
146
- message: Optional pause message
147
- type: State type (defaults to PAUSED)
148
149
Returns:
150
State object representing paused execution
151
"""
152
153
def Suspended(
154
cls: type = State,
155
timeout_seconds: Optional[int] = None,
156
pause_expiration_time: Optional[datetime] = None,
157
pause_key: Optional[str] = None,
158
**kwargs: Any,
159
) -> State:
160
"""
161
Create a suspended state for long-term halted operations.
162
163
Parameters:
164
- data: Optional suspension data
165
- name: Optional state name
166
- message: Optional suspension message
167
- type: State type (defaults to SUSPENDED)
168
169
Returns:
170
State object representing suspended execution
171
"""
172
173
def AwaitingRetry(
174
cls: type = State,
175
scheduled_time: Optional[datetime] = None,
176
**kwargs: Any,
177
) -> State:
178
"""
179
Create an awaiting retry state for operations scheduled to retry.
180
181
Parameters:
182
- scheduled_time: When the retry is scheduled
183
- name: Optional state name
184
- message: Optional retry message
185
- type: State type (defaults to AWAITING_RETRY)
186
187
Returns:
188
State object representing scheduled retry
189
"""
190
191
def Retrying(cls: type = State, **kwargs: Any) -> State:
192
"""
193
Create a retrying state for operations currently being retried.
194
195
Parameters:
196
- data: Optional retry data
197
- name: Optional state name
198
- message: Optional retry message
199
- type: State type (defaults to RETRYING)
200
201
Returns:
202
State object representing active retry
203
"""
204
205
def Late(
206
cls: type = State,
207
scheduled_time: Optional[datetime] = None,
208
**kwargs: Any,
209
) -> State:
210
"""
211
Create a late state for operations that missed their scheduled time.
212
213
Parameters:
214
- data: Optional lateness data
215
- name: Optional state name
216
- message: Optional lateness message
217
- type: State type (defaults to LATE)
218
219
Returns:
220
State object representing late execution
221
"""
222
```
223
224
#### Usage Examples
225
226
```python
227
from prefect import flow, task
228
from prefect.states import Completed, Failed, Running, Scheduled
229
from datetime import datetime, timedelta
230
231
@task
232
def process_data():
233
try:
234
# Processing logic
235
result = {"processed": 100}
236
return Completed(data=result, message="Processing successful")
237
except Exception as e:
238
return Failed(data=str(e), message="Processing failed")
239
240
@flow
241
def scheduled_workflow():
242
# Schedule for future execution
243
future_time = datetime.now() + timedelta(hours=1)
244
return Scheduled(scheduled_time=future_time, message="Scheduled for later")
245
246
# Using states in flow logic
247
@flow
248
def conditional_flow():
249
task_state = process_data()
250
if task_state.is_completed():
251
return "Success"
252
elif task_state.is_failed():
253
return "Failure"
254
else:
255
return "Unknown"
256
```
257
258
### Flow Run Control
259
260
Functions for controlling the lifecycle of running flows, including pause, resume, and suspension operations.
261
262
```python { .api }
263
def pause_flow_run(
264
flow_run_id: Optional[UUID] = None,
265
timeout: int = 300,
266
poll_interval: int = 10,
267
reschedule: bool = False,
268
key: Optional[str] = None,
269
) -> None:
270
"""
271
Pause a flow run, halting execution until manually resumed.
272
273
Parameters:
274
- flow_run_id: ID of the flow run to pause (defaults to current run)
275
- timeout: Maximum time to wait for pause acknowledgment (seconds)
276
- poll_interval: Polling interval for pause status (seconds)
277
- reschedule: Whether to reschedule the flow run after pausing
278
- key: Optional key for identifying the pause point
279
280
Raises:
281
TimeoutError: If pause is not acknowledged within timeout
282
"""
283
284
def resume_flow_run(
285
flow_run_id: UUID,
286
run_input: Dict[str, Any] = None,
287
) -> None:
288
"""
289
Resume a paused flow run.
290
291
Parameters:
292
- flow_run_id: ID of the paused flow run to resume
293
- run_input: Optional input data to provide when resuming
294
295
Raises:
296
ValueError: If flow run is not in a paused state
297
"""
298
299
def suspend_flow_run(
300
flow_run_id: Optional[UUID] = None,
301
timeout: int = 300,
302
poll_interval: int = 10,
303
key: Optional[str] = None,
304
) -> None:
305
"""
306
Suspend a flow run for long-term storage and later resumption.
307
308
Parameters:
309
- flow_run_id: ID of the flow run to suspend (defaults to current run)
310
- timeout: Maximum time to wait for suspension acknowledgment (seconds)
311
- poll_interval: Polling interval for suspension status (seconds)
312
- key: Optional key for identifying the suspension point
313
314
Raises:
315
TimeoutError: If suspension is not acknowledged within timeout
316
"""
317
```
318
319
#### Usage Examples
320
321
```python
322
from prefect import flow, get_run_logger
323
from prefect.flow_runs import pause_flow_run, resume_flow_run, suspend_flow_run
324
from prefect.client.orchestration import get_client
325
326
@flow
327
def interactive_flow():
328
logger = get_run_logger()
329
330
logger.info("Starting workflow")
331
332
# Pause for manual review
333
logger.info("Pausing for manual review")
334
pause_flow_run(key="manual_review", timeout=600)
335
336
logger.info("Resumed after manual review")
337
338
# Continue processing
339
return "Workflow completed"
340
341
@flow
342
def long_running_flow():
343
logger = get_run_logger()
344
345
# Process first batch
346
logger.info("Processing first batch")
347
348
# Suspend for overnight processing
349
logger.info("Suspending for overnight processing")
350
suspend_flow_run(key="overnight_break")
351
352
# Resume processing next day
353
logger.info("Resuming processing")
354
355
return "Long workflow completed"
356
357
# Resume a suspended flow programmatically
358
async def resume_workflow(flow_run_id: str):
359
client = get_client()
360
await resume_flow_run(flow_run_id, run_input={"resumed_at": datetime.now()})
361
```
362
363
### State Utilities
364
365
Utility functions for working with states, including result extraction and state conversion.
366
367
```python { .api }
368
def get_state_result(
369
state: State,
370
raise_on_failure: bool = True,
371
) -> Any:
372
"""
373
Extract the result data from a state object.
374
375
Parameters:
376
- state: State object to extract result from
377
- raise_on_failure: Whether to raise exception for failed states
378
379
Returns:
380
The result data stored in the state
381
382
Raises:
383
Exception: If state represents a failure and raise_on_failure is True
384
"""
385
386
def to_state_create(state: State) -> StateCreate:
387
"""
388
Convert a State object to StateCreate format for API submission.
389
390
Parameters:
391
- state: State object to convert
392
393
Returns:
394
StateCreate object suitable for API operations
395
"""
396
397
def exception_to_crashed_state(
398
exception: Exception,
399
message: str = None,
400
) -> State:
401
"""
402
Convert an exception to a crashed state.
403
404
Parameters:
405
- exception: Exception that caused the crash
406
- message: Optional message to include
407
408
Returns:
409
Crashed state containing exception information
410
"""
411
412
def exception_to_failed_state(
413
exception: Exception,
414
message: str = None,
415
) -> State:
416
"""
417
Convert an exception to a failed state.
418
419
Parameters:
420
- exception: Exception that caused the failure
421
- message: Optional message to include
422
423
Returns:
424
Failed state containing exception information
425
"""
426
427
def format_exception(exception: Exception) -> str:
428
"""
429
Format an exception for display in state messages.
430
431
Parameters:
432
- exception: Exception to format
433
434
Returns:
435
Formatted string representation of the exception
436
"""
437
```
438
439
#### Usage Examples
440
441
```python
442
from prefect import task, flow
443
from prefect.states import get_state_result, exception_to_failed_state
444
445
@task
446
def risky_task():
447
try:
448
# Risky operation
449
result = complex_operation()
450
return result
451
except Exception as e:
452
# Convert exception to failed state
453
return exception_to_failed_state(e, "Complex operation failed")
454
455
@flow
456
def result_processing_flow():
457
task_state = risky_task()
458
459
# Extract result safely
460
try:
461
result = get_state_result(task_state)
462
return f"Success: {result}"
463
except Exception:
464
return "Task failed, handling gracefully"
465
```
466
467
### State Class
468
469
The core State class representing the current status of flows and tasks.
470
471
```python { .api }
472
class State:
473
"""
474
Immutable state object representing the current status of a flow or task.
475
476
Attributes:
477
- type: StateType enum value (COMPLETED, FAILED, etc.)
478
- name: Optional descriptive name
479
- message: Optional status message
480
- data: Associated data or result
481
- timestamp: When the state was created
482
- state_details: Additional state-specific information
483
"""
484
485
def __init__(
486
self,
487
type: StateType,
488
name: str = None,
489
message: str = None,
490
data: Any = None,
491
timestamp: datetime = None,
492
state_details: StateDetails = None,
493
):
494
"""Initialize a state object."""
495
496
def is_scheduled(self) -> bool:
497
"""Check if state represents scheduled execution."""
498
499
def is_pending(self) -> bool:
500
"""Check if state represents pending execution."""
501
502
def is_running(self) -> bool:
503
"""Check if state represents active execution."""
504
505
def is_completed(self) -> bool:
506
"""Check if state represents successful completion."""
507
508
def is_failed(self) -> bool:
509
"""Check if state represents failure."""
510
511
def is_crashed(self) -> bool:
512
"""Check if state represents a system crash."""
513
514
def is_cancelled(self) -> bool:
515
"""Check if state represents cancellation."""
516
517
def is_cancelling(self) -> bool:
518
"""Check if state represents active cancellation."""
519
520
def is_paused(self) -> bool:
521
"""Check if state represents a pause."""
522
523
def is_suspended(self) -> bool:
524
"""Check if state represents suspension."""
525
526
def is_final(self) -> bool:
527
"""Check if state represents a final (terminal) status."""
528
529
def copy(
530
self,
531
*,
532
type: StateType = None,
533
name: str = None,
534
message: str = None,
535
data: Any = None,
536
) -> "State":
537
"""Create a copy of the state with modified attributes."""
538
539
def result(self, raise_on_failure: bool = True) -> Any:
540
"""
541
Get the result from the state.
542
543
Parameters:
544
- raise_on_failure: Whether to raise on failed states
545
546
Returns:
547
The result data from the state
548
"""
549
```
550
551
### State Groups
552
553
Utility for grouping and categorizing states by their types.
554
555
```python { .api }
556
class StateGroup:
557
"""
558
Groups states by their type for easier categorization and handling.
559
"""
560
561
# Final states that represent workflow completion
562
FINAL = frozenset([
563
StateType.COMPLETED,
564
StateType.FAILED,
565
StateType.CRASHED,
566
StateType.CANCELLED,
567
])
568
569
# Running states that represent active execution
570
RUNNING = frozenset([
571
StateType.RUNNING,
572
StateType.CANCELLING,
573
StateType.RETRYING,
574
])
575
576
# Waiting states that represent pending execution
577
WAITING = frozenset([
578
StateType.SCHEDULED,
579
StateType.PENDING,
580
StateType.AWAITING_RETRY,
581
StateType.PAUSED,
582
StateType.SUSPENDED,
583
])
584
585
@classmethod
586
def is_final(cls, state_type: StateType) -> bool:
587
"""Check if a state type is final."""
588
return state_type in cls.FINAL
589
590
@classmethod
591
def is_running(cls, state_type: StateType) -> bool:
592
"""Check if a state type represents running execution."""
593
return state_type in cls.RUNNING
594
595
@classmethod
596
def is_waiting(cls, state_type: StateType) -> bool:
597
"""Check if a state type represents waiting for execution."""
598
return state_type in cls.WAITING
599
```
600
601
#### Usage Examples
602
603
```python
604
from prefect.states import StateGroup, StateType
605
606
def handle_state(state_type: StateType):
607
if StateGroup.is_final(state_type):
608
print("Workflow has completed")
609
elif StateGroup.is_running(state_type):
610
print("Workflow is actively running")
611
elif StateGroup.is_waiting(state_type):
612
print("Workflow is waiting to run")
613
```
614
615
## Types
616
617
Types related to state management:
618
619
```python { .api }
620
from typing import Any, Optional, Dict
621
from datetime import datetime
622
from enum import Enum
623
from uuid import UUID
624
625
class StateType(str, Enum):
626
"""Enumeration of all possible state types."""
627
SCHEDULED = "SCHEDULED"
628
PENDING = "PENDING"
629
RUNNING = "RUNNING"
630
COMPLETED = "COMPLETED"
631
FAILED = "FAILED"
632
CANCELLED = "CANCELLED"
633
CRASHED = "CRASHED"
634
PAUSED = "PAUSED"
635
SUSPENDED = "SUSPENDED"
636
AWAITING_RETRY = "AWAITING_RETRY"
637
RETRYING = "RETRYING"
638
CANCELLING = "CANCELLING"
639
LATE = "LATE"
640
641
class StateDetails:
642
"""Additional details for specific state types."""
643
flow_run_id: Optional[UUID]
644
task_run_id: Optional[UUID]
645
child_flow_run_id: Optional[UUID]
646
scheduled_time: Optional[datetime]
647
cache_key: Optional[str]
648
pause_timeout: Optional[datetime]
649
pause_reschedule: Optional[bool]
650
pause_key: Optional[str]
651
run_input_keyset: Optional[Dict[str, Any]]
652
653
class StateCreate:
654
"""State creation format for API operations."""
655
type: StateType
656
name: Optional[str]
657
message: Optional[str]
658
state_details: Optional[StateDetails]
659
data: Optional[Any]
660
```