Workflow orchestration and management framework for building resilient data pipelines.
—
Prefect Variables are named, mutable JSON values that can be shared across tasks, flows, and deployments. They provide a centralized way to store and retrieve configuration values, secrets, and other data that needs to be accessed dynamically during workflow execution.
The main Variable class for creating, reading, updating, and managing variables.
from prefect.variables import Variableclass Variable(BaseModel):
"""Variables are named, mutable JSON values that can be shared across tasks and flows."""
name: str # Variable name identifier
value: StrictVariableValue # Variable value (JSON-serializable)
tags: Optional[List[str]] # Optional tags for organization
@classmethod
def get(
cls,
name: str,
default: StrictVariableValue = None,
) -> StrictVariableValue:
"""
Get a variable's value by name.
Args:
- name: Name of the variable to retrieve
- default: Default value if variable doesn't exist
Returns:
Variable value or default if not found
"""
@classmethod
async def aget(
cls,
name: str,
default: StrictVariableValue = None,
) -> StrictVariableValue:
"""
Asynchronously get a variable's value by name.
Args:
- name: Name of the variable to retrieve
- default: Default value if variable doesn't exist
Returns:
Variable value or default if not found
"""
@classmethod
def set(
cls,
name: str,
value: StrictVariableValue,
tags: Optional[List[str]] = None,
overwrite: bool = False,
) -> "Variable":
"""
Set a new variable. Must pass overwrite=True if variable exists.
Args:
- name: Name of the variable
- value: JSON-serializable value to store
- tags: Optional tags for organization
- overwrite: Whether to overwrite existing variable
Returns:
The newly created Variable object
"""
@classmethod
async def aset(
cls,
name: str,
value: StrictVariableValue,
tags: Optional[List[str]] = None,
overwrite: bool = False,
) -> "Variable":
"""
Asynchronously set a new variable. Must pass overwrite=True if variable exists.
Args:
- name: Name of the variable
- value: JSON-serializable value to store
- tags: Optional tags for organization
- overwrite: Whether to overwrite existing variable
Returns:
The newly created Variable object
"""from prefect.variables import Variable
# Set a variable
api_config = Variable.set(
name="api_config",
value={"base_url": "https://api.example.com", "timeout": 30},
tags=["config", "api"]
)
# Get a variable value
config = Variable.get("api_config")
print(config) # {"base_url": "https://api.example.com", "timeout": 30}
# Get with default value
debug_mode = Variable.get("debug_mode", default=False)
# Update an existing variable
Variable.set(
name="api_config",
value={"base_url": "https://api.example.com", "timeout": 60},
overwrite=True
)from prefect import flow, task
from prefect.variables import Variable
@task
def fetch_data():
# Get API configuration from variables
config = Variable.get("api_config")
base_url = config["base_url"]
timeout = config["timeout"]
# Use configuration in task logic
print(f"Fetching data from {base_url} with timeout {timeout}s")
return {"data": "sample", "source": base_url}
@task
def process_data(raw_data: dict):
# Get processing settings from variables
batch_size = Variable.get("batch_size", default=100)
print(f"Processing data in batches of {batch_size}")
return {"processed": True, "batch_size": batch_size}
@flow
def data_pipeline():
"""Example flow using variables for configuration."""
raw_data = fetch_data()
result = process_data(raw_data)
return resultfrom prefect import flow, task
from prefect.variables import Variable
import asyncio
@task
async def async_fetch_data():
# Asynchronously get variable value
config = await Variable.aget("api_config")
# Set a runtime variable asynchronously
await Variable.aset(
name="last_run_time",
value=str(asyncio.get_event_loop().time()),
overwrite=True
)
return {"data": "fetched", "config": config}
@flow
async def async_data_pipeline():
"""Example async flow using variables."""
result = await async_fetch_data()
return resultfrom prefect import flow
from prefect.variables import Variable
import prefect.runtime.deployment
@flow
def environment_aware_flow():
"""Flow that adapts behavior based on environment variables."""
# Get environment from deployment parameters or variables
env = prefect.runtime.deployment.parameters.get("environment", "prod")
# Get environment-specific configuration
db_config = Variable.get(f"database_config_{env}")
api_key = Variable.get(f"api_key_{env}")
print(f"Running in {env} environment")
print(f"Database config: {db_config}")
return {"environment": env, "configured": True}Variables are ideal for storing configuration that needs to be shared across multiple flows:
# Set up shared configuration
Variable.set("app_config", {
"database_url": "postgresql://localhost/mydb",
"redis_url": "redis://localhost:6379",
"log_level": "INFO"
})
Variable.set("feature_flags", {
"enable_cache": True,
"enable_notifications": False,
"beta_features": True
})@task
def conditional_task():
# Use variables to control task behavior
feature_flags = Variable.get("feature_flags", default={})
if feature_flags.get("enable_cache", False):
print("Using cached results")
return "cached_result"
else:
print("Computing fresh results")
return "fresh_result"@task
def track_progress():
# Update progress tracking variable
current_progress = Variable.get("pipeline_progress", default=0)
new_progress = current_progress + 1
Variable.set("pipeline_progress", new_progress, overwrite=True)
print(f"Pipeline progress: {new_progress}")Note: Variables persist across flow runs and are accessible throughout your Prefect workspace. Use appropriate naming conventions and tags to organize variables effectively.
Install with Tessl CLI
npx tessl i tessl/pypi-prefect