Workflow orchestration and management framework for building resilient data pipelines.
—
Core workflow functionality in Prefect, including flow and task creation with decorators, execution control, serving deployments, and flow lifecycle management. This forms the foundation of Prefect's orchestration capabilities.
Creates Prefect flows from Python functions, enabling workflow orchestration with built-in retry logic, state management, and observability.
def flow(
fn=None,
*,
name: str = None,
description: str = None,
version: str = None,
flow_run_name: Union[str, Callable[[], str]] = None,
task_runner: TaskRunner = None,
timeout_seconds: Union[int, float] = None,
validate_parameters: bool = True,
retries: int = None,
retry_delay_seconds: Union[int, float] = None,
persist_result: bool = None,
result_storage: ResultStorage = None,
result_serializer: ResultSerializer = None,
cache_result_in_memory: bool = True,
on_completion: List[FlowStateHook] = None,
on_failure: List[FlowStateHook] = None,
on_cancellation: List[FlowStateHook] = None,
on_crashed: List[FlowStateHook] = None,
on_running: List[FlowStateHook] = None,
log_prints: bool = None,
):
"""
Decorator to create Prefect flows from functions.
Parameters:
- fn: Function to decorate (provided automatically when used as decorator)
- name: Name for the flow (defaults to function name)
- description: Description of the flow's purpose
- version: Version string for the flow
- flow_run_name: Name template for flow runs
- task_runner: Task runner for executing tasks within the flow
- timeout_seconds: Maximum runtime for the flow
- validate_parameters: Whether to validate flow parameters against type hints
- retries: Number of retry attempts on failure
- retry_delay_seconds: Delay between retry attempts
- persist_result: Whether to persist flow results
- result_storage: Storage backend for results
- result_serializer: Serializer for results
- cache_result_in_memory: Whether to cache results in memory
- on_completion: Hooks to run when flow completes successfully
- on_failure: Hooks to run when flow fails
- on_cancellation: Hooks to run when flow is cancelled
- on_crashed: Hooks to run when flow crashes
- on_running: Hooks to run when flow starts running
- log_prints: Whether to log print statements
Returns:
Flow object when used as decorator
"""from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
# Basic flow
@flow
def my_workflow():
"""Simple workflow example."""
return "Hello, Prefect!"
# Flow with configuration
@flow(
name="Data Processing Pipeline",
description="Process incoming data files",
version="1.0.0",
retries=3,
retry_delay_seconds=60,
task_runner=ThreadPoolTaskRunner(max_workers=4),
timeout_seconds=3600
)
def data_pipeline(file_path: str):
"""Data processing workflow with retry and timeout configuration."""
# Workflow logic here
pass
# Flow with hooks
@flow(
on_completion=[lambda flow, flow_run, state: print("Flow completed!")],
on_failure=[lambda flow, flow_run, state: print("Flow failed!")],
)
def monitored_flow():
"""Flow with state change hooks for monitoring."""
passCreates Prefect tasks from Python functions, enabling parallel execution, caching, retries, and state management.
def task(
fn=None,
*,
name: str = None,
description: str = None,
tags: Iterable[str] = None,
version: str = None,
cache_key_fn: Callable[..., str] = None,
cache_expiration: datetime.timedelta = None,
task_run_name: Union[str, Callable[[], str]] = None,
retries: int = None,
retry_delay_seconds: Union[int, float] = None,
retry_condition_fn: Callable[..., bool] = None,
persist_result: bool = None,
result_storage: ResultStorage = None,
result_serializer: ResultSerializer = None,
cache_result_in_memory: bool = True,
timeout_seconds: Union[int, float] = None,
log_prints: bool = None,
refresh_cache: bool = None,
on_completion: List[StateHookCallable] = None,
on_failure: List[StateHookCallable] = None,
):
"""
Decorator to create Prefect tasks from functions.
Parameters:
- fn: Function to decorate (provided automatically when used as decorator)
- name: Name for the task (defaults to function name)
- description: Description of the task's purpose
- tags: Tags to apply to the task and its runs
- version: Version string for the task
- cache_key_fn: Function to generate cache keys from task inputs
- cache_expiration: Duration after which cached results expire
- task_run_name: Name template for task runs
- retries: Number of retry attempts on failure
- retry_delay_seconds: Delay between retry attempts
- retry_condition_fn: Function to determine if task should retry
- persist_result: Whether to persist task results
- result_storage: Storage backend for results
- result_serializer: Serializer for results
- cache_result_in_memory: Whether to cache results in memory
- timeout_seconds: Maximum runtime for the task
- log_prints: Whether to log print statements
- refresh_cache: Whether to refresh cached results
- on_completion: Hooks to run when task completes successfully
- on_failure: Hooks to run when task fails
Returns:
Task object when used as decorator
"""from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
# Basic task
@task
def process_data(data):
"""Simple task example."""
return data * 2
# Task with caching
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1),
retries=3,
retry_delay_seconds=30
)
def expensive_computation(input_value):
"""Task with caching and retry configuration."""
# Expensive computation here
return input_value ** 2
# Task with custom retry condition
def should_retry(task, task_run, state):
"""Custom retry condition function."""
return "timeout" in state.message.lower() if state.message else False
@task(
retries=5,
retry_condition_fn=should_retry,
timeout_seconds=300,
tags=["api", "external"]
)
def api_call(endpoint: str):
"""Task with custom retry logic for API calls."""
# API call logic here
pass
# Using tasks in a flow
@flow
def workflow():
data = process_data(10)
result = expensive_computation(data)
return resultThe Flow class represents a Prefect workflow definition with methods for execution, validation, and deployment.
class Flow(Generic[P, R]):
"""
Prefect workflow definition class.
Attributes:
- name: Flow name
- fn: The decorated function
- description: Flow description
- version: Flow version
- flow_run_name: Template for flow run names
- task_runner: Task runner for executing tasks
- timeout_seconds: Flow timeout
- validate_parameters: Whether to validate parameters
- retries: Number of retry attempts
- retry_delay_seconds: Delay between retries
- isasync: Whether the flow function is async
"""
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
"""Execute the flow with given parameters."""
def with_options(
self,
*,
name: str = None,
description: str = None,
version: str = None,
flow_run_name: Union[str, Callable] = None,
retries: int = None,
retry_delay_seconds: Union[int, float] = None,
timeout_seconds: Union[int, float] = None,
validate_parameters: bool = None,
persist_result: bool = None,
result_storage: ResultStorage = None,
result_serializer: ResultSerializer = None,
cache_result_in_memory: bool = None,
log_prints: bool = None,
) -> "Flow[P, R]":
"""Create a copy of the flow with modified options."""
def serve(
self,
name: str = None,
tags: List[str] = None,
parameters: Dict[str, Any] = None,
description: str = None,
version: str = None,
enforce_parameter_schema: bool = None,
pause_on_shutdown: bool = True,
print_starting_message: bool = True,
limit: int = None,
webserver: bool = False,
**kwargs
) -> None:
"""Serve the flow for remote execution."""The Task class represents a Prefect task definition with methods for execution and configuration.
class Task(Generic[P, R]):
"""
Prefect task definition class.
Attributes:
- name: Task name
- fn: The decorated function
- description: Task description
- version: Task version
- tags: Task tags
- cache_key_fn: Cache key generation function
- cache_expiration: Cache expiration duration
- retries: Number of retry attempts
- retry_delay_seconds: Delay between retries
- retry_condition_fn: Custom retry condition function
- timeout_seconds: Task timeout
- isasync: Whether the task function is async
"""
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
"""Execute the task with given parameters."""
def with_options(
self,
*,
name: str = None,
description: str = None,
tags: Iterable[str] = None,
version: str = None,
cache_key_fn: Callable = None,
cache_expiration: timedelta = None,
retries: int = None,
retry_delay_seconds: Union[int, float] = None,
retry_condition_fn: Callable = None,
persist_result: bool = None,
result_storage: ResultStorage = None,
result_serializer: ResultSerializer = None,
cache_result_in_memory: bool = None,
timeout_seconds: Union[int, float] = None,
log_prints: bool = None,
refresh_cache: bool = None,
) -> "Task[P, R]":
"""Create a copy of the task with modified options."""
def map(
self,
*args: Any,
**kwargs: Any,
) -> List[R]:
"""Execute the task over iterable inputs."""Functions for serving flows as deployments that can be triggered remotely.
def serve(
*flows: Flow,
name: str = None,
tags: List[str] = None,
parameters: Dict[str, Any] = None,
description: str = None,
version: str = None,
enforce_parameter_schema: bool = None,
pause_on_shutdown: bool = True,
print_starting_message: bool = True,
limit: int = None,
webserver: bool = False,
**kwargs
) -> None:
"""
Serve multiple flows for remote execution.
Parameters:
- flows: One or more flow objects to serve
- name: Name for the served deployment
- tags: Tags to apply to the served flows
- parameters: Default parameters for flow runs
- description: Description for the deployment
- version: Version for the deployment
- enforce_parameter_schema: Whether to validate parameters
- pause_on_shutdown: Whether to pause on shutdown
- print_starting_message: Whether to print startup message
- limit: Maximum number of concurrent flow runs
- webserver: Whether to start a webserver
"""
async def aserve(
*flows: Flow,
name: str = None,
tags: List[str] = None,
parameters: Dict[str, Any] = None,
description: str = None,
version: str = None,
enforce_parameter_schema: bool = None,
pause_on_shutdown: bool = True,
print_starting_message: bool = True,
limit: int = None,
webserver: bool = False,
**kwargs
) -> None:
"""
Asynchronously serve multiple flows for remote execution.
Same parameters as serve() but runs asynchronously.
"""from prefect import flow, serve
@flow
def data_pipeline():
# Pipeline logic here
pass
@flow
def monitoring_flow():
# Monitoring logic here
pass
# Serve multiple flows
if __name__ == "__main__":
serve(
data_pipeline,
monitoring_flow,
name="Production Flows",
tags=["production", "data"],
limit=10,
webserver=True
)Task caching utilities for improving performance and avoiding redundant computation.
def task_input_hash(*args, **kwargs) -> str:
"""
Generate a cache key from task inputs.
Creates a hash of the task inputs for use as a cache key. This function
can be used as the cache_key_fn parameter in task decorators.
Parameters:
- args: Positional arguments to hash
- kwargs: Keyword arguments to hash
Returns:
String hash of the inputs
"""
def exponential_backoff(backoff_factor: float = 2.0) -> Iterator[float]:
"""
Generate exponential backoff delays.
Yields increasingly longer delays for retry attempts, useful for
handling rate limits and temporary failures.
Parameters:
- backoff_factor: Multiplier for each successive delay
Yields:
Float delay values in seconds
"""from prefect import task
from prefect.tasks import task_input_hash, exponential_backoff
from datetime import timedelta
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=1)
)
def cached_computation(x, y):
"""Task with input-based caching."""
# Expensive computation
return x ** y
# Custom retry with exponential backoff
@task(
retries=5,
retry_delay_seconds=list(exponential_backoff(2.0))[:5]
)
def api_with_backoff():
"""Task with exponential backoff retry."""
# API call that might need retries
passTypes specific to workflow functionality:
from typing import Callable, List, Optional, Union, Any, Iterable, Generic, TypeVar
from datetime import timedelta
P = TypeVar("P") # Parameters
R = TypeVar("R") # Return type
# Hook types
FlowStateHook = Callable[[Flow, FlowRun, State], None]
StateHookCallable = Callable[[Task, TaskRun, State], None]
RetryConditionCallable = Callable[[Task, TaskRun, State], bool]
# Task runner interface
class TaskRunner:
"""Base class for task execution backends."""
pass
# Configuration types
class ResultStorage:
"""Result storage interface."""
pass
class ResultSerializer:
"""Result serialization interface."""
pass
# Task options type
class TaskOptions(TypedDict, total=False):
name: Optional[str]
description: Optional[str]
tags: Optional[Iterable[str]]
version: Optional[str]
cache_key_fn: Optional[Callable[..., str]]
cache_expiration: Optional[timedelta]
retries: Optional[int]
retry_delay_seconds: Optional[Union[int, float]]
retry_condition_fn: Optional[RetryConditionCallable]
timeout_seconds: Optional[Union[int, float]]Install with Tessl CLI
npx tessl i tessl/pypi-prefect