or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

client-api.mdconfiguration.mdcontext-utilities.mdcore-workflows.mddeployments.mdindex.mdruntime-context.mdstate-management.mdvariables.md
tile.json

tessl/pypi-prefect

Workflow orchestration and management framework for building resilient data pipelines.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/prefect@3.4.x

To install, run

npx @tessl/cli install tessl/pypi-prefect@3.4.0

index.mddocs/

Prefect

Prefect is a workflow orchestration and management framework for building resilient data pipelines in Python. It provides a modern approach to workflow management with features like automatic retries, dynamic mapping, caching, and real-time observability, enabling robust and scalable data workflows.

Package Information

  • Package Name: prefect
  • Language: Python
  • Installation: pip install prefect

Core Imports

import prefect
from prefect import flow, task, get_run_logger, get_client, State

Common imports for workflow development:

from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.deployments import deploy
from prefect.client.orchestration import get_client
from prefect.states import Completed, Failed, Running
from prefect.context import TaskRunContext

Basic Usage

from prefect import flow, task, get_run_logger

@task
def extract_data(url: str):
    """Extract data from a source."""
    logger = get_run_logger()
    logger.info(f"Extracting data from {url}")
    # Simulate data extraction
    return {"records": 100, "source": url}

@task
def transform_data(raw_data: dict):
    """Transform the extracted data."""
    logger = get_run_logger()
    logger.info("Transforming data")
    return {
        "processed_records": raw_data["records"],
        "status": "transformed"
    }

@task
def load_data(processed_data: dict):
    """Load data to destination."""
    logger = get_run_logger()
    logger.info(f"Loading {processed_data['processed_records']} records")
    return {"loaded": True}

@flow(name="ETL Pipeline")
def etl_flow(source_url: str):
    """Complete ETL workflow."""
    raw = extract_data(source_url)
    processed = transform_data(raw)
    result = load_data(processed)
    return result

if __name__ == "__main__":
    # Run the flow
    result = etl_flow("https://api.example.com/data")
    print(result)

Architecture

Prefect's architecture is built around several key concepts:

  • Flows: Top-level workflow containers that define the execution logic and dependencies between tasks
  • Tasks: Individual units of work that can be cached, retried, and run in parallel
  • States: Immutable snapshots representing the current status of flows and tasks (Pending, Running, Completed, Failed, etc.)
  • Deployments: Infrastructure-aware flow configurations that enable scheduled and triggered execution
  • Work Pools: Infrastructure abstraction layer for executing flows across different environments
  • Blocks: Reusable configuration objects for credentials, connections, and infrastructure settings

This design enables building resilient workflows with automatic error handling, observability, and scalable execution across diverse infrastructure environments.

Capabilities

Core Workflows

Flow and task creation with decorators, execution control, serving deployments, and flow lifecycle management.

def flow(
    fn=None,
    *,
    name: str = None,
    description: str = None,
    version: str = None,
    flow_run_name: str = None,
    task_runner: TaskRunner = None,
    timeout_seconds: Union[int, float] = None,
    validate_parameters: bool = True,
    retries: int = None,
    retry_delay_seconds: Union[int, float] = None,
    persist_result: bool = None,
    result_storage: ResultStorage = None,
    result_serializer: ResultSerializer = None,
    cache_result_in_memory: bool = True,
    on_completion: List[FlowStateHook] = None,
    on_failure: List[FlowStateHook] = None,
    on_cancellation: List[FlowStateHook] = None,
    on_crashed: List[FlowStateHook] = None,
    on_running: List[FlowStateHook] = None,
    log_prints: bool = None,
): ...

def task(
    fn=None,
    *,
    name: str = None,
    description: str = None,
    tags: Iterable[str] = None,
    version: str = None,
    cache_key_fn: Callable[[TaskRunContext, Dict[str, Any]], Optional[str]] = None,
    cache_expiration: datetime.timedelta = None,
    task_run_name: Union[str, Callable[[], str]] = None,
    retries: int = None,
    retry_delay_seconds: Union[int, float] = None,
    retry_condition_fn: RetryConditionCallable = None,
    persist_result: bool = None,
    result_storage: ResultStorage = None,
    result_serializer: ResultSerializer = None,
    cache_result_in_memory: bool = True,
    timeout_seconds: Union[int, float] = None,
    log_prints: bool = None,
    refresh_cache: bool = None,
    on_completion: List[StateHookCallable] = None,
    on_failure: List[StateHookCallable] = None,
): ...

def serve(*flows: Flow, **kwargs) -> None: ...
def aserve(*flows: Flow, **kwargs) -> None: ...

Core Workflows

State Management

State creation functions, state utilities, and flow run lifecycle control.

def Completed(cls: type = State, **kwargs: Any) -> State: ...

def Failed(cls: type = State, **kwargs: Any) -> State: ...

def Running(cls: type = State, **kwargs: Any) -> State: ...

def Pending(cls: type = State, **kwargs: Any) -> State: ...

def Cancelled(cls: type = State, **kwargs: Any) -> State: ...

def Crashed(cls: type = State, **kwargs: Any) -> State: ...

def Scheduled(
    scheduled_time: datetime = None,
    name: str = None,
    message: str = None,
    type: StateType = None,
) -> State: ...

def Paused(
    cls: type = State,
    timeout_seconds: Optional[int] = None,
    pause_expiration_time: Optional[datetime] = None,
    reschedule: bool = False,
    pause_key: Optional[str] = None,
    **kwargs: Any,
) -> State: ...

def Suspended(
    cls: type = State,
    timeout_seconds: Optional[int] = None,
    pause_expiration_time: Optional[datetime] = None,
    pause_key: Optional[str] = None,
    **kwargs: Any,
) -> State: ...

def AwaitingRetry(
    cls: type = State,
    scheduled_time: Optional[datetime] = None,
    **kwargs: Any,
) -> State: ...

def Retrying(cls: type = State, **kwargs: Any) -> State: ...

def pause_flow_run(
    wait_for_input: Optional[type] = None,
    timeout: int = 3600,
    poll_interval: int = 10,
    key: Optional[str] = None,
) -> Any: ...

def resume_flow_run(
    flow_run_id: UUID,
    run_input: Optional[Dict[str, Any]] = None,
) -> None: ...

State Management

Deployments

Deploy flows to work pools, manage deployments, and run deployed workflows.

def deploy(
    *deployments: RunnerDeployment,
    work_pool_name: Optional[str] = None,
    image: Optional[Union[str, DockerImage]] = None,
    build: bool = True,
    push: bool = True,
    print_next_steps_message: bool = True,
    ignore_warnings: bool = False,
) -> List[UUID]: ...

def initialize_project(
    name: str = None,
    recipe: str = None,
) -> None: ...

def run_deployment(
    name: str,
    parameters: Dict[str, Any] = None,
    scheduled_time: datetime = None,
    flow_run_name: str = None,
    timeout: int = None,
    poll_interval: int = 10,
    tags: List[str] = None,
    idempotency_key: str = None,
    work_queue_name: str = None,
) -> FlowRun: ...

Deployments

Client API

HTTP clients for interacting with the Prefect API server and cloud services.

def get_client(
    httpx_settings: dict = None,
    sync_client: bool = None,
) -> Union[PrefectClient, SyncPrefectClient]: ...

class PrefectClient:
    def __init__(
        self,
        api: str = None,
        api_key: str = None,
        api_version: str = None,
        httpx_settings: dict = None,
    ): ...

    async def create_flow_run(
        self,
        flow: Flow,
        name: str = None,
        parameters: Dict[str, Any] = None,
        context: Dict[str, Any] = None,
        tags: List[str] = None,
        parent_task_run_id: UUID = None,
        state: State = None,
    ) -> FlowRun: ...

Client API

Context & Utilities

Context management, logging, annotations, and transaction support.

def get_run_logger(name: str = None) -> logging.Logger: ...

def tags(*tags: str, **kwargs) -> ContextManager: ...

class unmapped:
    def __init__(self, value: Any): ...

class allow_failure:
    def __init__(self, value: Any): ...

class Transaction:
    def __init__(
        self,
        key: Optional[str] = None,
        timeout: Optional[float] = None,
    ): ...
    
    def __enter__(self) -> "Transaction": ...
    def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

Context & Utilities

Runtime Context Access

