or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

api-integration.mddecorators.mdexecutors.mdindex.mdjob-management.mdmonitoring.mdpod-operations.mdresource-management.mdspark-integration.md

executors.mddocs/

0

# Executors

1

2

Run Airflow tasks on Kubernetes infrastructure with executors that manage task distribution, pod creation, and lifecycle management across Kubernetes clusters.

3

4

## Capabilities

5

6

### Kubernetes Executor

7

8

Execute Airflow tasks as Kubernetes pods with comprehensive cluster management, resource allocation, and monitoring.

9

10

```python { .api }

11

class KubernetesExecutor(BaseExecutor):

12

"""

13

Executes tasks in Kubernetes pods.

14

15

This executor launches each task as a separate Kubernetes pod, providing

16

isolation, scalability, and resource management for Airflow task execution.

17

18

Configuration is typically done through airflow.cfg under the

19

[kubernetes_executor] section.

20

21

Key Features:

22

- Task isolation in separate pods

23

- Dynamic resource allocation

24

- Multi-namespace support

25

- Pod template customization

26

- Automatic cleanup and monitoring

27

"""

28

def __init__(self, **kwargs): ...

29

30

def start(self) -> None:

31

"""

32

Start the executor.

33

34

Initializes the Kubernetes client, starts the job watcher process,

35

and prepares the executor for task scheduling.

36

"""

37

...

38

39

def sync(self) -> None:

40

"""

41

Synchronize task states.

42

43

Updates task states based on pod status, processes completed tasks,

44

and handles task state transitions.

45

"""

46

...

47

48

def end(self) -> None:

49

"""

50

End executor.

51

52

Gracefully shuts down the executor, cleans up resources,

53

and terminates background processes.

54

"""

55

...

56

57

def terminate(self) -> None:

58

"""

59

Terminate executor.

60

61

Forcefully terminates the executor and all associated processes.

62

"""

63

...

64

65

def execute_async(

66

self,

67

key: TaskInstanceKey,

68

command: CommandType,

69

queue: str | None = None,

70

executor_config: dict | None = None

71

) -> None:

72

"""

73

Execute task asynchronously.

74

75

Args:

76

key: Task instance key

77

command: Command to execute

78

queue: Execution queue (optional)

79

executor_config: Executor-specific configuration

80

"""

81

...

82

83

def adopt_launched_task(self, ti: TaskInstance, last_heartbeat: datetime, session: Session) -> bool:

84

"""

85

Adopt a launched task.

86

87

Args:

88

ti: Task instance to adopt

89

last_heartbeat: Last heartbeat timestamp

90

session: Database session

91

92

Returns:

93

bool: True if task was successfully adopted

94

"""

95

...

96

```

97

98

### Local Kubernetes Executor

99

100

Hybrid executor that runs local tasks with LocalExecutor and Kubernetes tasks with KubernetesExecutor based on queue configuration.

101

102

```python { .api }

103

class LocalKubernetesExecutor(BaseExecutor):

104

"""

105

Hybrid executor running LocalExecutor for local tasks and

106

KubernetesExecutor for Kubernetes tasks.

107

108

Tasks are routed to the appropriate executor based on the queue name.

109

By default, tasks in the 'kubernetes' queue are executed on Kubernetes,

110

while all other tasks are executed locally.

111

112

Configuration:

113

- kubernetes_queue: Queue name for Kubernetes tasks (default: 'kubernetes')

114

"""

115

def __init__(self, local_executor: BaseExecutor, kubernetes_executor: BaseExecutor, **kwargs): ...

116

117

def start(self) -> None:

118

"""Start both local and Kubernetes executors."""

119

...

120

121

def sync(self) -> None:

122

"""Synchronize both executors."""

123

...

124

125

def end(self) -> None:

126

"""End both executors."""

127

...

128

129

def terminate(self) -> None:

130

"""Terminate both executors."""

131

...

132

133

def execute_async(

134

self,

135

key: TaskInstanceKey,

136

command: CommandType,

137

queue: str | None = None,

138

executor_config: dict | None = None

139

) -> None:

140

"""

141

Route task to appropriate executor based on queue.

142

143

Args:

144

key: Task instance key

145

command: Command to execute

146

queue: Execution queue - determines executor selection

147

executor_config: Executor-specific configuration

148

"""

149

...

150

151

def has_task(self, ti: TaskInstance) -> bool:

152

"""Check if task exists in either executor."""

153

...

154

155

def heartbeat(self) -> None:

156

"""Heartbeat both executors."""

157

...

158

```

159

160

### Kubernetes Job Watcher

161

162

Process that monitors Kubernetes job status and manages job lifecycle for the executor.

163

164

