Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations
npx @tessl/cli install tessl/pypi-kubernetes-asyncio@33.3.0Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations. This package enables non-blocking interactions with Kubernetes clusters from Python applications, built using the same OpenAPI generator approach as the official Kubernetes Python client but with full asynchronous support.
pip install kubernetes_asyncioimport kubernetes_asyncioCommon imports for basic usage:
from kubernetes_asyncio import client, configDynamic client import:
from kubernetes_asyncio import dynamicStreaming and watch functionality:
from kubernetes_asyncio import stream, watchUtility functions:
from kubernetes_asyncio import utilsLeader election:
from kubernetes_asyncio.leaderelection import leaderelection, electionconfig
from kubernetes_asyncio.leaderelection.resourcelock import leaselock, configmaplockimport asyncio
from kubernetes_asyncio import client, config
async def main():
# Load configuration (tries kubeconfig, then in-cluster)
await config.load_config()
# Use context manager to ensure proper cleanup
async with client.ApiClient() as api:
v1 = client.CoreV1Api(api)
# List all pods in default namespace
pod_list = await v1.list_namespaced_pod(namespace="default")
for pod in pod_list.items:
print(f"Pod: {pod.metadata.name}")
# Create a simple pod
pod_manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"name": "test-pod"},
"spec": {
"containers": [{
"name": "test-container",
"image": "nginx:latest",
"ports": [{"containerPort": 80}]
}]
}
}
pod = client.V1Pod(**pod_manifest)
await v1.create_namespaced_pod(namespace="default", body=pod)
print("Pod created successfully")
# Run the async function
asyncio.run(main())The kubernetes_asyncio library follows a modular architecture that mirrors the official Kubernetes Python client:
All operations use Python's async/await pattern with proper resource cleanup through context managers.
Core Kubernetes API access through 64 auto-generated API classes covering all Kubernetes API groups and versions. Includes comprehensive model classes for all resource types with full async/await support.
class CoreV1Api:
async def list_pod_for_all_namespaces(**kwargs): ...
async def list_namespaced_pod(namespace: str, **kwargs): ...
async def create_namespaced_pod(namespace: str, body: V1Pod, **kwargs): ...
async def read_namespaced_pod(name: str, namespace: str, **kwargs): ...
async def patch_namespaced_pod(name: str, namespace: str, body: object, **kwargs): ...
async def delete_namespaced_pod(name: str, namespace: str, **kwargs): ...
class AppsV1Api:
async def list_namespaced_deployment(namespace: str, **kwargs): ...
async def create_namespaced_deployment(namespace: str, body: V1Deployment, **kwargs): ...
async def read_namespaced_deployment(name: str, namespace: str, **kwargs): ...
async def patch_namespaced_deployment(name: str, namespace: str, body: object, **kwargs): ...
async def delete_namespaced_deployment(name: str, namespace: str, **kwargs): ...
class ApiClient:
async def __aenter__(self): ...
async def __aexit__(self, exc_type, exc_val, exc_tb): ...
async def close(self): ...Multiple methods for loading Kubernetes cluster credentials and configuration, supporting kubeconfig files, in-cluster authentication, and various credential providers.
async def load_config(**kwargs):
"""
Load Kubernetes configuration from kubeconfig file or in-cluster.
Parameters:
- config_file: str, path to kubeconfig file
- context: str, kubeconfig context to use
- client_configuration: Configuration object to populate
- persist_config: bool, whether to persist config globally
"""
def load_incluster_config(**kwargs):
"""Load in-cluster configuration from service account."""
async def load_kube_config(config_file=None, context=None, **kwargs):
"""Load configuration from kubeconfig file."""
def new_client_from_config(**kwargs) -> ApiClient:
"""Create new ApiClient from configuration."""Runtime API discovery and interaction with any Kubernetes resource, including custom resources. Enables working with resources not known at compile time.
class DynamicClient:
def __init__(self, client: ApiClient, cache_file=None, discoverer=None): ...
async def __aenter__(self): ...
async def __aexit__(self, exc_type, exc_val, exc_tb): ...
@property
def resources(self): ...
class Resource:
def get(self, name=None, namespace=None, **kwargs): ...
def create(self, body=None, namespace=None, **kwargs): ...
def patch(self, body=None, name=None, namespace=None, **kwargs): ...
def delete(self, name=None, namespace=None, **kwargs): ...WebSocket-based streaming for exec sessions, port forwarding, and attach operations. Provides real-time bidirectional communication with containers.
class WsApiClient(ApiClient):
def __init__(self, configuration=None, header_name=None, header_value=None,
cookie=None, pool_threads=1, heartbeat=None): ...
async def request(self, method, url, **kwargs): ...
# Channel constants
STDIN_CHANNEL: int = 0
STDOUT_CHANNEL: int = 1
STDERR_CHANNEL: int = 2
ERROR_CHANNEL: int = 3
RESIZE_CHANNEL: int = 4Event streaming for monitoring resource changes in real-time. Efficiently tracks additions, modifications, deletions, and errors for any Kubernetes resource.
class Watch:
def __init__(self, return_type=None): ...
def stop(self): ...
async def stream(self, func, *args, **kwargs):
"""
Stream events from a Kubernetes API list operation.
Parameters:
- func: API function that supports watch parameter
- *args, **kwargs: Arguments passed to the API function
Yields:
- dict: Event with 'type' ('ADDED', 'MODIFIED', 'DELETED', 'ERROR') and 'object'
"""YAML processing utilities for creating and applying Kubernetes resources from YAML files or dictionaries with full async support.
async def create_from_yaml(k8s_client: ApiClient, yaml_file: str,
verbose: bool = False, namespace: str = "default", **kwargs):
"""
Create Kubernetes resources from YAML file.
Parameters:
- k8s_client: Kubernetes API client
- yaml_file: Path to YAML file containing resource definitions
- verbose: Enable verbose output
- namespace: Default namespace for resources
- **kwargs: Additional arguments passed to create operations
"""
async def create_from_dict(k8s_client: ApiClient, data: dict,
verbose: bool = False, namespace: str = "default", **kwargs):
"""Create Kubernetes resources from dictionary."""
class FailToCreateError(Exception):
"""Exception raised when resource creation fails."""Distributed leader election for high availability applications using Kubernetes native resources (ConfigMaps or Leases) for coordination.
class LeaderElection:
def __init__(self, election_config: ElectionConfig): ...
async def run(self): ...
async def acquire(self): ...
async def renew_loop(self): ...
class ElectionConfig:
def __init__(self, lock, lease_duration, renew_deadline, retry_period,
onstarted_leading=None, onstopped_leading=None): ...
class LeaseLock:
def __init__(self, name: str, namespace: str, identity: str): ...
class ConfigMapLock:
def __init__(self, name: str, namespace: str, identity: str): ...The library provides comprehensive exception handling with specific exception types for different error conditions:
class ApiException(Exception):
"""General API errors with HTTP status, reason, and response body."""
def __init__(self, status=None, reason=None, http_resp=None): ...
class ApiTypeError(TypeError):
"""Type validation errors."""
class ApiValueError(ValueError):
"""Value validation errors."""
class ConfigException(Exception):
"""Configuration-related errors."""
class FailToCreateError(Exception):
"""Resource creation failures."""Common error handling pattern:
try:
result = await v1.create_namespaced_pod(namespace="default", body=pod)
except client.ApiException as e:
if e.status == 409:
print("Pod already exists")
elif e.status == 403:
print("Access denied")
else:
print(f"API error: {e.status} {e.reason}")
except Exception as e:
print(f"Unexpected error: {e}")