CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-prefect

Workflow orchestration and management framework for building resilient data pipelines.

Pending
Overview
Eval results
Files

variables.mddocs/

Variables Management

Prefect Variables are named, mutable JSON values that can be shared across tasks, flows, and deployments. They provide a centralized way to store and retrieve configuration values, secrets, and other data that needs to be accessed dynamically during workflow execution.

Capabilities

Variable Class

The main Variable class for creating, reading, updating, and managing variables.

from prefect.variables import Variable
class Variable(BaseModel):
    """Variables are named, mutable JSON values that can be shared across tasks and flows."""
    
    name: str                           # Variable name identifier
    value: StrictVariableValue         # Variable value (JSON-serializable)
    tags: Optional[List[str]]          # Optional tags for organization
    
    @classmethod
    def get(
        cls,
        name: str,
        default: StrictVariableValue = None,
    ) -> StrictVariableValue:
        """
        Get a variable's value by name.
        
        Args:
        - name: Name of the variable to retrieve
        - default: Default value if variable doesn't exist
        
        Returns:
        Variable value or default if not found
        """
    
    @classmethod
    async def aget(
        cls,
        name: str,
        default: StrictVariableValue = None,
    ) -> StrictVariableValue:
        """
        Asynchronously get a variable's value by name.
        
        Args:
        - name: Name of the variable to retrieve
        - default: Default value if variable doesn't exist
        
        Returns:
        Variable value or default if not found
        """
    
    @classmethod
    def set(
        cls,
        name: str,
        value: StrictVariableValue,
        tags: Optional[List[str]] = None,
        overwrite: bool = False,
    ) -> "Variable":
        """
        Set a new variable. Must pass overwrite=True if variable exists.
        
        Args:
        - name: Name of the variable
        - value: JSON-serializable value to store
        - tags: Optional tags for organization
        - overwrite: Whether to overwrite existing variable
        
        Returns:
        The newly created Variable object
        """
    
    @classmethod
    async def aset(
        cls,
        name: str,
        value: StrictVariableValue,
        tags: Optional[List[str]] = None,
        overwrite: bool = False,
    ) -> "Variable":
        """
        Asynchronously set a new variable. Must pass overwrite=True if variable exists.
        
        Args:
        - name: Name of the variable
        - value: JSON-serializable value to store
        - tags: Optional tags for organization
        - overwrite: Whether to overwrite existing variable
        
        Returns:
        The newly created Variable object
        """

Usage Examples

Basic Variable Operations

from prefect.variables import Variable

# Set a variable
api_config = Variable.set(
    name="api_config",
    value={"base_url": "https://api.example.com", "timeout": 30},
    tags=["config", "api"]
)

# Get a variable value
config = Variable.get("api_config")
print(config)  # {"base_url": "https://api.example.com", "timeout": 30}

# Get with default value
debug_mode = Variable.get("debug_mode", default=False)

# Update an existing variable
Variable.set(
    name="api_config", 
    value={"base_url": "https://api.example.com", "timeout": 60},
    overwrite=True
)

Using Variables in Flows and Tasks

from prefect import flow, task
from prefect.variables import Variable

@task
def fetch_data():
    # Get API configuration from variables
    config = Variable.get("api_config")
    base_url = config["base_url"]
    timeout = config["timeout"]
    
    # Use configuration in task logic
    print(f"Fetching data from {base_url} with timeout {timeout}s")
    return {"data": "sample", "source": base_url}

@task
def process_data(raw_data: dict):
    # Get processing settings from variables
    batch_size = Variable.get("batch_size", default=100)
    
    print(f"Processing data in batches of {batch_size}")
    return {"processed": True, "batch_size": batch_size}

@flow
def data_pipeline():
    """Example flow using variables for configuration."""
    raw_data = fetch_data()
    result = process_data(raw_data)
    return result

Async Variable Operations

from prefect import flow, task
from prefect.variables import Variable
import asyncio

@task
async def async_fetch_data():
    # Asynchronously get variable value
    config = await Variable.aget("api_config")
    
    # Set a runtime variable asynchronously
    await Variable.aset(
        name="last_run_time",
        value=str(asyncio.get_event_loop().time()),
        overwrite=True
    )
    
    return {"data": "fetched", "config": config}

@flow
async def async_data_pipeline():
    """Example async flow using variables."""
    result = await async_fetch_data()
    return result

Environment-Specific Configuration

from prefect import flow
from prefect.variables import Variable
import prefect.runtime.deployment

@flow 
def environment_aware_flow():
    """Flow that adapts behavior based on environment variables."""
    
    # Get environment from deployment parameters or variables
    env = prefect.runtime.deployment.parameters.get("environment", "prod")
    
    # Get environment-specific configuration
    db_config = Variable.get(f"database_config_{env}")
    api_key = Variable.get(f"api_key_{env}")
    
    print(f"Running in {env} environment")
    print(f"Database config: {db_config}")
    
    return {"environment": env, "configured": True}

Common Patterns

Configuration Management

Variables are ideal for storing configuration that needs to be shared across multiple flows:

# Set up shared configuration
Variable.set("app_config", {
    "database_url": "postgresql://localhost/mydb",
    "redis_url": "redis://localhost:6379",
    "log_level": "INFO"
})

Variable.set("feature_flags", {
    "enable_cache": True,
    "enable_notifications": False,
    "beta_features": True
})

Dynamic Behavior Control

@task
def conditional_task():
    # Use variables to control task behavior
    feature_flags = Variable.get("feature_flags", default={})
    
    if feature_flags.get("enable_cache", False):
        print("Using cached results")
        return "cached_result"
    else:
        print("Computing fresh results")
        return "fresh_result"

Runtime State Management

@task
def track_progress():
    # Update progress tracking variable
    current_progress = Variable.get("pipeline_progress", default=0)
    new_progress = current_progress + 1
    
    Variable.set("pipeline_progress", new_progress, overwrite=True)
    print(f"Pipeline progress: {new_progress}")

Note: Variables persist across flow runs and are accessible throughout your Prefect workspace. Use appropriate naming conventions and tags to organize variables effectively.

Install with Tessl CLI

npx tessl i tessl/pypi-prefect

docs

client-api.md

configuration.md

context-utilities.md

core-workflows.md

deployments.md

index.md

runtime-context.md

state-management.md

variables.md

tile.json