Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Run Airflow tasks on Kubernetes infrastructure with executors that manage task distribution, pod creation, and lifecycle management across Kubernetes clusters.
Execute Airflow tasks as Kubernetes pods with comprehensive cluster management, resource allocation, and monitoring.
class KubernetesExecutor(BaseExecutor):
"""
Executes tasks in Kubernetes pods.
This executor launches each task as a separate Kubernetes pod, providing
isolation, scalability, and resource management for Airflow task execution.
Configuration is typically done through airflow.cfg under the
[kubernetes_executor] section.
Key Features:
- Task isolation in separate pods
- Dynamic resource allocation
- Multi-namespace support
- Pod template customization
- Automatic cleanup and monitoring
"""
def __init__(self, **kwargs): ...
def start(self) -> None:
"""
Start the executor.
Initializes the Kubernetes client, starts the job watcher process,
and prepares the executor for task scheduling.
"""
...
def sync(self) -> None:
"""
Synchronize task states.
Updates task states based on pod status, processes completed tasks,
and handles task state transitions.
"""
...
def end(self) -> None:
"""
End executor.
Gracefully shuts down the executor, cleans up resources,
and terminates background processes.
"""
...
def terminate(self) -> None:
"""
Terminate executor.
Forcefully terminates the executor and all associated processes.
"""
...
def execute_async(
self,
key: TaskInstanceKey,
command: CommandType,
queue: str | None = None,
executor_config: dict | None = None
) -> None:
"""
Execute task asynchronously.
Args:
key: Task instance key
command: Command to execute
queue: Execution queue (optional)
executor_config: Executor-specific configuration
"""
...
def adopt_launched_task(self, ti: TaskInstance, last_heartbeat: datetime, session: Session) -> bool:
"""
Adopt a launched task.
Args:
ti: Task instance to adopt
last_heartbeat: Last heartbeat timestamp
session: Database session
Returns:
bool: True if task was successfully adopted
"""
...Hybrid executor that runs local tasks with LocalExecutor and Kubernetes tasks with KubernetesExecutor based on queue configuration.
class LocalKubernetesExecutor(BaseExecutor):
"""
Hybrid executor running LocalExecutor for local tasks and
KubernetesExecutor for Kubernetes tasks.
Tasks are routed to the appropriate executor based on the queue name.
By default, tasks in the 'kubernetes' queue are executed on Kubernetes,
while all other tasks are executed locally.
Configuration:
- kubernetes_queue: Queue name for Kubernetes tasks (default: 'kubernetes')
"""
def __init__(self, local_executor: BaseExecutor, kubernetes_executor: BaseExecutor, **kwargs): ...
def start(self) -> None:
"""Start both local and Kubernetes executors."""
...
def sync(self) -> None:
"""Synchronize both executors."""
...
def end(self) -> None:
"""End both executors."""
...
def terminate(self) -> None:
"""Terminate both executors."""
...
def execute_async(
self,
key: TaskInstanceKey,
command: CommandType,
queue: str | None = None,
executor_config: dict | None = None
) -> None:
"""
Route task to appropriate executor based on queue.
Args:
key: Task instance key
command: Command to execute
queue: Execution queue - determines executor selection
executor_config: Executor-specific configuration
"""
...
def has_task(self, ti: TaskInstance) -> bool:
"""Check if task exists in either executor."""
...
def heartbeat(self) -> None:
"""Heartbeat both executors."""
...Process that monitors Kubernetes job status and manages job lifecycle for the executor.
class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
"""
Watches Kubernetes jobs for state changes.
This process monitors pods created by the KubernetesExecutor,
tracks their status changes, and communicates updates back
to the main executor process.
Args:
namespace (str): Kubernetes namespace to watch
multi_namespace_mode (bool): Enable multi-namespace watching
watcher_queue (Queue): Queue for status updates
resource_version (str, optional): Starting resource version
worker_uuid (str): Unique identifier for this worker
kube_config (Configuration): Kubernetes client configuration
"""
def __init__(
self,
namespace: str,
multi_namespace_mode: bool,
watcher_queue: Queue,
resource_version: str | None,
worker_uuid: str,
kube_config: Configuration,
**kwargs
): ...
def run(self) -> None:
"""Main process loop for watching Kubernetes jobs."""
...
def _run(self) -> None:
"""Internal run method with error handling."""
...Scheduler component that manages pod creation and task distribution on Kubernetes clusters.
class AirflowKubernetesScheduler(LoggingMixin):
"""
Schedules pods on Kubernetes.
Manages the creation, monitoring, and cleanup of Kubernetes pods
for task execution. Handles pod templates, resource allocation,
and namespace management.
Args:
kube_config (Configuration): Kubernetes client configuration
task_queue (Queue): Queue of tasks to execute
result_queue (Queue): Queue for execution results
kube_client (ApiClient): Kubernetes API client
worker_uuid (str): Unique identifier for this worker
"""
def __init__(
self,
kube_config: Configuration,
task_queue: Queue,
result_queue: Queue,
kube_client: ApiClient,
worker_uuid: str,
**kwargs
): ...
def run_pod_async(self, pod: V1Pod, **kwargs) -> None:
"""Run pod asynchronously."""
...
def delete_pod(self, pod_name: str, namespace: str) -> None:
"""Delete a pod."""
...
def patch_pod_executor_done(self, pod_name: str, namespace: str) -> None:
"""Mark pod as executor done."""
...
def _make_safe_pod_id(self, safe_dag_id: str, safe_task_id: str, safe_run_id: str) -> str:
"""Generate safe pod identifier."""
...Singleton class that manages Kubernetes resource versions for efficient watching.
class ResourceVersion(metaclass=Singleton):
"""
Manages Kubernetes resource versions.
Tracks resource versions for efficient watching of Kubernetes
resources, enabling incremental updates and reducing API load.
"""
def __init__(self): ...
def get_resource_version(self) -> str:
"""Get current resource version."""
...
def set_resource_version(self, version: str) -> None:
"""Set resource version."""
...Helper functions for executor operations and pod management.
def get_base_pod_from_template(
pod_template_file: str | None,
kube_client: ApiClient
) -> V1Pod:
"""
Get base pod from template.
Args:
pod_template_file: Path to pod template file
kube_client: Kubernetes API client
Returns:
V1Pod: Base pod specification
"""
...The Kubernetes Executor is configured through the airflow.cfg file under the [kubernetes_executor] section:
[kubernetes_executor]
# Kubernetes namespace for pods
namespace = default
# Pod template file (optional)
pod_template_file = /path/to/pod_template.yaml
# Worker container image
worker_container_repository = apache/airflow
worker_container_tag = 2.7.0
# Delete pods after completion
delete_worker_pods = True
delete_worker_pods_on_failure = False
# In-cluster configuration
in_cluster = True
# Cluster context (for out-of-cluster)
cluster_context = my-cluster
# Config file path (for out-of-cluster)
config_file = ~/.kube/config# Enable multi-namespace mode
multi_namespace_mode = True
# Specific namespaces (when not using cluster role)
multi_namespace_mode_namespace_list = namespace1,namespace2,namespace3# Number of pods to create per scheduler loop
worker_pods_creation_batch_size = 3
# Task publish retry configuration
task_publish_max_retries = 3
# API client retry configuration
api_client_retry_configuration = {"total": 3, "backoff_factor": 0.5}# Pod termination grace period
termination_grace_period_seconds = 30
# Delete options for pod cleanup
delete_option_kwargs = {"grace_period_seconds": 10}# airflow.cfg configuration for KubernetesExecutor
[core]
executor = KubernetesExecutor
[kubernetes_executor]
namespace = airflow
worker_container_repository = my-registry/airflow
worker_container_tag = 2.7.0-custom
delete_worker_pods = True
in_cluster = True# pod_template.yaml
apiVersion: v1
kind: Pod
metadata:
name: airflow-worker
namespace: airflow
spec:
serviceAccountName: airflow-worker
securityContext:
runAsUser: 50000
runAsGroup: 50000
fsGroup: 50000
containers:
- name: base
image: apache/airflow:2.7.0
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: connection-string
volumeMounts:
- name: airflow-dags
mountPath: /opt/airflow/dags
- name: airflow-logs
mountPath: /opt/airflow/logs
volumes:
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-dags-pvc
- name: airflow-logs
persistentVolumeClaim:
claimName: airflow-logs-pvc
restartPolicy: Never# airflow.cfg for LocalKubernetesExecutor
[core]
executor = LocalKubernetesExecutor
[local_kubernetes_executor]
kubernetes_queue = kubernetes
[kubernetes_executor]
namespace = airflow-k8s
worker_container_repository = my-registry/airflow
delete_worker_pods = True
[celery]
# Local executor falls back to SequentialExecutor for local tasks
# Configure if you want CeleryExecutor for local tasks insteadfrom airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def my_task():
print("Running on Kubernetes!")
return "Task completed"
dag = DAG(
'kubernetes_executor_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
)
# Task with custom executor config
k8s_task = PythonOperator(
task_id='kubernetes_task',
python_callable=my_task,
executor_config={
'KubernetesExecutor': {
'namespace': 'custom-namespace',
'image': 'custom-image:latest',
'request_memory': '1Gi',
'request_cpu': '500m',
'limit_memory': '2Gi',
'limit_cpu': '1000m',
'labels': {'team': 'data-engineering'},
'annotations': {'monitoring': 'enabled'},
'env_vars': [
{'name': 'CUSTOM_VAR', 'value': 'custom_value'}
],
'secrets': [
{
'deploy_type': 'env',
'deploy_target': 'DB_PASSWORD',
'secret': 'database-secret',
'key': 'password'
}
],
'volumes': [
{
'name': 'data-volume',
'persistentVolumeClaim': {
'claimName': 'data-pvc'
}
}
],
'volume_mounts': [
{
'name': 'data-volume',
'mountPath': '/data'
}
],
'node_selector': {'disktype': 'ssd'},
'affinity': {
'nodeAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution': {
'nodeSelectorTerms': [{
'matchExpressions': [{
'key': 'kubernetes.io/arch',
'operator': 'In',
'values': ['amd64']
}]
}]
}
}
},
'tolerations': [
{
'key': 'compute-type',
'operator': 'Equal',
'value': 'gpu',
'effect': 'NoSchedule'
}
]
}
},
dag=dag
)# Tasks for LocalKubernetesExecutor with queue routing
# This task runs locally (default queue)
local_task = PythonOperator(
task_id='local_task',
python_callable=lambda: print("Running locally!"),
dag=dag
)
# This task runs on Kubernetes (kubernetes queue)
k8s_task = PythonOperator(
task_id='k8s_task',
python_callable=lambda: print("Running on Kubernetes!"),
queue='kubernetes', # Routes to KubernetesExecutor
dag=dag
)
# Custom queue name (configure in airflow.cfg)
gpu_task = PythonOperator(
task_id='gpu_task',
python_callable=lambda: print("Running on GPU nodes!"),
queue='gpu-queue', # Custom queue for GPU nodes
executor_config={
'KubernetesExecutor': {
'namespace': 'gpu-namespace',
'node_selector': {'accelerator': 'nvidia-gpu'},
'tolerations': [
{
'key': 'nvidia.com/gpu',
'operator': 'Exists',
'effect': 'NoSchedule'
}
],
'resources': {
'limits': {
'nvidia.com/gpu': '1'
}
}
}
},
dag=dag
)# airflow.cfg for HA Kubernetes Executor setup
[kubernetes_executor]
namespace = airflow-prod
# Multi-namespace for isolation
multi_namespace_mode = True
multi_namespace_mode_namespace_list = airflow-prod,airflow-staging
# High throughput configuration
worker_pods_creation_batch_size = 10
task_publish_max_retries = 5
# Pod cleanup configuration
delete_worker_pods = True
delete_worker_pods_on_failure = False
# Fatal container states that should fail tasks immediately
worker_pod_pending_fatal_container_state_reasons = CreateContainerConfigError,ErrImagePull,CreateContainerError,ImageInspectError,InvalidImageName
# Networking configuration
enable_tcp_keepalive = True
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
# SSL configuration for production
verify_ssl = True
ssl_ca_cert = /etc/ssl/certs/ca-certificates.crt
# Resource management
delete_option_kwargs = {"grace_period_seconds": 30, "propagation_policy": "Foreground"}# Task with enhanced monitoring configuration
monitored_task = PythonOperator(
task_id='monitored_task',
python_callable=my_task,
executor_config={
'KubernetesExecutor': {
'labels': {
'app': 'airflow',
'component': 'worker',
'version': '2.7.0',
'team': 'data-platform'
},
'annotations': {
'prometheus.io/scrape': 'true',
'prometheus.io/port': '8080',
'logging.coreos.com/enabled': 'true'
}
}
},
dag=dag
)def cleanup_completed_pods(**context):
"""Clean up completed pods older than specified age."""
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
from datetime import datetime, timedelta
hook = KubernetesHook(conn_id='kubernetes_default')
client = hook.get_conn()
# Get all pods in airflow namespace
pods = client.list_namespaced_pod(
namespace='airflow',
label_selector='airflow-worker=true'
)
cutoff_time = datetime.utcnow() - timedelta(hours=24)
for pod in pods.items:
# Check if pod is completed and old
if (pod.status.phase in ['Succeeded', 'Failed'] and
pod.metadata.creation_timestamp < cutoff_time):
client.delete_namespaced_pod(
name=pod.metadata.name,
namespace='airflow',
grace_period_seconds=0
)
print(f"Deleted pod: {pod.metadata.name}")
cleanup_task = PythonOperator(
task_id='cleanup_old_pods',
python_callable=cleanup_completed_pods,
schedule_interval='@daily',
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes