CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-prefect

Workflow orchestration and management framework for building resilient data pipelines.

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration

Prefect's configuration system provides comprehensive management of settings, blocks for reusable configurations, and variables for dynamic workflow parameterization. This enables secure credential management, environment-specific configurations, and flexible workflow customization.

Capabilities

Blocks

Reusable configuration objects for credentials, connections, and infrastructure settings that can be shared across flows and deployments.

class Block(BaseModel):
    """
    Base class for Prefect blocks - reusable configuration objects.
    
    Blocks provide a way to store and share configuration, credentials,
    and other reusable components across flows and deployments. They
    support serialization, validation, and secure storage.
    """
    
    _block_type_name: ClassVar[str]
    _block_type_slug: ClassVar[str]
    _logo_url: ClassVar[Optional[str]]
    _description: ClassVar[Optional[str]]
    _documentation_url: ClassVar[Optional[str]]
    
    def save(
        self,
        name: str,
        overwrite: bool = False,
    ) -> UUID:
        """
        Save the block to Prefect storage.
        
        Parameters:
        - name: Name to save the block under
        - overwrite: Whether to overwrite existing block with same name
        
        Returns:
        UUID of the saved block
        
        Raises:
        ValueError: If block with same name exists and overwrite=False
        """
    
    @classmethod
    def load(cls, name: str) -> "Block":
        """
        Load a saved block by name.
        
        Parameters:
        - name: Name of the block to load
        
        Returns:
        Block instance loaded from storage
        
        Raises:
        ValueError: If block with given name doesn't exist
        """
    
    @classmethod
    def register_type_and_schema(cls) -> None:
        """Register the block type and schema with Prefect."""
    
    def dict(self, **kwargs) -> Dict[str, Any]:
        """Convert block to dictionary representation."""
    
    def json(self, **kwargs) -> str:
        """Convert block to JSON representation."""
    
    @classmethod
    def schema(cls, **kwargs) -> Dict[str, Any]:
        """Get the JSON schema for the block type."""

Usage Examples

from prefect.blocks.core import Block
from pydantic import Field
from typing import Optional

# Custom block definition
class DatabaseConfig(Block):
    """Database connection configuration block."""
    
    _block_type_name = "Database Config"
    _block_type_slug = "database-config"
    _description = "Configuration for database connections"
    
    host: str = Field(description="Database host")
    port: int = Field(default=5432, description="Database port")
    database: str = Field(description="Database name")
    username: str = Field(description="Database username")
    password: str = Field(description="Database password")
    ssl_mode: Optional[str] = Field(default="prefer", description="SSL mode")
    
    def get_connection_string(self) -> str:
        """Generate connection string from configuration."""
        return f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}?sslmode={self.ssl_mode}"

# Save and load blocks
def setup_database_config():
    # Create block instance
    db_config = DatabaseConfig(
        host="localhost",
        port=5432,
        database="myapp",
        username="user",
        password="secret"
    )
    
    # Save for reuse
    block_id = db_config.save(name="production-db")
    
    return block_id

# Use in flows
from prefect import flow, task

@task
def connect_to_database():
    # Load the saved block
    db_config = DatabaseConfig.load("production-db")
    connection_string = db_config.get_connection_string()
    
    # Use configuration
    print(f"Connecting to: {connection_string}")
    return "connected"

@flow
def database_workflow():
    connection = connect_to_database()
    return connection

Settings Management

Access and management of Prefect settings and configuration profiles.

def get_settings_context() -> SettingsContext:
    """
    Get the current settings context.
    
    Returns:
    SettingsContext containing current profile and settings
    """

class SettingsContext:
    """
    Context object containing Prefect settings and profile information.
    
    Attributes:
    - profile: Current configuration profile
    - settings: Dictionary of current settings
    """
    
    profile: Profile
    settings: Dict[str, Any]
    
    def update_settings(self, **kwargs) -> None:
        """Update settings in the current context."""
    
    def get_setting(self, key: str, default: Any = None) -> Any:
        """Get a specific setting value."""

class Profile:
    """
    Configuration profile containing Prefect settings.
    
    Attributes:
    - name: Profile name
    - settings: Profile-specific settings
    """
    
    name: str
    settings: Dict[str, Any]
    
    @classmethod
    def load_from_file(cls, path: str) -> "Profile":
        """Load profile from configuration file."""
    
    def save_to_file(self, path: str) -> None:
        """Save profile to configuration file."""

Usage Examples

from prefect.context import get_settings_context
from prefect import flow, task

@task
def environment_aware_task():
    # Access current settings
    context = get_settings_context()
    
    print(f"Current profile: {context.profile.name}")
    print(f"API URL: {context.get_setting('PREFECT_API_URL')}")
    
    # Conditional logic based on environment
    if context.profile.name == "production":
        return "production-mode-result"
    else:
        return "development-mode-result"

