or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

celery-executor.mdcelery-kubernetes-executor.mdcli-commands.mdconfiguration.mdindex.mdqueue-monitoring.md

cli-commands.mddocs/

0

# CLI Commands

1

2

The Celery provider includes comprehensive command-line tools for managing Celery workers, monitoring with Flower, and performing queue operations. These commands extend Airflow's CLI with Celery-specific functionality.

3

4

## Capabilities

5

6

### Worker Management Commands

7

8

Commands for starting, stopping, and managing Celery workers.

9

10

```python { .api }

11

def worker(args):

12

"""

13

Start a Celery worker process.

14

15

Usage: airflow celery worker [options]

16

17

Parameters (via args object):

18

- concurrency: int, number of concurrent worker processes

19

- hostname: str, worker hostname identifier

20

- queues: list[str], specific queues for this worker to consume

21

- loglevel: str, logging level (DEBUG, INFO, WARNING, ERROR)

22

- autoscale: str, autoscaling configuration (max,min)

23

- without_gossip: bool, disable gossip for worker discovery

24

- without_mingle: bool, disable startup synchronization

25

- without_heartbeat: bool, disable worker heartbeat

26

- daemon: bool, run as daemon process

27

- pidfile: str, path to PID file for daemon mode

28

- logfile: str, path to log file

29

30

Environment Setup:

31

- Configures worker process name and logging

32

- Sets up signal handlers for graceful shutdown

33

- Initializes Celery app with Airflow configuration

34

"""

35

36

def stop_worker(args):

37

"""

38

Stop a running Celery worker by PID (deprecated - use shutdown instead).

39

40

Usage: airflow celery stop [options]

41

42

Parameters (via args object):

43

- pid: str, path to PID file of worker to stop

44

- verbose: bool, enable verbose output

45

46

Reads PID from file and sends termination signal to worker process.

47

"""

48

49

def list_workers(args):

50

"""

51

List all active Celery workers and their status.

52

53

Usage: airflow celery list-workers [options]

54

55

Parameters (via args object):

56

- output: str, output format ('table', 'json', 'yaml', 'plain')

57

58

Displays:

59

- Worker hostnames and online status

60

- Active task counts per worker

61

- Queues each worker is consuming from

62

- Load averages and resource usage

63

"""

64

65

def shutdown_worker(args):

66

"""

67

Gracefully shutdown a specific Celery worker.

68

69

Usage: airflow celery shutdown-worker [options]

70

71

Parameters (via args object):

72

- celery_hostname: str, worker hostname to shutdown (required)

73

74

Sends shutdown command and waits for worker to finish current tasks.

75

"""

76

77

def shutdown_all_workers(args):

78

"""

79

Shutdown all active Celery workers.

80

81

Usage: airflow celery shutdown-all-workers [options]

82

83

Parameters (via args object):

84

- yes: bool, skip confirmation prompt

85

86

Sends shutdown commands to all discovered workers and monitors

87

their termination status.

88

"""

89

```

90

91

### Queue Management Commands

92

93

Commands for managing Celery task queues and worker queue assignments.

94

95

```python { .api }

96

def add_queue(args):

97

"""

98

Add a queue to worker's consumption list.

99

100

Usage: airflow celery add-queue [options]

101

102

Parameters (via args object):

103

- celery_hostname: str, target worker hostname (required)

104

- queues: list[str], queues to add to worker (required)

105

106

Dynamically adds queue to running worker without restart.

107

Worker will begin consuming tasks from the new queue.

108

"""

109

110

def remove_queue(args):

111

"""

112

Remove a queue from worker's consumption list.

113

114

Usage: airflow celery remove-queue [options]

115

116

Parameters (via args object):

117

- celery_hostname: str, target worker hostname (required)

118

- queues: list[str], queues to remove from worker (required)

119

120

Stops worker from consuming new tasks from specified queue.

121

Currently processing tasks from that queue will complete normally.

122

"""

123

```

124

125

### Monitoring Commands

126

127

Commands for monitoring and administration of Celery clusters.

128

129

```python { .api }

130

def flower(args):

131

"""

132

Start Flower web-based monitoring and administration tool.

133

134

Usage: airflow celery flower [options]

135

136

Parameters (via args object):

137

- hostname: str, interface to bind (default: 0.0.0.0)

138

- port: int, port number (default: 5555)

139

- broker_api: str, broker API URL for advanced monitoring

140

- url_prefix: str, URL prefix for reverse proxy setups

141

- basic_auth: str, HTTP basic authentication (user:pass,user2:pass2)

142

- flower_conf: str, path to Flower configuration file

143

- daemon: bool, run as daemon process

144

- pidfile: str, PID file path for daemon mode

145

- logfile: str, log file path

146

147

Features:

148

- Real-time task monitoring and statistics

149

- Worker management and control

150

- Queue monitoring and manipulation

151

- Task routing and execution control

152

- Historical task execution data

153

"""

154

```

