or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

celery-executor.mdcelery-kubernetes-executor.mdcli-commands.mdconfiguration.mdindex.mdqueue-monitoring.md
tile.json

tessl/pypi-apache-airflow-providers-celery

Provider package that enables distributed task execution using Celery as the task queue for Apache Airflow

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-celery@3.12.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-celery@3.12.0

index.mddocs/

Apache Airflow Providers Celery

A provider package that enables distributed task execution using Celery as the task queue for Apache Airflow. This package provides Celery-based executors, monitoring sensors, and CLI commands for running Airflow tasks across multiple worker nodes with horizontal scalability and fault tolerance.

Package Information

  • Package Name: apache-airflow-providers-celery
  • Language: Python
  • Installation: pip install apache-airflow-providers-celery

Core Imports

from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor

Basic Usage

# Using CeleryExecutor in airflow.cfg
# [core]
# executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor

# Using CeleryQueueSensor in DAGs
from airflow import DAG
from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor
from datetime import datetime, timedelta

dag = DAG(
    'celery_monitoring',
    default_args={'start_date': datetime(2024, 1, 1)},
    schedule_interval=timedelta(hours=1),
    catchup=False
)

# Wait for a specific Celery queue to be empty
queue_sensor = CeleryQueueSensor(
    task_id='wait_for_queue_empty',
    celery_queue='high_priority',
    timeout=300,
    poke_interval=30,
    dag=dag
)

# CLI usage for starting workers and monitoring
# airflow celery worker --concurrency 16
# airflow celery flower --port 5555

Architecture

The package follows Airflow's executor pattern and Celery's distributed task queue architecture:

  • Executors: Implement Airflow's BaseExecutor interface to distribute tasks via Celery
  • Task Serialization: Airflow tasks are serialized and sent to Celery workers as jobs
  • Result Backend: Task states and results are stored in a shared backend (database/Redis)
  • Broker: Message queue (Redis/RabbitMQ) handles task distribution between scheduler and workers
  • Workers: Celery worker processes execute Airflow tasks on distributed nodes
  • Monitoring: Flower provides web-based monitoring and administration of Celery clusters

Capabilities

Celery Executor

Primary executor for distributed task execution using Celery. Routes Airflow tasks to Celery workers running across multiple machines, providing horizontal scalability and fault tolerance.

class CeleryExecutor(BaseExecutor):
    def start(self): ...
    def queue_workload(self, workload: workloads.All, session: Session | None = None): ...
    def sync(self): ...
    def end(self, synchronous: bool = False): ...
    def terminate(self): ...

Celery Executor

Celery Kubernetes Executor

Hybrid executor that routes tasks between CeleryExecutor and KubernetesExecutor based on queue names. Tasks in the 'kubernetes' queue run via KubernetesExecutor, others via CeleryExecutor.

class CeleryKubernetesExecutor(BaseExecutor):
    def start(self): ...
    def queue_command(self, task_instance: TaskInstance, command: CommandType, priority: int = 1, queue: str | None = None): ...
    def queue_task_instance(self, task_instance: TaskInstance, **kwargs): ...
    def sync(self): ...
    def end(self): ...

Celery Kubernetes Executor

Queue Monitoring

Sensor for monitoring Celery queue states, waiting for queues to be empty or checking specific task states.

class CeleryQueueSensor(BaseSensorOperator):
    def __init__(self, *, celery_queue: str, target_task_id: str | None = None, **kwargs): ...
    def poke(self, context: Context) -> bool: ...

Queue Monitoring

CLI Commands

Command-line tools for managing Celery workers, monitoring with Flower, and queue operations.

def worker(args): ...
def flower(args): ...
def stop_worker(args): ...
def list_workers(args): ...
def shutdown_worker(args): ...
def shutdown_all_workers(args): ...
def add_queue(args): ...
def remove_queue(args): ...

CLI Commands

Configuration

The package provides extensive configuration options through Airflow's configuration system:

# Key configuration sections:
# [celery] - Main Celery executor settings
# [celery_kubernetes_executor] - Hybrid executor settings  
# [celery_broker_transport_options] - Broker transport configuration

Configuration

Types

from typing import Any, Dict, List, Optional, Union
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.executors.base_executor import BaseExecutor, CommandType

# Celery-specific types
TaskTuple = tuple[TaskInstanceKey, CommandType, str, Any]
TaskInstanceInCelery = tuple[TaskInstance, CommandType]