or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-cncf-kubernetes

Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-cncf-kubernetes@10.7.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-cncf-kubernetes@10.7.0

0

# Apache Airflow Providers CNCF Kubernetes

1

2

A comprehensive Kubernetes integration provider for Apache Airflow that enables running workflows and tasks on Kubernetes clusters. This provider offers operators for managing pods and jobs, hooks for API interactions, sensors for monitoring workloads, triggers for asynchronous operations, decorators for task creation, and executors for running Airflow tasks as Kubernetes pods.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-cncf-kubernetes

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-providers-cncf-kubernetes`

9

- **Minimum Airflow Version**: 2.10.0+

10

11

## Core Imports

12

13

```python

14

# Main provider package

15

import airflow.providers.cncf.kubernetes

16

17

# Core operators

18

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

19

from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator

20

21

# Hooks for API interactions

22

from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook, AsyncKubernetesHook

23

24

# Sensors for monitoring

25

from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor

26

27

# Decorators for task creation

28

from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task

29

from airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd import kubernetes_cmd_task

30

31

# Executors

32

from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor

33

```

34

35

## Basic Usage

36

37

### Running a Task in a Kubernetes Pod

38

39

```python

40

from datetime import datetime

41

from airflow import DAG

42

from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

43

44

# Define the DAG

45

dag = DAG(

46

'kubernetes_pod_example',

47

start_date=datetime(2023, 1, 1),

48

schedule_interval=None,

49

catchup=False

50

)

51

52

# Run a simple task in a Kubernetes pod

53

task = KubernetesPodOperator(

54

task_id='hello_kubernetes',

55

name='hello-pod',

56

namespace='default',

57

image='python:3.9-slim',

58

cmds=['python', '-c'],

59

arguments=['print("Hello from Kubernetes!")'],

60

dag=dag

61

)

62

```

63

64

### Using Kubernetes Task Decorator

65

66

```python

67

from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task

68

69

@kubernetes_task(

70

image='python:3.9-slim',

71

namespace='default'

72

)

73

def process_data():

74

import pandas as pd

75

# Task logic here

76

return "Processing complete"

77

78

# Use in DAG

79

result = process_data()

80

```

81

82

### Managing Kubernetes Jobs

83

84

```python

85

from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator

86

87

job_task = KubernetesJobOperator(

88

task_id='data_processing_job',

89

name='data-job',

90

namespace='default',

91

image='data-processor:latest',

92

cmds=['python', 'process.py'],

93

dag=dag

94

)

95

```

96

97

## Architecture

98

99

The provider is organized around several key components:

100

101

- **Operators**: Execute tasks on Kubernetes (pods, jobs, resources, Spark applications)

102

- **Hooks**: Provide low-level API access to Kubernetes clusters (sync and async)

103

- **Sensors**: Monitor Kubernetes workloads for completion or state changes

104

- **Triggers**: Enable asynchronous monitoring of pods and jobs

105

- **Decorators**: Simplify task creation with Python decorators

106

- **Executors**: Run entire Airflow task execution on Kubernetes infrastructure

107

- **Utilities**: Support pod lifecycle management, resource conversion, and configuration

108

109

## Capabilities

110

111

### Pod Operations

112

113

Execute individual tasks in Kubernetes pods with full lifecycle management, including creation, monitoring, log retrieval, and cleanup.

114

115

```python { .api }

116

class KubernetesPodOperator(BaseOperator):

117

def __init__(

118

self,

119

image: str,

120

name: str | None = None,

121

namespace: str = "default",

122

cmds: list[str] | None = None,

123

arguments: list[str] | None = None,

124

**kwargs

125

): ...

126

127

def execute(self, context: Context) -> Any: ...

128

```

129

130

[Pod Operations](./pod-operations.md)

131

132

### Job Management

133

134

Create, monitor, and manage Kubernetes Jobs with support for batch processing, parallel execution, and job lifecycle operations.

135

136

```python { .api }

137

class KubernetesJobOperator(KubernetesPodOperator):

138

def __init__(

139

self,

140

name: str,

141

image: str,

142

namespace: str = "default",

143

**kwargs

144

): ...

145

```

146

147

[Job Management](./job-management.md)

148

149

### Kubernetes API Integration

150

151

Connect to Kubernetes clusters with comprehensive API access, supporting both synchronous and asynchronous operations.

152

153

```python { .api }

154

class KubernetesHook(BaseHook):

155

def get_conn(self): ...

156

def create_custom_object(self, group: str, version: str, plural: str, body: dict, namespace: str | None = None): ...

157

def get_pod(self, name: str, namespace: str): ...

158

def create_job(self, job: V1Job): ...

159

```

160

161

[API Integration](./api-integration.md)

162

163

### Resource Management

164

165

Create, update, and delete Kubernetes resources from YAML manifests or programmatic definitions.

166

167

```python { .api }

168

class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):

169

def __init__(

170

self,

171

yaml_conf: str | None = None,

172

custom_resource_definition: dict | None = None,

173

**kwargs

174

): ...

175

```

176

177

[Resource Management](./resource-management.md)

178

179

### Spark on Kubernetes

180

181

Deploy and manage Spark applications on Kubernetes clusters with custom resource definitions and monitoring.

182

183

```python { .api }

184

class SparkKubernetesOperator(KubernetesPodOperator):

185

def __init__(

186

self,

187

application_file: str,

188

namespace: str = "default",

189

**kwargs

190

): ...

191

```

192

193

[Spark Integration](./spark-integration.md)

194

195

### Task Decorators

196

197

Create Kubernetes tasks using Python decorators with automatic pod configuration and execution.

198

199

```python { .api }

200

def kubernetes_task(

201

image: str,

202

namespace: str = "default",

203

name: str | None = None,

204

**kwargs

205

): ...

206

207

def kubernetes_cmd_task(

208

image: str,

209

cmds: list[str],

210

namespace: str = "default",

211

**kwargs

212

): ...

213

```

214

215

[Task Decorators](./decorators.md)

216

217

### Monitoring and Sensors

218

219

Monitor Kubernetes workloads with sensors that check application status and wait for completion conditions.

220

221

```python { .api }

222

class SparkKubernetesSensor(BaseSensorOperator):

223

def __init__(

224

self,

225

application_name: str,

226

namespace: str = "default",

227

**kwargs

228

): ...

229

```

230

231

[Monitoring](./monitoring.md)

232

233

### Executors

234

235

Run Airflow tasks on Kubernetes infrastructure with the KubernetesExecutor and hybrid LocalKubernetesExecutor.

236

237

```python { .api }

238

class KubernetesExecutor(BaseExecutor):

239

def start(self): ...

240

def sync(self): ...

241

def end(self): ...

242

```

243

244

[Executors](./executors.md)

245

246

## Types

247

248

```python { .api }

249

# Pod event enumeration

250

class PodEventType(Enum):

251

"""Type of Events emitted by kubernetes pod."""

252

WARNING = "Warning"

253

NORMAL = "Normal"

254

255

# Container state enumeration

256

class ContainerState(str, Enum):

257

"""

258

Possible container states.

259

260

See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.

261

"""

262

WAITING = "waiting"

263

RUNNING = "running"

264

TERMINATED = "terminated"

265

FAILED = "failed"

266

UNDEFINED = "undefined"

267

268

# Execution mode for callbacks

269

class ExecutionMode(str, Enum):

270

"""Enum class for execution mode."""

271

SYNC = "sync"

272

ASYNC = "async"

273

274

# Pod phase constants

275

class PodPhase:

276

"""Pod phase constants for lifecycle management."""

277

PENDING = "Pending"

278

RUNNING = "Running"

279

SUCCEEDED = "Succeeded"

280

FAILED = "Failed"

281

UNKNOWN = "Unknown"

282

283

# Actions to take when pod finishes

284

class OnFinishAction(str, Enum):

285

"""Actions to take when pod finishes execution."""

286

DELETE_POD = "delete_pod"

287

KEEP_POD = "keep_pod"

288

```

289

290

## Exception Types

291

292

```python { .api }

293

# Pod operation exceptions

294

class PodMutationHookException(AirflowException):

295

"""Raised when exception happens during Pod Mutation Hook execution."""

296

...

297

298

class PodReconciliationError(AirflowException):

299

"""Raised when an error is encountered while trying to merge pod configs."""

300

...

301

302

class PodReattachFailure(AirflowException):

303

"""When we expect to be able to find a pod but cannot."""

304

...

305

306

class PodCredentialsExpiredFailure(AirflowException):

307

"""When pod fails to refresh credentials."""

308

...

309

310

# Pod manager exceptions

311

class PodLaunchFailedException(AirflowException):

312

"""When pod launching fails in KubernetesPodOperator."""

313

...

314

315

class PodLaunchTimeoutException(AirflowException):

316

"""When pod does not leave the Pending phase within specified timeout."""

317

...

318

319

class PodNotFoundException(AirflowException):

320

"""Expected pod does not exist in kube-api."""

321

...

322

323

# Resource operation exceptions

324

class FailToDeleteError(Exception):

325

"""For handling error if an error occurred when handling a yaml file during deletion of the resource."""

326

...

327

```