CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-prefect

Workflow orchestration and management framework for building resilient data pipelines.

Pending
Overview
Eval results
Files

context-utilities.mddocs/

Context & Utilities

Prefect's context and utility systems provide runtime information, logging capabilities, execution annotations, and transaction management. These components enable advanced workflow control, debugging, and data consistency across distributed execution environments.

Capabilities

Logging

Access to structured logging within flows and tasks for observability and debugging.

def get_run_logger(name: str = None) -> logging.Logger:
    """
    Get a logger for the current flow or task run.
    
    Creates a logger that automatically includes run context information
    such as flow run ID, task run ID, and other execution metadata.
    
    Parameters:
    - name: Custom logger name (defaults to current flow/task name)
    
    Returns:
    Logger instance configured for the current run context
    
    The logger automatically includes:
    - Run IDs for correlation
    - Timestamps
    - Context metadata
    - Structured formatting for Prefect UI
    """

def get_logger(name: str = None) -> logging.Logger:
    """
    Get a general-purpose Prefect logger.
    
    Parameters:
    - name: Logger name (defaults to calling module)
    
    Returns:
    Standard Python logger configured for Prefect
    """

def disable_run_logger() -> None:
    """
    Disable run-specific logging for the current context.
    
    Useful when you want to prevent automatic log capture
    or when debugging logging issues.
    """

Usage Examples

from prefect import flow, task, get_run_logger
from prefect.logging import get_logger, disable_run_logger

# Module-level logger
module_logger = get_logger(__name__)

@task
def process_data(data):
    logger = get_run_logger()
    
    logger.info(f"Processing {len(data)} records")
    
    try:
        result = complex_operation(data)
        logger.info(f"Successfully processed {len(result)} records")
        return result
    except Exception as e:
        logger.error(f"Processing failed: {e}")
        raise

@flow
def data_pipeline():
    logger = get_run_logger()
    
    logger.info("Starting data pipeline")
    
    # Log structured data
    logger.info("Configuration", extra={
        "batch_size": 1000,
        "env": "production"
    })
    
    data = extract_data()
    result = process_data(data)
    
    logger.info(f"Pipeline completed successfully with {len(result)} records")
    
    return result

# Disable logging in specific contexts
@task
def quiet_task():
    disable_run_logger()
    # This task won't generate run logs
    return "completed quietly"

Context Management

Context managers and utilities for managing execution context and metadata.

def tags(*tags: str, **kwargs) -> ContextManager:
    """
    Context manager for adding tags to flow and task runs.
    
    Tags applied within this context are automatically added to
    any flows or tasks that execute within the context.
    
    Parameters:
    - tags: Tag strings to apply
    - **kwargs: Additional tag-related configuration
    
    Returns:
    Context manager that applies tags to nested executions
    
    Usage:
    with tags("production", "etl"):
        # Any flows/tasks run here get these tags
        my_flow()
    """

class FlowRunContext:
    """
    Context object containing information about the current flow run.
    
    Attributes:
    - flow: The Flow object being executed
    - flow_run: The FlowRun object for current execution
    - parameters: Flow parameters
    - task_runner: Task runner instance
    - client: Prefect client for API access
    - background_tasks: Background task set
    """
    
    flow: Optional[Flow]
    flow_run: Optional[FlowRun]
    parameters: Dict[str, Any]
    task_runner: Optional[TaskRunner]
    client: Optional[PrefectClient]
    background_tasks: Optional[Set[asyncio.Task]]
    
    @classmethod
    def get(cls) -> Optional["FlowRunContext"]:
        """Get the current flow run context."""
    
    def __enter__(self) -> "FlowRunContext":
        """Enter the context."""
    
    def __exit__(self, *args) -> None:
        """Exit the context."""

class TaskRunContext:
    """
    Context object containing information about the current task run.
    
    Attributes:
    - task: The Task object being executed
    - task_run: The TaskRun object for current execution
    - parameters: Task parameters
    - client: Prefect client for API access
    """
    
    task: Optional[Task]
    task_run: Optional[TaskRun]
    parameters: Dict[str, Any]
    client: Optional[PrefectClient]
    
    @classmethod
    def get(cls) -> Optional["TaskRunContext"]:
        """Get the current task run context."""

Usage Examples

from prefect import flow, task, tags
from prefect.context import FlowRunContext, TaskRunContext

@task
def analyze_data():
    # Access task context
    context = TaskRunContext.get()
    if context:
        print(f"Task: {context.task.name}")
        print(f"Task Run ID: {context.task_run.id}")
    
    return "analysis complete"