```python { .api }

165

class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):

166

"""

167

Watches Kubernetes jobs for state changes.

168

169

This process monitors pods created by the KubernetesExecutor,

170

tracks their status changes, and communicates updates back

171

to the main executor process.

172

173

Args:

174

namespace (str): Kubernetes namespace to watch

175

multi_namespace_mode (bool): Enable multi-namespace watching

176

watcher_queue (Queue): Queue for status updates

177

resource_version (str, optional): Starting resource version

178

worker_uuid (str): Unique identifier for this worker

179

kube_config (Configuration): Kubernetes client configuration

180

"""

181

def __init__(

182

self,

183

namespace: str,

184

multi_namespace_mode: bool,

185

watcher_queue: Queue,

186

resource_version: str | None,

187

worker_uuid: str,

188

kube_config: Configuration,

189

**kwargs

190

): ...

191

192

def run(self) -> None:

193

"""Main process loop for watching Kubernetes jobs."""

194

...

195

196

def _run(self) -> None:

197

"""Internal run method with error handling."""

198

...

199

```

200

201

### Airflow Kubernetes Scheduler

202

203

Scheduler component that manages pod creation and task distribution on Kubernetes clusters.

204

205

```python { .api }

206

class AirflowKubernetesScheduler(LoggingMixin):

207

"""

208

Schedules pods on Kubernetes.

209

210

Manages the creation, monitoring, and cleanup of Kubernetes pods

211

for task execution. Handles pod templates, resource allocation,

212

and namespace management.

213

214

Args:

215

kube_config (Configuration): Kubernetes client configuration

216

task_queue (Queue): Queue of tasks to execute

217

result_queue (Queue): Queue for execution results

218

kube_client (ApiClient): Kubernetes API client

219

worker_uuid (str): Unique identifier for this worker

220

"""

221

def __init__(

222

self,

223

kube_config: Configuration,

224

task_queue: Queue,

225

result_queue: Queue,

226

kube_client: ApiClient,

227

worker_uuid: str,

228

**kwargs

229

): ...

230

231

def run_pod_async(self, pod: V1Pod, **kwargs) -> None:

232

"""Run pod asynchronously."""

233

...

234

235

def delete_pod(self, pod_name: str, namespace: str) -> None:

236

"""Delete a pod."""

237

...

238

239

def patch_pod_executor_done(self, pod_name: str, namespace: str) -> None:

240

"""Mark pod as executor done."""

241

...

242

243

def _make_safe_pod_id(self, safe_dag_id: str, safe_task_id: str, safe_run_id: str) -> str:

244

"""Generate safe pod identifier."""

245

...

246

```

247

248

### Resource Version Manager

249

250

Singleton class that manages Kubernetes resource versions for efficient watching.

251

252

```python { .api }

253

class ResourceVersion(metaclass=Singleton):

254

"""

255

Manages Kubernetes resource versions.

256

257

Tracks resource versions for efficient watching of Kubernetes

258

resources, enabling incremental updates and reducing API load.

259

"""

260

def __init__(self): ...

261

262

def get_resource_version(self) -> str:

263

"""Get current resource version."""

264

...

265

266

def set_resource_version(self, version: str) -> None:

267

"""Set resource version."""

268

...

269

```

270

271

### Utility Functions

272

273

Helper functions for executor operations and pod management.

274

275

```python { .api }

276

def get_base_pod_from_template(

277

pod_template_file: str | None,

278

kube_client: ApiClient

279

) -> V1Pod:

280

"""

281

Get base pod from template.

282

283

Args:

284

pod_template_file: Path to pod template file

285

kube_client: Kubernetes API client

286

287

Returns:

288

V1Pod: Base pod specification

289

"""

290

...

291

```

292

293

## Configuration

294

295

The Kubernetes Executor is configured through the `airflow.cfg` file under the `[kubernetes_executor]` section:

296

297

### Core Configuration

298

299

```ini

300

[kubernetes_executor]

301

# Kubernetes namespace for pods

302

namespace = default

303

304

# Pod template file (optional)

305

pod_template_file = /path/to/pod_template.yaml

306

307

# Worker container image

308

worker_container_repository = apache/airflow

309

worker_container_tag = 2.7.0

310

311

# Delete pods after completion

312

delete_worker_pods = True

313

delete_worker_pods_on_failure = False

314

315

# In-cluster configuration

316

in_cluster = True

317

318

# Cluster context (for out-of-cluster)

319

cluster_context = my-cluster

320

321

# Config file path (for out-of-cluster)

322

config_file = ~/.kube/config

323

```

324

325

### Multi-Namespace Configuration

326

327

```ini

328

# Enable multi-namespace mode

329

multi_namespace_mode = True

330

331

# Specific namespaces (when not using cluster role)

332

multi_namespace_mode_namespace_list = namespace1,namespace2,namespace3

333

```

334

335

### Performance Configuration

336

337