155

156

### Utility Functions

157

158

Supporting functions used by CLI commands.

159

160

```python { .api }

161

def _check_if_active_celery_worker(hostname: str) -> bool:

162

"""

163

Check if a Celery worker with given hostname is active.

164

165

Parameters:

166

- hostname: str, worker hostname to check

167

168

Returns:

169

bool: True if worker is active and responding

170

"""

171

172

def _serve_logs(skip_serve_logs: bool = False) -> None:

173

"""

174

Start log serving process for worker logs.

175

176

Parameters:

177

- skip_serve_logs: bool, whether to skip log serving setup

178

179

Sets up log file serving for accessing worker logs via web interface.

180

"""

181

182

def logger_setup_handler(logger, **kwargs) -> None:

183

"""

184

Configure logging for Celery worker processes.

185

186

Parameters:

187

- logger: Logger instance to configure

188

- **kwargs: Additional logging configuration parameters

189

190

Sets up appropriate log formatting, handlers, and levels for

191

Celery worker processes.

192

"""

193

```

194

195

## Usage Examples

196

197

### Starting Workers

198

199

```bash

200

# Start basic worker with default settings

201

airflow celery worker

202

203

# Start worker with specific concurrency

204

airflow celery worker --concurrency 8

205

206

# Start worker consuming from specific queues

207

airflow celery worker --queues high_priority,default

208

209

# Start worker with autoscaling

210

airflow celery worker --autoscale 16,4

211

212

# Start worker as daemon

213

airflow celery worker --daemon --pidfile /var/run/celery-worker.pid --logfile /var/log/celery-worker.log

214

215

# Start worker with custom hostname

216

airflow celery worker --hostname worker-gpu-01

217

218

# Start worker with debug logging

219

airflow celery worker --loglevel DEBUG

220

```

221

222

### Worker Management

223

224

```bash

225

# List all active workers

226

airflow celery list

227

228

# Stop specific worker

229

airflow celery stop worker-01@hostname

230

231

# Gracefully shutdown worker

232

airflow celery shutdown worker-01@hostname

233

234

# Shutdown all workers

235

airflow celery shutdown_all

236

```

237

238

### Queue Management

239

240

```bash

241

# Add queue to running worker

242

airflow celery add_queue worker-01@hostname ml_training

243

244

# Remove queue from worker

245

airflow celery remove_queue worker-01@hostname old_queue

246

247

# Check queue status (using Flower or custom scripts)

248

# airflow celery flower --port 5555

249

# Then access http://localhost:5555 for queue monitoring

250

```

251

252

### Monitoring with Flower

253

254

```bash

255

# Start Flower with default settings

256

airflow celery flower

257

258

# Start Flower on custom port

259

airflow celery flower --port 8080

260

261

# Start Flower with authentication

262

airflow celery flower --basic_auth admin:secret,user:password

263

264

# Start Flower with URL prefix (for reverse proxy)

265

airflow celery flower --url_prefix /flower

266

267

# Start Flower as daemon

268

airflow celery flower --daemon --pidfile /var/run/flower.pid --logfile /var/log/flower.log

269

270

# Start Flower with broker API access

271

airflow celery flower --broker_api http://guest@localhost:15672/api/

272

```

273

274

### Production Deployment Scripts

275

276

```bash

277

#!/bin/bash

278

# Production worker startup script

279

280

# Set environment variables

281

export AIRFLOW_HOME=/opt/airflow

282

export PYTHONPATH=/opt/airflow:$PYTHONPATH

283

284

# Start worker with production settings

285

airflow celery worker \

286

--concurrency 16 \

287

--queues default,high_priority,ml_training \

288

--hostname $(hostname)-worker \

289

--loglevel INFO \

290

--daemon \

291

--pidfile /var/run/airflow-worker.pid \

292

--logfile /var/log/airflow-worker.log

293

294

# Start Flower monitoring

295

airflow celery flower \

296

--port 5555 \

297

--basic_auth admin:$(cat /etc/flower-password) \

298

--daemon \

299

--pidfile /var/run/flower.pid \

300

--logfile /var/log/flower.log

301

302

echo "Celery worker and Flower started"

303

```

304

305

### Health Check Scripts

306

307