@flow
def data_workflow():
    # Access flow context
    context = FlowRunContext.get()
    if context:
        print(f"Flow: {context.flow.name}")
        print(f"Flow Run ID: {context.flow_run.id}")
    
    # Use tags context manager
    with tags("critical", "production"):
        # These tasks inherit the tags
        result1 = analyze_data()
        result2 = analyze_data()
    
    # These tasks don't have the tags
    result3 = analyze_data()
    
    return [result1, result2, result3]

# Nested tag contexts
@flow
def complex_workflow():
    with tags("pipeline"):
        with tags("extract"):
            extract_result = extract_data()
        
        with tags("transform"):
            transform_result = transform_data(extract_result)
        
        with tags("load"):
            load_result = load_data(transform_result)
    
    return load_result

Execution Annotations

Annotations for controlling task execution behavior, particularly in mapping operations.

class unmapped:
    """
    Annotation to mark inputs as unmapped in mapping operations.
    
    When using task.map(), wrap arguments with unmapped() to indicate
    they should not be mapped over but used as-is for all mapped calls.
    """
    
    def __init__(self, value: Any):
        """
        Initialize unmapped annotation.
        
        Parameters:
        - value: The value to keep unmapped
        """

class allow_failure:
    """
    Annotation to allow failed task results to flow downstream.
    
    Normally, if a task fails, dependent tasks don't run. Wrapping
    a task call with allow_failure() allows downstream tasks to
    receive the failed state and handle it gracefully.
    """
    
    def __init__(self, value: Any):
        """
        Initialize allow_failure annotation.
        
        Parameters:
        - value: The task call or value to allow failure for
        """

class quote:
    """
    Annotation to prevent expression evaluation in task parameters.
    
    Use quote() to pass expressions as literal values rather than
    evaluating them before passing to tasks.
    """
    
    def __init__(self, expr: Any):
        """
        Initialize quote annotation.
        
        Parameters:
        - expr: Expression to quote
        """

# Backward compatibility alias
Quote = quote

Usage Examples

from prefect import flow, task
from prefect.utilities.annotations import unmapped, allow_failure, quote

@task
def process_item(item, config, multiplier):
    return item * config["factor"] * multiplier

@task
def handle_result(result):
    if isinstance(result, Exception):
        return f"Error: {result}"
    return f"Success: {result}"

@flow
def mapping_example():
    items = [1, 2, 3, 4, 5]
    config = {"factor": 10}  # Shared configuration
    multiplier = 2  # Another shared value
    
    # Map over items, but config and multiplier are unmapped
    results = process_item.map(
        items,
        unmapped(config),      # Same config for all
        unmapped(multiplier)   # Same multiplier for all
    )
    
    return results

@task
def risky_task(value):
    if value < 0:
        raise ValueError("Negative value")
    return value * 2

@flow
def failure_handling_example():
    values = [-1, 2, 3, -4, 5]
    
    # Allow failures to propagate downstream
    results = risky_task.map(allow_failure(values))
    
    # Handle both successful and failed results
    handled = handle_result.map(results)
    
    return handled

@task
def expression_task(expr_string):
    # Receive the quoted expression as a string
    return f"Expression: {expr_string}"

@flow
def quote_example():
    x = 10
    y = 20
    
    # Without quote, x + y is evaluated to 30
    result1 = expression_task(x + y)
    
    # With quote, "x + y" is passed as a string
    result2 = expression_task(quote("x + y"))
    
    return result1, result2

Transaction Management

Transaction context for ensuring data consistency across task executions.

class Transaction:
    """
    Transaction context manager for coordinating task execution.
    
    Provides isolation and coordination mechanisms for tasks that
    need to execute as a unit with rollback capabilities.
    """
    
    def __init__(
        self,
        key: Optional[str] = None,
        timeout: Optional[float] = None,
    ):
        """
        Initialize a transaction context.
        
        Parameters:
        - key: Unique identifier for the transaction
        - timeout: Maximum time to hold the transaction (seconds)
        """
    
    def __enter__(self) -> "Transaction":
        """
        Enter the transaction context.
        
        Returns:
        Transaction instance for use in context
        """
    
    def __exit__(self, exc_type, exc_val, exc_tb) -> None:
        """
        Exit the transaction context.
        
        Parameters:
        - exc_type: Exception type if an error occurred
        - exc_val: Exception value if an error occurred  
        - exc_tb: Exception traceback if an error occurred
        
        If an exception occurred, the transaction is rolled back.
        Otherwise, it is committed.
        """
    
    def commit(self) -> None:
        """Commit the transaction."""
    
    def rollback(self) -> None:
        """Roll back the transaction."""
    
    @property
    def is_committed(self) -> bool:
        """Check if transaction has been committed."""
    
    @property
    def is_rolled_back(self) -> bool:
        """Check if transaction has been rolled back."""

Usage Examples

