Workflow orchestration and management framework for building resilient data pipelines.
—
Prefect's state management system provides comprehensive control over workflow execution through immutable state objects and lifecycle functions. States represent the current status of flows and tasks, enabling fine-grained control over execution, retry logic, and error handling.
Factory functions for creating different types of states that represent various stages in the lifecycle of flows and tasks.
def Completed(cls: type = State, **kwargs: Any) -> State:
"""
Create a completed state indicating successful execution.
Parameters:
- data: Result data from the completed operation
- name: Optional state name
- message: Optional descriptive message
- type: State type (defaults to COMPLETED)
Returns:
State object representing successful completion
"""
def Failed(cls: type = State, **kwargs: Any) -> State:
"""
Create a failed state indicating execution failure.
Parameters:
- data: Error information or exception details
- name: Optional state name
- message: Optional error message
- type: State type (defaults to FAILED)
Returns:
State object representing failure
"""
def Running(cls: type = State, **kwargs: Any) -> State:
"""
Create a running state indicating active execution.
Parameters:
- data: Optional data about the running operation
- name: Optional state name
- message: Optional status message
- type: State type (defaults to RUNNING)
Returns:
State object representing active execution
"""
def Scheduled(
scheduled_time: datetime = None,
name: str = None,
message: str = None,
type: StateType = None,
) -> State:
"""
Create a scheduled state for future execution.
Parameters:
- scheduled_time: When the operation is scheduled to run
- name: Optional state name
- message: Optional scheduling message
- type: State type (defaults to SCHEDULED)
Returns:
State object representing scheduled execution
"""
def Pending(cls: type = State, **kwargs: Any) -> State:
"""
Create a pending state for operations awaiting execution.
Parameters:
- name: Optional state name
- message: Optional pending message
- type: State type (defaults to PENDING)
Returns:
State object representing pending execution
"""
def Crashed(cls: type = State, **kwargs: Any) -> State:
"""
Create a crashed state for unexpected failures.
Parameters:
- data: Crash information or exception details
- name: Optional state name
- message: Optional crash message
- type: State type (defaults to CRASHED)
Returns:
State object representing system crash
"""
def Cancelled(cls: type = State, **kwargs: Any) -> State:
"""
Create a cancelled state for deliberately stopped operations.
Parameters:
- data: Optional cancellation data
- name: Optional state name
- message: Optional cancellation message
- type: State type (defaults to CANCELLED)
Returns:
State object representing cancellation
"""
def Cancelling(
data: Any = None,
name: str = None,
message: str = None,
type: StateType = None,
) -> State:
"""
Create a cancelling state for operations in the process of being cancelled.
Parameters:
- data: Optional cancellation data
- name: Optional state name
- message: Optional cancellation message
- type: State type (defaults to CANCELLING)
Returns:
State object representing active cancellation
"""
def Paused(
cls: type = State,
timeout_seconds: Optional[int] = None,
pause_expiration_time: Optional[datetime] = None,
reschedule: bool = False,
pause_key: Optional[str] = None,
**kwargs: Any,
) -> State:
"""
Create a paused state for temporarily halted operations.
Parameters:
- data: Optional pause data
- name: Optional state name
- message: Optional pause message
- type: State type (defaults to PAUSED)
Returns:
State object representing paused execution
"""
def Suspended(
cls: type = State,
timeout_seconds: Optional[int] = None,
pause_expiration_time: Optional[datetime] = None,
pause_key: Optional[str] = None,
**kwargs: Any,
) -> State:
"""
Create a suspended state for long-term halted operations.
Parameters:
- data: Optional suspension data
- name: Optional state name
- message: Optional suspension message
- type: State type (defaults to SUSPENDED)
Returns:
State object representing suspended execution
"""
def AwaitingRetry(
cls: type = State,
scheduled_time: Optional[datetime] = None,
**kwargs: Any,
) -> State:
"""
Create an awaiting retry state for operations scheduled to retry.
Parameters:
- scheduled_time: When the retry is scheduled
- name: Optional state name
- message: Optional retry message
- type: State type (defaults to AWAITING_RETRY)
Returns:
State object representing scheduled retry
"""
def Retrying(cls: type = State, **kwargs: Any) -> State:
"""
Create a retrying state for operations currently being retried.
Parameters:
- data: Optional retry data
- name: Optional state name
- message: Optional retry message
- type: State type (defaults to RETRYING)
Returns:
State object representing active retry
"""
def Late(
cls: type = State,
scheduled_time: Optional[datetime] = None,
**kwargs: Any,
) -> State:
"""
Create a late state for operations that missed their scheduled time.
Parameters:
- data: Optional lateness data
- name: Optional state name
- message: Optional lateness message
- type: State type (defaults to LATE)
Returns:
State object representing late execution
"""from prefect import flow, task
from prefect.states import Completed, Failed, Running, Scheduled
from datetime import datetime, timedelta
@task
def process_data():
try:
# Processing logic
result = {"processed": 100}
return Completed(data=result, message="Processing successful")
except Exception as e:
return Failed(data=str(e), message="Processing failed")
@flow
def scheduled_workflow():
# Schedule for future execution
future_time = datetime.now() + timedelta(hours=1)
return Scheduled(scheduled_time=future_time, message="Scheduled for later")
# Using states in flow logic
@flow
def conditional_flow():
task_state = process_data()
if task_state.is_completed():
return "Success"
elif task_state.is_failed():
return "Failure"
else:
return "Unknown"Functions for controlling the lifecycle of running flows, including pause, resume, and suspension operations.
def pause_flow_run(
flow_run_id: Optional[UUID] = None,
timeout: int = 300,
poll_interval: int = 10,
reschedule: bool = False,
key: Optional[str] = None,
) -> None:
"""
Pause a flow run, halting execution until manually resumed.
Parameters:
- flow_run_id: ID of the flow run to pause (defaults to current run)
- timeout: Maximum time to wait for pause acknowledgment (seconds)
- poll_interval: Polling interval for pause status (seconds)
- reschedule: Whether to reschedule the flow run after pausing
- key: Optional key for identifying the pause point
Raises:
TimeoutError: If pause is not acknowledged within timeout
"""
def resume_flow_run(
flow_run_id: UUID,
run_input: Dict[str, Any] = None,
) -> None:
"""
Resume a paused flow run.
Parameters:
- flow_run_id: ID of the paused flow run to resume
- run_input: Optional input data to provide when resuming
Raises:
ValueError: If flow run is not in a paused state
"""
def suspend_flow_run(
flow_run_id: Optional[UUID] = None,
timeout: int = 300,
poll_interval: int = 10,
key: Optional[str] = None,
) -> None:
"""
Suspend a flow run for long-term storage and later resumption.
Parameters:
- flow_run_id: ID of the flow run to suspend (defaults to current run)
- timeout: Maximum time to wait for suspension acknowledgment (seconds)
- poll_interval: Polling interval for suspension status (seconds)
- key: Optional key for identifying the suspension point
Raises:
TimeoutError: If suspension is not acknowledged within timeout
"""from prefect import flow, get_run_logger
from prefect.flow_runs import pause_flow_run, resume_flow_run, suspend_flow_run
from prefect.client.orchestration import get_client
@flow
def interactive_flow():
logger = get_run_logger()
logger.info("Starting workflow")
# Pause for manual review
logger.info("Pausing for manual review")
pause_flow_run(key="manual_review", timeout=600)
logger.info("Resumed after manual review")
# Continue processing
return "Workflow completed"
@flow
def long_running_flow():
logger = get_run_logger()
# Process first batch
logger.info("Processing first batch")
# Suspend for overnight processing
logger.info("Suspending for overnight processing")
suspend_flow_run(key="overnight_break")
# Resume processing next day
logger.info("Resuming processing")
return "Long workflow completed"
# Resume a suspended flow programmatically
async def resume_workflow(flow_run_id: str):
client = get_client()
await resume_flow_run(flow_run_id, run_input={"resumed_at": datetime.now()})Utility functions for working with states, including result extraction and state conversion.
def get_state_result(
state: State,
raise_on_failure: bool = True,
) -> Any:
"""
Extract the result data from a state object.
Parameters:
- state: State object to extract result from
- raise_on_failure: Whether to raise exception for failed states
Returns:
The result data stored in the state
Raises:
Exception: If state represents a failure and raise_on_failure is True
"""
def to_state_create(state: State) -> StateCreate:
"""
Convert a State object to StateCreate format for API submission.
Parameters:
- state: State object to convert
Returns:
StateCreate object suitable for API operations
"""
def exception_to_crashed_state(
exception: Exception,
message: str = None,
) -> State:
"""
Convert an exception to a crashed state.
Parameters:
- exception: Exception that caused the crash
- message: Optional message to include
Returns:
Crashed state containing exception information
"""
def exception_to_failed_state(
exception: Exception,
message: str = None,
) -> State:
"""
Convert an exception to a failed state.
Parameters:
- exception: Exception that caused the failure
- message: Optional message to include
Returns:
Failed state containing exception information
"""
def format_exception(exception: Exception) -> str:
"""
Format an exception for display in state messages.
Parameters:
- exception: Exception to format
Returns:
Formatted string representation of the exception
"""from prefect import task, flow
from prefect.states import get_state_result, exception_to_failed_state
@task
def risky_task():
try:
# Risky operation
result = complex_operation()
return result
except Exception as e:
# Convert exception to failed state
return exception_to_failed_state(e, "Complex operation failed")
@flow
def result_processing_flow():
task_state = risky_task()
# Extract result safely
try:
result = get_state_result(task_state)
return f"Success: {result}"
except Exception:
return "Task failed, handling gracefully"The core State class representing the current status of flows and tasks.
class State:
"""
Immutable state object representing the current status of a flow or task.
Attributes:
- type: StateType enum value (COMPLETED, FAILED, etc.)
- name: Optional descriptive name
- message: Optional status message
- data: Associated data or result
- timestamp: When the state was created
- state_details: Additional state-specific information
"""
def __init__(
self,
type: StateType,
name: str = None,
message: str = None,
data: Any = None,
timestamp: datetime = None,
state_details: StateDetails = None,
):
"""Initialize a state object."""
def is_scheduled(self) -> bool:
"""Check if state represents scheduled execution."""
def is_pending(self) -> bool:
"""Check if state represents pending execution."""
def is_running(self) -> bool:
"""Check if state represents active execution."""
def is_completed(self) -> bool:
"""Check if state represents successful completion."""
def is_failed(self) -> bool:
"""Check if state represents failure."""
def is_crashed(self) -> bool:
"""Check if state represents a system crash."""
def is_cancelled(self) -> bool:
"""Check if state represents cancellation."""
def is_cancelling(self) -> bool:
"""Check if state represents active cancellation."""
def is_paused(self) -> bool:
"""Check if state represents a pause."""
def is_suspended(self) -> bool:
"""Check if state represents suspension."""
def is_final(self) -> bool:
"""Check if state represents a final (terminal) status."""
def copy(
self,
*,
type: StateType = None,
name: str = None,
message: str = None,
data: Any = None,
) -> "State":
"""Create a copy of the state with modified attributes."""
def result(self, raise_on_failure: bool = True) -> Any:
"""
Get the result from the state.
Parameters:
- raise_on_failure: Whether to raise on failed states
Returns:
The result data from the state
"""Utility for grouping and categorizing states by their types.
class StateGroup:
"""
Groups states by their type for easier categorization and handling.
"""
# Final states that represent workflow completion
FINAL = frozenset([
StateType.COMPLETED,
StateType.FAILED,
StateType.CRASHED,
StateType.CANCELLED,
])
# Running states that represent active execution
RUNNING = frozenset([
StateType.RUNNING,
StateType.CANCELLING,
StateType.RETRYING,
])
# Waiting states that represent pending execution
WAITING = frozenset([
StateType.SCHEDULED,
StateType.PENDING,
StateType.AWAITING_RETRY,
StateType.PAUSED,
StateType.SUSPENDED,
])
@classmethod
def is_final(cls, state_type: StateType) -> bool:
"""Check if a state type is final."""
return state_type in cls.FINAL
@classmethod
def is_running(cls, state_type: StateType) -> bool:
"""Check if a state type represents running execution."""
return state_type in cls.RUNNING
@classmethod
def is_waiting(cls, state_type: StateType) -> bool:
"""Check if a state type represents waiting for execution."""
return state_type in cls.WAITINGfrom prefect.states import StateGroup, StateType
def handle_state(state_type: StateType):
if StateGroup.is_final(state_type):
print("Workflow has completed")
elif StateGroup.is_running(state_type):
print("Workflow is actively running")
elif StateGroup.is_waiting(state_type):
print("Workflow is waiting to run")Types related to state management:
from typing import Any, Optional, Dict
from datetime import datetime
from enum import Enum
from uuid import UUID
class StateType(str, Enum):
"""Enumeration of all possible state types."""
SCHEDULED = "SCHEDULED"
PENDING = "PENDING"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
CRASHED = "CRASHED"
PAUSED = "PAUSED"
SUSPENDED = "SUSPENDED"
AWAITING_RETRY = "AWAITING_RETRY"
RETRYING = "RETRYING"
CANCELLING = "CANCELLING"
LATE = "LATE"
class StateDetails:
"""Additional details for specific state types."""
flow_run_id: Optional[UUID]
task_run_id: Optional[UUID]
child_flow_run_id: Optional[UUID]
scheduled_time: Optional[datetime]
cache_key: Optional[str]
pause_timeout: Optional[datetime]
pause_reschedule: Optional[bool]
pause_key: Optional[str]
run_input_keyset: Optional[Dict[str, Any]]
class StateCreate:
"""State creation format for API operations."""
type: StateType
name: Optional[str]
message: Optional[str]
state_details: Optional[StateDetails]
data: Optional[Any]Install with Tessl CLI
npx tessl i tessl/pypi-prefect