```bash

308

#!/bin/bash

309

# Worker health check script

310

311

# Check if worker process is running

312

if ! pgrep -f "airflow celery worker" > /dev/null; then

313

echo "ERROR: Celery worker not running"

314

exit 1

315

fi

316

317

# Check if worker is responding

318

WORKER_HOSTNAME=$(hostname)-worker

319

if ! airflow celery list | grep -q "$WORKER_HOSTNAME"; then

320

echo "ERROR: Worker $WORKER_HOSTNAME not responding"

321

exit 1

322

fi

323

324

echo "Worker health check passed"

325

exit 0

326

```

327

328

### Queue Scaling Scripts

329

330

```python

331

#!/usr/bin/env python3

332

"""Dynamic queue management script."""

333

334

import subprocess

335

import json

336

from airflow.providers.celery.executors.celery_executor_utils import app

337

338

def scale_workers_for_queue(queue_name: str, target_workers: int):

339

"""Add or remove workers for a specific queue based on demand."""

340

341

# Get current workers for this queue

342

inspector = app.control.inspect()

343

active_queues = inspector.active_queues()

344

345

current_workers = []

346

if active_queues:

347

for worker, queues in active_queues.items():

348

if any(q.get('name') == queue_name for q in queues):

349

current_workers.append(worker)

350

351

current_count = len(current_workers)

352

353

if current_count < target_workers:

354

# Need to add workers

355

needed = target_workers - current_count

356

for i in range(needed):

357

subprocess.run([

358

'airflow', 'celery', 'worker',

359

'--queues', queue_name,

360

'--hostname', f'{queue_name}-worker-{i}',

361

'--daemon'

362

])

363

print(f"Started worker {queue_name}-worker-{i}")

364

365

elif current_count > target_workers:

366

# Need to remove workers

367

excess = current_count - target_workers

368

for i in range(excess):

369

worker_to_stop = current_workers[i]

370

subprocess.run([

371

'airflow', 'celery', 'shutdown', worker_to_stop

372

])

373

print(f"Stopped worker {worker_to_stop}")

374

375

if __name__ == "__main__":

376

# Example: Scale ml_training queue to 4 workers

377

scale_workers_for_queue('ml_training', 4)

378

```

379

380

### Docker Integration

381

382

```dockerfile

383

# Dockerfile for Celery worker

384

FROM apache/airflow:2.8.0

385

386

# Install additional dependencies

387

COPY requirements.txt /requirements.txt

388

RUN pip install -r /requirements.txt

389

390

# Copy DAGs and configuration

391

COPY dags/ /opt/airflow/dags/

392

COPY config/ /opt/airflow/config/

393

394

# Entrypoint script

395

COPY docker-entrypoint.sh /docker-entrypoint.sh

396

RUN chmod +x /docker-entrypoint.sh

397

398

ENTRYPOINT ["/docker-entrypoint.sh"]

399

```

400

401

```bash

402

#!/bin/bash

403

# docker-entrypoint.sh

404

405

# Initialize Airflow database (if needed)

406

if [ "$1" = "worker" ]; then

407

# Start Celery worker

408

exec airflow celery worker \

409

--concurrency ${WORKER_CONCURRENCY:-16} \

410

--queues ${WORKER_QUEUES:-default} \

411

--hostname ${HOSTNAME}-worker

412

elif [ "$1" = "flower" ]; then

413

# Start Flower monitoring

414

exec airflow celery flower \

415

--port ${FLOWER_PORT:-5555}

416

else

417

# Pass through other commands

418

exec "$@"

419

fi

420

```

421

422

```yaml

423

# docker-compose.yml

424

version: '3.8'

425

services:

426

worker:

427

build: .

428

command: worker

429

environment:

430

- WORKER_CONCURRENCY=8

431

- WORKER_QUEUES=default,high_priority

432

volumes:

433

- ./dags:/opt/airflow/dags

434

- ./logs:/opt/airflow/logs

435

depends_on:

436

- redis

437

- postgres

438

439

flower:

440

build: .

441

command: flower

442

ports:

443

- "5555:5555"

444

environment:

445

- FLOWER_PORT=5555

446

depends_on:

447

- redis

448

```

449

450

## Configuration

451

452

CLI commands respect Airflow configuration and environment variables:

453

454

```python { .api }

455

# Key configuration sections used by CLI commands:

456

457

# [celery] section

458

WORKER_CONCURRENCY = 16

459

FLOWER_HOST = "0.0.0.0"

460

FLOWER_PORT = 5555

461

FLOWER_BASIC_AUTH = "" # user:pass,user2:pass2

462

WORKER_AUTOSCALE = "" # max,min

463

WORKER_PREFETCH_MULTIPLIER = 1

464

465

# [logging] section

466

LOGGING_LEVEL = "INFO"

467

BASE_LOG_FOLDER = "/opt/airflow/logs"

468

469

# Environment variables

470

AIRFLOW_HOME = "/opt/airflow"

471

PYTHONPATH = "/opt/airflow"

472

```