CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-prefect

Workflow orchestration and management framework for building resilient data pipelines.

Pending
Overview
Eval results
Files

state-management.mddocs/

State Management

Prefect's state management system provides comprehensive control over workflow execution through immutable state objects and lifecycle functions. States represent the current status of flows and tasks, enabling fine-grained control over execution, retry logic, and error handling.

Capabilities

State Creation Functions

Factory functions for creating different types of states that represent various stages in the lifecycle of flows and tasks.

def Completed(cls: type = State, **kwargs: Any) -> State:
    """
    Create a completed state indicating successful execution.
    
    Parameters:
    - data: Result data from the completed operation
    - name: Optional state name
    - message: Optional descriptive message
    - type: State type (defaults to COMPLETED)
    
    Returns:
    State object representing successful completion
    """

def Failed(cls: type = State, **kwargs: Any) -> State:
    """
    Create a failed state indicating execution failure.
    
    Parameters:
    - data: Error information or exception details
    - name: Optional state name
    - message: Optional error message
    - type: State type (defaults to FAILED)
    
    Returns:
    State object representing failure
    """

def Running(cls: type = State, **kwargs: Any) -> State:
    """
    Create a running state indicating active execution.
    
    Parameters:
    - data: Optional data about the running operation
    - name: Optional state name
    - message: Optional status message
    - type: State type (defaults to RUNNING)
    
    Returns:
    State object representing active execution
    """

def Scheduled(
    scheduled_time: datetime = None,
    name: str = None,
    message: str = None,
    type: StateType = None,
) -> State:
    """
    Create a scheduled state for future execution.
    
    Parameters:
    - scheduled_time: When the operation is scheduled to run
    - name: Optional state name
    - message: Optional scheduling message
    - type: State type (defaults to SCHEDULED)
    
    Returns:
    State object representing scheduled execution
    """

def Pending(cls: type = State, **kwargs: Any) -> State:
    """
    Create a pending state for operations awaiting execution.
    
    Parameters:
    - name: Optional state name
    - message: Optional pending message
    - type: State type (defaults to PENDING)
    
    Returns:
    State object representing pending execution
    """

def Crashed(cls: type = State, **kwargs: Any) -> State:
    """
    Create a crashed state for unexpected failures.
    
    Parameters:
    - data: Crash information or exception details
    - name: Optional state name
    - message: Optional crash message
    - type: State type (defaults to CRASHED)
    
    Returns:
    State object representing system crash
    """

def Cancelled(cls: type = State, **kwargs: Any) -> State:
    """
    Create a cancelled state for deliberately stopped operations.
    
    Parameters:
    - data: Optional cancellation data
    - name: Optional state name
    - message: Optional cancellation message
    - type: State type (defaults to CANCELLED)
    
    Returns:
    State object representing cancellation
    """

def Cancelling(
    data: Any = None,
    name: str = None,
    message: str = None,
    type: StateType = None,
) -> State:
    """
    Create a cancelling state for operations in the process of being cancelled.
    
    Parameters:
    - data: Optional cancellation data
    - name: Optional state name
    - message: Optional cancellation message
    - type: State type (defaults to CANCELLING)
    
    Returns:
    State object representing active cancellation
    """

def Paused(
    cls: type = State,
    timeout_seconds: Optional[int] = None,
    pause_expiration_time: Optional[datetime] = None,
    reschedule: bool = False,
    pause_key: Optional[str] = None,
    **kwargs: Any,
) -> State:
    """
    Create a paused state for temporarily halted operations.
    
    Parameters:
    - data: Optional pause data
    - name: Optional state name
    - message: Optional pause message
    - type: State type (defaults to PAUSED)
    
    Returns:
    State object representing paused execution
    """

def Suspended(
    cls: type = State,
    timeout_seconds: Optional[int] = None,
    pause_expiration_time: Optional[datetime] = None,
    pause_key: Optional[str] = None,
    **kwargs: Any,
) -> State:
    """
    Create a suspended state for long-term halted operations.
    
    Parameters:
    - data: Optional suspension data
    - name: Optional state name
    - message: Optional suspension message
    - type: State type (defaults to SUSPENDED)
    
    Returns:
    State object representing suspended execution
    """

