CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kubernetes-asyncio

Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations

Pending
Overview
Eval results
Files

dynamic-client.mddocs/

Dynamic Client

Runtime API discovery and interaction with any Kubernetes resource, including custom resources defined by Custom Resource Definitions (CRDs). The dynamic client enables working with resources not known at compile time and provides a flexible interface for any Kubernetes API.

Capabilities

Dynamic Client

The main entry point for dynamic Kubernetes API interactions with automatic resource discovery.

class DynamicClient:
    def __init__(self, client, cache_file=None, discoverer=None):
        """
        Initialize dynamic client with API discovery.
        
        Parameters:
        - client: ApiClient, authenticated Kubernetes API client
        - cache_file: str, path to cache discovered API resources (optional)
        - discoverer: Discoverer, custom discovery implementation (optional)
        """
    
    async def __aenter__(self):
        """Async context manager entry."""
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Async context manager exit with cleanup."""
    
    @property
    def resources(self):
        """
        Access to discovered resources.
        
        Returns:
        - ResourceRegistry: Registry of all discovered API resources
        """
    
    @property  
    def version(self):
        """
        Kubernetes API server version information.
        
        Returns:
        - dict: Version details including major, minor, git version, etc.
        """

Resource Access

Interface for accessing and manipulating any Kubernetes resource type dynamically.

class Resource:
    def __init__(self, api_version, kind, client, **kwargs):
        """
        Represents a Kubernetes API resource type.
        
        Parameters:
        - api_version: str, API version (e.g., "v1", "apps/v1")
        - kind: str, resource kind (e.g., "Pod", "Deployment")
        - client: DynamicClient, parent dynamic client
        """
    
    def get(self, name=None, namespace=None, label_selector=None, field_selector=None, **kwargs):
        """
        Get resource(s) by name or selectors.
        
        Parameters:
        - name: str, specific resource name (for single resource)
        - namespace: str, namespace to search (for namespaced resources)
        - label_selector: str, label selector query
        - field_selector: str, field selector query
        - **kwargs: Additional query parameters
        
        Returns:
        - ResourceInstance or ResourceList: Single resource or list of resources
        """
    
    def create(self, body=None, namespace=None, **kwargs):
        """
        Create a new resource.
        
        Parameters:
        - body: dict, resource definition
        - namespace: str, target namespace (for namespaced resources)
        - **kwargs: Additional creation parameters
        
        Returns:
        - ResourceInstance: Created resource
        """
    
    def patch(self, body=None, name=None, namespace=None, **kwargs):
        """
        Partially update a resource.
        
        Parameters:
        - body: dict, patch data
        - name: str, resource name to patch
        - namespace: str, resource namespace (for namespaced resources)
        - **kwargs: Additional patch parameters
        
        Returns:
        - ResourceInstance: Updated resource
        """
    
    def replace(self, body=None, name=None, namespace=None, **kwargs):
        """
        Replace entire resource.
        
        Parameters:
        - body: dict, complete resource definition
        - name: str, resource name to replace
        - namespace: str, resource namespace (for namespaced resources)
        - **kwargs: Additional replace parameters
        
        Returns:
        - ResourceInstance: Replaced resource
        """
    
    def delete(self, name=None, namespace=None, label_selector=None, field_selector=None, **kwargs):
        """
        Delete resource(s).
        
        Parameters:
        - name: str, specific resource name
        - namespace: str, resource namespace (for namespaced resources)
        - label_selector: str, label selector for bulk deletion
        - field_selector: str, field selector for bulk deletion
        - **kwargs: Additional deletion parameters
        
        Returns:
        - ResourceInstance or ResourceList: Deleted resource(s) or status
        """
    
    def watch(self, namespace=None, name=None, label_selector=None, field_selector=None, 
              resource_version=None, timeout_seconds=None, **kwargs):
        """
        Watch for resource changes.
        
        Parameters:
        - namespace: str, namespace to watch (for namespaced resources)
        - name: str, specific resource name to watch
        - label_selector: str, label selector for filtering
        - field_selector: str, field selector for filtering
        - resource_version: str, resource version to start watching from
        - timeout_seconds: int, watch timeout
        - **kwargs: Additional watch parameters
        
        Returns:
        - generator: Stream of watch events
        """
    
    @property
    def subresources(self):
        """
        Access to resource subresources (scale, status, etc.).
        
        Returns:
        - dict: Mapping of subresource names to Subresource objects
        """

Resource Objects

Wrappers for individual Kubernetes resource instances with convenient access patterns.

class ResourceInstance:
    def __init__(self, resource, instance):
        """
        Wrapper for individual Kubernetes resource.
        
        Parameters:
        - resource: Resource, parent resource type
        - instance: dict, resource data from API
        """
    
    @property
    def metadata(self):
        """Resource metadata (name, namespace, labels, annotations, etc.)."""
    
    @property
    def spec(self):
        """Resource specification."""
    
    @property
    def status(self):
        """Resource status."""
    
    def to_dict(self):
        """Convert to dictionary representation."""
    
    def patch(self, body=None, **kwargs):
        """Patch this specific resource instance."""
    
    def replace(self, body=None, **kwargs):
        """Replace this specific resource instance."""
    
    def delete(self, **kwargs):
        """Delete this specific resource instance."""

class ResourceList:
    def __init__(self, resource, items):
        """
        List of Kubernetes resources.
        
        Parameters:
        - resource: Resource, parent resource type
        - items: list, resource instances
        """
    
    @property
    def items(self):
        """List of ResourceInstance objects."""
    
    @property
    def metadata(self):
        """List metadata (resource version, continue token, etc.)."""
    
    def to_dict(self):
        """Convert entire list to dictionary representation."""

Subresources

Access to Kubernetes subresources like scale, status, and custom subresources.

class Subresource:
    def __init__(self, parent_resource, name):
        """
        Kubernetes resource subresource.
        
        Parameters:
        - parent_resource: Resource, parent resource type
        - name: str, subresource name (e.g., "scale", "status")
        """
    
    def get(self, name, namespace=None, **kwargs):
        """Get subresource for specific parent resource."""
    
    def patch(self, name, body, namespace=None, **kwargs):
        """Patch subresource for specific parent resource."""
    
    def replace(self, name, body, namespace=None, **kwargs):
        """Replace subresource for specific parent resource."""

Discovery System

API discovery implementations for loading Kubernetes API resources at runtime.

class EagerDiscoverer:
    def __init__(self, client, cache_file=None):
        """
        Pre-loads all API resource information at initialization.
        
        Parameters:
        - client: ApiClient, authenticated Kubernetes client
        - cache_file: str, path to cache file for discovered resources
        """
    
    def get_resources_for_api_version(self, prefix, group_version):
        """Get all resources for specific API version."""

class LazyDiscoverer:
    def __init__(self, client, cache_file=None):
        """
        Loads API resources on-demand as they are accessed (default).
        
        Parameters:
        - client: ApiClient, authenticated Kubernetes client  
        - cache_file: str, path to cache file for discovered resources
        """
    
    def get_resources_for_api_version(self, prefix, group_version):
        """Get resources for API version, loading if not cached."""

Exception Classes

Dynamic client specific exceptions for error handling.

class KubernetesValidateMissing(Exception):
    """Raised when kubernetes-validate library is not available for client-side validation."""

def api_exception(e):
    """
    Wrapper function that converts API exceptions to appropriate types.
    
    Parameters:
    - e: Exception, original API exception
    
    Returns:
    - Exception: Appropriately typed exception
    """

Usage Examples

Basic Dynamic Client Usage

import asyncio
from kubernetes_asyncio import client, config, dynamic

async def basic_dynamic():
    await config.load_config()
    api_client = client.ApiClient()
    
    async with dynamic.DynamicClient(api_client) as dyn_client:
        # Access any Kubernetes resource
        pods = dyn_client.resources.get(api_version="v1", kind="Pod")
        
        # List all pods across all namespaces
        pod_list = pods.get()
        print(f"Found {len(pod_list.items)} pods")
        
        # List pods in specific namespace
        default_pods = pods.get(namespace="default")
        for pod in default_pods.items:
            print(f"Pod: {pod.metadata.name} in {pod.metadata.namespace}")

asyncio.run(basic_dynamic())

Working with Custom Resources

async def custom_resources():
    await config.load_config()
    api_client = client.ApiClient()
    
    async with dynamic.DynamicClient(api_client) as dyn_client:
        # Access custom resource (assuming CRD exists)
        custom_resource = dyn_client.resources.get(
            api_version="example.com/v1", 
            kind="MyCustomResource"
        )
        
        # Create custom resource
        custom_obj = {
            "apiVersion": "example.com/v1",
            "kind": "MyCustomResource", 
            "metadata": {
                "name": "my-custom-object",
                "namespace": "default"
            },
            "spec": {
                "replicas": 3,
                "template": "nginx"
            }
        }
        
        created = custom_resource.create(body=custom_obj, namespace="default")
        print(f"Created custom resource: {created.metadata.name}")
        
        # List custom resources
        custom_list = custom_resource.get(namespace="default")
        for item in custom_list.items:
            print(f"Custom resource: {item.metadata.name}")

asyncio.run(custom_resources())

Resource Discovery and Exploration

async def explore_resources():
    await config.load_config()
    api_client = client.ApiClient()
    
    async with dynamic.DynamicClient(api_client) as dyn_client:
        # Explore available API resources
        print(f"Kubernetes version: {dyn_client.version}")
        
        # Get all available resources
        all_resources = dyn_client.resources.search()
        
        print("Available API resources:")
        for resource in all_resources:
            print(f"  {resource.api_version}/{resource.kind} - {resource.name}")
        
        # Find resources by kind
        deployment_resources = dyn_client.resources.search(kind="Deployment")
        for res in deployment_resources:
            print(f"Deployment resource: {res.api_version}/{res.kind}")

asyncio.run(explore_resources())

Watching Resources

async def watch_resources():
    await config.load_config()
    api_client = client.ApiClient()
    
    async with dynamic.DynamicClient(api_client) as dyn_client:
        pods = dyn_client.resources.get(api_version="v1", kind="Pod")
        
        # Watch for pod changes in default namespace
        print("Watching for pod changes...")
        
        async for event in pods.watch(namespace="default", timeout_seconds=300):
            event_type = event['type']  # ADDED, MODIFIED, DELETED
            pod = event['object']
            print(f"{event_type}: Pod {pod.metadata.name} - {pod.status.phase}")

asyncio.run(watch_resources())

Scaling Deployments

async def scale_deployment():
    await config.load_config()
    api_client = client.ApiClient()
    
    async with dynamic.DynamicClient(api_client) as dyn_client:
        deployments = dyn_client.resources.get(api_version="apps/v1", kind="Deployment")
        
        # Get current deployment
        deployment = deployments.get(name="nginx-deployment", namespace="default")
        print(f"Current replicas: {deployment.spec.replicas}")
        
        # Scale using the scale subresource
        scale_resource = deployments.subresources["scale"]
        current_scale = scale_resource.get(name="nginx-deployment", namespace="default")
        
        # Update replica count
        new_scale = {
            "apiVersion": "autoscaling/v1",
            "kind": "Scale",
            "metadata": current_scale.metadata.to_dict(),
            "spec": {"replicas": 5}
        }
        
        updated_scale = scale_resource.replace(
            name="nginx-deployment", 
            namespace="default", 
            body=new_scale
        )
        print(f"Scaled to {updated_scale.spec.replicas} replicas")

asyncio.run(scale_deployment())

Bulk Operations with Label Selectors

async def bulk_operations():
    await config.load_config()
    api_client = client.ApiClient()
    
    async with dynamic.DynamicClient(api_client) as dyn_client:
        pods = dyn_client.resources.get(api_version="v1", kind="Pod")
        
        # List pods with specific labels
        labeled_pods = pods.get(
            namespace="default",
            label_selector="app=nginx,env=production"
        )
        
        print(f"Found {len(labeled_pods.items)} nginx production pods")
        
        # Patch all matching pods
        for pod in labeled_pods.items:
            patch_body = {
                "metadata": {
                    "annotations": {
                        "last-updated": "2024-01-15T10:00:00Z"
                    }
                }
            }
            
            patched = pods.patch(
                name=pod.metadata.name,
                namespace=pod.metadata.namespace,
                body=patch_body
            )
            print(f"Patched pod: {patched.metadata.name}")

asyncio.run(bulk_operations())

Install with Tessl CLI

npx tessl i tessl/pypi-kubernetes-asyncio

docs

client-apis.md

configuration.md

dynamic-client.md

index.md

leader-election.md

streaming.md

utils.md

watch.md

tile.json