CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow

Programmatically author, schedule and monitor data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

configuration.mddocs/

Configuration Management

System configuration, variables, parameters, and connection management for workflow orchestration. Airflow provides multiple ways to manage configuration, secrets, and runtime parameters.

Capabilities

Variables

Store and retrieve configuration values that can be accessed across DAGs and tasks.

class Variable:
    @classmethod
    def get(
        cls,
        key: str,
        default_var: Any = None,
        deserialize_json: bool = False
    ) -> Any:
        """
        Get a variable value.
        
        Args:
            key: Variable key
            default_var: Default value if key not found
            deserialize_json: Whether to deserialize JSON values
            
        Returns:
            Variable value
        """
    
    @classmethod
    def set(
        cls,
        key: str,
        value: Any,
        description: Optional[str] = None,
        serialize_json: bool = False
    ) -> None:
        """
        Set a variable value.
        
        Args:
            key: Variable key
            value: Variable value
            description: Variable description
            serialize_json: Whether to serialize value as JSON
        """
    
    @classmethod
    def delete(cls, key: str) -> None:
        """
        Delete a variable.
        
        Args:
            key: Variable key to delete
        """
    
    @classmethod
    def setdefault(cls, key: str, default: Any) -> Any:
        """
        Set variable if it doesn't exist, otherwise return existing value.
        
        Args:
            key: Variable key
            default: Default value to set
            
        Returns:
            Variable value
        """

Usage example:

from airflow.models import Variable
from airflow.decorators import dag, task

@dag(dag_id='variable_example', start_date=datetime(2024, 1, 1))
def variable_example():
    @task
    def use_variables():
        # Get variables
        api_key = Variable.get("api_key")
        config = Variable.get("app_config", deserialize_json=True)
        
        # Set variables
        Variable.set("last_run", datetime.now().isoformat())
        
        return {"api_key_length": len(api_key), "config": config}
    
    use_variables()

dag_instance = variable_example()

Parameters

Define typed parameters for DAGs that can be set at runtime.

class Param:
    def __init__(
        self,
        default: Any = None,
        description: Optional[str] = None,
        schema: Optional[Dict[str, Any]] = None,
        **kwargs
    ):
        """
        Define a DAG parameter.
        
        Args:
            default: Default parameter value
            description: Parameter description
            schema: JSON schema for validation
            **kwargs: Additional parameter options
        """
    
    def resolve(self, value: Any = None) -> Any:
        """
        Resolve parameter value with validation.
        
        Args:
            value: Input value to resolve
            
        Returns:
            Resolved and validated value
        """

Usage example:

from airflow.models import Param
from airflow.decorators import dag, task

@dag(
    dag_id='parameterized_dag',
    start_date=datetime(2024, 1, 1),
    params={
        'environment': Param(
            default='dev',
            description='Environment to deploy to',
            schema={'type': 'string', 'enum': ['dev', 'staging', 'prod']}
        ),
        'batch_size': Param(
            default=100,
            description='Number of records to process',
            schema={'type': 'integer', 'minimum': 1, 'maximum': 1000}
        ),
        'debug_mode': Param(
            default=False,
            description='Enable debug logging',
            schema={'type': 'boolean'}
        )
    }
)
def parameterized_dag():
    @task
    def process_with_params(**context):
        params = context['params']
        environment = params['environment']
        batch_size = params['batch_size']
        debug_mode = params['debug_mode']
        
        return f"Processing {batch_size} records in {environment} with debug={debug_mode}"
    
    process_with_params()

dag_instance = parameterized_dag()

Connections

Manage external system connections with credentials and configuration.

class Connection:
    def __init__(
        self,
        conn_id: str,
        conn_type: str,
        description: Optional[str] = None,
        host: Optional[str] = None,
        login: Optional[str] = None,
        password: Optional[str] = None,
        schema: Optional[str] = None,
        port: Optional[int] = None,
        extra: Optional[Union[str, Dict]] = None,
        uri: Optional[str] = None
    ):
        """
        Create a connection configuration.
        
        Args:
            conn_id: Unique connection identifier
            conn_type: Connection type (postgres, mysql, http, etc.)
            description: Connection description
            host: Host address
            login: Username
            password: Password
            schema: Database schema
            port: Port number
            extra: Additional connection parameters (JSON string or dict)
            uri: Complete connection URI
        """
    
    @property
    def conn_id(self) -> str:
        """Connection ID."""
    
    @property
    def conn_type(self) -> str:
        """Connection type."""
    
    def get_uri(self) -> str:
        """Get complete connection URI."""
    
    def get_password(self) -> Optional[str]:
        """Get connection password."""
    
    def get_extra(self) -> Dict[str, Any]:
        """Get extra connection parameters as dictionary."""

