or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

celery-executor.mdcelery-kubernetes-executor.mdcli-commands.mdconfiguration.mdindex.mdqueue-monitoring.md

celery-kubernetes-executor.mddocs/

0

# Celery Kubernetes Executor

1

2

The CeleryKubernetesExecutor is a hybrid executor that intelligently routes tasks between CeleryExecutor and KubernetesExecutor based on queue names. This allows for flexible task execution strategies within a single Airflow deployment.

3

4

## Capabilities

5

6

### CeleryKubernetesExecutor Class

7

8

Hybrid executor class that combines Celery and Kubernetes execution strategies.

9

10

```python { .api }

11

class CeleryKubernetesExecutor(BaseExecutor):

12

"""

13

Hybrid executor routing tasks between Celery and Kubernetes executors.

14

15

Tasks assigned to the 'kubernetes' queue (configurable) are executed

16

via KubernetesExecutor, while all other tasks use CeleryExecutor.

17

"""

18

19

# Class attributes

20

supports_ad_hoc_ti_run: bool = True

21

supports_pickling: bool = True

22

supports_sentry: bool = False

23

is_local: bool = False

24

is_single_threaded: bool = False

25

is_production: bool = True

26

serve_logs: bool = False

27

callback_sink: BaseCallbackSink | None = None

28

29

def __init__(self, celery_executor: CeleryExecutor | None = None, kubernetes_executor: KubernetesExecutor | None = None):

30

"""

31

Initialize the hybrid executor.

32

33

Parameters:

34

- celery_executor: CeleryExecutor | None, optional Celery executor instance

35

- kubernetes_executor: KubernetesExecutor | None, optional Kubernetes executor instance

36

"""

37

38

@property

39

def kubernetes_queue(self) -> str:

40

"""

41

Get the queue name that routes tasks to KubernetesExecutor.

42

43

Returns:

44

str: Queue name (default: 'kubernetes')

45

"""

46

47

@property

48

def queued_tasks(self) -> dict[TaskInstanceKey, Any]:

49

"""

50

Get combined queued tasks from both executors.

51

52

Returns:

53

dict[TaskInstanceKey, Any]: all queued tasks

54

"""

55

56

@property

57

def running(self) -> set[TaskInstanceKey]:

58

"""

59

Get combined running tasks from both executors.

60

61

Returns:

62

set[TaskInstanceKey]: all running task keys

63

"""

64

65

@property

66

def job_id(self) -> int | str | None:

67

"""

68

Get the job ID for this executor instance.

69

70

Returns:

71

int | str | None: executor job identifier

72

"""

73

74

@property

75

def slots_available(self) -> int:

76

"""

77

Get total available slots across both executors.

78

79

Returns:

80

int: number of available execution slots

81

"""

82

83

@property

84

def slots_occupied(self) -> int:

85

"""

86

Get total occupied slots across both executors.

87

88

Returns:

89

int: number of occupied execution slots

90

"""

91

92

def start(self) -> None:

93

"""

94

Start both underlying executors.

95

96

Initializes and starts both CeleryExecutor and KubernetesExecutor

97

instances for handling different types of tasks.

98

"""

99

100

def queue_command(self, task_instance: TaskInstance, command: CommandType,

101

priority: int = 1, queue: str | None = None) -> None:

102

"""

103

Route task to appropriate executor based on queue name.

104

105

Parameters:

106

- task_instance: TaskInstance to execute

107

- command: CommandType containing the task execution command

108

- priority: int, task priority level

109

- queue: str | None, queue name determining execution strategy

110

111

Routing Logic:

112

- If queue == kubernetes_queue (default: 'kubernetes') -> KubernetesExecutor

113

- All other queues -> CeleryExecutor

114

"""

115

116

def queue_task_instance(self, task_instance: TaskInstance, **kwargs) -> None:

117

"""

118

Queue a task instance to appropriate executor based on routing logic.

119

120

Parameters:

121

- task_instance: TaskInstance to queue

122

- **kwargs: Additional arguments passed to underlying executor

123

"""

124

125

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:

126

"""

127

Retrieve task logs from the appropriate executor.

128

129

Parameters:

130

- ti: TaskInstance to get logs for

131

- try_number: int, task attempt number

132

133

Returns:

134

tuple[list[str], list[str]]: (stdout_lines, stderr_lines)

135

"""

136

137

def has_task(self, task_instance: TaskInstance) -> bool:

138

"""

139

Check if a task instance is tracked by either executor.

140

141

Parameters:

142

- task_instance: TaskInstance to check

143

144

Returns:

145

bool: True if task is tracked by either executor

146

"""

147

148

def heartbeat(self) -> None:

149

"""

150

Perform heartbeat operations on both executors.

151

152

Calls heartbeat() on both underlying executors to maintain

153

health and perform periodic maintenance.

154

"""

155

156

def get_event_buffer(self, dag_ids: list[str] | None = None) -> dict[TaskInstanceKey, EventBufferValueType]:

157

"""

158

Get combined event buffer from both executors.

159

160

Parameters:

161

- dag_ids: list[str] | None, optional DAG IDs to filter events

162

163

Returns:

164

dict[TaskInstanceKey, EventBufferValueType]: combined event buffer

165

"""

166

167

def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:

168

"""

169

Attempt to adopt orphaned task instances across both executors.

170

171

Parameters:

172

- tis: Sequence[TaskInstance], potential orphaned tasks

173

174

Returns:

175

Sequence[TaskInstance]: task instances that could not be adopted

176

"""

177

178

def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:

179

"""

180

Clean up stuck queued tasks on both executors (deprecated).

181

182

Parameters:

183

- tis: list[TaskInstance], tasks to clean up

184

185

Returns:

186

list[str]: task instance keys that were cleaned up

187

"""

188

189

def revoke_task(self, *, ti: TaskInstance) -> None:

190

"""

191

Revoke a running task on the appropriate executor.

192

193

Parameters:

194

- ti: TaskInstance, task instance to revoke

195

196

Routes the revocation to the correct underlying executor.

197

"""

198

199

def end(self) -> None:

200

"""

201

Gracefully shutdown both executors.

202

203

Performs clean shutdown of both CeleryExecutor and

204

KubernetesExecutor instances.

205

"""

206

207

def terminate(self) -> None:

208

"""

209

Force termination of tasks on both executors.

210

211

Immediately terminates tasks running on both Celery workers

212

and Kubernetes pods.

213

"""

214

215

def debug_dump(self) -> None:

216

"""

217

Print debug information from both executors.

218

219

Calls debug_dump() on both underlying executors to provide

220

comprehensive debugging information.

221

"""

222

223

def send_callback(self, request: CallbackRequest) -> None:

224

"""

225

Send callback request to appropriate executor.

226

227

Parameters:

228

- request: CallbackRequest, callback to send

229

230

Routes callback to the executor that handled the original task.

231

"""

232

233

@staticmethod

234

def get_cli_commands() -> list:

235

"""

236

Get CLI commands provided by this executor.

237

238

Returns:

239

list: list of CLI command groups

240

"""

241

```

