Programmatically author, schedule and monitor data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
System configuration, variables, parameters, and connection management for workflow orchestration. Airflow provides multiple ways to manage configuration, secrets, and runtime parameters.
Store and retrieve configuration values that can be accessed across DAGs and tasks.
class Variable:
@classmethod
def get(
cls,
key: str,
default_var: Any = None,
deserialize_json: bool = False
) -> Any:
"""
Get a variable value.
Args:
key: Variable key
default_var: Default value if key not found
deserialize_json: Whether to deserialize JSON values
Returns:
Variable value
"""
@classmethod
def set(
cls,
key: str,
value: Any,
description: Optional[str] = None,
serialize_json: bool = False
) -> None:
"""
Set a variable value.
Args:
key: Variable key
value: Variable value
description: Variable description
serialize_json: Whether to serialize value as JSON
"""
@classmethod
def delete(cls, key: str) -> None:
"""
Delete a variable.
Args:
key: Variable key to delete
"""
@classmethod
def setdefault(cls, key: str, default: Any) -> Any:
"""
Set variable if it doesn't exist, otherwise return existing value.
Args:
key: Variable key
default: Default value to set
Returns:
Variable value
"""Usage example:
from airflow.models import Variable
from airflow.decorators import dag, task
@dag(dag_id='variable_example', start_date=datetime(2024, 1, 1))
def variable_example():
@task
def use_variables():
# Get variables
api_key = Variable.get("api_key")
config = Variable.get("app_config", deserialize_json=True)
# Set variables
Variable.set("last_run", datetime.now().isoformat())
return {"api_key_length": len(api_key), "config": config}
use_variables()
dag_instance = variable_example()Define typed parameters for DAGs that can be set at runtime.
class Param:
def __init__(
self,
default: Any = None,
description: Optional[str] = None,
schema: Optional[Dict[str, Any]] = None,
**kwargs
):
"""
Define a DAG parameter.
Args:
default: Default parameter value
description: Parameter description
schema: JSON schema for validation
**kwargs: Additional parameter options
"""
def resolve(self, value: Any = None) -> Any:
"""
Resolve parameter value with validation.
Args:
value: Input value to resolve
Returns:
Resolved and validated value
"""Usage example:
from airflow.models import Param
from airflow.decorators import dag, task
@dag(
dag_id='parameterized_dag',
start_date=datetime(2024, 1, 1),
params={
'environment': Param(
default='dev',
description='Environment to deploy to',
schema={'type': 'string', 'enum': ['dev', 'staging', 'prod']}
),
'batch_size': Param(
default=100,
description='Number of records to process',
schema={'type': 'integer', 'minimum': 1, 'maximum': 1000}
),
'debug_mode': Param(
default=False,
description='Enable debug logging',
schema={'type': 'boolean'}
)
}
)
def parameterized_dag():
@task
def process_with_params(**context):
params = context['params']
environment = params['environment']
batch_size = params['batch_size']
debug_mode = params['debug_mode']
return f"Processing {batch_size} records in {environment} with debug={debug_mode}"
process_with_params()
dag_instance = parameterized_dag()Manage external system connections with credentials and configuration.
class Connection:
def __init__(
self,
conn_id: str,
conn_type: str,
description: Optional[str] = None,
host: Optional[str] = None,
login: Optional[str] = None,
password: Optional[str] = None,
schema: Optional[str] = None,
port: Optional[int] = None,
extra: Optional[Union[str, Dict]] = None,
uri: Optional[str] = None
):
"""
Create a connection configuration.
Args:
conn_id: Unique connection identifier
conn_type: Connection type (postgres, mysql, http, etc.)
description: Connection description
host: Host address
login: Username
password: Password
schema: Database schema
port: Port number
extra: Additional connection parameters (JSON string or dict)
uri: Complete connection URI
"""
@property
def conn_id(self) -> str:
"""Connection ID."""
@property
def conn_type(self) -> str:
"""Connection type."""
def get_uri(self) -> str:
"""Get complete connection URI."""
def get_password(self) -> Optional[str]:
"""Get connection password."""
def get_extra(self) -> Dict[str, Any]:
"""Get extra connection parameters as dictionary."""
# Connection retrieval
def get_connection(conn_id: str) -> Connection:
"""
Get connection by ID.
Args:
conn_id: Connection identifier
Returns:
Connection instance
"""Access and manage Airflow configuration settings.
from airflow import configuration
class AirflowConfigParser:
def get(
self,
section: str,
key: str,
fallback: Any = None
) -> str:
"""
Get configuration value.
Args:
section: Configuration section
key: Configuration key
fallback: Fallback value
Returns:
Configuration value
"""
def getboolean(
self,
section: str,
key: str,
fallback: bool = False
) -> bool:
"""Get boolean configuration value."""
def getint(
self,
section: str,
key: str,
fallback: int = 0
) -> int:
"""Get integer configuration value."""
def getfloat(
self,
section: str,
key: str,
fallback: float = 0.0
) -> float:
"""Get float configuration value."""
def set(self, section: str, key: str, value: str) -> None:
"""Set configuration value."""
# Global configuration instance
conf: AirflowConfigParserUsage example:
from airflow import configuration
from airflow.decorators import dag, task
@dag(dag_id='config_example', start_date=datetime(2024, 1, 1))
def config_example():
@task
def use_config():
# Get configuration values
webserver_port = configuration.conf.getint('webserver', 'web_server_port')
sql_alchemy_conn = configuration.conf.get('database', 'sql_alchemy_conn')
parallelism = configuration.conf.getint('core', 'parallelism')
return {
'webserver_port': webserver_port,
'db_conn': sql_alchemy_conn[:20] + '...', # Don't log full connection
'parallelism': parallelism
}
use_config()
dag_instance = config_example()Global settings and environment configuration.
from airflow import settings
# Database session
Session = settings.Session
# Configuration paths
AIRFLOW_HOME: str
DAGS_FOLDER: str
PLUGINS_FOLDER: str
SQL_ALCHEMY_CONN: str
def configure_logging() -> None:
"""Configure Airflow logging."""
def initialize_airflow() -> None:
"""Initialize Airflow configuration and database."""Integrate with external secret management systems.
class BaseSecretsBackend:
def __init__(self, **kwargs):
"""Base class for secret backends."""
def get_connection(self, conn_id: str) -> Optional[Connection]:
"""
Get connection from secret backend.
Args:
conn_id: Connection identifier
Returns:
Connection instance or None
"""
def get_variable(self, key: str) -> Optional[str]:
"""
Get variable from secret backend.
Args:
key: Variable key
Returns:
Variable value or None
"""
# Common secret backends
class EnvironmentVariablesBackend(BaseSecretsBackend):
"""Use environment variables as secret backend."""
class LocalFilesystemBackend(BaseSecretsBackend):
"""Use local files as secret backend."""Manage resource allocation for task execution.
class Pool:
def __init__(
self,
pool: str,
slots: int,
description: str = ""
):
"""
Define a resource pool.
Args:
pool: Pool name
slots: Number of available slots
description: Pool description
"""
@property
def pool(self) -> str:
"""Pool name."""
@property
def slots(self) -> int:
"""Available slots."""
@property
def description(self) -> str:
"""Pool description."""
@classmethod
def get_pool(cls, pool_name: str) -> Optional['Pool']:
"""Get pool by name."""
@classmethod
def create_pool(
cls,
pool_name: str,
slots: int,
description: str = ""
) -> 'Pool':
"""Create a new pool."""from typing import Union, Optional, Dict, Any
from datetime import datetime
ConfigValue = Union[str, int, float, bool]
ParamValue = Union[str, int, float, bool, list, dict]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow