CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-celery

Provider package that enables distributed task execution using Celery as the task queue for Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

celery-kubernetes-executor.mddocs/

Celery Kubernetes Executor

The CeleryKubernetesExecutor is a hybrid executor that intelligently routes tasks between CeleryExecutor and KubernetesExecutor based on queue names. This allows for flexible task execution strategies within a single Airflow deployment.

Capabilities

CeleryKubernetesExecutor Class

Hybrid executor class that combines Celery and Kubernetes execution strategies.

class CeleryKubernetesExecutor(BaseExecutor):
    """
    Hybrid executor routing tasks between Celery and Kubernetes executors.
    
    Tasks assigned to the 'kubernetes' queue (configurable) are executed
    via KubernetesExecutor, while all other tasks use CeleryExecutor.
    """
    
    # Class attributes
    supports_ad_hoc_ti_run: bool = True
    supports_pickling: bool = True
    supports_sentry: bool = False
    is_local: bool = False
    is_single_threaded: bool = False
    is_production: bool = True
    serve_logs: bool = False
    callback_sink: BaseCallbackSink | None = None
    
    def __init__(self, celery_executor: CeleryExecutor | None = None, kubernetes_executor: KubernetesExecutor | None = None):
        """
        Initialize the hybrid executor.
        
        Parameters:
        - celery_executor: CeleryExecutor | None, optional Celery executor instance
        - kubernetes_executor: KubernetesExecutor | None, optional Kubernetes executor instance
        """
    
    @property
    def kubernetes_queue(self) -> str:
        """
        Get the queue name that routes tasks to KubernetesExecutor.
        
        Returns:
        str: Queue name (default: 'kubernetes')
        """
    
    @property
    def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
        """
        Get combined queued tasks from both executors.
        
        Returns:
        dict[TaskInstanceKey, Any]: all queued tasks
        """
    
    @property
    def running(self) -> set[TaskInstanceKey]:
        """
        Get combined running tasks from both executors.
        
        Returns:
        set[TaskInstanceKey]: all running task keys
        """
    
    @property
    def job_id(self) -> int | str | None:
        """
        Get the job ID for this executor instance.
        
        Returns:
        int | str | None: executor job identifier
        """
    
    @property
    def slots_available(self) -> int:
        """
        Get total available slots across both executors.
        
        Returns:
        int: number of available execution slots
        """
    
    @property
    def slots_occupied(self) -> int:
        """
        Get total occupied slots across both executors.
        
        Returns:
        int: number of occupied execution slots
        """
    
    def start(self) -> None:
        """
        Start both underlying executors.
        
        Initializes and starts both CeleryExecutor and KubernetesExecutor
        instances for handling different types of tasks.
        """
    
    def queue_command(self, task_instance: TaskInstance, command: CommandType,
                     priority: int = 1, queue: str | None = None) -> None:
        """
        Route task to appropriate executor based on queue name.
        
        Parameters:
        - task_instance: TaskInstance to execute
        - command: CommandType containing the task execution command  
        - priority: int, task priority level
        - queue: str | None, queue name determining execution strategy
        
        Routing Logic:
        - If queue == kubernetes_queue (default: 'kubernetes') -> KubernetesExecutor
        - All other queues -> CeleryExecutor
        """
    
    def queue_task_instance(self, task_instance: TaskInstance, **kwargs) -> None:
        """
        Queue a task instance to appropriate executor based on routing logic.
        
        Parameters:
        - task_instance: TaskInstance to queue
        - **kwargs: Additional arguments passed to underlying executor
        """
    
    def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
        """
        Retrieve task logs from the appropriate executor.
        
        Parameters:
        - ti: TaskInstance to get logs for
        - try_number: int, task attempt number
        
        Returns:
        tuple[list[str], list[str]]: (stdout_lines, stderr_lines)
        """
    
    def has_task(self, task_instance: TaskInstance) -> bool:
        """
        Check if a task instance is tracked by either executor.
        
        Parameters:
        - task_instance: TaskInstance to check
        
        Returns:
        bool: True if task is tracked by either executor
        """
    
    def heartbeat(self) -> None:
        """
        Perform heartbeat operations on both executors.
        
        Calls heartbeat() on both underlying executors to maintain
        health and perform periodic maintenance.
        """
    
    def get_event_buffer(self, dag_ids: list[str] | None = None) -> dict[TaskInstanceKey, EventBufferValueType]:
        """
        Get combined event buffer from both executors.
        
        Parameters:
        - dag_ids: list[str] | None, optional DAG IDs to filter events
        
        Returns:
        dict[TaskInstanceKey, EventBufferValueType]: combined event buffer
        """
    
    def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
        """
        Attempt to adopt orphaned task instances across both executors.
        
        Parameters:
        - tis: Sequence[TaskInstance], potential orphaned tasks
        
        Returns:
        Sequence[TaskInstance]: task instances that could not be adopted
        """
    
    def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
        """
        Clean up stuck queued tasks on both executors (deprecated).
        
        Parameters:
        - tis: list[TaskInstance], tasks to clean up
        
        Returns:
        list[str]: task instance keys that were cleaned up
        """
    
    def revoke_task(self, *, ti: TaskInstance) -> None:
        """
        Revoke a running task on the appropriate executor.
        
        Parameters:
        - ti: TaskInstance, task instance to revoke
        
        Routes the revocation to the correct underlying executor.
        """
    
    def end(self) -> None:
        """
        Gracefully shutdown both executors.
        
        Performs clean shutdown of both CeleryExecutor and 
        KubernetesExecutor instances.
        """
    
    def terminate(self) -> None:
        """
        Force termination of tasks on both executors.
        
        Immediately terminates tasks running on both Celery workers
        and Kubernetes pods.
        """
    
    def debug_dump(self) -> None:
        """
        Print debug information from both executors.
        
        Calls debug_dump() on both underlying executors to provide
        comprehensive debugging information.
        """
    
    def send_callback(self, request: CallbackRequest) -> None:
        """
        Send callback request to appropriate executor.
        
        Parameters:
        - request: CallbackRequest, callback to send
        
        Routes callback to the executor that handled the original task.
        """
    
    @staticmethod
    def get_cli_commands() -> list:
        """
        Get CLI commands provided by this executor.
        
        Returns:
        list: list of CLI command groups
        """