@flow
def settings_aware_flow():
    context = get_settings_context()
    
    # Log current configuration
    print(f"Running with profile: {context.profile.name}")
    
    result = environment_aware_task()
    return result

Variables

Dynamic variables for runtime parameterization and configuration management.

class Variable:
    """
    Prefect variable for dynamic configuration.
    
    Variables provide a way to store and retrieve dynamic values
    that can be updated without changing flow code.
    """
    
    def __init__(
        self,
        name: str,
        value: Any = None,
        tags: List[str] = None,
    ):
        """
        Initialize a variable.
        
        Parameters:
        - name: Variable name
        - value: Variable value
        - tags: Optional tags for organization
        """
    
    def save(self, overwrite: bool = True) -> UUID:
        """
        Save the variable to Prefect storage.
        
        Parameters:
        - overwrite: Whether to overwrite existing variable
        
        Returns:
        UUID of the saved variable
        """
    
    @classmethod
    def get(
        cls,
        name: str,
        default: Any = None,
    ) -> Any:
        """
        Get a variable value by name.
        
        Parameters:
        - name: Variable name to retrieve
        - default: Default value if variable doesn't exist
        
        Returns:
        Variable value or default
        """
    
    @classmethod
    def set(
        cls,
        name: str,
        value: Any,
        tags: List[str] = None,
        overwrite: bool = True,
    ) -> UUID:
        """
        Set a variable value.
        
        Parameters:
        - name: Variable name
        - value: Variable value
        - tags: Optional tags
        - overwrite: Whether to overwrite existing variable
        
        Returns:
        UUID of the variable
        """
    
    @classmethod
    def delete(cls, name: str) -> bool:
        """
        Delete a variable.
        
        Parameters:
        - name: Variable name to delete
        
        Returns:
        True if variable was deleted
        """

Usage Examples

from prefect.variables import Variable
from prefect import flow, task

@task
def configurable_task():
    # Get dynamic configuration
    batch_size = Variable.get("batch_size", default=100)
    max_retries = Variable.get("max_retries", default=3)
    
    print(f"Processing with batch_size={batch_size}, max_retries={max_retries}")
    
    return {"batch_size": batch_size, "max_retries": max_retries}

@flow
def dynamic_workflow():
    # Set runtime variables
    Variable.set("batch_size", 500, tags=["processing"])
    Variable.set("max_retries", 5, tags=["reliability"])
    
    # Use variables in task
    config = configurable_task()
    
    return config

# Variable management
def setup_environment_variables():
    # Production settings
    Variable.set("api_endpoint", "https://api.production.com", tags=["production", "api"])
    Variable.set("timeout_seconds", 30, tags=["production", "performance"])
    Variable.set("log_level", "INFO", tags=["production", "logging"])
    
    # Development settings
    Variable.set("debug_mode", True, tags=["development"])
    Variable.set("mock_external_apis", True, tags=["development", "testing"])

Built-in Block Types

Prefect provides several built-in block types for common infrastructure and service integrations.

# System blocks
class Secret(Block):
    """Secure storage for sensitive values."""
    value: str = Field(description="Secret value")

class JSON(Block):
    """Storage for JSON data."""
    value: Dict[str, Any] = Field(description="JSON data")

class String(Block):
    """Storage for string values."""
    value: str = Field(description="String value")

# Notification blocks  
class SlackWebhook(Block):
    """Slack webhook integration."""
    url: str = Field(description="Slack webhook URL")
    
    def notify(self, message: str) -> None:
        """Send notification to Slack."""

class EmailServer(Block):
    """Email server configuration."""
    smtp_server: str = Field(description="SMTP server host")
    smtp_port: int = Field(default=587, description="SMTP port")
    username: str = Field(description="SMTP username")
    password: str = Field(description="SMTP password")
    
    def send_email(
        self,
        to: List[str],
        subject: str,
        body: str,
    ) -> None:
        """Send email notification."""

# Infrastructure blocks
class DockerContainer(Block):
    """Docker container configuration."""
    image: str = Field(description="Container image")
    command: Optional[List[str]] = Field(description="Container command")
    env: Optional[Dict[str, str]] = Field(description="Environment variables")

class KubernetesJob(Block):
    """Kubernetes job configuration."""
    namespace: str = Field(default="default", description="Kubernetes namespace")
    image: str = Field(description="Container image")
    job_manifest: Dict[str, Any] = Field(description="Kubernetes job manifest")

Usage Examples

from prefect.blocks.system import Secret, JSON
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task