Access current execution context including deployment parameters, flow run metadata, and task run information during workflow execution.

# Runtime context modules for accessing execution information
import prefect.runtime.deployment
import prefect.runtime.flow_run  
import prefect.runtime.task_run

# Deployment context attributes
prefect.runtime.deployment.id: str
prefect.runtime.deployment.name: str
prefect.runtime.deployment.parameters: Dict[str, Any]
prefect.runtime.deployment.version: str

# Flow run context attributes  
prefect.runtime.flow_run.id: str
prefect.runtime.flow_run.name: str
prefect.runtime.flow_run.parameters: Dict[str, Any]
prefect.runtime.flow_run.tags: List[str]

# Task run context attributes
prefect.runtime.task_run.id: str
prefect.runtime.task_run.name: str
prefect.runtime.task_run.task_key: str

Runtime Context Access

Variables Management

Named, mutable JSON values that can be shared across tasks, flows, and deployments for configuration and data storage.

from prefect.variables import Variable

class Variable(BaseModel):
    name: str
    value: StrictVariableValue
    tags: Optional[List[str]]
    
    @classmethod
    def get(
        cls,
        name: str,
        default: StrictVariableValue = None,
    ) -> StrictVariableValue: ...

    @classmethod
    async def aget(
        cls,
        name: str, 
        default: StrictVariableValue = None,
    ) -> StrictVariableValue: ...

    @classmethod
    def set(
        cls,
        name: str,
        value: StrictVariableValue,
        tags: Optional[List[str]] = None,
        overwrite: bool = False,
    ) -> "Variable": ...

    @classmethod
    async def aset(
        cls,
        name: str,
        value: StrictVariableValue,
        tags: Optional[List[str]] = None,
        overwrite: bool = False,
    ) -> "Variable": ...

Variables Management

Configuration

Settings, blocks, and configuration management for Prefect infrastructure and credentials.

class Block(BaseModel):
    def save(
        self,
        name: str,
        overwrite: bool = False,
    ) -> UUID: ...
    
    @classmethod
    def load(cls, name: str) -> "Block": ...

def get_settings_context() -> SettingsContext: ...

Configuration

Types

Core types used throughout the Prefect API:

from typing import Any, Callable, Dict, List, Optional, Union, TypeVar, Generic
from datetime import datetime, timedelta
from uuid import UUID
import logging

# Generic type parameters
P = TypeVar("P")  # Parameters
R = TypeVar("R")  # Return type

class Flow(Generic[P, R]):
    name: str
    fn: Callable[P, R]
    description: Optional[str]
    version: Optional[str]

class Task(Generic[P, R]):
    name: str
    fn: Callable[P, R]
    description: Optional[str]
    version: Optional[str]

class State:
    type: StateType
    name: Optional[str]
    message: Optional[str]
    data: Any
    timestamp: datetime

class FlowRun:
    id: UUID
    name: str
    flow_id: UUID
    state: State
    parameters: Dict[str, Any]

class TaskRun:
    id: UUID
    name: str
    task_key: str
    flow_run_id: UUID
    state: State

# Enums and Constants
class StateType(str, Enum):
    SCHEDULED = "SCHEDULED"
    PENDING = "PENDING"
    RUNNING = "RUNNING"
    COMPLETED = "COMPLETED"
    FAILED = "FAILED"
    CANCELLED = "CANCELLED"
    CRASHED = "CRASHED"
    PAUSED = "PAUSED"
    SUSPENDED = "SUSPENDED"
    AWAITING_RETRY = "AWAITING_RETRY"
    RETRYING = "RETRYING"

# Context types
class TaskRunContext:
    """Context information available during task execution."""
    pass

# Callable types
FlowStateHook = Callable[[Flow, FlowRun, State], None]
StateHookCallable = Callable[[Task, TaskRun, State], None]
RetryConditionCallable = Callable[[Task, TaskRun, State], bool]

# Configuration types
class TaskRunner:
    pass

class ResultStorage:
    pass

class ResultSerializer:
    pass

# Deployment types
class RunnerDeployment:
    """Deployment configuration for flows."""
    pass

class DockerImage:
    """Docker image configuration for deployments."""
    pass

class DeploymentImage:
    """Deployment image configuration."""
    pass