from prefect import flow, task
from prefect.transactions import Transaction

@task
def update_database(data):
    # Database update logic
    print(f"Updating database with {data}")
    return f"Updated: {data}"

@task
def send_notification(message):
    # Notification logic
    print(f"Sending notification: {message}")
    return f"Sent: {message}"

@task
def log_audit(action):
    # Audit logging
    print(f"Audit log: {action}")
    return f"Logged: {action}"

@flow
def transactional_workflow(data):
    try:
        with Transaction(key="data-update", timeout=300) as txn:
            # All tasks in this context are part of the transaction
            update_result = update_database(data)
            notification_result = send_notification(f"Data updated: {data}")
            audit_result = log_audit(f"Updated data for {data}")
            
            # If any task fails, the entire transaction rolls back
            return {
                "update": update_result,
                "notification": notification_result,
                "audit": audit_result,
                "transaction_id": txn.key
            }
    except Exception as e:
        # Handle transaction failure
        return {"error": str(e), "status": "rolled_back"}

# Manual transaction control
@flow
def manual_transaction_example():
    txn = Transaction(key="manual-txn")
    
    try:
        txn.__enter__()
        
        # Do work
        result1 = update_database("batch1")
        result2 = update_database("batch2")
        
        # Explicit commit
        txn.commit()
        
        return {"results": [result1, result2], "status": "committed"}
        
    except Exception as e:
        # Explicit rollback
        txn.rollback()
        return {"error": str(e), "status": "rolled_back"}
    
    finally:
        if not (txn.is_committed or txn.is_rolled_back):
            txn.__exit__(None, None, None)

Context Serialization

Utilities for serializing and managing context across process boundaries.

def serialize_context() -> Dict[str, Any]:
    """
    Serialize the current Prefect context for cross-process communication.
    
    Returns:
    Dictionary containing serialized context information including:
    - Flow run context
    - Task run context
    - Settings context
    - Tag context
    """

def hydrated_context(context_data: Dict[str, Any]) -> ContextManager:
    """
    Context manager that restores serialized context.
    
    Parameters:
    - context_data: Serialized context from serialize_context()
    
    Returns:
    Context manager that applies the hydrated context
    """

Usage Examples

from prefect.context import serialize_context, hydrated_context
from prefect import flow, task
import json

@flow
def parent_flow():
    # Serialize current context
    context_data = serialize_context()
    
    # Could save to file, send to another process, etc.
    context_json = json.dumps(context_data, default=str)
    
    # Later, restore the context
    restored_data = json.loads(context_json)
    
    with hydrated_context(restored_data):
        # This runs with the restored context
        child_task()

@task
def child_task():
    # This task has access to the restored context
    context = TaskRunContext.get()
    if context:
        print(f"Restored task context: {context.task_run.id}")

Input Management

Flow run input/output management for interactive workflows and human-in-the-loop processes.

from prefect.input import (
    RunInput,
    RunInputMetadata,
    Keyset,
    GetInputHandler,
    send_input,
    receive_input,
    create_flow_run_input,
    read_flow_run_input,
    delete_flow_run_input,
)

async def send_input(
    run_input: Any,
    flow_run_id: UUID,
    sender: Optional[str] = None,
    key_prefix: Optional[str] = None,
) -> None:
    """
    Send input to a flow run for interactive workflows.
    
    Parameters:
    - run_input: Input data to send (JSON-serializable)
    - flow_run_id: ID of target flow run
    - sender: Optional identifier of sender
    - key_prefix: Optional prefix for input keys
    """

def receive_input(
    input_type: Type[T],
    timeout: Optional[float] = 3600,
    poll_interval: float = 10,
    raise_timeout_error: bool = False,
    exclude_keys: Optional[Set[str]] = None,
    key_prefix: Optional[str] = None,
    flow_run_id: Optional[UUID] = None,
    with_metadata: bool = False,
) -> GetInputHandler[T]:
    """
    Receive input for the current flow run.
    
    Parameters:
    - input_type: Type of input to receive
    - timeout: Maximum wait time in seconds
    - poll_interval: Polling interval in seconds
    - raise_timeout_error: Whether to raise error on timeout
    - exclude_keys: Keys to exclude from input
    - key_prefix: Prefix for input keys
    - flow_run_id: Specific flow run ID (defaults to current)
    - with_metadata: Whether to include metadata
    
    Returns:
    Input handler for receiving typed input
    """

def create_flow_run_input(
    flow_run_id: UUID,
    key: str,
    value: Any,
    sender: Optional[str] = None,
) -> RunInput:
    """
    Create flow run input for manual input management.
    
    Parameters:
    - flow_run_id: Target flow run ID
    - key: Input key identifier
    - value: Input value (JSON-serializable)
    - sender: Optional sender identifier
    
    Returns:
    Created RunInput object
    """