# Setup blocks
def setup_infrastructure_blocks():
    # Save API key securely
    api_secret = Secret(value="sk-1234567890abcdef")
    api_secret.save("openai-api-key")
    
    # Save configuration
    config = JSON(value={
        "model": "gpt-4",
        "temperature": 0.7,
        "max_tokens": 1000
    })
    config.save("llm-config")
    
    # Setup notifications
    slack = SlackWebhook(url="https://hooks.slack.com/services/...")
    slack.save("team-notifications")

@task
def ai_task():
    # Load secure credentials
    api_key = Secret.load("openai-api-key").value
    config = JSON.load("llm-config").value
    
    # Use configuration
    print(f"Using model: {config['model']}")
    # API call logic here
    
    return "AI processing complete"

@flow
def ai_workflow():
    result = ai_task()
    
    # Send notification
    slack = SlackWebhook.load("team-notifications")
    slack.notify(f"Workflow completed: {result}")
    
    return result

Environment Configuration

Utilities for managing environment-specific configurations and deployment settings.

def get_current_profile() -> Profile:
    """Get the currently active configuration profile."""

def set_current_profile(profile_name: str) -> None:
    """Set the active configuration profile."""

def list_profiles() -> List[str]:
    """List all available configuration profiles."""

def create_profile(
    name: str,
    settings: Dict[str, Any],
    activate: bool = False,
) -> Profile:
    """
    Create a new configuration profile.
    
    Parameters:
    - name: Profile name
    - settings: Profile settings dictionary
    - activate: Whether to activate the new profile
    
    Returns:
    Created Profile object
    """

def delete_profile(name: str) -> bool:
    """
    Delete a configuration profile.
    
    Parameters:
    - name: Profile name to delete
    
    Returns:
    True if profile was deleted
    """

Usage Examples

from prefect.profiles import (
    get_current_profile,
    create_profile,
    set_current_profile,
    list_profiles
)

# Profile management
def setup_environments():
    # Create development profile
    dev_profile = create_profile(
        name="development",
        settings={
            "PREFECT_API_URL": "http://localhost:4200/api",
            "PREFECT_LOGGING_LEVEL": "DEBUG",
            "PREFECT_LOCAL_STORAGE_PATH": "./dev-storage"
        }
    )
    
    # Create production profile
    prod_profile = create_profile(
        name="production",
        settings={
            "PREFECT_API_URL": "https://api.prefect.cloud/api/accounts/.../workspaces/...",
            "PREFECT_API_KEY": "pnu_...",
            "PREFECT_LOGGING_LEVEL": "INFO"
        }
    )
    
    return dev_profile, prod_profile

def switch_environment(env_name: str):
    """Switch to specified environment profile."""
    available = list_profiles()
    
    if env_name in available:
        set_current_profile(env_name)
        current = get_current_profile()
        print(f"Switched to profile: {current.name}")
    else:
        print(f"Profile '{env_name}' not found. Available: {available}")

Types

Types related to configuration and blocks:

from typing import Any, Dict, List, Optional, ClassVar, Union
from uuid import UUID
from pydantic import BaseModel, Field
from datetime import datetime

class BlockDocument:
    """Document representing a saved block."""
    id: UUID
    name: str
    block_type_id: UUID
    block_schema_id: UUID
    data: Dict[str, Any]
    created: datetime
    updated: datetime

class BlockType:
    """Block type registration information."""
    id: UUID
    name: str
    slug: str
    logo_url: Optional[str]
    documentation_url: Optional[str]
    description: Optional[str]
    code_example: Optional[str]

class BlockSchema:
    """JSON schema for a block type."""
    id: UUID
    checksum: str
    fields: Dict[str, Any]
    block_type_id: UUID
    version: str

class VariableDocument:
    """Document representing a saved variable."""
    id: UUID
    name: str
    value: Any
    tags: List[str]
    created: datetime
    updated: datetime

# Configuration types
class ProfileSettings:
    """Settings within a configuration profile."""
    
    PREFECT_API_URL: Optional[str]
    PREFECT_API_KEY: Optional[str]
    PREFECT_LOGGING_LEVEL: Optional[str]
    PREFECT_LOCAL_STORAGE_PATH: Optional[str]
    # ... many other settings

class DeploymentConfiguration:
    """Configuration for deployment environments."""
    work_pool_name: str
    job_variables: Dict[str, Any]
    build_steps: List[Dict[str, Any]]
    push_steps: List[Dict[str, Any]]
    pull_steps: List[Dict[str, Any]]

Install with Tessl CLI

npx tessl i tessl/pypi-prefect

docs

client-api.md

configuration.md

context-utilities.md

core-workflows.md

deployments.md

index.md

runtime-context.md

state-management.md

variables.md

tile.json