0
# Resource Management
1
2
Create, update, and delete Kubernetes resources from YAML manifests or programmatic definitions. Resource management operators provide comprehensive lifecycle control over Kubernetes objects.
3
4
## Capabilities
5
6
### Resource Base Operations
7
8
Base class providing common functionality for Kubernetes resource operations.
9
10
```python { .api }
11
class KubernetesResourceBaseOperator(BaseOperator):
12
"""
13
Base class for Kubernetes resource operations.
14
15
Args:
16
namespace (str): Kubernetes namespace. Defaults to 'default'
17
cluster_context (str, optional): Kubernetes cluster context
18
config_file (str, optional): Path to kubeconfig file
19
in_cluster (bool, optional): Use in-cluster configuration
20
conn_id (str): Airflow connection ID. Default: 'kubernetes_default'
21
"""
22
def __init__(
23
self,
24
namespace: str = "default",
25
cluster_context: str | None = None,
26
config_file: str | None = None,
27
in_cluster: bool | None = None,
28
conn_id: str = "kubernetes_default",
29
**kwargs
30
): ...
31
```
32
33
### Resource Creation
34
35
Create Kubernetes resources from YAML files, dictionaries, or custom resource definitions.
36
37
```python { .api }
38
class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):
39
"""
40
Create Kubernetes resources from YAML.
41
42
Args:
43
yaml_conf (str, optional): YAML configuration as string
44
yaml_conf_file (str, optional): Path to YAML configuration file
45
custom_resource_definition (dict, optional): Custom resource definition as dict
46
namespace (str): Target namespace. Defaults to 'default'
47
custom_objects (list[dict], optional): List of custom objects to create
48
namespaced (bool): Whether resources are namespaced. Default: True
49
wait_until_ready (bool): Wait for resources to be ready. Default: False
50
wait_timeout (int): Timeout for waiting in seconds. Default: 300
51
"""
52
def __init__(
53
self,
54
yaml_conf: str | None = None,
55
yaml_conf_file: str | None = None,
56
custom_resource_definition: dict | None = None,
57
custom_objects: list[dict] | None = None,
58
namespaced: bool = True,
59
wait_until_ready: bool = False,
60
wait_timeout: int = 300,
61
**kwargs
62
): ...
63
64
def execute(self, context: Context) -> Any:
65
"""Create the Kubernetes resources."""
66
...
67
```
68
69
### Resource Deletion
70
71
Delete Kubernetes resources by name, labels, or from YAML specifications.
72
73
```python { .api }
74
class KubernetesDeleteResourceOperator(KubernetesResourceBaseOperator):
75
"""
76
Delete Kubernetes resources.
77
78
Args:
79
yaml_conf (str, optional): YAML configuration for resources to delete
80
yaml_conf_file (str, optional): Path to YAML file with resources to delete
81
custom_resource_definition (dict, optional): Custom resource definition to delete
82
custom_objects (list[dict], optional): List of custom objects to delete
83
api_version (str, optional): API version of resources to delete
84
kind (str, optional): Kind of resources to delete
85
name (str, optional): Name of specific resource to delete
86
label_selector (str, optional): Label selector for bulk deletion
87
field_selector (str, optional): Field selector for resource selection
88
grace_period_seconds (int, optional): Grace period for deletion
89
propagation_policy (str): Deletion propagation policy. Default: 'Background'
90
wait_for_completion (bool): Wait for deletion to complete. Default: True
91
wait_timeout (int): Timeout for waiting in seconds. Default: 300
92
"""
93
def __init__(
94
self,
95
yaml_conf: str | None = None,
96
yaml_conf_file: str | None = None,
97
custom_resource_definition: dict | None = None,
98
custom_objects: list[dict] | None = None,
99
api_version: str | None = None,
100
kind: str | None = None,
101
name: str | None = None,
102
label_selector: str | None = None,
103
field_selector: str | None = None,
104
grace_period_seconds: int | None = None,
105
propagation_policy: str = "Background",
106
wait_for_completion: bool = True,
107
wait_timeout: int = 300,
108
**kwargs
109
): ...
110
111
def execute(self, context: Context) -> Any:
112
"""Delete the Kubernetes resources."""
113
...
114
```
115
116
## Usage Examples
117
118
### Creating Resources from YAML File
119
120
```python
121
from airflow.providers.cncf.kubernetes.operators.resource import KubernetesCreateResourceOperator
122
123
# Create resources from YAML file
124
create_from_file = KubernetesCreateResourceOperator(
125
task_id='create_resources',
126
yaml_conf_file='/path/to/resources.yaml',
127
namespace='my-namespace',
128
wait_until_ready=True,
129
wait_timeout=300,
130
dag=dag
131
)
132
```
133
134
### Creating Resources from YAML String
135
136
```python
137
# YAML configuration as string
138
yaml_config = """
139
apiVersion: v1
140
kind: ConfigMap
141
metadata:
142
name: my-config
143
namespace: default
144
data:
145
database_url: "postgresql://localhost:5432/mydb"
146
debug: "false"
147
---
148
apiVersion: apps/v1
149
kind: Deployment
150
metadata:
151
name: my-app
152
namespace: default
153
spec:
154
replicas: 3
155
selector:
156
matchLabels:
157
app: my-app
158
template:
159
metadata:
160
labels:
161
app: my-app
162
spec:
163
containers:
164
- name: app
165
image: my-app:latest
166
ports:
167
- containerPort: 8080
168
envFrom:
169
- configMapRef:
170
name: my-config
171
"""
172
173
create_from_yaml = KubernetesCreateResourceOperator(
174
task_id='create_from_yaml',
175
yaml_conf=yaml_config,
176
namespace='default',
177
dag=dag
178
)
179
```
180
181
### Creating Custom Resources
182
183
```python
184
# Custom resource definition
185
custom_resource = {
186
'apiVersion': 'argoproj.io/v1alpha1',
187
'kind': 'Application',
188
'metadata': {
189
'name': 'my-application',
190
'namespace': 'argocd'
191
},
192
'spec': {
193
'source': {
194
'repoURL': 'https://github.com/my-org/my-app',
195
'path': 'k8s',
196
'targetRevision': 'HEAD'
197
},
198
'destination': {
199
'server': 'https://kubernetes.default.svc',
200
'namespace': 'default'
201
},
202
'syncPolicy': {
203
'automated': {
204
'prune': True,
205
'selfHeal': True
206
}
207
}
208
}
209
}
210
211
create_custom_resource = KubernetesCreateResourceOperator(
212
task_id='create_argocd_app',
213
custom_resource_definition=custom_resource,
214
namespace='argocd',
215
dag=dag
216
)
217
```
218
219
### Creating Multiple Custom Objects
220
221
```python
222
# Multiple custom objects
223
custom_objects = [
224
{
225
'apiVersion': 'v1',
226
'kind': 'Secret',
227
'metadata': {
228
'name': 'db-credentials',
229
'namespace': 'default'
230
},
231
'type': 'Opaque',
232
'data': {
233
'username': 'dXNlcm5hbWU=', # base64 encoded
234
'password': 'cGFzc3dvcmQ=' # base64 encoded
235
}
236
},
237
{
238
'apiVersion': 'v1',
239
'kind': 'Service',
240
'metadata': {
241
'name': 'my-service',
242
'namespace': 'default'
243
},
244
'spec': {
245
'selector': {
246
'app': 'my-app'
247
},
248
'ports': [
249
{
250
'port': 80,
251
'targetPort': 8080
252
}
253
]
254
}
255
}
256
]
257
258
create_multiple = KubernetesCreateResourceOperator(
259
task_id='create_multiple_objects',
260
custom_objects=custom_objects,
261
namespace='default',
262
dag=dag
263
)
264
```
265
266
### Deleting Resources by Name
267
268
```python
269
from airflow.providers.cncf.kubernetes.operators.resource import KubernetesDeleteResourceOperator
270
271
# Delete specific resource by name
272
delete_deployment = KubernetesDeleteResourceOperator(
273
task_id='delete_deployment',
274
api_version='apps/v1',
275
kind='Deployment',
276
name='my-app',
277
namespace='default',
278
wait_for_completion=True,
279
dag=dag
280
)
281
```
282
283
### Deleting Resources by Label Selector
284
285
```python
286
# Delete resources by label selector
287
delete_by_labels = KubernetesDeleteResourceOperator(
288
task_id='delete_by_labels',
289
api_version='v1',
290
kind='Pod',
291
label_selector='app=my-app,environment=staging',
292
namespace='default',
293
propagation_policy='Foreground',
294
dag=dag
295
)
296
```
297
298
### Deleting Resources from YAML
299
300
```python
301
# Delete resources defined in YAML file
302
delete_from_yaml = KubernetesDeleteResourceOperator(
303
task_id='delete_from_yaml',
304
yaml_conf_file='/path/to/resources-to-delete.yaml',
305
namespace='default',
306
grace_period_seconds=30,
307
dag=dag
308
)
309
```
310
311
### Deleting Custom Resources
312
313
```python
314
# Delete custom resource
315
delete_custom = KubernetesDeleteResourceOperator(
316
task_id='delete_custom_resource',
317
custom_resource_definition={
318
'apiVersion': 'argoproj.io/v1alpha1',
319
'kind': 'Application',
320
'metadata': {
321
'name': 'my-application',
322
'namespace': 'argocd'
323
}
324
},
325
namespace='argocd',
326
dag=dag
327
)
328
```
329
330
### Resource Cleanup with Dependencies
331
332
```python
333
# Complete resource lifecycle example
334
from airflow import DAG
335
from datetime import datetime
336
337
dag = DAG(
338
'resource_lifecycle',
339
start_date=datetime(2023, 1, 1),
340
schedule_interval=None,
341
catchup=False
342
)
343
344
# Create resources
345
create_resources = KubernetesCreateResourceOperator(
346
task_id='create_app_resources',
347
yaml_conf_file='/path/to/app-resources.yaml',
348
namespace='production',
349
wait_until_ready=True,
350
wait_timeout=600,
351
dag=dag
352
)
353
354
# ... application processing tasks ...
355
356
# Clean up resources
357
cleanup_resources = KubernetesDeleteResourceOperator(
358
task_id='cleanup_resources',
359
yaml_conf_file='/path/to/app-resources.yaml',
360
namespace='production',
361
wait_for_completion=True,
362
dag=dag
363
)
364
365
# Set dependencies
366
create_resources >> cleanup_resources
367
```
368
369
### Advanced Resource Creation with Waiting
370
371
```python
372
# Create resources and wait for readiness
373
create_with_wait = KubernetesCreateResourceOperator(
374
task_id='create_and_wait',
375
yaml_conf="""
376
apiVersion: apps/v1
377
kind: StatefulSet
378
metadata:
379
name: database
380
namespace: default
381
spec:
382
serviceName: database
383
replicas: 3
384
selector:
385
matchLabels:
386
app: database
387
template:
388
metadata:
389
labels:
390
app: database
391
spec:
392
containers:
393
- name: postgres
394
image: postgres:13
395
env:
396
- name: POSTGRES_PASSWORD
397
valueFrom:
398
secretKeyRef:
399
name: db-credentials
400
key: password
401
volumeMounts:
402
- name: data
403
mountPath: /var/lib/postgresql/data
404
volumeClaimTemplates:
405
- metadata:
406
name: data
407
spec:
408
accessModes: ["ReadWriteOnce"]
409
resources:
410
requests:
411
storage: 10Gi
412
""",
413
namespace='default',
414
wait_until_ready=True,
415
wait_timeout=900, # Wait up to 15 minutes for StatefulSet to be ready
416
dag=dag
417
)
418
```
419
420
### Field Selector Deletion
421
422
```python
423
# Delete resources using field selector
424
delete_failed_pods = KubernetesDeleteResourceOperator(
425
task_id='delete_failed_pods',
426
api_version='v1',
427
kind='Pod',
428
field_selector='status.phase=Failed',
429
namespace='default',
430
propagation_policy='Background',
431
dag=dag
432
)
433
```