CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow

Programmatically author, schedule and monitor data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

executors.mddocs/

Executors

Task execution engines including LocalExecutor, CeleryExecutor, KubernetesExecutor, and custom executor development. Executors determine how and where tasks are executed in Airflow.

Capabilities

Base Executor

Foundation for all Airflow executors providing core execution interface.

class BaseExecutor:
    def __init__(self, parallelism: int = 32):
        """
        Base executor implementation.
        
        Args:
            parallelism: Maximum number of parallel tasks
        """
    
    def execute_async(
        self,
        key: TaskInstanceKey,
        command: CommandType,
        queue: Optional[str] = None,
        executor_config: Optional[Dict] = None
    ) -> None:
        """
        Execute task asynchronously.
        
        Args:
            key: Task instance key
            command: Command to execute
            queue: Execution queue
            executor_config: Executor-specific configuration
        """
    
    def sync(self) -> None:
        """Sync executor state and collect results."""
    
    def heartbeat(self) -> None:
        """Heartbeat to maintain executor health."""
    
    def end(self) -> None:
        """Clean shutdown of executor."""
    
    def terminate(self) -> None:
        """Force terminate executor."""

Local Executor

Execute tasks in separate processes on the same machine.

class LocalExecutor(BaseExecutor):
    def __init__(self, parallelism: int = 0):
        """
        Local process executor.
        
        Args:
            parallelism: Max parallel tasks (0 = unlimited)
        """
    
    def execute_async(
        self,
        key: TaskInstanceKey,
        command: CommandType,
        queue: Optional[str] = None,
        executor_config: Optional[Dict] = None
    ) -> None:
        """Execute task in local subprocess."""
    
    def sync(self) -> None:
        """Collect completed task results."""

Usage example:

# Configuration for LocalExecutor
EXECUTOR_CONFIG = {
    'core': {
        'executor': 'LocalExecutor',
        'parallelism': 16,
        'max_active_runs_per_dag': 4
    }
}

Sequential Executor

Execute tasks one at a time (for testing and development).

class SequentialExecutor(BaseExecutor):
    def __init__(self):
        """Sequential executor for single-threaded execution."""
    
    def execute_async(
        self,
        key: TaskInstanceKey,
        command: CommandType,
        queue: Optional[str] = None,
        executor_config: Optional[Dict] = None
    ) -> None:
        """Execute task immediately in current process."""

Celery Executor

Distribute tasks across multiple worker nodes using Celery.

class CeleryExecutor(BaseExecutor):
    def __init__(self):
        """Celery-based distributed executor."""
    
    def execute_async(
        self,
        key: TaskInstanceKey,
        command: CommandType,
        queue: Optional[str] = 'default',
        executor_config: Optional[Dict] = None
    ) -> None:
        """Submit task to Celery worker queue."""
    
    def sync(self) -> None:
        """Check Celery task status and collect results."""

Kubernetes Executor

Execute tasks as Kubernetes pods.

class KubernetesExecutor(BaseExecutor):
    def __init__(self):
        """Kubernetes pod executor."""
    
    def execute_async(
        self,
        key: TaskInstanceKey,
        command: CommandType,
        queue: Optional[str] = None,
        executor_config: Optional[Dict] = None
    ) -> None:
        """Create Kubernetes pod for task execution."""
    
    def sync(self) -> None:
        """Monitor pod status and collect results."""
    
    def adopt_launched_task(
        self,
        kube_client: Any,
        pod: Any,
        pods: Dict[TaskInstanceKey, Any]
    ) -> None:
        """Adopt running pod for monitoring."""

Types

from typing import Optional, Dict, Any, List, Tuple
from airflow.models.taskinstance import TaskInstanceKey

CommandType = List[str]
ExecutorConfigType = Optional[Dict[str, Any]]
QueuedTaskInstanceType = Tuple[TaskInstanceKey, CommandType, Optional[str], ExecutorConfigType]

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow

docs

assets-scheduling.md

cli-utilities.md

configuration.md

dag-management.md

database-models.md

exceptions.md

executors.md

extensions.md

index.md

task-operators.md

xcom.md

tile.json