Workflow orchestration and management framework for building resilient data pipelines.
—
Prefect's configuration system provides comprehensive management of settings, blocks for reusable configurations, and variables for dynamic workflow parameterization. This enables secure credential management, environment-specific configurations, and flexible workflow customization.
Reusable configuration objects for credentials, connections, and infrastructure settings that can be shared across flows and deployments.
class Block(BaseModel):
"""
Base class for Prefect blocks - reusable configuration objects.
Blocks provide a way to store and share configuration, credentials,
and other reusable components across flows and deployments. They
support serialization, validation, and secure storage.
"""
_block_type_name: ClassVar[str]
_block_type_slug: ClassVar[str]
_logo_url: ClassVar[Optional[str]]
_description: ClassVar[Optional[str]]
_documentation_url: ClassVar[Optional[str]]
def save(
self,
name: str,
overwrite: bool = False,
) -> UUID:
"""
Save the block to Prefect storage.
Parameters:
- name: Name to save the block under
- overwrite: Whether to overwrite existing block with same name
Returns:
UUID of the saved block
Raises:
ValueError: If block with same name exists and overwrite=False
"""
@classmethod
def load(cls, name: str) -> "Block":
"""
Load a saved block by name.
Parameters:
- name: Name of the block to load
Returns:
Block instance loaded from storage
Raises:
ValueError: If block with given name doesn't exist
"""
@classmethod
def register_type_and_schema(cls) -> None:
"""Register the block type and schema with Prefect."""
def dict(self, **kwargs) -> Dict[str, Any]:
"""Convert block to dictionary representation."""
def json(self, **kwargs) -> str:
"""Convert block to JSON representation."""
@classmethod
def schema(cls, **kwargs) -> Dict[str, Any]:
"""Get the JSON schema for the block type."""from prefect.blocks.core import Block
from pydantic import Field
from typing import Optional
# Custom block definition
class DatabaseConfig(Block):
"""Database connection configuration block."""
_block_type_name = "Database Config"
_block_type_slug = "database-config"
_description = "Configuration for database connections"
host: str = Field(description="Database host")
port: int = Field(default=5432, description="Database port")
database: str = Field(description="Database name")
username: str = Field(description="Database username")
password: str = Field(description="Database password")
ssl_mode: Optional[str] = Field(default="prefer", description="SSL mode")
def get_connection_string(self) -> str:
"""Generate connection string from configuration."""
return f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}?sslmode={self.ssl_mode}"
# Save and load blocks
def setup_database_config():
# Create block instance
db_config = DatabaseConfig(
host="localhost",
port=5432,
database="myapp",
username="user",
password="secret"
)
# Save for reuse
block_id = db_config.save(name="production-db")
return block_id
# Use in flows
from prefect import flow, task
@task
def connect_to_database():
# Load the saved block
db_config = DatabaseConfig.load("production-db")
connection_string = db_config.get_connection_string()
# Use configuration
print(f"Connecting to: {connection_string}")
return "connected"
@flow
def database_workflow():
connection = connect_to_database()
return connectionAccess and management of Prefect settings and configuration profiles.
def get_settings_context() -> SettingsContext:
"""
Get the current settings context.
Returns:
SettingsContext containing current profile and settings
"""
class SettingsContext:
"""
Context object containing Prefect settings and profile information.
Attributes:
- profile: Current configuration profile
- settings: Dictionary of current settings
"""
profile: Profile
settings: Dict[str, Any]
def update_settings(self, **kwargs) -> None:
"""Update settings in the current context."""
def get_setting(self, key: str, default: Any = None) -> Any:
"""Get a specific setting value."""
class Profile:
"""
Configuration profile containing Prefect settings.
Attributes:
- name: Profile name
- settings: Profile-specific settings
"""
name: str
settings: Dict[str, Any]
@classmethod
def load_from_file(cls, path: str) -> "Profile":
"""Load profile from configuration file."""
def save_to_file(self, path: str) -> None:
"""Save profile to configuration file."""from prefect.context import get_settings_context
from prefect import flow, task
@task
def environment_aware_task():
# Access current settings
context = get_settings_context()
print(f"Current profile: {context.profile.name}")
print(f"API URL: {context.get_setting('PREFECT_API_URL')}")
# Conditional logic based on environment
if context.profile.name == "production":
return "production-mode-result"
else:
return "development-mode-result"
@flow
def settings_aware_flow():
context = get_settings_context()
# Log current configuration
print(f"Running with profile: {context.profile.name}")
result = environment_aware_task()
return resultDynamic variables for runtime parameterization and configuration management.
class Variable:
"""
Prefect variable for dynamic configuration.
Variables provide a way to store and retrieve dynamic values
that can be updated without changing flow code.
"""
def __init__(
self,
name: str,
value: Any = None,
tags: List[str] = None,
):
"""
Initialize a variable.
Parameters:
- name: Variable name
- value: Variable value
- tags: Optional tags for organization
"""
def save(self, overwrite: bool = True) -> UUID:
"""
Save the variable to Prefect storage.
Parameters:
- overwrite: Whether to overwrite existing variable
Returns:
UUID of the saved variable
"""
@classmethod
def get(
cls,
name: str,
default: Any = None,
) -> Any:
"""
Get a variable value by name.
Parameters:
- name: Variable name to retrieve
- default: Default value if variable doesn't exist
Returns:
Variable value or default
"""
@classmethod
def set(
cls,
name: str,
value: Any,
tags: List[str] = None,
overwrite: bool = True,
) -> UUID:
"""
Set a variable value.
Parameters:
- name: Variable name
- value: Variable value
- tags: Optional tags
- overwrite: Whether to overwrite existing variable
Returns:
UUID of the variable
"""
@classmethod
def delete(cls, name: str) -> bool:
"""
Delete a variable.
Parameters:
- name: Variable name to delete
Returns:
True if variable was deleted
"""from prefect.variables import Variable
from prefect import flow, task
@task
def configurable_task():
# Get dynamic configuration
batch_size = Variable.get("batch_size", default=100)
max_retries = Variable.get("max_retries", default=3)
print(f"Processing with batch_size={batch_size}, max_retries={max_retries}")
return {"batch_size": batch_size, "max_retries": max_retries}
@flow
def dynamic_workflow():
# Set runtime variables
Variable.set("batch_size", 500, tags=["processing"])
Variable.set("max_retries", 5, tags=["reliability"])
# Use variables in task
config = configurable_task()
return config
# Variable management
def setup_environment_variables():
# Production settings
Variable.set("api_endpoint", "https://api.production.com", tags=["production", "api"])
Variable.set("timeout_seconds", 30, tags=["production", "performance"])
Variable.set("log_level", "INFO", tags=["production", "logging"])
# Development settings
Variable.set("debug_mode", True, tags=["development"])
Variable.set("mock_external_apis", True, tags=["development", "testing"])Prefect provides several built-in block types for common infrastructure and service integrations.
# System blocks
class Secret(Block):
"""Secure storage for sensitive values."""
value: str = Field(description="Secret value")
class JSON(Block):
"""Storage for JSON data."""
value: Dict[str, Any] = Field(description="JSON data")
class String(Block):
"""Storage for string values."""
value: str = Field(description="String value")
# Notification blocks
class SlackWebhook(Block):
"""Slack webhook integration."""
url: str = Field(description="Slack webhook URL")
def notify(self, message: str) -> None:
"""Send notification to Slack."""
class EmailServer(Block):
"""Email server configuration."""
smtp_server: str = Field(description="SMTP server host")
smtp_port: int = Field(default=587, description="SMTP port")
username: str = Field(description="SMTP username")
password: str = Field(description="SMTP password")
def send_email(
self,
to: List[str],
subject: str,
body: str,
) -> None:
"""Send email notification."""
# Infrastructure blocks
class DockerContainer(Block):
"""Docker container configuration."""
image: str = Field(description="Container image")
command: Optional[List[str]] = Field(description="Container command")
env: Optional[Dict[str, str]] = Field(description="Environment variables")
class KubernetesJob(Block):
"""Kubernetes job configuration."""
namespace: str = Field(default="default", description="Kubernetes namespace")
image: str = Field(description="Container image")
job_manifest: Dict[str, Any] = Field(description="Kubernetes job manifest")from prefect.blocks.system import Secret, JSON
from prefect.blocks.notifications import SlackWebhook
from prefect import flow, task
# Setup blocks
def setup_infrastructure_blocks():
# Save API key securely
api_secret = Secret(value="sk-1234567890abcdef")
api_secret.save("openai-api-key")
# Save configuration
config = JSON(value={
"model": "gpt-4",
"temperature": 0.7,
"max_tokens": 1000
})
config.save("llm-config")
# Setup notifications
slack = SlackWebhook(url="https://hooks.slack.com/services/...")
slack.save("team-notifications")
@task
def ai_task():
# Load secure credentials
api_key = Secret.load("openai-api-key").value
config = JSON.load("llm-config").value
# Use configuration
print(f"Using model: {config['model']}")
# API call logic here
return "AI processing complete"
@flow
def ai_workflow():
result = ai_task()
# Send notification
slack = SlackWebhook.load("team-notifications")
slack.notify(f"Workflow completed: {result}")
return resultUtilities for managing environment-specific configurations and deployment settings.
def get_current_profile() -> Profile:
"""Get the currently active configuration profile."""
def set_current_profile(profile_name: str) -> None:
"""Set the active configuration profile."""
def list_profiles() -> List[str]:
"""List all available configuration profiles."""
def create_profile(
name: str,
settings: Dict[str, Any],
activate: bool = False,
) -> Profile:
"""
Create a new configuration profile.
Parameters:
- name: Profile name
- settings: Profile settings dictionary
- activate: Whether to activate the new profile
Returns:
Created Profile object
"""
def delete_profile(name: str) -> bool:
"""
Delete a configuration profile.
Parameters:
- name: Profile name to delete
Returns:
True if profile was deleted
"""from prefect.profiles import (
get_current_profile,
create_profile,
set_current_profile,
list_profiles
)
# Profile management
def setup_environments():
# Create development profile
dev_profile = create_profile(
name="development",
settings={
"PREFECT_API_URL": "http://localhost:4200/api",
"PREFECT_LOGGING_LEVEL": "DEBUG",
"PREFECT_LOCAL_STORAGE_PATH": "./dev-storage"
}
)
# Create production profile
prod_profile = create_profile(
name="production",
settings={
"PREFECT_API_URL": "https://api.prefect.cloud/api/accounts/.../workspaces/...",
"PREFECT_API_KEY": "pnu_...",
"PREFECT_LOGGING_LEVEL": "INFO"
}
)
return dev_profile, prod_profile
def switch_environment(env_name: str):
"""Switch to specified environment profile."""
available = list_profiles()
if env_name in available:
set_current_profile(env_name)
current = get_current_profile()
print(f"Switched to profile: {current.name}")
else:
print(f"Profile '{env_name}' not found. Available: {available}")Types related to configuration and blocks:
from typing import Any, Dict, List, Optional, ClassVar, Union
from uuid import UUID
from pydantic import BaseModel, Field
from datetime import datetime
class BlockDocument:
"""Document representing a saved block."""
id: UUID
name: str
block_type_id: UUID
block_schema_id: UUID
data: Dict[str, Any]
created: datetime
updated: datetime
class BlockType:
"""Block type registration information."""
id: UUID
name: str
slug: str
logo_url: Optional[str]
documentation_url: Optional[str]
description: Optional[str]
code_example: Optional[str]
class BlockSchema:
"""JSON schema for a block type."""
id: UUID
checksum: str
fields: Dict[str, Any]
block_type_id: UUID
version: str
class VariableDocument:
"""Document representing a saved variable."""
id: UUID
name: str
value: Any
tags: List[str]
created: datetime
updated: datetime
# Configuration types
class ProfileSettings:
"""Settings within a configuration profile."""
PREFECT_API_URL: Optional[str]
PREFECT_API_KEY: Optional[str]
PREFECT_LOGGING_LEVEL: Optional[str]
PREFECT_LOCAL_STORAGE_PATH: Optional[str]
# ... many other settings
class DeploymentConfiguration:
"""Configuration for deployment environments."""
work_pool_name: str
job_variables: Dict[str, Any]
build_steps: List[Dict[str, Any]]
push_steps: List[Dict[str, Any]]
pull_steps: List[Dict[str, Any]]Install with Tessl CLI
npx tessl i tessl/pypi-prefect