# Connection retrieval
def get_connection(conn_id: str) -> Connection:
    """
    Get connection by ID.
    
    Args:
        conn_id: Connection identifier
        
    Returns:
        Connection instance
    """

Configuration System

Access and manage Airflow configuration settings.

from airflow import configuration

class AirflowConfigParser:
    def get(
        self,
        section: str,
        key: str,
        fallback: Any = None
    ) -> str:
        """
        Get configuration value.
        
        Args:
            section: Configuration section
            key: Configuration key
            fallback: Fallback value
            
        Returns:
            Configuration value
        """
    
    def getboolean(
        self,
        section: str,
        key: str,
        fallback: bool = False
    ) -> bool:
        """Get boolean configuration value."""
    
    def getint(
        self,
        section: str,
        key: str,
        fallback: int = 0
    ) -> int:
        """Get integer configuration value."""
    
    def getfloat(
        self,
        section: str,
        key: str,
        fallback: float = 0.0
    ) -> float:
        """Get float configuration value."""
    
    def set(self, section: str, key: str, value: str) -> None:
        """Set configuration value."""

# Global configuration instance
conf: AirflowConfigParser

Usage example:

from airflow import configuration
from airflow.decorators import dag, task

@dag(dag_id='config_example', start_date=datetime(2024, 1, 1))
def config_example():
    @task
    def use_config():
        # Get configuration values
        webserver_port = configuration.conf.getint('webserver', 'web_server_port')
        sql_alchemy_conn = configuration.conf.get('database', 'sql_alchemy_conn')
        parallelism = configuration.conf.getint('core', 'parallelism')
        
        return {
            'webserver_port': webserver_port,
            'db_conn': sql_alchemy_conn[:20] + '...',  # Don't log full connection
            'parallelism': parallelism
        }
    
    use_config()

dag_instance = config_example()

Settings and Environment

Global settings and environment configuration.

from airflow import settings

# Database session
Session = settings.Session

# Configuration paths
AIRFLOW_HOME: str
DAGS_FOLDER: str
PLUGINS_FOLDER: str
SQL_ALCHEMY_CONN: str

def configure_logging() -> None:
    """Configure Airflow logging."""

def initialize_airflow() -> None:
    """Initialize Airflow configuration and database."""

Secret Backends

Integrate with external secret management systems.

class BaseSecretsBackend:
    def __init__(self, **kwargs):
        """Base class for secret backends."""
    
    def get_connection(self, conn_id: str) -> Optional[Connection]:
        """
        Get connection from secret backend.
        
        Args:
            conn_id: Connection identifier
            
        Returns:
            Connection instance or None
        """
    
    def get_variable(self, key: str) -> Optional[str]:
        """
        Get variable from secret backend.
        
        Args:
            key: Variable key
            
        Returns:
            Variable value or None
        """

# Common secret backends
class EnvironmentVariablesBackend(BaseSecretsBackend):
    """Use environment variables as secret backend."""

class LocalFilesystemBackend(BaseSecretsBackend):
    """Use local files as secret backend."""

Resource Pools

Manage resource allocation for task execution.

class Pool:
    def __init__(
        self,
        pool: str,
        slots: int,
        description: str = ""
    ):
        """
        Define a resource pool.
        
        Args:
            pool: Pool name
            slots: Number of available slots
            description: Pool description
        """
    
    @property
    def pool(self) -> str:
        """Pool name."""
    
    @property
    def slots(self) -> int:
        """Available slots."""
    
    @property
    def description(self) -> str:
        """Pool description."""
    
    @classmethod
    def get_pool(cls, pool_name: str) -> Optional['Pool']:
        """Get pool by name."""
    
    @classmethod
    def create_pool(
        cls,
        pool_name: str,
        slots: int,
        description: str = ""
    ) -> 'Pool':
        """Create a new pool."""

Types

from typing import Union, Optional, Dict, Any
from datetime import datetime

ConfigValue = Union[str, int, float, bool]
ParamValue = Union[str, int, float, bool, list, dict]

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow

docs

assets-scheduling.md

cli-utilities.md

configuration.md

dag-management.md

database-models.md

exceptions.md

executors.md

extensions.md

index.md

task-operators.md

xcom.md

tile.json