class RunInput(BaseModel):
    """Flow run input data container."""
    flow_run_id: UUID
    key: str
    value: Any
    sender: Optional[str]
    created: datetime

class RunInputMetadata(BaseModel):
    """Metadata for flow run inputs."""
    key: str
    sender: Optional[str]
    created: datetime

class Keyset:
    """Set of input keys for input management."""
    
    def __init__(self, keys: Set[str]):
        self.keys = keys
    
    def __contains__(self, key: str) -> bool:
        """Check if key is in keyset."""

class GetInputHandler(Generic[T]):
    """Handler for receiving typed input."""
    
    def get(self, key: str, default: T = None) -> T:
        """Get input by key."""
    
    def __getitem__(self, key: str) -> T:
        """Get input by key (dict-like access)."""
    
    def keys(self) -> Set[str]:
        """Get available input keys."""

Usage Examples

from prefect import flow, task
from prefect.input import send_input, receive_input
from prefect.states import Paused
import asyncio

# Interactive workflow with human input
@flow
def approval_workflow(document_id: str):
    """Workflow requiring human approval."""
    # Process document
    processed_doc = process_document(document_id)
    
    # Pause for human review
    approval_input = receive_input(
        input_type=dict,
        timeout=3600,  # 1 hour timeout
        key_prefix="approval"
    )
    
    # Wait for approval input
    approval = approval_input.get("decision")
    comments = approval_input.get("comments", "")
    
    if approval == "approved":
        return finalize_document(processed_doc, comments)
    else:
        return reject_document(processed_doc, comments)

# Sending input to paused flow
async def send_approval():
    """Send approval input to paused flow."""
    flow_run_id = UUID("...")  # Get from flow run
    
    await send_input(
        run_input={
            "decision": "approved",
            "comments": "Looks good, approved for publication"
        },
        flow_run_id=flow_run_id,
        sender="manager@company.com",
        key_prefix="approval"
    )

# Multi-step input collection
@flow
def data_collection_workflow():
    """Collect multiple inputs over time."""
    
    # Step 1: Initial parameters
    config_input = receive_input(
        input_type=dict,
        key_prefix="config",
        timeout=1800  # 30 minutes
    )
    
    config = config_input.get("parameters")
    
    # Step 2: Data processing with config
    results = process_with_config(config)
    
    # Step 3: Review results and get feedback
    feedback_input = receive_input(
        input_type=dict,
        key_prefix="feedback",
        timeout=3600  # 1 hour
    )
    
    feedback = feedback_input.get("review")
    
    if feedback.get("needs_revision"):
        # Process feedback and revise
        return revise_results(results, feedback)
    else:
        return finalize_results(results)

@task
def process_document(doc_id: str):
    return {"id": doc_id, "status": "processed"}

@task  
def finalize_document(doc: dict, comments: str):
    return {"id": doc["id"], "status": "finalized", "comments": comments}

@task
def reject_document(doc: dict, reason: str):
    return {"id": doc["id"], "status": "rejected", "reason": reason}

Types

Types related to context and utilities:

from typing import Any, Dict, List, Optional, Set, ContextManager
from uuid import UUID
import asyncio
import logging

class ContextModel:
    """Base model for context data objects."""
    
    @classmethod
    def get(cls) -> Optional["ContextModel"]:
        """Get current context instance."""

class TagsContext(ContextModel):
    """Context for managing tags."""
    current_tags: Set[str]
    
    def add_tags(self, *tags: str) -> None:
        """Add tags to current context."""
    
    def remove_tags(self, *tags: str) -> None:
        """Remove tags from current context."""

class SettingsContext(ContextModel):
    """Context for Prefect settings."""
    profile: Profile
    settings: Dict[str, Any]

# Annotation base class
class BaseAnnotation:
    """Base class for execution annotations."""
    
    def __init__(self, value: Any):
        self.value = value
    
    def unwrap(self) -> Any:
        """Unwrap the annotated value."""
        return self.value

# Log eavesdropper for capturing logs
class LogEavesdropper:
    """Utility for capturing and managing log output."""
    
    def __init__(self, logger: logging.Logger):
        """Initialize log eavesdropper."""
    
    def __enter__(self) -> "LogEavesdropper":
        """Start capturing logs."""
    
    def __exit__(self, *args) -> None:
        """Stop capturing logs."""
    
    def get_logs(self) -> List[logging.LogRecord]:
        """Get captured log records."""

Install with Tessl CLI

npx tessl i tessl/pypi-prefect

docs

client-api.md

configuration.md

context-utilities.md

core-workflows.md

deployments.md

index.md

runtime-context.md

state-management.md

variables.md

tile.json