Workflow orchestration and management framework for building resilient data pipelines.
—
Prefect's context and utility systems provide runtime information, logging capabilities, execution annotations, and transaction management. These components enable advanced workflow control, debugging, and data consistency across distributed execution environments.
Access to structured logging within flows and tasks for observability and debugging.
def get_run_logger(name: str = None) -> logging.Logger:
"""
Get a logger for the current flow or task run.
Creates a logger that automatically includes run context information
such as flow run ID, task run ID, and other execution metadata.
Parameters:
- name: Custom logger name (defaults to current flow/task name)
Returns:
Logger instance configured for the current run context
The logger automatically includes:
- Run IDs for correlation
- Timestamps
- Context metadata
- Structured formatting for Prefect UI
"""
def get_logger(name: str = None) -> logging.Logger:
"""
Get a general-purpose Prefect logger.
Parameters:
- name: Logger name (defaults to calling module)
Returns:
Standard Python logger configured for Prefect
"""
def disable_run_logger() -> None:
"""
Disable run-specific logging for the current context.
Useful when you want to prevent automatic log capture
or when debugging logging issues.
"""from prefect import flow, task, get_run_logger
from prefect.logging import get_logger, disable_run_logger
# Module-level logger
module_logger = get_logger(__name__)
@task
def process_data(data):
logger = get_run_logger()
logger.info(f"Processing {len(data)} records")
try:
result = complex_operation(data)
logger.info(f"Successfully processed {len(result)} records")
return result
except Exception as e:
logger.error(f"Processing failed: {e}")
raise
@flow
def data_pipeline():
logger = get_run_logger()
logger.info("Starting data pipeline")
# Log structured data
logger.info("Configuration", extra={
"batch_size": 1000,
"env": "production"
})
data = extract_data()
result = process_data(data)
logger.info(f"Pipeline completed successfully with {len(result)} records")
return result
# Disable logging in specific contexts
@task
def quiet_task():
disable_run_logger()
# This task won't generate run logs
return "completed quietly"Context managers and utilities for managing execution context and metadata.
def tags(*tags: str, **kwargs) -> ContextManager:
"""
Context manager for adding tags to flow and task runs.
Tags applied within this context are automatically added to
any flows or tasks that execute within the context.
Parameters:
- tags: Tag strings to apply
- **kwargs: Additional tag-related configuration
Returns:
Context manager that applies tags to nested executions
Usage:
with tags("production", "etl"):
# Any flows/tasks run here get these tags
my_flow()
"""
class FlowRunContext:
"""
Context object containing information about the current flow run.
Attributes:
- flow: The Flow object being executed
- flow_run: The FlowRun object for current execution
- parameters: Flow parameters
- task_runner: Task runner instance
- client: Prefect client for API access
- background_tasks: Background task set
"""
flow: Optional[Flow]
flow_run: Optional[FlowRun]
parameters: Dict[str, Any]
task_runner: Optional[TaskRunner]
client: Optional[PrefectClient]
background_tasks: Optional[Set[asyncio.Task]]
@classmethod
def get(cls) -> Optional["FlowRunContext"]:
"""Get the current flow run context."""
def __enter__(self) -> "FlowRunContext":
"""Enter the context."""
def __exit__(self, *args) -> None:
"""Exit the context."""
class TaskRunContext:
"""
Context object containing information about the current task run.
Attributes:
- task: The Task object being executed
- task_run: The TaskRun object for current execution
- parameters: Task parameters
- client: Prefect client for API access
"""
task: Optional[Task]
task_run: Optional[TaskRun]
parameters: Dict[str, Any]
client: Optional[PrefectClient]
@classmethod
def get(cls) -> Optional["TaskRunContext"]:
"""Get the current task run context."""from prefect import flow, task, tags
from prefect.context import FlowRunContext, TaskRunContext
@task
def analyze_data():
# Access task context
context = TaskRunContext.get()
if context:
print(f"Task: {context.task.name}")
print(f"Task Run ID: {context.task_run.id}")
return "analysis complete"
@flow
def data_workflow():
# Access flow context
context = FlowRunContext.get()
if context:
print(f"Flow: {context.flow.name}")
print(f"Flow Run ID: {context.flow_run.id}")
# Use tags context manager
with tags("critical", "production"):
# These tasks inherit the tags
result1 = analyze_data()
result2 = analyze_data()
# These tasks don't have the tags
result3 = analyze_data()
return [result1, result2, result3]
# Nested tag contexts
@flow
def complex_workflow():
with tags("pipeline"):
with tags("extract"):
extract_result = extract_data()
with tags("transform"):
transform_result = transform_data(extract_result)
with tags("load"):
load_result = load_data(transform_result)
return load_resultAnnotations for controlling task execution behavior, particularly in mapping operations.
class unmapped:
"""
Annotation to mark inputs as unmapped in mapping operations.
When using task.map(), wrap arguments with unmapped() to indicate
they should not be mapped over but used as-is for all mapped calls.
"""
def __init__(self, value: Any):
"""
Initialize unmapped annotation.
Parameters:
- value: The value to keep unmapped
"""
class allow_failure:
"""
Annotation to allow failed task results to flow downstream.
Normally, if a task fails, dependent tasks don't run. Wrapping
a task call with allow_failure() allows downstream tasks to
receive the failed state and handle it gracefully.
"""
def __init__(self, value: Any):
"""
Initialize allow_failure annotation.
Parameters:
- value: The task call or value to allow failure for
"""
class quote:
"""
Annotation to prevent expression evaluation in task parameters.
Use quote() to pass expressions as literal values rather than
evaluating them before passing to tasks.
"""
def __init__(self, expr: Any):
"""
Initialize quote annotation.
Parameters:
- expr: Expression to quote
"""
# Backward compatibility alias
Quote = quotefrom prefect import flow, task
from prefect.utilities.annotations import unmapped, allow_failure, quote
@task
def process_item(item, config, multiplier):
return item * config["factor"] * multiplier
@task
def handle_result(result):
if isinstance(result, Exception):
return f"Error: {result}"
return f"Success: {result}"
@flow
def mapping_example():
items = [1, 2, 3, 4, 5]
config = {"factor": 10} # Shared configuration
multiplier = 2 # Another shared value
# Map over items, but config and multiplier are unmapped
results = process_item.map(
items,
unmapped(config), # Same config for all
unmapped(multiplier) # Same multiplier for all
)
return results
@task
def risky_task(value):
if value < 0:
raise ValueError("Negative value")
return value * 2
@flow
def failure_handling_example():
values = [-1, 2, 3, -4, 5]
# Allow failures to propagate downstream
results = risky_task.map(allow_failure(values))
# Handle both successful and failed results
handled = handle_result.map(results)
return handled
@task
def expression_task(expr_string):
# Receive the quoted expression as a string
return f"Expression: {expr_string}"
@flow
def quote_example():
x = 10
y = 20
# Without quote, x + y is evaluated to 30
result1 = expression_task(x + y)
# With quote, "x + y" is passed as a string
result2 = expression_task(quote("x + y"))
return result1, result2Transaction context for ensuring data consistency across task executions.
class Transaction:
"""
Transaction context manager for coordinating task execution.
Provides isolation and coordination mechanisms for tasks that
need to execute as a unit with rollback capabilities.
"""
def __init__(
self,
key: Optional[str] = None,
timeout: Optional[float] = None,
):
"""
Initialize a transaction context.
Parameters:
- key: Unique identifier for the transaction
- timeout: Maximum time to hold the transaction (seconds)
"""
def __enter__(self) -> "Transaction":
"""
Enter the transaction context.
Returns:
Transaction instance for use in context
"""
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""
Exit the transaction context.
Parameters:
- exc_type: Exception type if an error occurred
- exc_val: Exception value if an error occurred
- exc_tb: Exception traceback if an error occurred
If an exception occurred, the transaction is rolled back.
Otherwise, it is committed.
"""
def commit(self) -> None:
"""Commit the transaction."""
def rollback(self) -> None:
"""Roll back the transaction."""
@property
def is_committed(self) -> bool:
"""Check if transaction has been committed."""
@property
def is_rolled_back(self) -> bool:
"""Check if transaction has been rolled back."""from prefect import flow, task
from prefect.transactions import Transaction
@task
def update_database(data):
# Database update logic
print(f"Updating database with {data}")
return f"Updated: {data}"
@task
def send_notification(message):
# Notification logic
print(f"Sending notification: {message}")
return f"Sent: {message}"
@task
def log_audit(action):
# Audit logging
print(f"Audit log: {action}")
return f"Logged: {action}"
@flow
def transactional_workflow(data):
try:
with Transaction(key="data-update", timeout=300) as txn:
# All tasks in this context are part of the transaction
update_result = update_database(data)
notification_result = send_notification(f"Data updated: {data}")
audit_result = log_audit(f"Updated data for {data}")
# If any task fails, the entire transaction rolls back
return {
"update": update_result,
"notification": notification_result,
"audit": audit_result,
"transaction_id": txn.key
}
except Exception as e:
# Handle transaction failure
return {"error": str(e), "status": "rolled_back"}
# Manual transaction control
@flow
def manual_transaction_example():
txn = Transaction(key="manual-txn")
try:
txn.__enter__()
# Do work
result1 = update_database("batch1")
result2 = update_database("batch2")
# Explicit commit
txn.commit()
return {"results": [result1, result2], "status": "committed"}
except Exception as e:
# Explicit rollback
txn.rollback()
return {"error": str(e), "status": "rolled_back"}
finally:
if not (txn.is_committed or txn.is_rolled_back):
txn.__exit__(None, None, None)Utilities for serializing and managing context across process boundaries.
def serialize_context() -> Dict[str, Any]:
"""
Serialize the current Prefect context for cross-process communication.
Returns:
Dictionary containing serialized context information including:
- Flow run context
- Task run context
- Settings context
- Tag context
"""
def hydrated_context(context_data: Dict[str, Any]) -> ContextManager:
"""
Context manager that restores serialized context.
Parameters:
- context_data: Serialized context from serialize_context()
Returns:
Context manager that applies the hydrated context
"""from prefect.context import serialize_context, hydrated_context
from prefect import flow, task
import json
@flow
def parent_flow():
# Serialize current context
context_data = serialize_context()
# Could save to file, send to another process, etc.
context_json = json.dumps(context_data, default=str)
# Later, restore the context
restored_data = json.loads(context_json)
with hydrated_context(restored_data):
# This runs with the restored context
child_task()
@task
def child_task():
# This task has access to the restored context
context = TaskRunContext.get()
if context:
print(f"Restored task context: {context.task_run.id}")Flow run input/output management for interactive workflows and human-in-the-loop processes.
from prefect.input import (
RunInput,
RunInputMetadata,
Keyset,
GetInputHandler,
send_input,
receive_input,
create_flow_run_input,
read_flow_run_input,
delete_flow_run_input,
)
async def send_input(
run_input: Any,
flow_run_id: UUID,
sender: Optional[str] = None,
key_prefix: Optional[str] = None,
) -> None:
"""
Send input to a flow run for interactive workflows.
Parameters:
- run_input: Input data to send (JSON-serializable)
- flow_run_id: ID of target flow run
- sender: Optional identifier of sender
- key_prefix: Optional prefix for input keys
"""
def receive_input(
input_type: Type[T],
timeout: Optional[float] = 3600,
poll_interval: float = 10,
raise_timeout_error: bool = False,
exclude_keys: Optional[Set[str]] = None,
key_prefix: Optional[str] = None,
flow_run_id: Optional[UUID] = None,
with_metadata: bool = False,
) -> GetInputHandler[T]:
"""
Receive input for the current flow run.
Parameters:
- input_type: Type of input to receive
- timeout: Maximum wait time in seconds
- poll_interval: Polling interval in seconds
- raise_timeout_error: Whether to raise error on timeout
- exclude_keys: Keys to exclude from input
- key_prefix: Prefix for input keys
- flow_run_id: Specific flow run ID (defaults to current)
- with_metadata: Whether to include metadata
Returns:
Input handler for receiving typed input
"""
def create_flow_run_input(
flow_run_id: UUID,
key: str,
value: Any,
sender: Optional[str] = None,
) -> RunInput:
"""
Create flow run input for manual input management.
Parameters:
- flow_run_id: Target flow run ID
- key: Input key identifier
- value: Input value (JSON-serializable)
- sender: Optional sender identifier
Returns:
Created RunInput object
"""
class RunInput(BaseModel):
"""Flow run input data container."""
flow_run_id: UUID
key: str
value: Any
sender: Optional[str]
created: datetime
class RunInputMetadata(BaseModel):
"""Metadata for flow run inputs."""
key: str
sender: Optional[str]
created: datetime
class Keyset:
"""Set of input keys for input management."""
def __init__(self, keys: Set[str]):
self.keys = keys
def __contains__(self, key: str) -> bool:
"""Check if key is in keyset."""
class GetInputHandler(Generic[T]):
"""Handler for receiving typed input."""
def get(self, key: str, default: T = None) -> T:
"""Get input by key."""
def __getitem__(self, key: str) -> T:
"""Get input by key (dict-like access)."""
def keys(self) -> Set[str]:
"""Get available input keys."""from prefect import flow, task
from prefect.input import send_input, receive_input
from prefect.states import Paused
import asyncio
# Interactive workflow with human input
@flow
def approval_workflow(document_id: str):
"""Workflow requiring human approval."""
# Process document
processed_doc = process_document(document_id)
# Pause for human review
approval_input = receive_input(
input_type=dict,
timeout=3600, # 1 hour timeout
key_prefix="approval"
)
# Wait for approval input
approval = approval_input.get("decision")
comments = approval_input.get("comments", "")
if approval == "approved":
return finalize_document(processed_doc, comments)
else:
return reject_document(processed_doc, comments)
# Sending input to paused flow
async def send_approval():
"""Send approval input to paused flow."""
flow_run_id = UUID("...") # Get from flow run
await send_input(
run_input={
"decision": "approved",
"comments": "Looks good, approved for publication"
},
flow_run_id=flow_run_id,
sender="manager@company.com",
key_prefix="approval"
)
# Multi-step input collection
@flow
def data_collection_workflow():
"""Collect multiple inputs over time."""
# Step 1: Initial parameters
config_input = receive_input(
input_type=dict,
key_prefix="config",
timeout=1800 # 30 minutes
)
config = config_input.get("parameters")
# Step 2: Data processing with config
results = process_with_config(config)
# Step 3: Review results and get feedback
feedback_input = receive_input(
input_type=dict,
key_prefix="feedback",
timeout=3600 # 1 hour
)
feedback = feedback_input.get("review")
if feedback.get("needs_revision"):
# Process feedback and revise
return revise_results(results, feedback)
else:
return finalize_results(results)
@task
def process_document(doc_id: str):
return {"id": doc_id, "status": "processed"}
@task
def finalize_document(doc: dict, comments: str):
return {"id": doc["id"], "status": "finalized", "comments": comments}
@task
def reject_document(doc: dict, reason: str):
return {"id": doc["id"], "status": "rejected", "reason": reason}Types related to context and utilities:
from typing import Any, Dict, List, Optional, Set, ContextManager
from uuid import UUID
import asyncio
import logging
class ContextModel:
"""Base model for context data objects."""
@classmethod
def get(cls) -> Optional["ContextModel"]:
"""Get current context instance."""
class TagsContext(ContextModel):
"""Context for managing tags."""
current_tags: Set[str]
def add_tags(self, *tags: str) -> None:
"""Add tags to current context."""
def remove_tags(self, *tags: str) -> None:
"""Remove tags from current context."""
class SettingsContext(ContextModel):
"""Context for Prefect settings."""
profile: Profile
settings: Dict[str, Any]
# Annotation base class
class BaseAnnotation:
"""Base class for execution annotations."""
def __init__(self, value: Any):
self.value = value
def unwrap(self) -> Any:
"""Unwrap the annotated value."""
return self.value
# Log eavesdropper for capturing logs
class LogEavesdropper:
"""Utility for capturing and managing log output."""
def __init__(self, logger: logging.Logger):
"""Initialize log eavesdropper."""
def __enter__(self) -> "LogEavesdropper":
"""Start capturing logs."""
def __exit__(self, *args) -> None:
"""Stop capturing logs."""
def get_logs(self) -> List[logging.LogRecord]:
"""Get captured log records."""Install with Tessl CLI
npx tessl i tessl/pypi-prefect