0
# Utils
1
2
YAML processing utilities for creating and applying Kubernetes resources from YAML files or dictionaries with full async support. Provides convenient functions for deploying applications and managing resources declaratively.
3
4
## Capabilities
5
6
### YAML Processing Functions
7
8
Core functions for creating Kubernetes resources from YAML definitions.
9
10
```python { .api }
11
async def create_from_yaml(k8s_client, yaml_file, verbose=False, namespace="default", **kwargs):
12
"""
13
Create Kubernetes resources from YAML file.
14
15
Reads YAML file containing one or more Kubernetes resource definitions
16
and creates them in the cluster using the appropriate API calls.
17
18
Parameters:
19
- k8s_client: ApiClient, authenticated Kubernetes API client
20
- yaml_file: str, path to YAML file containing resource definitions
21
- verbose: bool, enable verbose output showing creation details
22
- namespace: str, default namespace for resources without namespace specified
23
- dry_run: str, dry run mode ('All', 'Server', or None)
24
- field_manager: str, field manager name for server-side apply
25
- **kwargs: additional parameters passed to create operations
26
27
Returns:
28
- list: Created resource objects
29
30
Raises:
31
- FailToCreateError: When resource creation fails
32
- FileNotFoundError: When YAML file doesn't exist
33
- yaml.YAMLError: When YAML parsing fails
34
"""
35
36
async def create_from_dict(k8s_client, data, verbose=False, namespace="default", **kwargs):
37
"""
38
Create Kubernetes resources from dictionary or list of dictionaries.
39
40
Takes Python dictionary representation(s) of Kubernetes resources
41
and creates them in the cluster.
42
43
Parameters:
44
- k8s_client: ApiClient, authenticated Kubernetes API client
45
- data: dict or list, resource definition(s) as dictionaries
46
- verbose: bool, enable verbose output showing creation details
47
- namespace: str, default namespace for resources without namespace specified
48
- dry_run: str, dry run mode ('All', 'Server', or None)
49
- field_manager: str, field manager name for server-side apply
50
- **kwargs: additional parameters passed to create operations
51
52
Returns:
53
- list: Created resource objects
54
55
Raises:
56
- FailToCreateError: When resource creation fails
57
- ValueError: When data format is invalid
58
"""
59
60
async def create_from_yaml_single_item(k8s_client, yaml_object, verbose=False,
61
namespace="default", **kwargs):
62
"""
63
Create single Kubernetes resource from YAML object.
64
65
Helper function for creating individual resources, used internally
66
by create_from_yaml and create_from_dict.
67
68
Parameters:
69
- k8s_client: ApiClient, authenticated Kubernetes API client
70
- yaml_object: dict, single resource definition
71
- verbose: bool, enable verbose output
72
- namespace: str, default namespace if not specified in resource
73
- **kwargs: additional parameters passed to create operation
74
75
Returns:
76
- object: Created resource object
77
78
Raises:
79
- FailToCreateError: When resource creation fails
80
"""
81
```
82
83
### Exception Classes
84
85
Specialized exceptions for YAML processing and resource creation errors.
86
87
```python { .api }
88
class FailToCreateError(Exception):
89
"""
90
Exception raised when Kubernetes resource creation fails.
91
92
Provides detailed information about which resources failed
93
and the reasons for failure.
94
"""
95
96
def __init__(self, api_exceptions):
97
"""
98
Initialize with list of API exceptions from failed operations.
99
100
Parameters:
101
- api_exceptions: list, API exceptions from failed create operations
102
"""
103
self.api_exceptions = api_exceptions
104
105
@property
106
def message(self):
107
"""Formatted error message describing all failures."""
108
109
def __str__(self):
110
"""String representation of the error."""
111
```
112
113
## Usage Examples
114
115
### Basic YAML File Application
116
117
```python
118
import asyncio
119
from kubernetes_asyncio import client, config, utils
120
121
async def apply_yaml_file():
122
await config.load_config()
123
api_client = client.ApiClient()
124
125
try:
126
# Apply resources from YAML file
127
created_resources = await utils.create_from_yaml(
128
k8s_client=api_client,
129
yaml_file="deployment.yaml",
130
namespace="default",
131
verbose=True
132
)
133
134
print(f"Successfully created {len(created_resources)} resources:")
135
for resource in created_resources:
136
print(f" - {resource.kind}: {resource.metadata.name}")
137
138
except utils.FailToCreateError as e:
139
print(f"Failed to create resources: {e}")
140
for api_exception in e.api_exceptions:
141
print(f" - {api_exception}")
142
except FileNotFoundError:
143
print("YAML file not found")
144
finally:
145
await api_client.close()
146
147
asyncio.run(apply_yaml_file())
148
```
149
150
### Multi-Document YAML File
151
152
```python
153
# Example: multi-document.yaml
154
"""
155
apiVersion: v1
156
kind: Namespace
157
metadata:
158
name: myapp
159
---
160
apiVersion: apps/v1
161
kind: Deployment
162
metadata:
163
name: myapp-deployment
164
namespace: myapp
165
spec:
166
replicas: 3
167
selector:
168
matchLabels:
169
app: myapp
170
template:
171
metadata:
172
labels:
173
app: myapp
174
spec:
175
containers:
176
- name: myapp
177
image: nginx:latest
178
ports:
179
- containerPort: 80
180
---
181
apiVersion: v1
182
kind: Service
183
metadata:
184
name: myapp-service
185
namespace: myapp
186
spec:
187
selector:
188
app: myapp
189
ports:
190
- port: 80
191
targetPort: 80
192
type: ClusterIP
193
"""
194
195
async def apply_multi_document():
196
await config.load_config()
197
api_client = client.ApiClient()
198
199
try:
200
# Apply multi-document YAML file
201
resources = await utils.create_from_yaml(
202
k8s_client=api_client,
203
yaml_file="multi-document.yaml",
204
verbose=True
205
)
206
207
print("Created resources:")
208
for resource in resources:
209
namespace = getattr(resource.metadata, 'namespace', 'cluster-wide')
210
print(f" - {resource.kind}/{resource.metadata.name} in {namespace}")
211
212
except utils.FailToCreateError as e:
213
print("Some resources failed to create:")
214
for exception in e.api_exceptions:
215
print(f" - Error: {exception}")
216
finally:
217
await api_client.close()
218
219
asyncio.run(apply_multi_document())
220
```
221
222
### Creating from Dictionary
223
224
```python
225
async def create_from_dict_example():
226
await config.load_config()
227
api_client = client.ApiClient()
228
229
# Define resources as Python dictionaries
230
resources = [
231
{
232
"apiVersion": "v1",
233
"kind": "ConfigMap",
234
"metadata": {
235
"name": "app-config",
236
"namespace": "default"
237
},
238
"data": {
239
"database_url": "postgresql://localhost:5432/mydb",
240
"debug": "true"
241
}
242
},
243
{
244
"apiVersion": "v1",
245
"kind": "Secret",
246
"metadata": {
247
"name": "app-secrets",
248
"namespace": "default"
249
},
250
"type": "Opaque",
251
"data": {
252
"password": "cGFzc3dvcmQ=", # base64 encoded "password"
253
"api_key": "YWJjZGVmZw==" # base64 encoded "abcdefg"
254
}
255
},
256
{
257
"apiVersion": "apps/v1",
258
"kind": "Deployment",
259
"metadata": {
260
"name": "myapp",
261
"namespace": "default"
262
},
263
"spec": {
264
"replicas": 2,
265
"selector": {
266
"matchLabels": {"app": "myapp"}
267
},
268
"template": {
269
"metadata": {
270
"labels": {"app": "myapp"}
271
},
272
"spec": {
273
"containers": [{
274
"name": "myapp",
275
"image": "myapp:latest",
276
"env": [
277
{
278
"name": "DATABASE_URL",
279
"valueFrom": {
280
"configMapKeyRef": {
281
"name": "app-config",
282
"key": "database_url"
283
}
284
}
285
},
286
{
287
"name": "API_KEY",
288
"valueFrom": {
289
"secretKeyRef": {
290
"name": "app-secrets",
291
"key": "api_key"
292
}
293
}
294
}
295
]
296
}]
297
}
298
}
299
}
300
}
301
]
302
303
try:
304
created = await utils.create_from_dict(
305
k8s_client=api_client,
306
data=resources,
307
namespace="default",
308
verbose=True
309
)
310
311
print(f"Successfully created {len(created)} resources")
312
313
except utils.FailToCreateError as e:
314
print(f"Creation failed: {e}")
315
finally:
316
await api_client.close()
317
318
asyncio.run(create_from_dict_example())
319
```
320
321
### Dry Run Operations
322
323
```python
324
async def dry_run_example():
325
await config.load_config()
326
api_client = client.ApiClient()
327
328
deployment_yaml = """
329
apiVersion: apps/v1
330
kind: Deployment
331
metadata:
332
name: test-deployment
333
namespace: default
334
spec:
335
replicas: 3
336
selector:
337
matchLabels:
338
app: test
339
template:
340
metadata:
341
labels:
342
app: test
343
spec:
344
containers:
345
- name: test
346
image: nginx:latest
347
"""
348
349
# Save to temporary file
350
import tempfile
351
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
352
f.write(deployment_yaml)
353
temp_file = f.name
354
355
try:
356
# Perform dry run to validate without creating
357
resources = await utils.create_from_yaml(
358
k8s_client=api_client,
359
yaml_file=temp_file,
360
dry_run="All", # Client-side dry run
361
verbose=True
362
)
363
364
print("Dry run successful - resources would be created:")
365
for resource in resources:
366
print(f" - {resource.kind}: {resource.metadata.name}")
367
368
except utils.FailToCreateError as e:
369
print("Dry run failed - issues found:")
370
for exception in e.api_exceptions:
371
print(f" - {exception}")
372
finally:
373
import os
374
os.unlink(temp_file)
375
await api_client.close()
376
377
asyncio.run(dry_run_example())
378
```
379
380
### Error Handling and Recovery
381
382
```python
383
async def error_handling_example():
384
await config.load_config()
385
api_client = client.ApiClient()
386
387
# YAML with intentional errors for demonstration
388
problematic_resources = [
389
{
390
"apiVersion": "v1",
391
"kind": "Pod",
392
"metadata": {
393
"name": "valid-pod",
394
"namespace": "default"
395
},
396
"spec": {
397
"containers": [{
398
"name": "nginx",
399
"image": "nginx:latest"
400
}]
401
}
402
},
403
{
404
"apiVersion": "v1",
405
"kind": "Pod",
406
"metadata": {
407
"name": "invalid-pod",
408
"namespace": "nonexistent-namespace" # This will fail
409
},
410
"spec": {
411
"containers": [{
412
"name": "nginx",
413
"image": "nginx:latest"
414
}]
415
}
416
},
417
{
418
"apiVersion": "apps/v1",
419
"kind": "Deployment",
420
"metadata": {
421
"name": "invalid-deployment",
422
"namespace": "default"
423
},
424
"spec": {
425
# Missing required selector - this will fail
426
"replicas": 1,
427
"template": {
428
"metadata": {
429
"labels": {"app": "test"}
430
},
431
"spec": {
432
"containers": [{
433
"name": "nginx",
434
"image": "nginx:latest"
435
}]
436
}
437
}
438
}
439
}
440
]
441
442
try:
443
created = await utils.create_from_dict(
444
k8s_client=api_client,
445
data=problematic_resources,
446
verbose=True
447
)
448
449
print(f"Created {len(created)} resources successfully")
450
451
except utils.FailToCreateError as e:
452
print("Some resources failed to create:")
453
454
# Analyze failures
455
successful_resources = []
456
failed_resources = []
457
458
for i, exception in enumerate(e.api_exceptions):
459
if exception is None:
460
successful_resources.append(i)
461
print(f" ✓ Resource {i} created successfully")
462
else:
463
failed_resources.append((i, exception))
464
print(f" ✗ Resource {i} failed: {exception}")
465
466
print(f"\nSummary: {len(successful_resources)} succeeded, {len(failed_resources)} failed")
467
468
# Attempt to clean up successfully created resources if needed
469
if successful_resources:
470
print("Successfully created resources are still in the cluster")
471
472
finally:
473
await api_client.close()
474
475
asyncio.run(error_handling_example())
476
```
477
478
### Advanced Usage with Custom Parameters
479
480
```python
481
async def advanced_usage():
482
await config.load_config()
483
api_client = client.ApiClient()
484
485
deployment_config = {
486
"apiVersion": "apps/v1",
487
"kind": "Deployment",
488
"metadata": {
489
"name": "advanced-app",
490
"namespace": "production"
491
},
492
"spec": {
493
"replicas": 5,
494
"selector": {
495
"matchLabels": {"app": "advanced-app"}
496
},
497
"template": {
498
"metadata": {
499
"labels": {"app": "advanced-app"}
500
},
501
"spec": {
502
"containers": [{
503
"name": "app",
504
"image": "myapp:v2.0.0",
505
"resources": {
506
"requests": {
507
"memory": "256Mi",
508
"cpu": "200m"
509
},
510
"limits": {
511
"memory": "512Mi",
512
"cpu": "500m"
513
}
514
}
515
}]
516
}
517
}
518
}
519
}
520
521
try:
522
# Create with custom field manager for server-side apply tracking
523
created = await utils.create_from_dict(
524
k8s_client=api_client,
525
data=deployment_config,
526
namespace="production",
527
field_manager="my-deployment-tool",
528
verbose=True
529
)
530
531
deployment = created[0]
532
print(f"Created deployment: {deployment.metadata.name}")
533
print(f" Replicas: {deployment.spec.replicas}")
534
print(f" Image: {deployment.spec.template.spec.containers[0].image}")
535
536
except utils.FailToCreateError as e:
537
print(f"Deployment failed: {e}")
538
finally:
539
await api_client.close()
540
541
asyncio.run(advanced_usage())
542
```