Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations
—
Runtime API discovery and interaction with any Kubernetes resource, including custom resources defined by Custom Resource Definitions (CRDs). The dynamic client enables working with resources not known at compile time and provides a flexible interface for any Kubernetes API.
The main entry point for dynamic Kubernetes API interactions with automatic resource discovery.
class DynamicClient:
def __init__(self, client, cache_file=None, discoverer=None):
"""
Initialize dynamic client with API discovery.
Parameters:
- client: ApiClient, authenticated Kubernetes API client
- cache_file: str, path to cache discovered API resources (optional)
- discoverer: Discoverer, custom discovery implementation (optional)
"""
async def __aenter__(self):
"""Async context manager entry."""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit with cleanup."""
@property
def resources(self):
"""
Access to discovered resources.
Returns:
- ResourceRegistry: Registry of all discovered API resources
"""
@property
def version(self):
"""
Kubernetes API server version information.
Returns:
- dict: Version details including major, minor, git version, etc.
"""Interface for accessing and manipulating any Kubernetes resource type dynamically.
class Resource:
def __init__(self, api_version, kind, client, **kwargs):
"""
Represents a Kubernetes API resource type.
Parameters:
- api_version: str, API version (e.g., "v1", "apps/v1")
- kind: str, resource kind (e.g., "Pod", "Deployment")
- client: DynamicClient, parent dynamic client
"""
def get(self, name=None, namespace=None, label_selector=None, field_selector=None, **kwargs):
"""
Get resource(s) by name or selectors.
Parameters:
- name: str, specific resource name (for single resource)
- namespace: str, namespace to search (for namespaced resources)
- label_selector: str, label selector query
- field_selector: str, field selector query
- **kwargs: Additional query parameters
Returns:
- ResourceInstance or ResourceList: Single resource or list of resources
"""
def create(self, body=None, namespace=None, **kwargs):
"""
Create a new resource.
Parameters:
- body: dict, resource definition
- namespace: str, target namespace (for namespaced resources)
- **kwargs: Additional creation parameters
Returns:
- ResourceInstance: Created resource
"""
def patch(self, body=None, name=None, namespace=None, **kwargs):
"""
Partially update a resource.
Parameters:
- body: dict, patch data
- name: str, resource name to patch
- namespace: str, resource namespace (for namespaced resources)
- **kwargs: Additional patch parameters
Returns:
- ResourceInstance: Updated resource
"""
def replace(self, body=None, name=None, namespace=None, **kwargs):
"""
Replace entire resource.
Parameters:
- body: dict, complete resource definition
- name: str, resource name to replace
- namespace: str, resource namespace (for namespaced resources)
- **kwargs: Additional replace parameters
Returns:
- ResourceInstance: Replaced resource
"""
def delete(self, name=None, namespace=None, label_selector=None, field_selector=None, **kwargs):
"""
Delete resource(s).
Parameters:
- name: str, specific resource name
- namespace: str, resource namespace (for namespaced resources)
- label_selector: str, label selector for bulk deletion
- field_selector: str, field selector for bulk deletion
- **kwargs: Additional deletion parameters
Returns:
- ResourceInstance or ResourceList: Deleted resource(s) or status
"""
def watch(self, namespace=None, name=None, label_selector=None, field_selector=None,
resource_version=None, timeout_seconds=None, **kwargs):
"""
Watch for resource changes.
Parameters:
- namespace: str, namespace to watch (for namespaced resources)
- name: str, specific resource name to watch
- label_selector: str, label selector for filtering
- field_selector: str, field selector for filtering
- resource_version: str, resource version to start watching from
- timeout_seconds: int, watch timeout
- **kwargs: Additional watch parameters
Returns:
- generator: Stream of watch events
"""
@property
def subresources(self):
"""
Access to resource subresources (scale, status, etc.).
Returns:
- dict: Mapping of subresource names to Subresource objects
"""Wrappers for individual Kubernetes resource instances with convenient access patterns.
class ResourceInstance:
def __init__(self, resource, instance):
"""
Wrapper for individual Kubernetes resource.
Parameters:
- resource: Resource, parent resource type
- instance: dict, resource data from API
"""
@property
def metadata(self):
"""Resource metadata (name, namespace, labels, annotations, etc.)."""
@property
def spec(self):
"""Resource specification."""
@property
def status(self):
"""Resource status."""
def to_dict(self):
"""Convert to dictionary representation."""
def patch(self, body=None, **kwargs):
"""Patch this specific resource instance."""
def replace(self, body=None, **kwargs):
"""Replace this specific resource instance."""
def delete(self, **kwargs):
"""Delete this specific resource instance."""
class ResourceList:
def __init__(self, resource, items):
"""
List of Kubernetes resources.
Parameters:
- resource: Resource, parent resource type
- items: list, resource instances
"""
@property
def items(self):
"""List of ResourceInstance objects."""
@property
def metadata(self):
"""List metadata (resource version, continue token, etc.)."""
def to_dict(self):
"""Convert entire list to dictionary representation."""Access to Kubernetes subresources like scale, status, and custom subresources.
class Subresource:
def __init__(self, parent_resource, name):
"""
Kubernetes resource subresource.
Parameters:
- parent_resource: Resource, parent resource type
- name: str, subresource name (e.g., "scale", "status")
"""
def get(self, name, namespace=None, **kwargs):
"""Get subresource for specific parent resource."""
def patch(self, name, body, namespace=None, **kwargs):
"""Patch subresource for specific parent resource."""
def replace(self, name, body, namespace=None, **kwargs):
"""Replace subresource for specific parent resource."""API discovery implementations for loading Kubernetes API resources at runtime.
class EagerDiscoverer:
def __init__(self, client, cache_file=None):
"""
Pre-loads all API resource information at initialization.
Parameters:
- client: ApiClient, authenticated Kubernetes client
- cache_file: str, path to cache file for discovered resources
"""
def get_resources_for_api_version(self, prefix, group_version):
"""Get all resources for specific API version."""
class LazyDiscoverer:
def __init__(self, client, cache_file=None):
"""
Loads API resources on-demand as they are accessed (default).
Parameters:
- client: ApiClient, authenticated Kubernetes client
- cache_file: str, path to cache file for discovered resources
"""
def get_resources_for_api_version(self, prefix, group_version):
"""Get resources for API version, loading if not cached."""Dynamic client specific exceptions for error handling.
class KubernetesValidateMissing(Exception):
"""Raised when kubernetes-validate library is not available for client-side validation."""
def api_exception(e):
"""
Wrapper function that converts API exceptions to appropriate types.
Parameters:
- e: Exception, original API exception
Returns:
- Exception: Appropriately typed exception
"""import asyncio
from kubernetes_asyncio import client, config, dynamic
async def basic_dynamic():
await config.load_config()
api_client = client.ApiClient()
async with dynamic.DynamicClient(api_client) as dyn_client:
# Access any Kubernetes resource
pods = dyn_client.resources.get(api_version="v1", kind="Pod")
# List all pods across all namespaces
pod_list = pods.get()
print(f"Found {len(pod_list.items)} pods")
# List pods in specific namespace
default_pods = pods.get(namespace="default")
for pod in default_pods.items:
print(f"Pod: {pod.metadata.name} in {pod.metadata.namespace}")
asyncio.run(basic_dynamic())async def custom_resources():
await config.load_config()
api_client = client.ApiClient()
async with dynamic.DynamicClient(api_client) as dyn_client:
# Access custom resource (assuming CRD exists)
custom_resource = dyn_client.resources.get(
api_version="example.com/v1",
kind="MyCustomResource"
)
# Create custom resource
custom_obj = {
"apiVersion": "example.com/v1",
"kind": "MyCustomResource",
"metadata": {
"name": "my-custom-object",
"namespace": "default"
},
"spec": {
"replicas": 3,
"template": "nginx"
}
}
created = custom_resource.create(body=custom_obj, namespace="default")
print(f"Created custom resource: {created.metadata.name}")
# List custom resources
custom_list = custom_resource.get(namespace="default")
for item in custom_list.items:
print(f"Custom resource: {item.metadata.name}")
asyncio.run(custom_resources())async def explore_resources():
await config.load_config()
api_client = client.ApiClient()
async with dynamic.DynamicClient(api_client) as dyn_client:
# Explore available API resources
print(f"Kubernetes version: {dyn_client.version}")
# Get all available resources
all_resources = dyn_client.resources.search()
print("Available API resources:")
for resource in all_resources:
print(f" {resource.api_version}/{resource.kind} - {resource.name}")
# Find resources by kind
deployment_resources = dyn_client.resources.search(kind="Deployment")
for res in deployment_resources:
print(f"Deployment resource: {res.api_version}/{res.kind}")
asyncio.run(explore_resources())async def watch_resources():
await config.load_config()
api_client = client.ApiClient()
async with dynamic.DynamicClient(api_client) as dyn_client:
pods = dyn_client.resources.get(api_version="v1", kind="Pod")
# Watch for pod changes in default namespace
print("Watching for pod changes...")
async for event in pods.watch(namespace="default", timeout_seconds=300):
event_type = event['type'] # ADDED, MODIFIED, DELETED
pod = event['object']
print(f"{event_type}: Pod {pod.metadata.name} - {pod.status.phase}")
asyncio.run(watch_resources())async def scale_deployment():
await config.load_config()
api_client = client.ApiClient()
async with dynamic.DynamicClient(api_client) as dyn_client:
deployments = dyn_client.resources.get(api_version="apps/v1", kind="Deployment")
# Get current deployment
deployment = deployments.get(name="nginx-deployment", namespace="default")
print(f"Current replicas: {deployment.spec.replicas}")
# Scale using the scale subresource
scale_resource = deployments.subresources["scale"]
current_scale = scale_resource.get(name="nginx-deployment", namespace="default")
# Update replica count
new_scale = {
"apiVersion": "autoscaling/v1",
"kind": "Scale",
"metadata": current_scale.metadata.to_dict(),
"spec": {"replicas": 5}
}
updated_scale = scale_resource.replace(
name="nginx-deployment",
namespace="default",
body=new_scale
)
print(f"Scaled to {updated_scale.spec.replicas} replicas")
asyncio.run(scale_deployment())async def bulk_operations():
await config.load_config()
api_client = client.ApiClient()
async with dynamic.DynamicClient(api_client) as dyn_client:
pods = dyn_client.resources.get(api_version="v1", kind="Pod")
# List pods with specific labels
labeled_pods = pods.get(
namespace="default",
label_selector="app=nginx,env=production"
)
print(f"Found {len(labeled_pods.items)} nginx production pods")
# Patch all matching pods
for pod in labeled_pods.items:
patch_body = {
"metadata": {
"annotations": {
"last-updated": "2024-01-15T10:00:00Z"
}
}
}
patched = pods.patch(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
body=patch_body
)
print(f"Patched pod: {patched.metadata.name}")
asyncio.run(bulk_operations())Install with Tessl CLI
npx tessl i tessl/pypi-kubernetes-asyncio