Programmatically author, schedule and monitor data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Task execution engines including LocalExecutor, CeleryExecutor, KubernetesExecutor, and custom executor development. Executors determine how and where tasks are executed in Airflow.
Foundation for all Airflow executors providing core execution interface.
class BaseExecutor:
def __init__(self, parallelism: int = 32):
"""
Base executor implementation.
Args:
parallelism: Maximum number of parallel tasks
"""
def execute_async(
self,
key: TaskInstanceKey,
command: CommandType,
queue: Optional[str] = None,
executor_config: Optional[Dict] = None
) -> None:
"""
Execute task asynchronously.
Args:
key: Task instance key
command: Command to execute
queue: Execution queue
executor_config: Executor-specific configuration
"""
def sync(self) -> None:
"""Sync executor state and collect results."""
def heartbeat(self) -> None:
"""Heartbeat to maintain executor health."""
def end(self) -> None:
"""Clean shutdown of executor."""
def terminate(self) -> None:
"""Force terminate executor."""Execute tasks in separate processes on the same machine.
class LocalExecutor(BaseExecutor):
def __init__(self, parallelism: int = 0):
"""
Local process executor.
Args:
parallelism: Max parallel tasks (0 = unlimited)
"""
def execute_async(
self,
key: TaskInstanceKey,
command: CommandType,
queue: Optional[str] = None,
executor_config: Optional[Dict] = None
) -> None:
"""Execute task in local subprocess."""
def sync(self) -> None:
"""Collect completed task results."""Usage example:
# Configuration for LocalExecutor
EXECUTOR_CONFIG = {
'core': {
'executor': 'LocalExecutor',
'parallelism': 16,
'max_active_runs_per_dag': 4
}
}Execute tasks one at a time (for testing and development).
class SequentialExecutor(BaseExecutor):
def __init__(self):
"""Sequential executor for single-threaded execution."""
def execute_async(
self,
key: TaskInstanceKey,
command: CommandType,
queue: Optional[str] = None,
executor_config: Optional[Dict] = None
) -> None:
"""Execute task immediately in current process."""Distribute tasks across multiple worker nodes using Celery.
class CeleryExecutor(BaseExecutor):
def __init__(self):
"""Celery-based distributed executor."""
def execute_async(
self,
key: TaskInstanceKey,
command: CommandType,
queue: Optional[str] = 'default',
executor_config: Optional[Dict] = None
) -> None:
"""Submit task to Celery worker queue."""
def sync(self) -> None:
"""Check Celery task status and collect results."""Execute tasks as Kubernetes pods.
class KubernetesExecutor(BaseExecutor):
def __init__(self):
"""Kubernetes pod executor."""
def execute_async(
self,
key: TaskInstanceKey,
command: CommandType,
queue: Optional[str] = None,
executor_config: Optional[Dict] = None
) -> None:
"""Create Kubernetes pod for task execution."""
def sync(self) -> None:
"""Monitor pod status and collect results."""
def adopt_launched_task(
self,
kube_client: Any,
pod: Any,
pods: Dict[TaskInstanceKey, Any]
) -> None:
"""Adopt running pod for monitoring."""from typing import Optional, Dict, Any, List, Tuple
from airflow.models.taskinstance import TaskInstanceKey
CommandType = List[str]
ExecutorConfigType = Optional[Dict[str, Any]]
QueuedTaskInstanceType = Tuple[TaskInstanceKey, CommandType, Optional[str], ExecutorConfigType]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow