or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

client-apis.mdconfiguration.mddynamic-client.mdindex.mdleader-election.mdstreaming.mdutils.mdwatch.md
tile.json

tessl/pypi-kubernetes-asyncio

Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/kubernetes-asyncio@33.3.x

To install, run

npx @tessl/cli install tessl/pypi-kubernetes-asyncio@33.3.0

index.mddocs/

Kubernetes Asyncio

Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations. This package enables non-blocking interactions with Kubernetes clusters from Python applications, built using the same OpenAPI generator approach as the official Kubernetes Python client but with full asynchronous support.

Package Information

  • Package Name: kubernetes_asyncio
  • Language: Python
  • Installation: pip install kubernetes_asyncio
  • OpenAPI Version: v1.33.3
  • Package Version: 33.3.0

Core Imports

import kubernetes_asyncio

Common imports for basic usage:

from kubernetes_asyncio import client, config

Dynamic client import:

from kubernetes_asyncio import dynamic

Streaming and watch functionality:

from kubernetes_asyncio import stream, watch

Utility functions:

from kubernetes_asyncio import utils

Leader election:

from kubernetes_asyncio.leaderelection import leaderelection, electionconfig
from kubernetes_asyncio.leaderelection.resourcelock import leaselock, configmaplock

Basic Usage

import asyncio
from kubernetes_asyncio import client, config

async def main():
    # Load configuration (tries kubeconfig, then in-cluster)
    await config.load_config()
    
    # Use context manager to ensure proper cleanup
    async with client.ApiClient() as api:
        v1 = client.CoreV1Api(api)
        
        # List all pods in default namespace
        pod_list = await v1.list_namespaced_pod(namespace="default")
        for pod in pod_list.items:
            print(f"Pod: {pod.metadata.name}")
        
        # Create a simple pod
        pod_manifest = {
            "apiVersion": "v1",
            "kind": "Pod",
            "metadata": {"name": "test-pod"},
            "spec": {
                "containers": [{
                    "name": "test-container",
                    "image": "nginx:latest",
                    "ports": [{"containerPort": 80}]
                }]
            }
        }
        
        pod = client.V1Pod(**pod_manifest)
        await v1.create_namespaced_pod(namespace="default", body=pod)
        print("Pod created successfully")

# Run the async function
asyncio.run(main())

Architecture

The kubernetes_asyncio library follows a modular architecture that mirrors the official Kubernetes Python client:

  • Client Module: Auto-generated API classes (64 total) corresponding to all Kubernetes API groups and versions, plus 694 model classes representing all Kubernetes resource types
  • Configuration Module: Multiple authentication methods including kubeconfig files, in-cluster configuration, exec providers, and OIDC
  • Dynamic Client: Runtime API discovery allowing interaction with any Kubernetes resource, including custom resources
  • Streaming: WebSocket support for exec sessions and port forwarding operations
  • Watch: Event streaming for monitoring resource changes in real-time
  • Utils: YAML processing utilities for applying manifests from files or dictionaries
  • Leader Election: Distributed coordination using ConfigMaps or Lease resources

All operations use Python's async/await pattern with proper resource cleanup through context managers.

Capabilities

Client APIs

Core Kubernetes API access through 64 auto-generated API classes covering all Kubernetes API groups and versions. Includes comprehensive model classes for all resource types with full async/await support.

class CoreV1Api:
    async def list_pod_for_all_namespaces(**kwargs): ...
    async def list_namespaced_pod(namespace: str, **kwargs): ...
    async def create_namespaced_pod(namespace: str, body: V1Pod, **kwargs): ...
    async def read_namespaced_pod(name: str, namespace: str, **kwargs): ...
    async def patch_namespaced_pod(name: str, namespace: str, body: object, **kwargs): ...
    async def delete_namespaced_pod(name: str, namespace: str, **kwargs): ...

class AppsV1Api:
    async def list_namespaced_deployment(namespace: str, **kwargs): ...
    async def create_namespaced_deployment(namespace: str, body: V1Deployment, **kwargs): ...
    async def read_namespaced_deployment(name: str, namespace: str, **kwargs): ...
    async def patch_namespaced_deployment(name: str, namespace: str, body: object, **kwargs): ...
    async def delete_namespaced_deployment(name: str, namespace: str, **kwargs): ...

class ApiClient:
    async def __aenter__(self): ...
    async def __aexit__(self, exc_type, exc_val, exc_tb): ...
    async def close(self): ...

Client APIs

Configuration

Multiple methods for loading Kubernetes cluster credentials and configuration, supporting kubeconfig files, in-cluster authentication, and various credential providers.