Queue Routing Logic

The executor determines execution strategy based on queue names:

# Queue routing is handled internally by the executor
# based on the kubernetes_queue configuration property

Configuration

Configuration for the hybrid executor routing behavior:

# Configuration section: [celery_kubernetes_executor]

KUBERNETES_QUEUE = "kubernetes"  # Queue name for Kubernetes execution

# Both underlying executors use their respective configuration sections:
# [celery] - for CeleryExecutor settings
# [kubernetes_executor] - for KubernetesExecutor settings

Usage Examples

Basic Configuration

# In airflow.cfg:
[core]
executor = airflow.providers.celery.executors.celery_kubernetes_executor.CeleryKubernetesExecutor

[celery_kubernetes_executor]
kubernetes_queue = kubernetes

[celery]
# Standard Celery configuration
broker_url = redis://redis:6379/0
result_backend = db+postgresql://postgres:airflow@postgres/airflow
worker_concurrency = 16

[kubernetes_executor]
# Standard Kubernetes configuration
namespace = airflow
worker_container_repository = airflow-workers
worker_container_tag = latest

Task Queue Assignment

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

dag = DAG('hybrid_execution', schedule_interval=None)

# This task runs on Kubernetes (ephemeral pod)
k8s_task = PythonOperator(
    task_id='kubernetes_task',
    python_callable=lambda: print("Running on Kubernetes"),
    queue='kubernetes',  # Routes to KubernetesExecutor
    dag=dag
)

# This task runs on Celery workers (persistent workers)  
celery_task = BashOperator(
    task_id='celery_task',
    bash_command='echo "Running on Celery"',
    queue='default',  # Routes to CeleryExecutor
    dag=dag
)

