0
# API Integration
1
2
Connect to Kubernetes clusters with comprehensive API access supporting both synchronous and asynchronous operations. The hooks provide low-level access to Kubernetes resources and enable custom integrations.
3
4
## Capabilities
5
6
### Kubernetes Hook
7
8
Synchronous hook for interacting with Kubernetes API, supporting cluster connections, resource management, and pod operations.
9
10
```python { .api }
11
class KubernetesHook(BaseHook):
12
"""
13
Creates Kubernetes API connection with multiple configuration options.
14
15
Args:
16
conn_id (str): Airflow connection ID. Default: 'kubernetes_default'
17
client_configuration (Configuration, optional): Kubernetes client configuration
18
cluster_context (str, optional): Kubernetes cluster context
19
config_file (str, optional): Path to kubeconfig file
20
in_cluster (bool, optional): Use in-cluster configuration
21
disable_verify_ssl (bool): Disable SSL verification. Default: False
22
disable_tcp_keepalive (bool): Disable TCP keepalive. Default: False
23
"""
24
def __init__(
25
self,
26
conn_id: str = "kubernetes_default",
27
client_configuration: Configuration | None = None,
28
cluster_context: str | None = None,
29
config_file: str | None = None,
30
in_cluster: bool | None = None,
31
disable_verify_ssl: bool = False,
32
disable_tcp_keepalive: bool = False,
33
**kwargs
34
): ...
35
36
def get_conn(self) -> ApiClient:
37
"""Returns kubernetes api session."""
38
...
39
40
def create_custom_object(
41
self,
42
group: str,
43
version: str,
44
plural: str,
45
body: dict,
46
namespace: str | None = None
47
) -> dict:
48
"""Create custom resource definition object."""
49
...
50
51
def get_pod(self, name: str, namespace: str) -> V1Pod:
52
"""Read pod object from kubernetes API."""
53
...
54
55
def create_job(self, job: V1Job, **kwargs) -> V1Job:
56
"""Run Job."""
57
...
58
59
def get_pod_logs(
60
self,
61
pod_name: str,
62
container: str | None = "",
63
namespace: str | None = None,
64
) -> str:
65
"""Retrieve a container's log from the specified pod."""
66
...
67
68
def get_pod_log_stream(
69
self,
70
pod_name: str,
71
container: str | None = "",
72
namespace: str | None = None,
73
) -> tuple[watch.Watch, Generator[str, None, None]]:
74
"""Retrieve a log stream for a container in a kubernetes pod."""
75
...
76
77
def get_namespaced_pod_list(
78
self,
79
label_selector: str | None = "",
80
namespace: str | None = None,
81
**kwargs
82
) -> list[V1Pod]:
83
"""Get list of pods in a namespace."""
84
...
85
```
86
87
### Asynchronous Kubernetes Hook
88
89
Asynchronous hook for non-blocking Kubernetes API operations, enabling efficient concurrent processing and improved performance.
90
91
```python { .api }
92
class AsyncKubernetesHook(KubernetesHook):
93
"""
94
Asynchronous Kubernetes API hook for non-blocking operations.
95
96
Args:
97
config_dict (dict, optional): Configuration dictionary
98
All other parameters inherited from KubernetesHook
99
"""
100
def __init__(
101
self,
102
config_dict: dict | None = None,
103
*args,
104
**kwargs
105
): ...
106
107
async def get_conn(self) -> AsyncApiClient:
108
"""Returns asynchronous kubernetes api session."""
109
...
110
111
async def get_pod(self, name: str, namespace: str) -> V1Pod:
112
"""Get pod's object asynchronously."""
113
...
114
115
def get_custom_object(
116
self,
117
group: str,
118
version: str,
119
plural: str,
120
name: str,
121
namespace: str | None = None
122
) -> dict:
123
"""Get custom resource definition object."""
124
...
125
126
def delete_custom_object(
127
self,
128
group: str,
129
version: str,
130
plural: str,
131
name: str,
132
namespace: str | None = None,
133
body: V1DeleteOptions | None = None
134
) -> dict:
135
"""Delete custom resource definition object."""
136
...
137
138
def get_namespace(self) -> str:
139
"""Return the namespace defined in connection."""
140
...
141
142
def get_pod_log_stream(
143
self,
144
pod_name: str,
145
container: str | None = None,
146
namespace: str | None = None,
147
**kwargs
148
):
149
"""Retrieve log stream for container in pod."""
150
...
151
152
def get_pod_logs(
153
self,
154
pod_name: str,
155
container: str | None = None,
156
namespace: str | None = None,
157
**kwargs
158
) -> str:
159
"""Retrieve container's log from pod."""
160
...
161
162
def get_pod(self, name: str, namespace: str) -> V1Pod:
163
"""Read pod object from kubernetes API."""
164
...
165
166
def create_job(self, job: V1Job) -> V1Job:
167
"""Run Kubernetes Job."""
168
...
169
170
def get_job(self, job_name: str, namespace: str) -> V1Job:
171
"""Get Job of specified name and namespace."""
172
...
173
174
def get_job_status(self, job_name: str, namespace: str) -> V1JobStatus:
175
"""Get job status."""
176
...
177
178
def wait_until_job_complete(
179
self,
180
job_name: str,
181
namespace: str,
182
timeout: int = 3600
183
) -> V1Job:
184
"""Block until job is complete or failed."""
185
...
186
187
def is_job_complete(self, job: V1Job) -> bool:
188
"""Check if job is complete."""
189
...
190
191
def is_job_failed(self, job: V1Job) -> bool:
192
"""Check if job failed."""
193
...
194
195
def is_job_successful(self, job: V1Job) -> bool:
196
"""Check if job completed successfully."""
197
...
198
199
def apply_from_yaml_file(
200
self,
201
yaml_file: str | None = None,
202
yaml_objects: list[dict] | None = None,
203
verbose: bool = False,
204
namespace: str = "default"
205
) -> list[dict]:
206
"""Perform action from yaml file."""
207
...
208
209
def test_connection(self) -> tuple[bool, str]:
210
"""Test the connection."""
211
...
212
```
213
214
### Asynchronous Kubernetes Hook
215
216
Asynchronous hook for non-blocking Kubernetes operations, supporting concurrent resource management and monitoring.
217
218
```python { .api }
219
class AsyncKubernetesHook(KubernetesHook):
220
"""
221
Hook to use Kubernetes SDK asynchronously.
222
223
Inherits all configuration from KubernetesHook with async operation support.
224
"""
225
def __init__(self, **kwargs): ...
226
227
@asynccontextmanager
228
async def get_conn(self) -> AsyncGenerator[ApiClient, None]:
229
"""Async context manager for API client."""
230
...
231
232
async def get_pod(self, name: str, namespace: str) -> V1Pod:
233
"""Get pod's object asynchronously."""
234
...
235
236
async def delete_pod(self, name: str, namespace: str) -> V1Status:
237
"""Delete pod's object asynchronously."""
238
...
239
240
async def read_logs(
241
self,
242
name: str,
243
namespace: str,
244
container: str | None = None,
245
follow: bool = False,
246
**kwargs
247
) -> str:
248
"""Read logs inside the pod."""
249
...
250
251
async def get_job_status(self, job_name: str, namespace: str) -> V1JobStatus:
252
"""Get job's status object asynchronously."""
253
...
254
255
async def wait_until_job_complete(
256
self,
257
job_name: str,
258
namespace: str,
259
timeout: int = 3600,
260
check_interval: int = 10
261
) -> V1Job:
262
"""Wait until job complete asynchronously."""
263
...
264
265
async def wait_until_container_complete(
266
self,
267
pod_name: str,
268
namespace: str,
269
container_name: str | None = None,
270
timeout: int = 3600
271
) -> None:
272
"""Wait for container completion."""
273
...
274
275
async def wait_until_container_started(
276
self,
277
pod_name: str,
278
namespace: str,
279
container_name: str | None = None,
280
timeout: int = 120
281
) -> None:
282
"""Wait for container to start."""
283
...
284
```
285
286
### Hook Constants
287
288
Constants used by the Kubernetes hooks for configuration and status checking.
289
290
```python { .api }
291
# Resource loading constant
292
LOADING_KUBE_CONFIG_FILE_RESOURCE: str
293
294
# Job status condition types
295
JOB_FINAL_STATUS_CONDITION_TYPES: list[str]
296
JOB_STATUS_CONDITION_TYPES: list[str]
297
```
298
299
## Usage Examples
300
301
### Basic Hook Usage
302
303
```python
304
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
305
306
# Connect to Kubernetes cluster
307
hook = KubernetesHook(
308
conn_id='kubernetes_default',
309
in_cluster=False,
310
config_file='/path/to/kubeconfig'
311
)
312
313
# Get API client
314
client = hook.get_conn()
315
316
# Test connection
317
is_connected, message = hook.test_connection()
318
print(f"Connection status: {is_connected}, Message: {message}")
319
```
320
321
### Pod Operations with Hook
322
323
```python
324
# Get pod information
325
pod = hook.get_pod(name='my-pod', namespace='default')
326
print(f"Pod status: {pod.status.phase}")
327
328
# Get pod logs
329
logs = hook.get_pod_logs(
330
pod_name='my-pod',
331
namespace='default',
332
container='main-container'
333
)
334
print(logs)
335
336
# Stream pod logs
337
for log_line in hook.get_pod_log_stream(
338
pod_name='my-pod',
339
namespace='default',
340
follow=True
341
):
342
print(log_line)
343
```
344
345
### Job Management with Hook
346
347
```python
348
from kubernetes.client import V1Job, V1JobSpec, V1PodTemplateSpec, V1PodSpec, V1Container
349
350
# Create job specification
351
job_spec = V1Job(
352
api_version='batch/v1',
353
kind='Job',
354
metadata={'name': 'my-job', 'namespace': 'default'},
355
spec=V1JobSpec(
356
template=V1PodTemplateSpec(
357
spec=V1PodSpec(
358
containers=[
359
V1Container(
360
name='worker',
361
image='busybox:latest',
362
command=['sh', '-c', 'echo "Job completed"']
363
)
364
],
365
restart_policy='Never'
366
)
367
)
368
)
369
)
370
371
# Create and monitor job
372
created_job = hook.create_job(job_spec)
373
print(f"Job created: {created_job.metadata.name}")
374
375
# Wait for completion
376
completed_job = hook.wait_until_job_complete(
377
job_name='my-job',
378
namespace='default',
379
timeout=600
380
)
381
382
# Check job status
383
if hook.is_job_successful(completed_job):
384
print("Job completed successfully")
385
elif hook.is_job_failed(completed_job):
386
print("Job failed")
387
```
388
389
### Custom Resource Operations
390
391
```python
392
# Create custom resource
393
custom_resource = {
394
'apiVersion': 'apps.example.com/v1',
395
'kind': 'MyApp',
396
'metadata': {
397
'name': 'my-app-instance',
398
'namespace': 'default'
399
},
400
'spec': {
401
'replicas': 3,
402
'image': 'my-app:latest'
403
}
404
}
405
406
# Create the custom resource
407
created_cr = hook.create_custom_object(
408
group='apps.example.com',
409
version='v1',
410
plural='myapps',
411
body=custom_resource,
412
namespace='default'
413
)
414
415
# Get custom resource
416
cr = hook.get_custom_object(
417
group='apps.example.com',
418
version='v1',
419
plural='myapps',
420
name='my-app-instance',
421
namespace='default'
422
)
423
424
# Delete custom resource
425
hook.delete_custom_object(
426
group='apps.example.com',
427
version='v1',
428
plural='myapps',
429
name='my-app-instance',
430
namespace='default'
431
)
432
```
433
434
### YAML Resource Management
435
436
```python
437
# Apply resources from YAML file
438
applied_resources = hook.apply_from_yaml_file(
439
yaml_file='/path/to/resources.yaml',
440
namespace='default',
441
verbose=True
442
)
443
444
# Apply resources from YAML objects
445
yaml_objects = [
446
{
447
'apiVersion': 'v1',
448
'kind': 'ConfigMap',
449
'metadata': {'name': 'my-config', 'namespace': 'default'},
450
'data': {'key': 'value'}
451
}
452
]
453
454
hook.apply_from_yaml_file(
455
yaml_objects=yaml_objects,
456
namespace='default'
457
)
458
```
459
460
### Asynchronous Operations
461
462
```python
463
import asyncio
464
from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook
465
466
async def async_operations():
467
# Create async hook
468
async_hook = AsyncKubernetesHook(
469
conn_id='kubernetes_default',
470
in_cluster=True
471
)
472
473
# Use async context manager for connection
474
async with async_hook.get_conn() as client:
475
# Get pod asynchronously
476
pod = await async_hook.get_pod(
477
name='my-pod',
478
namespace='default'
479
)
480
481
# Read logs asynchronously
482
logs = await async_hook.read_logs(
483
name='my-pod',
484
namespace='default',
485
container='main',
486
follow=False
487
)
488
489
# Monitor job completion
490
completed_job = await async_hook.wait_until_job_complete(
491
job_name='my-job',
492
namespace='default',
493
timeout=600,
494
check_interval=5
495
)
496
497
# Wait for container to start
498
await async_hook.wait_until_container_started(
499
pod_name='my-pod',
500
namespace='default',
501
container_name='main',
502
timeout=120
503
)
504
505
# Run async operations
506
asyncio.run(async_operations())
507
```
508
509
### Multi-cluster Configuration
510
511
```python
512
# Connect to different clusters
513
prod_hook = KubernetesHook(
514
conn_id='k8s_prod',
515
cluster_context='production-cluster',
516
config_file='/home/user/.kube/config'
517
)
518
519
staging_hook = KubernetesHook(
520
conn_id='k8s_staging',
521
cluster_context='staging-cluster',
522
config_file='/home/user/.kube/config'
523
)
524
525
# In-cluster configuration
526
in_cluster_hook = KubernetesHook(
527
conn_id='k8s_in_cluster',
528
in_cluster=True
529
)
530
```
531
532
### Connection Testing and Debugging
533
534
```python
535
# Test connection with detailed output
536
is_connected, message = hook.test_connection()
537
if not is_connected:
538
print(f"Connection failed: {message}")
539
else:
540
print("Successfully connected to Kubernetes cluster")
541
542
# Get namespace from connection
543
namespace = hook.get_namespace()
544
print(f"Default namespace: {namespace}")
545
546
# Disable SSL verification for testing
547
test_hook = KubernetesHook(
548
conn_id='k8s_test',
549
disable_verify_ssl=True,
550
disable_tcp_keepalive=True
551
)
552
```