```ini

338

# Number of pods to create per scheduler loop

339

worker_pods_creation_batch_size = 3

340

341

# Task publish retry configuration

342

task_publish_max_retries = 3

343

344

# API client retry configuration

345

api_client_retry_configuration = {"total": 3, "backoff_factor": 0.5}

346

```

347

348

### Resource Management

349

350

```ini

351

# Pod termination grace period

352

termination_grace_period_seconds = 30

353

354

# Delete options for pod cleanup

355

delete_option_kwargs = {"grace_period_seconds": 10}

356

```

357

358

## Usage Examples

359

360

### Basic Executor Configuration

361

362

```python

363

# airflow.cfg configuration for KubernetesExecutor

364

[core]

365

executor = KubernetesExecutor

366

367

[kubernetes_executor]

368

namespace = airflow

369

worker_container_repository = my-registry/airflow

370

worker_container_tag = 2.7.0-custom

371

delete_worker_pods = True

372

in_cluster = True

373

```

374

375

### Pod Template Configuration

376

377

```yaml

378

# pod_template.yaml

379

apiVersion: v1

380

kind: Pod

381

metadata:

382

name: airflow-worker

383

namespace: airflow

384

spec:

385

serviceAccountName: airflow-worker

386

securityContext:

387

runAsUser: 50000

388

runAsGroup: 50000

389

fsGroup: 50000

390

containers:

391

- name: base

392

image: apache/airflow:2.7.0

393

resources:

394

requests:

395

memory: "512Mi"

396

cpu: "500m"

397

limits:

398

memory: "1Gi"

399

cpu: "1000m"

400

env:

401

- name: AIRFLOW__CORE__EXECUTOR

402

value: LocalExecutor

403

- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN

404

valueFrom:

405

secretKeyRef:

406

name: airflow-secrets

407

key: connection-string

408

volumeMounts:

409

- name: airflow-dags

410

mountPath: /opt/airflow/dags

411

- name: airflow-logs

412

mountPath: /opt/airflow/logs

413

volumes:

414

- name: airflow-dags

415

persistentVolumeClaim:

416

claimName: airflow-dags-pvc

417

- name: airflow-logs

418

persistentVolumeClaim:

419

claimName: airflow-logs-pvc

420

restartPolicy: Never

421

```

422

423

### Local Kubernetes Executor Setup

424

425

```python

426

# airflow.cfg for LocalKubernetesExecutor

427

[core]

428

executor = LocalKubernetesExecutor

429

430

[local_kubernetes_executor]

431

kubernetes_queue = kubernetes

432

433

[kubernetes_executor]

434

namespace = airflow-k8s

435

worker_container_repository = my-registry/airflow

436

delete_worker_pods = True

437

438

[celery]

439

# Local executor falls back to SequentialExecutor for local tasks

440

# Configure if you want CeleryExecutor for local tasks instead

441

```

442

443

### Task with Kubernetes Executor Config

444

445

```python

446

from airflow import DAG

447

from airflow.operators.python import PythonOperator

448

from datetime import datetime

449

450

def my_task():

451

print("Running on Kubernetes!")

452

return "Task completed"

453

454

dag = DAG(

455

'kubernetes_executor_example',

456

start_date=datetime(2023, 1, 1),

457

schedule_interval=None,

458

catchup=False

459

)

460

461

# Task with custom executor config

462

k8s_task = PythonOperator(

463

task_id='kubernetes_task',

464

python_callable=my_task,

465

executor_config={

466

'KubernetesExecutor': {

467

'namespace': 'custom-namespace',

468

'image': 'custom-image:latest',

469

'request_memory': '1Gi',

470

'request_cpu': '500m',

471

'limit_memory': '2Gi',

472

'limit_cpu': '1000m',

473

'labels': {'team': 'data-engineering'},

474

'annotations': {'monitoring': 'enabled'},

475

'env_vars': [

476

{'name': 'CUSTOM_VAR', 'value': 'custom_value'}

477

],

478

'secrets': [

479

{

480

'deploy_type': 'env',

481

'deploy_target': 'DB_PASSWORD',

482

'secret': 'database-secret',

483

'key': 'password'

484

}

485

],

486

'volumes': [

487

{

488

'name': 'data-volume',

489

'persistentVolumeClaim': {

490

'claimName': 'data-pvc'

491

}

492

}

493

],

494

'volume_mounts': [

495

{

496

'name': 'data-volume',

497

'mountPath': '/data'

498

}

499

],

500

'node_selector': {'disktype': 'ssd'},

501

'affinity': {

502

'nodeAffinity': {

503

'requiredDuringSchedulingIgnoredDuringExecution': {

504

'nodeSelectorTerms': [{

505

'matchExpressions': [{

506

'key': 'kubernetes.io/arch',

507

'operator': 'In',

508

'values': ['amd64']

509

}]

510

}]

511

}

512

}

513

},

514

'tolerations': [

515

{

516

'key': 'compute-type',

517

'operator': 'Equal',

518

'value': 'gpu',

519

'effect': 'NoSchedule'

520

}

521

]

522

}

523

},

524

dag=dag

525

)

526

```

