Provider package that enables distributed task execution using Celery as the task queue for Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The CeleryKubernetesExecutor is a hybrid executor that intelligently routes tasks between CeleryExecutor and KubernetesExecutor based on queue names. This allows for flexible task execution strategies within a single Airflow deployment.
Hybrid executor class that combines Celery and Kubernetes execution strategies.
class CeleryKubernetesExecutor(BaseExecutor):
"""
Hybrid executor routing tasks between Celery and Kubernetes executors.
Tasks assigned to the 'kubernetes' queue (configurable) are executed
via KubernetesExecutor, while all other tasks use CeleryExecutor.
"""
# Class attributes
supports_ad_hoc_ti_run: bool = True
supports_pickling: bool = True
supports_sentry: bool = False
is_local: bool = False
is_single_threaded: bool = False
is_production: bool = True
serve_logs: bool = False
callback_sink: BaseCallbackSink | None = None
def __init__(self, celery_executor: CeleryExecutor | None = None, kubernetes_executor: KubernetesExecutor | None = None):
"""
Initialize the hybrid executor.
Parameters:
- celery_executor: CeleryExecutor | None, optional Celery executor instance
- kubernetes_executor: KubernetesExecutor | None, optional Kubernetes executor instance
"""
@property
def kubernetes_queue(self) -> str:
"""
Get the queue name that routes tasks to KubernetesExecutor.
Returns:
str: Queue name (default: 'kubernetes')
"""
@property
def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
"""
Get combined queued tasks from both executors.
Returns:
dict[TaskInstanceKey, Any]: all queued tasks
"""
@property
def running(self) -> set[TaskInstanceKey]:
"""
Get combined running tasks from both executors.
Returns:
set[TaskInstanceKey]: all running task keys
"""
@property
def job_id(self) -> int | str | None:
"""
Get the job ID for this executor instance.
Returns:
int | str | None: executor job identifier
"""
@property
def slots_available(self) -> int:
"""
Get total available slots across both executors.
Returns:
int: number of available execution slots
"""
@property
def slots_occupied(self) -> int:
"""
Get total occupied slots across both executors.
Returns:
int: number of occupied execution slots
"""
def start(self) -> None:
"""
Start both underlying executors.
Initializes and starts both CeleryExecutor and KubernetesExecutor
instances for handling different types of tasks.
"""
def queue_command(self, task_instance: TaskInstance, command: CommandType,
priority: int = 1, queue: str | None = None) -> None:
"""
Route task to appropriate executor based on queue name.
Parameters:
- task_instance: TaskInstance to execute
- command: CommandType containing the task execution command
- priority: int, task priority level
- queue: str | None, queue name determining execution strategy
Routing Logic:
- If queue == kubernetes_queue (default: 'kubernetes') -> KubernetesExecutor
- All other queues -> CeleryExecutor
"""
def queue_task_instance(self, task_instance: TaskInstance, **kwargs) -> None:
"""
Queue a task instance to appropriate executor based on routing logic.
Parameters:
- task_instance: TaskInstance to queue
- **kwargs: Additional arguments passed to underlying executor
"""
def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
"""
Retrieve task logs from the appropriate executor.
Parameters:
- ti: TaskInstance to get logs for
- try_number: int, task attempt number
Returns:
tuple[list[str], list[str]]: (stdout_lines, stderr_lines)
"""
def has_task(self, task_instance: TaskInstance) -> bool:
"""
Check if a task instance is tracked by either executor.
Parameters:
- task_instance: TaskInstance to check
Returns:
bool: True if task is tracked by either executor
"""
def heartbeat(self) -> None:
"""
Perform heartbeat operations on both executors.
Calls heartbeat() on both underlying executors to maintain
health and perform periodic maintenance.
"""
def get_event_buffer(self, dag_ids: list[str] | None = None) -> dict[TaskInstanceKey, EventBufferValueType]:
"""
Get combined event buffer from both executors.
Parameters:
- dag_ids: list[str] | None, optional DAG IDs to filter events
Returns:
dict[TaskInstanceKey, EventBufferValueType]: combined event buffer
"""
def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
"""
Attempt to adopt orphaned task instances across both executors.
Parameters:
- tis: Sequence[TaskInstance], potential orphaned tasks
Returns:
Sequence[TaskInstance]: task instances that could not be adopted
"""
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
Clean up stuck queued tasks on both executors (deprecated).
Parameters:
- tis: list[TaskInstance], tasks to clean up
Returns:
list[str]: task instance keys that were cleaned up
"""
def revoke_task(self, *, ti: TaskInstance) -> None:
"""
Revoke a running task on the appropriate executor.
Parameters:
- ti: TaskInstance, task instance to revoke
Routes the revocation to the correct underlying executor.
"""
def end(self) -> None:
"""
Gracefully shutdown both executors.
Performs clean shutdown of both CeleryExecutor and
KubernetesExecutor instances.
"""
def terminate(self) -> None:
"""
Force termination of tasks on both executors.
Immediately terminates tasks running on both Celery workers
and Kubernetes pods.
"""
def debug_dump(self) -> None:
"""
Print debug information from both executors.
Calls debug_dump() on both underlying executors to provide
comprehensive debugging information.
"""
def send_callback(self, request: CallbackRequest) -> None:
"""
Send callback request to appropriate executor.
Parameters:
- request: CallbackRequest, callback to send
Routes callback to the executor that handled the original task.
"""
@staticmethod
def get_cli_commands() -> list:
"""
Get CLI commands provided by this executor.
Returns:
list: list of CLI command groups
"""The executor determines execution strategy based on queue names:
# Queue routing is handled internally by the executor
# based on the kubernetes_queue configuration propertyConfiguration for the hybrid executor routing behavior:
# Configuration section: [celery_kubernetes_executor]
KUBERNETES_QUEUE = "kubernetes" # Queue name for Kubernetes execution
# Both underlying executors use their respective configuration sections:
# [celery] - for CeleryExecutor settings
# [kubernetes_executor] - for KubernetesExecutor settings# In airflow.cfg:
[core]
executor = airflow.providers.celery.executors.celery_kubernetes_executor.CeleryKubernetesExecutor
[celery_kubernetes_executor]
kubernetes_queue = kubernetes
[celery]
# Standard Celery configuration
broker_url = redis://redis:6379/0
result_backend = db+postgresql://postgres:airflow@postgres/airflow
worker_concurrency = 16
[kubernetes_executor]
# Standard Kubernetes configuration
namespace = airflow
worker_container_repository = airflow-workers
worker_container_tag = latestfrom airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
dag = DAG('hybrid_execution', schedule_interval=None)
# This task runs on Kubernetes (ephemeral pod)
k8s_task = PythonOperator(
task_id='kubernetes_task',
python_callable=lambda: print("Running on Kubernetes"),
queue='kubernetes', # Routes to KubernetesExecutor
dag=dag
)
# This task runs on Celery workers (persistent workers)
celery_task = BashOperator(
task_id='celery_task',
bash_command='echo "Running on Celery"',
queue='default', # Routes to CeleryExecutor
dag=dag
)
# Custom queue also routes to Celery
high_memory_task = PythonOperator(
task_id='high_memory_task',
python_callable=lambda: print("High memory processing"),
queue='high_memory', # Routes to CeleryExecutor
dag=dag
)
k8s_task >> [celery_task, high_memory_task]from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.models.taskinstancekey import TaskInstanceKey
# Initialize hybrid executor
executor = CeleryKubernetesExecutor()
executor.start()
# Task routed to Kubernetes
k8s_key = TaskInstanceKey(dag_id="my_dag", task_id="k8s_task",
run_id="manual_run", try_number=1)
k8s_command = ["python", "-c", "print('K8s task')"]
executor.execute_async(key=k8s_key, command=k8s_command, queue="kubernetes")
# Task routed to Celery
celery_key = TaskInstanceKey(dag_id="my_dag", task_id="celery_task",
run_id="manual_run", try_number=1)
celery_command = ["python", "-c", "print('Celery task')"]
executor.execute_async(key=celery_key, command=celery_command, queue="default")
# Sync states from both executors
executor.sync()
# Cleanup
executor.end()# Custom kubernetes queue name
# In airflow.cfg:
[celery_kubernetes_executor]
kubernetes_queue = special_k8s_queue
# In DAG:
special_k8s_task = PythonOperator(
task_id='special_kubernetes_task',
python_callable=my_function,
queue='special_k8s_queue', # Routes to KubernetesExecutor
dag=dag
)# Route tasks based on resource requirements
# CPU-intensive tasks on Kubernetes (auto-scaling)
cpu_intensive_task = PythonOperator(
task_id='ml_training',
python_callable=train_model,
queue='kubernetes',
executor_config={
'pod_override': {
'spec': {
'containers': [{
'name': 'base',
'resources': {
'requests': {'cpu': '4', 'memory': '8Gi'},
'limits': {'cpu': '8', 'memory': '16Gi'}
}
}]
}
}
},
dag=dag
)
# I/O intensive tasks on persistent Celery workers
io_intensive_task = PythonOperator(
task_id='data_processing',
python_callable=process_large_files,
queue='io_intensive', # Routes to specialized Celery workers
dag=dag
)# Isolate tasks with different dependency requirements
# Task requiring special libraries (isolated K8s pod)
special_deps_task = BashOperator(
task_id='special_processing',
bash_command='python special_script.py',
queue='kubernetes',
executor_config={
'pod_override': {
'spec': {
'containers': [{
'name': 'base',
'image': 'my-special-image:latest'
}]
}
}
},
dag=dag
)
# Standard task on regular Celery workers
standard_task = PythonOperator(
task_id='standard_processing',
python_callable=standard_function,
queue='default',
dag=dag
)# Tasks are tracked separately by each underlying executor
# Use Airflow UI to see which executor handled each task
# Celery tasks show up in Flower monitoring
# Kubernetes tasks show up in K8s dashboard/kubectl
# Log aggregation from both execution environments
def get_task_logs(dag_id: str, task_id: str, execution_date: str, try_number: int):
"""
Retrieve logs from either Celery or Kubernetes based on task queue.
The hybrid executor automatically determines the correct source
for log retrieval based on where the task was executed.
"""# Balance between Celery and Kubernetes based on:
# Celery advantages:
# - Persistent workers (faster task startup)
# - Better for high-frequency, short-duration tasks
# - Shared state and caching between tasks
# - Lower resource overhead
# Kubernetes advantages:
# - Resource isolation per task
# - Auto-scaling based on workload
# - Better for resource-intensive tasks
# - Clean environment per execution
# - Support for different container images per taskInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-celery