or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

celery-executor.mdcelery-kubernetes-executor.mdcli-commands.mdconfiguration.mdindex.mdqueue-monitoring.md

index.mddocs/

0

# Apache Airflow Providers Celery

1

2

A provider package that enables distributed task execution using Celery as the task queue for Apache Airflow. This package provides Celery-based executors, monitoring sensors, and CLI commands for running Airflow tasks across multiple worker nodes with horizontal scalability and fault tolerance.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-celery

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-providers-celery`

9

10

## Core Imports

11

12

```python

13

from airflow.providers.celery.executors.celery_executor import CeleryExecutor

14

from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor

15

from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor

16

```

17

18

## Basic Usage

19

20

```python

21

# Using CeleryExecutor in airflow.cfg

22

# [core]

23

# executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor

24

25

# Using CeleryQueueSensor in DAGs

26

from airflow import DAG

27

from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor

28

from datetime import datetime, timedelta

29

30

dag = DAG(

31

'celery_monitoring',

32

default_args={'start_date': datetime(2024, 1, 1)},

33

schedule_interval=timedelta(hours=1),

34

catchup=False

35

)

36

37

# Wait for a specific Celery queue to be empty

38

queue_sensor = CeleryQueueSensor(

39

task_id='wait_for_queue_empty',

40

celery_queue='high_priority',

41

timeout=300,

42

poke_interval=30,

43

dag=dag

44

)

45

46

# CLI usage for starting workers and monitoring

47

# airflow celery worker --concurrency 16

48

# airflow celery flower --port 5555

49

```

50

51

## Architecture

52

53

The package follows Airflow's executor pattern and Celery's distributed task queue architecture:

54

55

- **Executors**: Implement Airflow's BaseExecutor interface to distribute tasks via Celery

56

- **Task Serialization**: Airflow tasks are serialized and sent to Celery workers as jobs

57

- **Result Backend**: Task states and results are stored in a shared backend (database/Redis)

58

- **Broker**: Message queue (Redis/RabbitMQ) handles task distribution between scheduler and workers

59

- **Workers**: Celery worker processes execute Airflow tasks on distributed nodes

60

- **Monitoring**: Flower provides web-based monitoring and administration of Celery clusters

61

62

## Capabilities

63

64

### Celery Executor

65

66

Primary executor for distributed task execution using Celery. Routes Airflow tasks to Celery workers running across multiple machines, providing horizontal scalability and fault tolerance.

67

68

```python { .api }

69

class CeleryExecutor(BaseExecutor):

70

def start(self): ...

71

def queue_workload(self, workload: workloads.All, session: Session | None = None): ...

72

def sync(self): ...

73

def end(self, synchronous: bool = False): ...

74

def terminate(self): ...

75

```

76

77

[Celery Executor](./celery-executor.md)

78

79

### Celery Kubernetes Executor

80

81

Hybrid executor that routes tasks between CeleryExecutor and KubernetesExecutor based on queue names. Tasks in the 'kubernetes' queue run via KubernetesExecutor, others via CeleryExecutor.

82

83

```python { .api }

84

class CeleryKubernetesExecutor(BaseExecutor):

85

def start(self): ...

86

def queue_command(self, task_instance: TaskInstance, command: CommandType, priority: int = 1, queue: str | None = None): ...

87

def queue_task_instance(self, task_instance: TaskInstance, **kwargs): ...

88

def sync(self): ...

89

def end(self): ...

90

```

91

92

[Celery Kubernetes Executor](./celery-kubernetes-executor.md)

93

94

### Queue Monitoring

95

96

Sensor for monitoring Celery queue states, waiting for queues to be empty or checking specific task states.

97

98

```python { .api }

99

class CeleryQueueSensor(BaseSensorOperator):

100

def __init__(self, *, celery_queue: str, target_task_id: str | None = None, **kwargs): ...

101

def poke(self, context: Context) -> bool: ...

102

```

103

104

[Queue Monitoring](./queue-monitoring.md)

105

106

### CLI Commands

107

108

Command-line tools for managing Celery workers, monitoring with Flower, and queue operations.

109

110

```python { .api }

111

def worker(args): ...

112

def flower(args): ...

113

def stop_worker(args): ...

114

def list_workers(args): ...

115

def shutdown_worker(args): ...

116

def shutdown_all_workers(args): ...

117

def add_queue(args): ...

118

def remove_queue(args): ...

119

```

120

121

[CLI Commands](./cli-commands.md)

122

123

## Configuration

124

125

The package provides extensive configuration options through Airflow's configuration system:

126

127

```python

128

# Key configuration sections:

129

# [celery] - Main Celery executor settings

130

# [celery_kubernetes_executor] - Hybrid executor settings

131

# [celery_broker_transport_options] - Broker transport configuration

132

```

133

134

[Configuration](./configuration.md)

135

136

## Types

137

138

```python { .api }

139

from typing import Any, Dict, List, Optional, Union

140

from airflow.models.taskinstance import TaskInstance

141

from airflow.models.taskinstancekey import TaskInstanceKey

142

from airflow.executors.base_executor import BaseExecutor, CommandType

143

144

# Celery-specific types

145

TaskTuple = tuple[TaskInstanceKey, CommandType, str, Any]

146

TaskInstanceInCelery = tuple[TaskInstance, CommandType]

147

```