242

243

### Queue Routing Logic

244

245

The executor determines execution strategy based on queue names:

246

247

```python { .api }

248

# Queue routing is handled internally by the executor

249

# based on the kubernetes_queue configuration property

250

```

251

252

## Configuration

253

254

Configuration for the hybrid executor routing behavior:

255

256

```python { .api }

257

# Configuration section: [celery_kubernetes_executor]

258

259

KUBERNETES_QUEUE = "kubernetes" # Queue name for Kubernetes execution

260

261

# Both underlying executors use their respective configuration sections:

262

# [celery] - for CeleryExecutor settings

263

# [kubernetes_executor] - for KubernetesExecutor settings

264

```

265

266

## Usage Examples

267

268

### Basic Configuration

269

270

```python

271

# In airflow.cfg:

272

[core]

273

executor = airflow.providers.celery.executors.celery_kubernetes_executor.CeleryKubernetesExecutor

274

275

[celery_kubernetes_executor]

276

kubernetes_queue = kubernetes

277

278

[celery]

279

# Standard Celery configuration

280

broker_url = redis://redis:6379/0

281

result_backend = db+postgresql://postgres:airflow@postgres/airflow

282

worker_concurrency = 16

283

284

[kubernetes_executor]

285

# Standard Kubernetes configuration

286

namespace = airflow

287

worker_container_repository = airflow-workers

288

worker_container_tag = latest

289

```

290

291

### Task Queue Assignment

292

293

```python

294

from airflow import DAG

295

from airflow.operators.python import PythonOperator

296

from airflow.operators.bash import BashOperator

297

298

dag = DAG('hybrid_execution', schedule_interval=None)

299

300

# This task runs on Kubernetes (ephemeral pod)

301

k8s_task = PythonOperator(

302

task_id='kubernetes_task',

303

python_callable=lambda: print("Running on Kubernetes"),

304

queue='kubernetes', # Routes to KubernetesExecutor

305

dag=dag

306

)

307

308

# This task runs on Celery workers (persistent workers)

309

celery_task = BashOperator(

310

task_id='celery_task',

311

bash_command='echo "Running on Celery"',

312

queue='default', # Routes to CeleryExecutor

313

dag=dag

314

)

315

316

# Custom queue also routes to Celery

317

high_memory_task = PythonOperator(

318

task_id='high_memory_task',

319

python_callable=lambda: print("High memory processing"),

320

queue='high_memory', # Routes to CeleryExecutor

321

dag=dag

322

)

323

324

k8s_task >> [celery_task, high_memory_task]

325

```