def AwaitingRetry(
    cls: type = State,
    scheduled_time: Optional[datetime] = None,
    **kwargs: Any,
) -> State:
    """
    Create an awaiting retry state for operations scheduled to retry.
    
    Parameters:
    - scheduled_time: When the retry is scheduled
    - name: Optional state name
    - message: Optional retry message
    - type: State type (defaults to AWAITING_RETRY)
    
    Returns:
    State object representing scheduled retry
    """

def Retrying(cls: type = State, **kwargs: Any) -> State:
    """
    Create a retrying state for operations currently being retried.
    
    Parameters:
    - data: Optional retry data
    - name: Optional state name
    - message: Optional retry message
    - type: State type (defaults to RETRYING)
    
    Returns:
    State object representing active retry
    """

def Late(
    cls: type = State,
    scheduled_time: Optional[datetime] = None,
    **kwargs: Any,
) -> State:
    """
    Create a late state for operations that missed their scheduled time.
    
    Parameters:
    - data: Optional lateness data
    - name: Optional state name
    - message: Optional lateness message
    - type: State type (defaults to LATE)
    
    Returns:
    State object representing late execution
    """

Usage Examples

from prefect import flow, task
from prefect.states import Completed, Failed, Running, Scheduled
from datetime import datetime, timedelta

@task
def process_data():
    try:
        # Processing logic
        result = {"processed": 100}
        return Completed(data=result, message="Processing successful")
    except Exception as e:
        return Failed(data=str(e), message="Processing failed")

@flow
def scheduled_workflow():
    # Schedule for future execution
    future_time = datetime.now() + timedelta(hours=1)
    return Scheduled(scheduled_time=future_time, message="Scheduled for later")

# Using states in flow logic
@flow
def conditional_flow():
    task_state = process_data()
    if task_state.is_completed():
        return "Success"
    elif task_state.is_failed():
        return "Failure"
    else:
        return "Unknown"

Flow Run Control

Functions for controlling the lifecycle of running flows, including pause, resume, and suspension operations.

def pause_flow_run(
    flow_run_id: Optional[UUID] = None,
    timeout: int = 300,
    poll_interval: int = 10,
    reschedule: bool = False,
    key: Optional[str] = None,
) -> None:
    """
    Pause a flow run, halting execution until manually resumed.
    
    Parameters:
    - flow_run_id: ID of the flow run to pause (defaults to current run)
    - timeout: Maximum time to wait for pause acknowledgment (seconds)
    - poll_interval: Polling interval for pause status (seconds)
    - reschedule: Whether to reschedule the flow run after pausing
    - key: Optional key for identifying the pause point
    
    Raises:
    TimeoutError: If pause is not acknowledged within timeout
    """

def resume_flow_run(
    flow_run_id: UUID,
    run_input: Dict[str, Any] = None,
) -> None:
    """
    Resume a paused flow run.
    
    Parameters:
    - flow_run_id: ID of the paused flow run to resume
    - run_input: Optional input data to provide when resuming
    
    Raises:
    ValueError: If flow run is not in a paused state
    """

def suspend_flow_run(
    flow_run_id: Optional[UUID] = None,
    timeout: int = 300,
    poll_interval: int = 10,
    key: Optional[str] = None,
) -> None:
    """
    Suspend a flow run for long-term storage and later resumption.
    
    Parameters:
    - flow_run_id: ID of the flow run to suspend (defaults to current run)
    - timeout: Maximum time to wait for suspension acknowledgment (seconds)
    - poll_interval: Polling interval for suspension status (seconds)
    - key: Optional key for identifying the suspension point
    
    Raises:
    TimeoutError: If suspension is not acknowledged within timeout
    """

Usage Examples

from prefect import flow, get_run_logger
from prefect.flow_runs import pause_flow_run, resume_flow_run, suspend_flow_run
from prefect.client.orchestration import get_client