async def load_config(**kwargs):
    """
    Load Kubernetes configuration from kubeconfig file or in-cluster.
    
    Parameters:
    - config_file: str, path to kubeconfig file
    - context: str, kubeconfig context to use
    - client_configuration: Configuration object to populate
    - persist_config: bool, whether to persist config globally
    """

def load_incluster_config(**kwargs):
    """Load in-cluster configuration from service account."""

async def load_kube_config(config_file=None, context=None, **kwargs):
    """Load configuration from kubeconfig file."""

def new_client_from_config(**kwargs) -> ApiClient:
    """Create new ApiClient from configuration."""

Configuration

Dynamic Client

Runtime API discovery and interaction with any Kubernetes resource, including custom resources. Enables working with resources not known at compile time.

class DynamicClient:
    def __init__(self, client: ApiClient, cache_file=None, discoverer=None): ...
    async def __aenter__(self): ...
    async def __aexit__(self, exc_type, exc_val, exc_tb): ...
    @property
    def resources(self): ...
    
class Resource:
    def get(self, name=None, namespace=None, **kwargs): ...
    def create(self, body=None, namespace=None, **kwargs): ...
    def patch(self, body=None, name=None, namespace=None, **kwargs): ...
    def delete(self, name=None, namespace=None, **kwargs): ...

Dynamic Client

Streaming

WebSocket-based streaming for exec sessions, port forwarding, and attach operations. Provides real-time bidirectional communication with containers.

class WsApiClient(ApiClient):
    def __init__(self, configuration=None, header_name=None, header_value=None, 
                 cookie=None, pool_threads=1, heartbeat=None): ...
    async def request(self, method, url, **kwargs): ...

# Channel constants
STDIN_CHANNEL: int = 0
STDOUT_CHANNEL: int = 1  
STDERR_CHANNEL: int = 2
ERROR_CHANNEL: int = 3
RESIZE_CHANNEL: int = 4

Streaming

Watch

Event streaming for monitoring resource changes in real-time. Efficiently tracks additions, modifications, deletions, and errors for any Kubernetes resource.

class Watch:
    def __init__(self, return_type=None): ...
    def stop(self): ...
    async def stream(self, func, *args, **kwargs):
        """
        Stream events from a Kubernetes API list operation.
        
        Parameters:
        - func: API function that supports watch parameter
        - *args, **kwargs: Arguments passed to the API function
        
        Yields:
        - dict: Event with 'type' ('ADDED', 'MODIFIED', 'DELETED', 'ERROR') and 'object'
        """

Watch

Utils

YAML processing utilities for creating and applying Kubernetes resources from YAML files or dictionaries with full async support.

async def create_from_yaml(k8s_client: ApiClient, yaml_file: str, 
                          verbose: bool = False, namespace: str = "default", **kwargs):
    """
    Create Kubernetes resources from YAML file.
    
    Parameters:
    - k8s_client: Kubernetes API client
    - yaml_file: Path to YAML file containing resource definitions
    - verbose: Enable verbose output
    - namespace: Default namespace for resources
    - **kwargs: Additional arguments passed to create operations
    """

async def create_from_dict(k8s_client: ApiClient, data: dict,
                          verbose: bool = False, namespace: str = "default", **kwargs):
    """Create Kubernetes resources from dictionary."""

class FailToCreateError(Exception):
    """Exception raised when resource creation fails."""

Utils

Leader Election

Distributed leader election for high availability applications using Kubernetes native resources (ConfigMaps or Leases) for coordination.

class LeaderElection:
    def __init__(self, election_config: ElectionConfig): ...
    async def run(self): ...
    async def acquire(self): ...
    async def renew_loop(self): ...

class ElectionConfig:
    def __init__(self, lock, lease_duration, renew_deadline, retry_period, 
                 onstarted_leading=None, onstopped_leading=None): ...

class LeaseLock:
    def __init__(self, name: str, namespace: str, identity: str): ...

class ConfigMapLock:
    def __init__(self, name: str, namespace: str, identity: str): ...

Leader Election

Error Handling

The library provides comprehensive exception handling with specific exception types for different error conditions:

class ApiException(Exception):
    """General API errors with HTTP status, reason, and response body."""
    def __init__(self, status=None, reason=None, http_resp=None): ...

class ApiTypeError(TypeError):
    """Type validation errors."""

class ApiValueError(ValueError):
    """Value validation errors."""

class ConfigException(Exception):
    """Configuration-related errors."""

class FailToCreateError(Exception):
    """Resource creation failures."""

Common error handling pattern:

try:
    result = await v1.create_namespaced_pod(namespace="default", body=pod)
except client.ApiException as e:
    if e.status == 409:
        print("Pod already exists")
    elif e.status == 403:
        print("Access denied")
    else:
        print(f"API error: {e.status} {e.reason}")
except Exception as e:
    print(f"Unexpected error: {e}")