326

327

### Programmatic Usage

328

329

```python

330

from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor

331

from airflow.models.taskinstancekey import TaskInstanceKey

332

333

# Initialize hybrid executor

334

executor = CeleryKubernetesExecutor()

335

executor.start()

336

337

# Task routed to Kubernetes

338

k8s_key = TaskInstanceKey(dag_id="my_dag", task_id="k8s_task",

339

run_id="manual_run", try_number=1)

340

k8s_command = ["python", "-c", "print('K8s task')"]

341

342

executor.execute_async(key=k8s_key, command=k8s_command, queue="kubernetes")

343

344

# Task routed to Celery

345

celery_key = TaskInstanceKey(dag_id="my_dag", task_id="celery_task",

346

run_id="manual_run", try_number=1)

347

celery_command = ["python", "-c", "print('Celery task')"]

348

349

executor.execute_async(key=celery_key, command=celery_command, queue="default")

350

351

# Sync states from both executors

352

executor.sync()

353

354

# Cleanup

355

executor.end()

356

```

357

358

### Custom Queue Configuration

359

360

```python

361

# Custom kubernetes queue name

362

# In airflow.cfg:

363

[celery_kubernetes_executor]

364

kubernetes_queue = special_k8s_queue

365

366

# In DAG:

367

special_k8s_task = PythonOperator(

368

task_id='special_kubernetes_task',

369

python_callable=my_function,

370

queue='special_k8s_queue', # Routes to KubernetesExecutor

371

dag=dag

372

)

373

```

374

375

## Use Cases

376

377

### Resource-Based Task Routing

378

379

```python

380

# Route tasks based on resource requirements

381

382

# CPU-intensive tasks on Kubernetes (auto-scaling)

383

cpu_intensive_task = PythonOperator(

384

task_id='ml_training',

385

python_callable=train_model,

386

queue='kubernetes',

387

executor_config={

388

'pod_override': {

389

'spec': {

390

'containers': [{

391

'name': 'base',

392

'resources': {

393

'requests': {'cpu': '4', 'memory': '8Gi'},

394

'limits': {'cpu': '8', 'memory': '16Gi'}

395

}

396

}]

397

}

398

}

399

},

400

dag=dag

401

)

402

403

# I/O intensive tasks on persistent Celery workers

404

io_intensive_task = PythonOperator(

405

task_id='data_processing',

406

python_callable=process_large_files,

407

queue='io_intensive', # Routes to specialized Celery workers

408

dag=dag

409

)

410

```

411

412

### Environment Isolation

413

414

```python

415

# Isolate tasks with different dependency requirements

416

417

# Task requiring special libraries (isolated K8s pod)

418

special_deps_task = BashOperator(

419

task_id='special_processing',

420

bash_command='python special_script.py',

421

queue='kubernetes',

422

executor_config={

423

'pod_override': {

424

'spec': {

425

'containers': [{

426

'name': 'base',

427

'image': 'my-special-image:latest'

428

}]

429

}

430

}

431

},

432

dag=dag

433

)

434

435

# Standard task on regular Celery workers

436

standard_task = PythonOperator(

437

task_id='standard_processing',

438

python_callable=standard_function,

439

queue='default',

440

dag=dag

441

)

442

```

443

444

## Monitoring and Troubleshooting

445

446

### Task Tracking

447

448

```python

449

# Tasks are tracked separately by each underlying executor

450

# Use Airflow UI to see which executor handled each task

451

452

# Celery tasks show up in Flower monitoring

453

# Kubernetes tasks show up in K8s dashboard/kubectl

454

455

# Log aggregation from both execution environments

456

def get_task_logs(dag_id: str, task_id: str, execution_date: str, try_number: int):

457

"""

458

Retrieve logs from either Celery or Kubernetes based on task queue.

459

460

The hybrid executor automatically determines the correct source

461

for log retrieval based on where the task was executed.

462

"""

463

```

464

465

### Performance Considerations

466

467

```python

468

# Balance between Celery and Kubernetes based on:

469

470

# Celery advantages:

471

# - Persistent workers (faster task startup)

472

# - Better for high-frequency, short-duration tasks

473

# - Shared state and caching between tasks

474

# - Lower resource overhead

475

476

# Kubernetes advantages:

477

# - Resource isolation per task

478

# - Auto-scaling based on workload

479

# - Better for resource-intensive tasks

480

# - Clean environment per execution

481

# - Support for different container images per task

482

```