Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations
—
YAML processing utilities for creating and applying Kubernetes resources from YAML files or dictionaries with full async support. Provides convenient functions for deploying applications and managing resources declaratively.
Core functions for creating Kubernetes resources from YAML definitions.
async def create_from_yaml(k8s_client, yaml_file, verbose=False, namespace="default", **kwargs):
"""
Create Kubernetes resources from YAML file.
Reads YAML file containing one or more Kubernetes resource definitions
and creates them in the cluster using the appropriate API calls.
Parameters:
- k8s_client: ApiClient, authenticated Kubernetes API client
- yaml_file: str, path to YAML file containing resource definitions
- verbose: bool, enable verbose output showing creation details
- namespace: str, default namespace for resources without namespace specified
- dry_run: str, dry run mode ('All', 'Server', or None)
- field_manager: str, field manager name for server-side apply
- **kwargs: additional parameters passed to create operations
Returns:
- list: Created resource objects
Raises:
- FailToCreateError: When resource creation fails
- FileNotFoundError: When YAML file doesn't exist
- yaml.YAMLError: When YAML parsing fails
"""
async def create_from_dict(k8s_client, data, verbose=False, namespace="default", **kwargs):
"""
Create Kubernetes resources from dictionary or list of dictionaries.
Takes Python dictionary representation(s) of Kubernetes resources
and creates them in the cluster.
Parameters:
- k8s_client: ApiClient, authenticated Kubernetes API client
- data: dict or list, resource definition(s) as dictionaries
- verbose: bool, enable verbose output showing creation details
- namespace: str, default namespace for resources without namespace specified
- dry_run: str, dry run mode ('All', 'Server', or None)
- field_manager: str, field manager name for server-side apply
- **kwargs: additional parameters passed to create operations
Returns:
- list: Created resource objects
Raises:
- FailToCreateError: When resource creation fails
- ValueError: When data format is invalid
"""
async def create_from_yaml_single_item(k8s_client, yaml_object, verbose=False,
namespace="default", **kwargs):
"""
Create single Kubernetes resource from YAML object.
Helper function for creating individual resources, used internally
by create_from_yaml and create_from_dict.
Parameters:
- k8s_client: ApiClient, authenticated Kubernetes API client
- yaml_object: dict, single resource definition
- verbose: bool, enable verbose output
- namespace: str, default namespace if not specified in resource
- **kwargs: additional parameters passed to create operation
Returns:
- object: Created resource object
Raises:
- FailToCreateError: When resource creation fails
"""Specialized exceptions for YAML processing and resource creation errors.
class FailToCreateError(Exception):
"""
Exception raised when Kubernetes resource creation fails.
Provides detailed information about which resources failed
and the reasons for failure.
"""
def __init__(self, api_exceptions):
"""
Initialize with list of API exceptions from failed operations.
Parameters:
- api_exceptions: list, API exceptions from failed create operations
"""
self.api_exceptions = api_exceptions
@property
def message(self):
"""Formatted error message describing all failures."""
def __str__(self):
"""String representation of the error."""import asyncio
from kubernetes_asyncio import client, config, utils
async def apply_yaml_file():
await config.load_config()
api_client = client.ApiClient()
try:
# Apply resources from YAML file
created_resources = await utils.create_from_yaml(
k8s_client=api_client,
yaml_file="deployment.yaml",
namespace="default",
verbose=True
)
print(f"Successfully created {len(created_resources)} resources:")
for resource in created_resources:
print(f" - {resource.kind}: {resource.metadata.name}")
except utils.FailToCreateError as e:
print(f"Failed to create resources: {e}")
for api_exception in e.api_exceptions:
print(f" - {api_exception}")
except FileNotFoundError:
print("YAML file not found")
finally:
await api_client.close()
asyncio.run(apply_yaml_file())# Example: multi-document.yaml
"""
apiVersion: v1
kind: Namespace
metadata:
name: myapp
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: myapp-deployment
namespace: myapp
spec:
replicas: 3
selector:
matchLabels:
app: myapp
template:
metadata:
labels:
app: myapp
spec:
containers:
- name: myapp
image: nginx:latest
ports:
- containerPort: 80
---
apiVersion: v1
kind: Service
metadata:
name: myapp-service
namespace: myapp
spec:
selector:
app: myapp
ports:
- port: 80
targetPort: 80
type: ClusterIP
"""
async def apply_multi_document():
await config.load_config()
api_client = client.ApiClient()
try:
# Apply multi-document YAML file
resources = await utils.create_from_yaml(
k8s_client=api_client,
yaml_file="multi-document.yaml",
verbose=True
)
print("Created resources:")
for resource in resources:
namespace = getattr(resource.metadata, 'namespace', 'cluster-wide')
print(f" - {resource.kind}/{resource.metadata.name} in {namespace}")
except utils.FailToCreateError as e:
print("Some resources failed to create:")
for exception in e.api_exceptions:
print(f" - Error: {exception}")
finally:
await api_client.close()
asyncio.run(apply_multi_document())async def create_from_dict_example():
await config.load_config()
api_client = client.ApiClient()
# Define resources as Python dictionaries
resources = [
{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"name": "app-config",
"namespace": "default"
},
"data": {
"database_url": "postgresql://localhost:5432/mydb",
"debug": "true"
}
},
{
"apiVersion": "v1",
"kind": "Secret",
"metadata": {
"name": "app-secrets",
"namespace": "default"
},
"type": "Opaque",
"data": {
"password": "cGFzc3dvcmQ=", # base64 encoded "password"
"api_key": "YWJjZGVmZw==" # base64 encoded "abcdefg"
}
},
{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": "myapp",
"namespace": "default"
},
"spec": {
"replicas": 2,
"selector": {
"matchLabels": {"app": "myapp"}
},
"template": {
"metadata": {
"labels": {"app": "myapp"}
},
"spec": {
"containers": [{
"name": "myapp",
"image": "myapp:latest",
"env": [
{
"name": "DATABASE_URL",
"valueFrom": {
"configMapKeyRef": {
"name": "app-config",
"key": "database_url"
}
}
},
{
"name": "API_KEY",
"valueFrom": {
"secretKeyRef": {
"name": "app-secrets",
"key": "api_key"
}
}
}
]
}]
}
}
}
}
]
try:
created = await utils.create_from_dict(
k8s_client=api_client,
data=resources,
namespace="default",
verbose=True
)
print(f"Successfully created {len(created)} resources")
except utils.FailToCreateError as e:
print(f"Creation failed: {e}")
finally:
await api_client.close()
asyncio.run(create_from_dict_example())async def dry_run_example():
await config.load_config()
api_client = client.ApiClient()
deployment_yaml = """
apiVersion: apps/v1
kind: Deployment
metadata:
name: test-deployment
namespace: default
spec:
replicas: 3
selector:
matchLabels:
app: test
template:
metadata:
labels:
app: test
spec:
containers:
- name: test
image: nginx:latest
"""
# Save to temporary file
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as f:
f.write(deployment_yaml)
temp_file = f.name
try:
# Perform dry run to validate without creating
resources = await utils.create_from_yaml(
k8s_client=api_client,
yaml_file=temp_file,
dry_run="All", # Client-side dry run
verbose=True
)
print("Dry run successful - resources would be created:")
for resource in resources:
print(f" - {resource.kind}: {resource.metadata.name}")
except utils.FailToCreateError as e:
print("Dry run failed - issues found:")
for exception in e.api_exceptions:
print(f" - {exception}")
finally:
import os
os.unlink(temp_file)
await api_client.close()
asyncio.run(dry_run_example())async def error_handling_example():
await config.load_config()
api_client = client.ApiClient()
# YAML with intentional errors for demonstration
problematic_resources = [
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "valid-pod",
"namespace": "default"
},
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx:latest"
}]
}
},
{
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "invalid-pod",
"namespace": "nonexistent-namespace" # This will fail
},
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx:latest"
}]
}
},
{
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": "invalid-deployment",
"namespace": "default"
},
"spec": {
# Missing required selector - this will fail
"replicas": 1,
"template": {
"metadata": {
"labels": {"app": "test"}
},
"spec": {
"containers": [{
"name": "nginx",
"image": "nginx:latest"
}]
}
}
}
}
]
try:
created = await utils.create_from_dict(
k8s_client=api_client,
data=problematic_resources,
verbose=True
)
print(f"Created {len(created)} resources successfully")
except utils.FailToCreateError as e:
print("Some resources failed to create:")
# Analyze failures
successful_resources = []
failed_resources = []
for i, exception in enumerate(e.api_exceptions):
if exception is None:
successful_resources.append(i)
print(f" ✓ Resource {i} created successfully")
else:
failed_resources.append((i, exception))
print(f" ✗ Resource {i} failed: {exception}")
print(f"\nSummary: {len(successful_resources)} succeeded, {len(failed_resources)} failed")
# Attempt to clean up successfully created resources if needed
if successful_resources:
print("Successfully created resources are still in the cluster")
finally:
await api_client.close()
asyncio.run(error_handling_example())async def advanced_usage():
await config.load_config()
api_client = client.ApiClient()
deployment_config = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {
"name": "advanced-app",
"namespace": "production"
},
"spec": {
"replicas": 5,
"selector": {
"matchLabels": {"app": "advanced-app"}
},
"template": {
"metadata": {
"labels": {"app": "advanced-app"}
},
"spec": {
"containers": [{
"name": "app",
"image": "myapp:v2.0.0",
"resources": {
"requests": {
"memory": "256Mi",
"cpu": "200m"
},
"limits": {
"memory": "512Mi",
"cpu": "500m"
}
}
}]
}
}
}
}
try:
# Create with custom field manager for server-side apply tracking
created = await utils.create_from_dict(
k8s_client=api_client,
data=deployment_config,
namespace="production",
field_manager="my-deployment-tool",
verbose=True
)
deployment = created[0]
print(f"Created deployment: {deployment.metadata.name}")
print(f" Replicas: {deployment.spec.replicas}")
print(f" Image: {deployment.spec.template.spec.containers[0].image}")
except utils.FailToCreateError as e:
print(f"Deployment failed: {e}")
finally:
await api_client.close()
asyncio.run(advanced_usage())Install with Tessl CLI
npx tessl i tessl/pypi-kubernetes-asyncio