527

528

### Queue-Based Executor Routing

529

530

```python

531

# Tasks for LocalKubernetesExecutor with queue routing

532

533

# This task runs locally (default queue)

534

local_task = PythonOperator(

535

task_id='local_task',

536

python_callable=lambda: print("Running locally!"),

537

dag=dag

538

)

539

540

# This task runs on Kubernetes (kubernetes queue)

541

k8s_task = PythonOperator(

542

task_id='k8s_task',

543

python_callable=lambda: print("Running on Kubernetes!"),

544

queue='kubernetes', # Routes to KubernetesExecutor

545

dag=dag

546

)

547

548

# Custom queue name (configure in airflow.cfg)

549

gpu_task = PythonOperator(

550

task_id='gpu_task',

551

python_callable=lambda: print("Running on GPU nodes!"),

552

queue='gpu-queue', # Custom queue for GPU nodes

553

executor_config={

554

'KubernetesExecutor': {

555

'namespace': 'gpu-namespace',

556

'node_selector': {'accelerator': 'nvidia-gpu'},

557

'tolerations': [

558

{

559

'key': 'nvidia.com/gpu',

560

'operator': 'Exists',

561

'effect': 'NoSchedule'

562

}

563

],

564

'resources': {

565

'limits': {

566

'nvidia.com/gpu': '1'

567

}

568

}

569

}

570

},

571

dag=dag

572

)

573

```

574

575

### High Availability Configuration

576

577

```ini

578

# airflow.cfg for HA Kubernetes Executor setup

579

[kubernetes_executor]

580

namespace = airflow-prod

581

582

# Multi-namespace for isolation

583

multi_namespace_mode = True

584

multi_namespace_mode_namespace_list = airflow-prod,airflow-staging

585

586

# High throughput configuration

587

worker_pods_creation_batch_size = 10

588

task_publish_max_retries = 5

589

590

# Pod cleanup configuration

591

delete_worker_pods = True

592

delete_worker_pods_on_failure = False

593

594

# Fatal container states that should fail tasks immediately

595

worker_pod_pending_fatal_container_state_reasons = CreateContainerConfigError,ErrImagePull,CreateContainerError,ImageInspectError,InvalidImageName

596

597

# Networking configuration

598

enable_tcp_keepalive = True

599

tcp_keep_idle = 120

600

tcp_keep_intvl = 30

601

tcp_keep_cnt = 6

602

603

# SSL configuration for production

604

verify_ssl = True

605

ssl_ca_cert = /etc/ssl/certs/ca-certificates.crt

606

607

# Resource management

608

delete_option_kwargs = {"grace_period_seconds": 30, "propagation_policy": "Foreground"}

609

```

610

611

### Monitoring and Observability

612

613

```python

614

# Task with enhanced monitoring configuration

615

monitored_task = PythonOperator(

616

task_id='monitored_task',

617

python_callable=my_task,

618

executor_config={

619

'KubernetesExecutor': {

620

'labels': {

621

'app': 'airflow',

622

'component': 'worker',

623

'version': '2.7.0',

624

'team': 'data-platform'

625

},

626

'annotations': {

627

'prometheus.io/scrape': 'true',

628

'prometheus.io/port': '8080',

629

'logging.coreos.com/enabled': 'true'

630

}

631

}

632

},

633

dag=dag

634

)

635

```

636

637

### Cleanup and Maintenance

638

639

```python

640

def cleanup_completed_pods(**context):

641

"""Clean up completed pods older than specified age."""

642

from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook

643

from datetime import datetime, timedelta

644

645

hook = KubernetesHook(conn_id='kubernetes_default')

646

client = hook.get_conn()

647

648

# Get all pods in airflow namespace

649

pods = client.list_namespaced_pod(

650

namespace='airflow',

651

label_selector='airflow-worker=true'

652

)

653

654

cutoff_time = datetime.utcnow() - timedelta(hours=24)

655

656

for pod in pods.items:

657

# Check if pod is completed and old

658

if (pod.status.phase in ['Succeeded', 'Failed'] and

659

pod.metadata.creation_timestamp < cutoff_time):

660

661

client.delete_namespaced_pod(

662

name=pod.metadata.name,

663

namespace='airflow',

664

grace_period_seconds=0

665

)

666

print(f"Deleted pod: {pod.metadata.name}")

667

668

cleanup_task = PythonOperator(

669

task_id='cleanup_old_pods',

670

python_callable=cleanup_completed_pods,

671

schedule_interval='@daily',

672

dag=dag

673

)

674

```