0
# CLI Commands
1
2
The Celery provider includes comprehensive command-line tools for managing Celery workers, monitoring with Flower, and performing queue operations. These commands extend Airflow's CLI with Celery-specific functionality.
3
4
## Capabilities
5
6
### Worker Management Commands
7
8
Commands for starting, stopping, and managing Celery workers.
9
10
```python { .api }
11
def worker(args):
12
"""
13
Start a Celery worker process.
14
15
Usage: airflow celery worker [options]
16
17
Parameters (via args object):
18
- concurrency: int, number of concurrent worker processes
19
- hostname: str, worker hostname identifier
20
- queues: list[str], specific queues for this worker to consume
21
- loglevel: str, logging level (DEBUG, INFO, WARNING, ERROR)
22
- autoscale: str, autoscaling configuration (max,min)
23
- without_gossip: bool, disable gossip for worker discovery
24
- without_mingle: bool, disable startup synchronization
25
- without_heartbeat: bool, disable worker heartbeat
26
- daemon: bool, run as daemon process
27
- pidfile: str, path to PID file for daemon mode
28
- logfile: str, path to log file
29
30
Environment Setup:
31
- Configures worker process name and logging
32
- Sets up signal handlers for graceful shutdown
33
- Initializes Celery app with Airflow configuration
34
"""
35
36
def stop_worker(args):
37
"""
38
Stop a running Celery worker by PID (deprecated - use shutdown instead).
39
40
Usage: airflow celery stop [options]
41
42
Parameters (via args object):
43
- pid: str, path to PID file of worker to stop
44
- verbose: bool, enable verbose output
45
46
Reads PID from file and sends termination signal to worker process.
47
"""
48
49
def list_workers(args):
50
"""
51
List all active Celery workers and their status.
52
53
Usage: airflow celery list-workers [options]
54
55
Parameters (via args object):
56
- output: str, output format ('table', 'json', 'yaml', 'plain')
57
58
Displays:
59
- Worker hostnames and online status
60
- Active task counts per worker
61
- Queues each worker is consuming from
62
- Load averages and resource usage
63
"""
64
65
def shutdown_worker(args):
66
"""
67
Gracefully shutdown a specific Celery worker.
68
69
Usage: airflow celery shutdown-worker [options]
70
71
Parameters (via args object):
72
- celery_hostname: str, worker hostname to shutdown (required)
73
74
Sends shutdown command and waits for worker to finish current tasks.
75
"""
76
77
def shutdown_all_workers(args):
78
"""
79
Shutdown all active Celery workers.
80
81
Usage: airflow celery shutdown-all-workers [options]
82
83
Parameters (via args object):
84
- yes: bool, skip confirmation prompt
85
86
Sends shutdown commands to all discovered workers and monitors
87
their termination status.
88
"""
89
```
90
91
### Queue Management Commands
92
93
Commands for managing Celery task queues and worker queue assignments.
94
95
```python { .api }
96
def add_queue(args):
97
"""
98
Add a queue to worker's consumption list.
99
100
Usage: airflow celery add-queue [options]
101
102
Parameters (via args object):
103
- celery_hostname: str, target worker hostname (required)
104
- queues: list[str], queues to add to worker (required)
105
106
Dynamically adds queue to running worker without restart.
107
Worker will begin consuming tasks from the new queue.
108
"""
109
110
def remove_queue(args):
111
"""
112
Remove a queue from worker's consumption list.
113
114
Usage: airflow celery remove-queue [options]
115
116
Parameters (via args object):
117
- celery_hostname: str, target worker hostname (required)
118
- queues: list[str], queues to remove from worker (required)
119
120
Stops worker from consuming new tasks from specified queue.
121
Currently processing tasks from that queue will complete normally.
122
"""
123
```
124
125
### Monitoring Commands
126
127
Commands for monitoring and administration of Celery clusters.
128
129
```python { .api }
130
def flower(args):
131
"""
132
Start Flower web-based monitoring and administration tool.
133
134
Usage: airflow celery flower [options]
135
136
Parameters (via args object):
137
- hostname: str, interface to bind (default: 0.0.0.0)
138
- port: int, port number (default: 5555)
139
- broker_api: str, broker API URL for advanced monitoring
140
- url_prefix: str, URL prefix for reverse proxy setups
141
- basic_auth: str, HTTP basic authentication (user:pass,user2:pass2)
142
- flower_conf: str, path to Flower configuration file
143
- daemon: bool, run as daemon process
144
- pidfile: str, PID file path for daemon mode
145
- logfile: str, log file path
146
147
Features:
148
- Real-time task monitoring and statistics
149
- Worker management and control
150
- Queue monitoring and manipulation
151
- Task routing and execution control
152
- Historical task execution data
153
"""
154
```
155
156
### Utility Functions
157
158
Supporting functions used by CLI commands.
159
160
```python { .api }
161
def _check_if_active_celery_worker(hostname: str) -> bool:
162
"""
163
Check if a Celery worker with given hostname is active.
164
165
Parameters:
166
- hostname: str, worker hostname to check
167
168
Returns:
169
bool: True if worker is active and responding
170
"""
171
172
def _serve_logs(skip_serve_logs: bool = False) -> None:
173
"""
174
Start log serving process for worker logs.
175
176
Parameters:
177
- skip_serve_logs: bool, whether to skip log serving setup
178
179
Sets up log file serving for accessing worker logs via web interface.
180
"""
181
182
def logger_setup_handler(logger, **kwargs) -> None:
183
"""
184
Configure logging for Celery worker processes.
185
186
Parameters:
187
- logger: Logger instance to configure
188
- **kwargs: Additional logging configuration parameters
189
190
Sets up appropriate log formatting, handlers, and levels for
191
Celery worker processes.
192
"""
193
```
194
195
## Usage Examples
196
197
### Starting Workers
198
199
```bash
200
# Start basic worker with default settings
201
airflow celery worker
202
203
# Start worker with specific concurrency
204
airflow celery worker --concurrency 8
205
206
# Start worker consuming from specific queues
207
airflow celery worker --queues high_priority,default
208
209
# Start worker with autoscaling
210
airflow celery worker --autoscale 16,4
211
212
# Start worker as daemon
213
airflow celery worker --daemon --pidfile /var/run/celery-worker.pid --logfile /var/log/celery-worker.log
214
215
# Start worker with custom hostname
216
airflow celery worker --hostname worker-gpu-01
217
218
# Start worker with debug logging
219
airflow celery worker --loglevel DEBUG
220
```
221
222
### Worker Management
223
224
```bash
225
# List all active workers
226
airflow celery list
227
228
# Stop specific worker
229
airflow celery stop worker-01@hostname
230
231
# Gracefully shutdown worker
232
airflow celery shutdown worker-01@hostname
233
234
# Shutdown all workers
235
airflow celery shutdown_all
236
```
237
238
### Queue Management
239
240
```bash
241
# Add queue to running worker
242
airflow celery add_queue worker-01@hostname ml_training
243
244
# Remove queue from worker
245
airflow celery remove_queue worker-01@hostname old_queue
246
247
# Check queue status (using Flower or custom scripts)
248
# airflow celery flower --port 5555
249
# Then access http://localhost:5555 for queue monitoring
250
```
251
252
### Monitoring with Flower
253
254
```bash
255
# Start Flower with default settings
256
airflow celery flower
257
258
# Start Flower on custom port
259
airflow celery flower --port 8080
260
261
# Start Flower with authentication
262
airflow celery flower --basic_auth admin:secret,user:password
263
264
# Start Flower with URL prefix (for reverse proxy)
265
airflow celery flower --url_prefix /flower
266
267
# Start Flower as daemon
268
airflow celery flower --daemon --pidfile /var/run/flower.pid --logfile /var/log/flower.log
269
270
# Start Flower with broker API access
271
airflow celery flower --broker_api http://guest@localhost:15672/api/
272
```
273
274
### Production Deployment Scripts
275
276
```bash
277
#!/bin/bash
278
# Production worker startup script
279
280
# Set environment variables
281
export AIRFLOW_HOME=/opt/airflow
282
export PYTHONPATH=/opt/airflow:$PYTHONPATH
283
284
# Start worker with production settings
285
airflow celery worker \
286
--concurrency 16 \
287
--queues default,high_priority,ml_training \
288
--hostname $(hostname)-worker \
289
--loglevel INFO \
290
--daemon \
291
--pidfile /var/run/airflow-worker.pid \
292
--logfile /var/log/airflow-worker.log
293
294
# Start Flower monitoring
295
airflow celery flower \
296
--port 5555 \
297
--basic_auth admin:$(cat /etc/flower-password) \
298
--daemon \
299
--pidfile /var/run/flower.pid \
300
--logfile /var/log/flower.log
301
302
echo "Celery worker and Flower started"
303
```
304
305
### Health Check Scripts
306
307
```bash
308
#!/bin/bash
309
# Worker health check script
310
311
# Check if worker process is running
312
if ! pgrep -f "airflow celery worker" > /dev/null; then
313
echo "ERROR: Celery worker not running"
314
exit 1
315
fi
316
317
# Check if worker is responding
318
WORKER_HOSTNAME=$(hostname)-worker
319
if ! airflow celery list | grep -q "$WORKER_HOSTNAME"; then
320
echo "ERROR: Worker $WORKER_HOSTNAME not responding"
321
exit 1
322
fi
323
324
echo "Worker health check passed"
325
exit 0
326
```
327
328
### Queue Scaling Scripts
329
330
```python
331
#!/usr/bin/env python3
332
"""Dynamic queue management script."""
333
334
import subprocess
335
import json
336
from airflow.providers.celery.executors.celery_executor_utils import app
337
338
def scale_workers_for_queue(queue_name: str, target_workers: int):
339
"""Add or remove workers for a specific queue based on demand."""
340
341
# Get current workers for this queue
342
inspector = app.control.inspect()
343
active_queues = inspector.active_queues()
344
345
current_workers = []
346
if active_queues:
347
for worker, queues in active_queues.items():
348
if any(q.get('name') == queue_name for q in queues):
349
current_workers.append(worker)
350
351
current_count = len(current_workers)
352
353
if current_count < target_workers:
354
# Need to add workers
355
needed = target_workers - current_count
356
for i in range(needed):
357
subprocess.run([
358
'airflow', 'celery', 'worker',
359
'--queues', queue_name,
360
'--hostname', f'{queue_name}-worker-{i}',
361
'--daemon'
362
])
363
print(f"Started worker {queue_name}-worker-{i}")
364
365
elif current_count > target_workers:
366
# Need to remove workers
367
excess = current_count - target_workers
368
for i in range(excess):
369
worker_to_stop = current_workers[i]
370
subprocess.run([
371
'airflow', 'celery', 'shutdown', worker_to_stop
372
])
373
print(f"Stopped worker {worker_to_stop}")
374
375
if __name__ == "__main__":
376
# Example: Scale ml_training queue to 4 workers
377
scale_workers_for_queue('ml_training', 4)
378
```
379
380
### Docker Integration
381
382
```dockerfile
383
# Dockerfile for Celery worker
384
FROM apache/airflow:2.8.0
385
386
# Install additional dependencies
387
COPY requirements.txt /requirements.txt
388
RUN pip install -r /requirements.txt
389
390
# Copy DAGs and configuration
391
COPY dags/ /opt/airflow/dags/
392
COPY config/ /opt/airflow/config/
393
394
# Entrypoint script
395
COPY docker-entrypoint.sh /docker-entrypoint.sh
396
RUN chmod +x /docker-entrypoint.sh
397
398
ENTRYPOINT ["/docker-entrypoint.sh"]
399
```
400
401
```bash
402
#!/bin/bash
403
# docker-entrypoint.sh
404
405
# Initialize Airflow database (if needed)
406
if [ "$1" = "worker" ]; then
407
# Start Celery worker
408
exec airflow celery worker \
409
--concurrency ${WORKER_CONCURRENCY:-16} \
410
--queues ${WORKER_QUEUES:-default} \
411
--hostname ${HOSTNAME}-worker
412
elif [ "$1" = "flower" ]; then
413
# Start Flower monitoring
414
exec airflow celery flower \
415
--port ${FLOWER_PORT:-5555}
416
else
417
# Pass through other commands
418
exec "$@"
419
fi
420
```
421
422
```yaml
423
# docker-compose.yml
424
version: '3.8'
425
services:
426
worker:
427
build: .
428
command: worker
429
environment:
430
- WORKER_CONCURRENCY=8
431
- WORKER_QUEUES=default,high_priority
432
volumes:
433
- ./dags:/opt/airflow/dags
434
- ./logs:/opt/airflow/logs
435
depends_on:
436
- redis
437
- postgres
438
439
flower:
440
build: .
441
command: flower
442
ports:
443
- "5555:5555"
444
environment:
445
- FLOWER_PORT=5555
446
depends_on:
447
- redis
448
```
449
450
## Configuration
451
452
CLI commands respect Airflow configuration and environment variables:
453
454
```python { .api }
455
# Key configuration sections used by CLI commands:
456
457
# [celery] section
458
WORKER_CONCURRENCY = 16
459
FLOWER_HOST = "0.0.0.0"
460
FLOWER_PORT = 5555
461
FLOWER_BASIC_AUTH = "" # user:pass,user2:pass2
462
WORKER_AUTOSCALE = "" # max,min
463
WORKER_PREFETCH_MULTIPLIER = 1
464
465
# [logging] section
466
LOGGING_LEVEL = "INFO"
467
BASE_LOG_FOLDER = "/opt/airflow/logs"
468
469
# Environment variables
470
AIRFLOW_HOME = "/opt/airflow"
471
PYTHONPATH = "/opt/airflow"
472
```