Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations. This package enables non-blocking interactions with Kubernetes clusters from Python applications, built using the same OpenAPI generator approach as the official Kubernetes Python client but with full asynchronous support.
pip install kubernetes_asyncioimport kubernetes_asyncioCommon imports for basic usage:
from kubernetes_asyncio import client, configDynamic client import:
from kubernetes_asyncio import dynamicStreaming and watch functionality:
from kubernetes_asyncio import stream, watchUtility functions:
from kubernetes_asyncio import utilsLeader election:
from kubernetes_asyncio.leaderelection import leaderelection, electionconfig
from kubernetes_asyncio.leaderelection.resourcelock import leaselock, configmaplockimport asyncio
from kubernetes_asyncio import client, config
async def main():
# Load configuration (tries kubeconfig, then in-cluster)
await config.load_config()
# Use context manager to ensure proper cleanup
async with client.ApiClient() as api:
v1 = client.CoreV1Api(api)
# List all pods in default namespace
pod_list = await v1.list_namespaced_pod(namespace="default")
for pod in pod_list.items:
print(f"Pod: {pod.metadata.name}")
# Create a simple pod
pod_manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"name": "test-pod"},
"spec": {
"containers": [{
"name": "test-container",
"image": "nginx:latest",
"ports": [{"containerPort": 80}]
}]
}
}
pod = client.V1Pod(**pod_manifest)
await v1.create_namespaced_pod(namespace="default", body=pod)
print("Pod created successfully")
# Run the async function
asyncio.run(main())The kubernetes_asyncio library follows a modular architecture that mirrors the official Kubernetes Python client:
All operations use Python's async/await pattern with proper resource cleanup through context managers.
Core Kubernetes API access through 64 auto-generated API classes covering all Kubernetes API groups and versions. Includes comprehensive model classes for all resource types with full async/await support.
class CoreV1Api:
async def list_pod_for_all_namespaces(**kwargs): ...
async def list_namespaced_pod(namespace: str, **kwargs): ...
async def create_namespaced_pod(namespace: str, body: V1Pod, **kwargs): ...
async def read_namespaced_pod(name: str, namespace: str, **kwargs): ...
async def patch_namespaced_pod(name: str, namespace: str, body: object, **kwargs): ...
async def delete_namespaced_pod(name: str, namespace: str, **kwargs): ...
class AppsV1Api:
async def list_namespaced_deployment(namespace: str, **kwargs): ...
async def create_namespaced_deployment(namespace: str, body: V1Deployment, **kwargs): ...
async def read_namespaced_deployment(name: str, namespace: str, **kwargs): ...
async def patch_namespaced_deployment(name: str, namespace: str, body: object, **kwargs): ...
async def delete_namespaced_deployment(name: str, namespace: str, **kwargs): ...
class ApiClient:
async def __aenter__(self): ...
async def __aexit__(self, exc_type, exc_val, exc_tb): ...
async def close(self): ...Multiple methods for loading Kubernetes cluster credentials and configuration, supporting kubeconfig files, in-cluster authentication, and various credential providers.
async def load_config(**kwargs):
"""
Load Kubernetes configuration from kubeconfig file or in-cluster.
Parameters:
- config_file: str, path to kubeconfig file
- context: str, kubeconfig context to use
- client_configuration: Configuration object to populate
- persist_config: bool, whether to persist config globally
"""
def load_incluster_config(**kwargs):
"""Load in-cluster configuration from service account."""
async def load_kube_config(config_file=None, context=None, **kwargs):
"""Load configuration from kubeconfig file."""
def new_client_from_config(**kwargs) -> ApiClient:
"""Create new ApiClient from configuration."""Runtime API discovery and interaction with any Kubernetes resource, including custom resources. Enables working with resources not known at compile time.
class DynamicClient:
def __init__(self, client: ApiClient, cache_file=None, discoverer=None): ...
async def __aenter__(self): ...
async def __aexit__(self, exc_type, exc_val, exc_tb): ...
@property
def resources(self): ...
class Resource:
def get(self, name=None, namespace=None, **kwargs): ...
def create(self, body=None, namespace=None, **kwargs): ...
def patch(self, body=None, name=None, namespace=None, **kwargs): ...
def delete(self, name=None, namespace=None, **kwargs): ...WebSocket-based streaming for exec sessions, port forwarding, and attach operations. Provides real-time bidirectional communication with containers.
class WsApiClient(ApiClient):
def __init__(self, configuration=None, header_name=None, header_value=None,
cookie=None, pool_threads=1, heartbeat=None): ...
async def request(self, method, url, **kwargs): ...
# Channel constants
STDIN_CHANNEL: int = 0
STDOUT_CHANNEL: int = 1
STDERR_CHANNEL: int = 2
ERROR_CHANNEL: int = 3
RESIZE_CHANNEL: int = 4Event streaming for monitoring resource changes in real-time. Efficiently tracks additions, modifications, deletions, and errors for any Kubernetes resource.
class Watch:
def __init__(self, return_type=None): ...
def stop(self): ...
async def stream(self, func, *args, **kwargs):
"""
Stream events from a Kubernetes API list operation.
Parameters:
- func: API function that supports watch parameter
- *args, **kwargs: Arguments passed to the API function
Yields:
- dict: Event with 'type' ('ADDED', 'MODIFIED', 'DELETED', 'ERROR') and 'object'
"""YAML processing utilities for creating and applying Kubernetes resources from YAML files or dictionaries with full async support.
async def create_from_yaml(k8s_client: ApiClient, yaml_file: str,
verbose: bool = False, namespace: str = "default", **kwargs):
"""
Create Kubernetes resources from YAML file.
Parameters:
- k8s_client: Kubernetes API client
- yaml_file: Path to YAML file containing resource definitions
- verbose: Enable verbose output
- namespace: Default namespace for resources
- **kwargs: Additional arguments passed to create operations
"""
async def create_from_dict(k8s_client: ApiClient, data: dict,
verbose: bool = False, namespace: str = "default", **kwargs):
"""Create Kubernetes resources from dictionary."""
class FailToCreateError(Exception):
"""Exception raised when resource creation fails."""Distributed leader election for high availability applications using Kubernetes native resources (ConfigMaps or Leases) for coordination.
class LeaderElection:
def __init__(self, election_config: ElectionConfig): ...
async def run(self): ...
async def acquire(self): ...
async def renew_loop(self): ...
class ElectionConfig:
def __init__(self, lock, lease_duration, renew_deadline, retry_period,
onstarted_leading=None, onstopped_leading=None): ...
class LeaseLock:
def __init__(self, name: str, namespace: str, identity: str): ...
class ConfigMapLock:
def __init__(self, name: str, namespace: str, identity: str): ...The library provides comprehensive exception handling with specific exception types for different error conditions:
class ApiException(Exception):
"""General API errors with HTTP status, reason, and response body."""
def __init__(self, status=None, reason=None, http_resp=None): ...
class ApiTypeError(TypeError):
"""Type validation errors."""
class ApiValueError(ValueError):
"""Value validation errors."""
class ConfigException(Exception):
"""Configuration-related errors."""
class FailToCreateError(Exception):
"""Resource creation failures."""Common error handling pattern:
try:
result = await v1.create_namespaced_pod(namespace="default", body=pod)
except client.ApiException as e:
if e.status == 409:
print("Pod already exists")
elif e.status == 403:
print("Access denied")
else:
print(f"API error: {e.status} {e.reason}")
except Exception as e:
print(f"Unexpected error: {e}")