# Custom queue also routes to Celery
high_memory_task = PythonOperator(
    task_id='high_memory_task',
    python_callable=lambda: print("High memory processing"),
    queue='high_memory',  # Routes to CeleryExecutor
    dag=dag
)

k8s_task >> [celery_task, high_memory_task]

Programmatic Usage

from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.models.taskinstancekey import TaskInstanceKey

# Initialize hybrid executor
executor = CeleryKubernetesExecutor()
executor.start()

# Task routed to Kubernetes
k8s_key = TaskInstanceKey(dag_id="my_dag", task_id="k8s_task", 
                         run_id="manual_run", try_number=1)
k8s_command = ["python", "-c", "print('K8s task')"]

executor.execute_async(key=k8s_key, command=k8s_command, queue="kubernetes")

# Task routed to Celery
celery_key = TaskInstanceKey(dag_id="my_dag", task_id="celery_task",
                           run_id="manual_run", try_number=1) 
celery_command = ["python", "-c", "print('Celery task')"]

executor.execute_async(key=celery_key, command=celery_command, queue="default")

# Sync states from both executors
executor.sync()

# Cleanup
executor.end()

Custom Queue Configuration

# Custom kubernetes queue name
# In airflow.cfg:
[celery_kubernetes_executor]
kubernetes_queue = special_k8s_queue

# In DAG:
special_k8s_task = PythonOperator(
    task_id='special_kubernetes_task',
    python_callable=my_function,
    queue='special_k8s_queue',  # Routes to KubernetesExecutor
    dag=dag
)

Use Cases

Resource-Based Task Routing

# Route tasks based on resource requirements

# CPU-intensive tasks on Kubernetes (auto-scaling)
cpu_intensive_task = PythonOperator(
    task_id='ml_training',
    python_callable=train_model,
    queue='kubernetes',
    executor_config={
        'pod_override': {
            'spec': {
                'containers': [{
                    'name': 'base',
                    'resources': {
                        'requests': {'cpu': '4', 'memory': '8Gi'},
                        'limits': {'cpu': '8', 'memory': '16Gi'}
                    }
                }]
            }
        }
    },
    dag=dag
)

# I/O intensive tasks on persistent Celery workers
io_intensive_task = PythonOperator(
    task_id='data_processing',
    python_callable=process_large_files,
    queue='io_intensive',  # Routes to specialized Celery workers
    dag=dag
)

Environment Isolation

# Isolate tasks with different dependency requirements

# Task requiring special libraries (isolated K8s pod)
special_deps_task = BashOperator(
    task_id='special_processing',
    bash_command='python special_script.py',
    queue='kubernetes',
    executor_config={
        'pod_override': {
            'spec': {
                'containers': [{
                    'name': 'base',
                    'image': 'my-special-image:latest'
                }]
            }
        }
    },
    dag=dag
)

# Standard task on regular Celery workers
standard_task = PythonOperator(
    task_id='standard_processing',
    python_callable=standard_function,
    queue='default',
    dag=dag
)

Monitoring and Troubleshooting

Task Tracking

# Tasks are tracked separately by each underlying executor
# Use Airflow UI to see which executor handled each task

# Celery tasks show up in Flower monitoring
# Kubernetes tasks show up in K8s dashboard/kubectl

# Log aggregation from both execution environments
def get_task_logs(dag_id: str, task_id: str, execution_date: str, try_number: int):
    """
    Retrieve logs from either Celery or Kubernetes based on task queue.
    
    The hybrid executor automatically determines the correct source
    for log retrieval based on where the task was executed.
    """

Performance Considerations

# Balance between Celery and Kubernetes based on:

# Celery advantages:
# - Persistent workers (faster task startup)
# - Better for high-frequency, short-duration tasks
# - Shared state and caching between tasks
# - Lower resource overhead

# Kubernetes advantages:  
# - Resource isolation per task
# - Auto-scaling based on workload
# - Better for resource-intensive tasks
# - Clean environment per execution
# - Support for different container images per task

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-celery

docs

celery-executor.md

celery-kubernetes-executor.md

cli-commands.md

configuration.md

index.md

queue-monitoring.md

tile.json