Workflow orchestration and management framework for building resilient data pipelines.
npx @tessl/cli install tessl/pypi-prefect@3.4.0Prefect is a workflow orchestration and management framework for building resilient data pipelines in Python. It provides a modern approach to workflow management with features like automatic retries, dynamic mapping, caching, and real-time observability, enabling robust and scalable data workflows.
pip install prefectimport prefect
from prefect import flow, task, get_run_logger, get_client, StateCommon imports for workflow development:
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.deployments import deploy
from prefect.client.orchestration import get_client
from prefect.states import Completed, Failed, Running
from prefect.context import TaskRunContextfrom prefect import flow, task, get_run_logger
@task
def extract_data(url: str):
"""Extract data from a source."""
logger = get_run_logger()
logger.info(f"Extracting data from {url}")
# Simulate data extraction
return {"records": 100, "source": url}
@task
def transform_data(raw_data: dict):
"""Transform the extracted data."""
logger = get_run_logger()
logger.info("Transforming data")
return {
"processed_records": raw_data["records"],
"status": "transformed"
}
@task
def load_data(processed_data: dict):
"""Load data to destination."""
logger = get_run_logger()
logger.info(f"Loading {processed_data['processed_records']} records")
return {"loaded": True}
@flow(name="ETL Pipeline")
def etl_flow(source_url: str):
"""Complete ETL workflow."""
raw = extract_data(source_url)
processed = transform_data(raw)
result = load_data(processed)
return result
if __name__ == "__main__":
# Run the flow
result = etl_flow("https://api.example.com/data")
print(result)Prefect's architecture is built around several key concepts:
This design enables building resilient workflows with automatic error handling, observability, and scalable execution across diverse infrastructure environments.
Flow and task creation with decorators, execution control, serving deployments, and flow lifecycle management.
def flow(
fn=None,
*,
name: str = None,
description: str = None,
version: str = None,
flow_run_name: str = None,
task_runner: TaskRunner = None,
timeout_seconds: Union[int, float] = None,
validate_parameters: bool = True,
retries: int = None,
retry_delay_seconds: Union[int, float] = None,
persist_result: bool = None,
result_storage: ResultStorage = None,
result_serializer: ResultSerializer = None,
cache_result_in_memory: bool = True,
on_completion: List[FlowStateHook] = None,
on_failure: List[FlowStateHook] = None,
on_cancellation: List[FlowStateHook] = None,
on_crashed: List[FlowStateHook] = None,
on_running: List[FlowStateHook] = None,
log_prints: bool = None,
): ...
def task(
fn=None,
*,
name: str = None,
description: str = None,
tags: Iterable[str] = None,
version: str = None,
cache_key_fn: Callable[[TaskRunContext, Dict[str, Any]], Optional[str]] = None,
cache_expiration: datetime.timedelta = None,
task_run_name: Union[str, Callable[[], str]] = None,
retries: int = None,
retry_delay_seconds: Union[int, float] = None,
retry_condition_fn: RetryConditionCallable = None,
persist_result: bool = None,
result_storage: ResultStorage = None,
result_serializer: ResultSerializer = None,
cache_result_in_memory: bool = True,
timeout_seconds: Union[int, float] = None,
log_prints: bool = None,
refresh_cache: bool = None,
on_completion: List[StateHookCallable] = None,
on_failure: List[StateHookCallable] = None,
): ...
def serve(*flows: Flow, **kwargs) -> None: ...
def aserve(*flows: Flow, **kwargs) -> None: ...State creation functions, state utilities, and flow run lifecycle control.
def Completed(cls: type = State, **kwargs: Any) -> State: ...
def Failed(cls: type = State, **kwargs: Any) -> State: ...
def Running(cls: type = State, **kwargs: Any) -> State: ...
def Pending(cls: type = State, **kwargs: Any) -> State: ...
def Cancelled(cls: type = State, **kwargs: Any) -> State: ...
def Crashed(cls: type = State, **kwargs: Any) -> State: ...
def Scheduled(
scheduled_time: datetime = None,
name: str = None,
message: str = None,
type: StateType = None,
) -> State: ...
def Paused(
cls: type = State,
timeout_seconds: Optional[int] = None,
pause_expiration_time: Optional[datetime] = None,
reschedule: bool = False,
pause_key: Optional[str] = None,
**kwargs: Any,
) -> State: ...
def Suspended(
cls: type = State,
timeout_seconds: Optional[int] = None,
pause_expiration_time: Optional[datetime] = None,
pause_key: Optional[str] = None,
**kwargs: Any,
) -> State: ...
def AwaitingRetry(
cls: type = State,
scheduled_time: Optional[datetime] = None,
**kwargs: Any,
) -> State: ...
def Retrying(cls: type = State, **kwargs: Any) -> State: ...
def pause_flow_run(
wait_for_input: Optional[type] = None,
timeout: int = 3600,
poll_interval: int = 10,
key: Optional[str] = None,
) -> Any: ...
def resume_flow_run(
flow_run_id: UUID,
run_input: Optional[Dict[str, Any]] = None,
) -> None: ...Deploy flows to work pools, manage deployments, and run deployed workflows.
def deploy(
*deployments: RunnerDeployment,
work_pool_name: Optional[str] = None,
image: Optional[Union[str, DockerImage]] = None,
build: bool = True,
push: bool = True,
print_next_steps_message: bool = True,
ignore_warnings: bool = False,
) -> List[UUID]: ...
def initialize_project(
name: str = None,
recipe: str = None,
) -> None: ...
def run_deployment(
name: str,
parameters: Dict[str, Any] = None,
scheduled_time: datetime = None,
flow_run_name: str = None,
timeout: int = None,
poll_interval: int = 10,
tags: List[str] = None,
idempotency_key: str = None,
work_queue_name: str = None,
) -> FlowRun: ...HTTP clients for interacting with the Prefect API server and cloud services.
def get_client(
httpx_settings: dict = None,
sync_client: bool = None,
) -> Union[PrefectClient, SyncPrefectClient]: ...
class PrefectClient:
def __init__(
self,
api: str = None,
api_key: str = None,
api_version: str = None,
httpx_settings: dict = None,
): ...
async def create_flow_run(
self,
flow: Flow,
name: str = None,
parameters: Dict[str, Any] = None,
context: Dict[str, Any] = None,
tags: List[str] = None,
parent_task_run_id: UUID = None,
state: State = None,
) -> FlowRun: ...Context management, logging, annotations, and transaction support.
def get_run_logger(name: str = None) -> logging.Logger: ...
def tags(*tags: str, **kwargs) -> ContextManager: ...
class unmapped:
def __init__(self, value: Any): ...
class allow_failure:
def __init__(self, value: Any): ...
class Transaction:
def __init__(
self,
key: Optional[str] = None,
timeout: Optional[float] = None,
): ...
def __enter__(self) -> "Transaction": ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...Access current execution context including deployment parameters, flow run metadata, and task run information during workflow execution.
# Runtime context modules for accessing execution information
import prefect.runtime.deployment
import prefect.runtime.flow_run
import prefect.runtime.task_run
# Deployment context attributes
prefect.runtime.deployment.id: str
prefect.runtime.deployment.name: str
prefect.runtime.deployment.parameters: Dict[str, Any]
prefect.runtime.deployment.version: str
# Flow run context attributes
prefect.runtime.flow_run.id: str
prefect.runtime.flow_run.name: str
prefect.runtime.flow_run.parameters: Dict[str, Any]
prefect.runtime.flow_run.tags: List[str]
# Task run context attributes
prefect.runtime.task_run.id: str
prefect.runtime.task_run.name: str
prefect.runtime.task_run.task_key: strNamed, mutable JSON values that can be shared across tasks, flows, and deployments for configuration and data storage.
from prefect.variables import Variable
class Variable(BaseModel):
name: str
value: StrictVariableValue
tags: Optional[List[str]]
@classmethod
def get(
cls,
name: str,
default: StrictVariableValue = None,
) -> StrictVariableValue: ...
@classmethod
async def aget(
cls,
name: str,
default: StrictVariableValue = None,
) -> StrictVariableValue: ...
@classmethod
def set(
cls,
name: str,
value: StrictVariableValue,
tags: Optional[List[str]] = None,
overwrite: bool = False,
) -> "Variable": ...
@classmethod
async def aset(
cls,
name: str,
value: StrictVariableValue,
tags: Optional[List[str]] = None,
overwrite: bool = False,
) -> "Variable": ...Settings, blocks, and configuration management for Prefect infrastructure and credentials.
class Block(BaseModel):
def save(
self,
name: str,
overwrite: bool = False,
) -> UUID: ...
@classmethod
def load(cls, name: str) -> "Block": ...
def get_settings_context() -> SettingsContext: ...Core types used throughout the Prefect API:
from typing import Any, Callable, Dict, List, Optional, Union, TypeVar, Generic
from datetime import datetime, timedelta
from uuid import UUID
import logging
# Generic type parameters
P = TypeVar("P") # Parameters
R = TypeVar("R") # Return type
class Flow(Generic[P, R]):
name: str
fn: Callable[P, R]
description: Optional[str]
version: Optional[str]
class Task(Generic[P, R]):
name: str
fn: Callable[P, R]
description: Optional[str]
version: Optional[str]
class State:
type: StateType
name: Optional[str]
message: Optional[str]
data: Any
timestamp: datetime
class FlowRun:
id: UUID
name: str
flow_id: UUID
state: State
parameters: Dict[str, Any]
class TaskRun:
id: UUID
name: str
task_key: str
flow_run_id: UUID
state: State
# Enums and Constants
class StateType(str, Enum):
SCHEDULED = "SCHEDULED"
PENDING = "PENDING"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
CANCELLED = "CANCELLED"
CRASHED = "CRASHED"
PAUSED = "PAUSED"
SUSPENDED = "SUSPENDED"
AWAITING_RETRY = "AWAITING_RETRY"
RETRYING = "RETRYING"
# Context types
class TaskRunContext:
"""Context information available during task execution."""
pass
# Callable types
FlowStateHook = Callable[[Flow, FlowRun, State], None]
StateHookCallable = Callable[[Task, TaskRun, State], None]
RetryConditionCallable = Callable[[Task, TaskRun, State], bool]
# Configuration types
class TaskRunner:
pass
class ResultStorage:
pass
class ResultSerializer:
pass
# Deployment types
class RunnerDeployment:
"""Deployment configuration for flows."""
pass
class DockerImage:
"""Docker image configuration for deployments."""
pass
class DeploymentImage:
"""Deployment image configuration."""
pass