@flow
def interactive_flow():
    logger = get_run_logger()
    
    logger.info("Starting workflow")
    
    # Pause for manual review
    logger.info("Pausing for manual review")
    pause_flow_run(key="manual_review", timeout=600)
    
    logger.info("Resumed after manual review")
    
    # Continue processing
    return "Workflow completed"

@flow
def long_running_flow():
    logger = get_run_logger()
    
    # Process first batch
    logger.info("Processing first batch")
    
    # Suspend for overnight processing
    logger.info("Suspending for overnight processing") 
    suspend_flow_run(key="overnight_break")
    
    # Resume processing next day
    logger.info("Resuming processing")
    
    return "Long workflow completed"

# Resume a suspended flow programmatically
async def resume_workflow(flow_run_id: str):
    client = get_client()
    await resume_flow_run(flow_run_id, run_input={"resumed_at": datetime.now()})

State Utilities

Utility functions for working with states, including result extraction and state conversion.

def get_state_result(
    state: State,
    raise_on_failure: bool = True,
) -> Any:
    """
    Extract the result data from a state object.
    
    Parameters:
    - state: State object to extract result from
    - raise_on_failure: Whether to raise exception for failed states
    
    Returns:
    The result data stored in the state
    
    Raises:
    Exception: If state represents a failure and raise_on_failure is True
    """

def to_state_create(state: State) -> StateCreate:
    """
    Convert a State object to StateCreate format for API submission.
    
    Parameters:
    - state: State object to convert
    
    Returns:
    StateCreate object suitable for API operations
    """

def exception_to_crashed_state(
    exception: Exception,
    message: str = None,
) -> State:
    """
    Convert an exception to a crashed state.
    
    Parameters:
    - exception: Exception that caused the crash
    - message: Optional message to include
    
    Returns:
    Crashed state containing exception information
    """

def exception_to_failed_state(
    exception: Exception,
    message: str = None,
) -> State:
    """
    Convert an exception to a failed state.
    
    Parameters:
    - exception: Exception that caused the failure
    - message: Optional message to include
    
    Returns:
    Failed state containing exception information
    """

def format_exception(exception: Exception) -> str:
    """
    Format an exception for display in state messages.
    
    Parameters:
    - exception: Exception to format
    
    Returns:
    Formatted string representation of the exception
    """

Usage Examples

from prefect import task, flow
from prefect.states import get_state_result, exception_to_failed_state

@task
def risky_task():
    try:
        # Risky operation
        result = complex_operation()
        return result
    except Exception as e:
        # Convert exception to failed state
        return exception_to_failed_state(e, "Complex operation failed")

@flow
def result_processing_flow():
    task_state = risky_task()
    
    # Extract result safely
    try:
        result = get_state_result(task_state)
        return f"Success: {result}"
    except Exception:
        return "Task failed, handling gracefully"

State Class

The core State class representing the current status of flows and tasks.

class State:
    """
    Immutable state object representing the current status of a flow or task.
    
    Attributes:
    - type: StateType enum value (COMPLETED, FAILED, etc.)
    - name: Optional descriptive name
    - message: Optional status message
    - data: Associated data or result
    - timestamp: When the state was created
    - state_details: Additional state-specific information
    """
    
    def __init__(
        self,
        type: StateType,
        name: str = None,
        message: str = None,
        data: Any = None,
        timestamp: datetime = None,
        state_details: StateDetails = None,
    ):
        """Initialize a state object."""
    
    def is_scheduled(self) -> bool:
        """Check if state represents scheduled execution."""
    
    def is_pending(self) -> bool:
        """Check if state represents pending execution."""
    
    def is_running(self) -> bool:
        """Check if state represents active execution."""
    
    def is_completed(self) -> bool:
        """Check if state represents successful completion."""
    
    def is_failed(self) -> bool:
        """Check if state represents failure."""
    
    def is_crashed(self) -> bool:
        """Check if state represents a system crash."""
    
    def is_cancelled(self) -> bool:
        """Check if state represents cancellation."""
    
    def is_cancelling(self) -> bool:
        """Check if state represents active cancellation."""
    
    def is_paused(self) -> bool:
        """Check if state represents a pause."""
    
    def is_suspended(self) -> bool:
        """Check if state represents suspension."""
    
    def is_final(self) -> bool:
        """Check if state represents a final (terminal) status."""
    
    def copy(
        self,
        *,
        type: StateType = None,
        name: str = None,
        message: str = None,
        data: Any = None,
    ) -> "State":
        """Create a copy of the state with modified attributes."""
    
    def result(self, raise_on_failure: bool = True) -> Any:
        """
        Get the result from the state.
        
        Parameters:
        - raise_on_failure: Whether to raise on failed states
        
        Returns:
        The result data from the state
        """

State Groups

Utility for grouping and categorizing states by their types.

class StateGroup:
    """
    Groups states by their type for easier categorization and handling.
    """
    
    # Final states that represent workflow completion
    FINAL = frozenset([
        StateType.COMPLETED,
        StateType.FAILED,
        StateType.CRASHED,
        StateType.CANCELLED,
    ])
    
    # Running states that represent active execution
    RUNNING = frozenset([
        StateType.RUNNING,
        StateType.CANCELLING,
        StateType.RETRYING,
    ])
    
    # Waiting states that represent pending execution
    WAITING = frozenset([
        StateType.SCHEDULED,
        StateType.PENDING,
        StateType.AWAITING_RETRY,
        StateType.PAUSED,
        StateType.SUSPENDED,
    ])
    
    @classmethod
    def is_final(cls, state_type: StateType) -> bool:
        """Check if a state type is final."""
        return state_type in cls.FINAL
    
    @classmethod  
    def is_running(cls, state_type: StateType) -> bool:
        """Check if a state type represents running execution."""
        return state_type in cls.RUNNING
    
    @classmethod
    def is_waiting(cls, state_type: StateType) -> bool:
        """Check if a state type represents waiting for execution."""
        return state_type in cls.WAITING

Usage Examples

from prefect.states import StateGroup, StateType

def handle_state(state_type: StateType):
    if StateGroup.is_final(state_type):
        print("Workflow has completed")
    elif StateGroup.is_running(state_type):
        print("Workflow is actively running")
    elif StateGroup.is_waiting(state_type):
        print("Workflow is waiting to run")

Types

Types related to state management:

from typing import Any, Optional, Dict
from datetime import datetime
from enum import Enum
from uuid import UUID

class StateType(str, Enum):
    """Enumeration of all possible state types."""
    SCHEDULED = "SCHEDULED"
    PENDING = "PENDING" 
    RUNNING = "RUNNING"
    COMPLETED = "COMPLETED"
    FAILED = "FAILED"
    CANCELLED = "CANCELLED"
    CRASHED = "CRASHED"
    PAUSED = "PAUSED"
    SUSPENDED = "SUSPENDED"
    AWAITING_RETRY = "AWAITING_RETRY"
    RETRYING = "RETRYING"
    CANCELLING = "CANCELLING"
    LATE = "LATE"

class StateDetails:
    """Additional details for specific state types."""
    flow_run_id: Optional[UUID]
    task_run_id: Optional[UUID]
    child_flow_run_id: Optional[UUID]
    scheduled_time: Optional[datetime]
    cache_key: Optional[str]
    pause_timeout: Optional[datetime]
    pause_reschedule: Optional[bool]
    pause_key: Optional[str]
    run_input_keyset: Optional[Dict[str, Any]]

class StateCreate:
    """State creation format for API operations."""
    type: StateType
    name: Optional[str]
    message: Optional[str]
    state_details: Optional[StateDetails]
    data: Optional[Any]

Install with Tessl CLI

npx tessl i tessl/pypi-prefect

docs

client-api.md

configuration.md

context-utilities.md

core-workflows.md

deployments.md

index.md

runtime-context.md

state-management.md

variables.md

tile.json