CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kubernetes-asyncio

Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations

Pending
Overview
Eval results
Files

watch.mddocs/

Watch

Event streaming for monitoring resource changes in real-time. Efficiently tracks additions, modifications, deletions, and errors for any Kubernetes resource, providing a reactive interface for building controllers and monitoring applications.

Capabilities

Watch Class

Primary interface for streaming Kubernetes resource change events.

class Watch:
    def __init__(self, return_type=None):
        """
        Initialize watch instance for streaming resource events.
        
        Parameters:
        - return_type: type, expected return type for automatic deserialization
        """
    
    def stop(self):
        """
        Stop the watch stream and clean up resources.
        """
    
    def get_return_type(self, func):
        """
        Determine the return type for event objects from API function.
        
        Parameters:
        - func: callable, Kubernetes API function that supports watching
        
        Returns:
        - type: Expected return type for event objects
        """
    
    def get_watch_argument_name(self, func):
        """
        Get the watch parameter name for the API function.
        
        Parameters:
        - func: callable, Kubernetes API function
        
        Returns:
        - str: Parameter name for watch flag (usually 'watch')
        """
    
    def unmarshal_event(self, data, return_type):
        """
        Parse raw watch event data into structured event object.
        
        Parameters:
        - data: str, raw JSON event data from watch stream
        - return_type: type, expected object type for deserialization
        
        Returns:
        - dict: Parsed event with 'type' and 'object' fields
        """
    
    async def stream(self, func, *args, **kwargs):
        """
        Start streaming events from Kubernetes API list operation.
        
        Parameters:
        - func: callable, API function that supports watch parameter
        - *args: positional arguments passed to API function
        - **kwargs: keyword arguments passed to API function
        
        Yields:
        - dict: Event objects with structure:
          {
              'type': str,     # 'ADDED', 'MODIFIED', 'DELETED', 'ERROR'
              'object': object # Kubernetes resource object or error status
          }
        """

Stream Base Class

Base class for event stream implementations.

class Stream:
    def __init__(self, func, *args, **kwargs):
        """
        Base class for event stream wrappers.
        
        Parameters:
        - func: callable, API function for streaming
        - *args: positional arguments for API function
        - **kwargs: keyword arguments for API function
        """
    
    async def __aiter__(self):
        """Async iterator interface for streaming events."""
    
    async def __anext__(self):
        """Get next event from stream."""

Constants

Important constants used in watch operations.

TYPE_LIST_SUFFIX: str = "List"  # Suffix used for inferring object types from list operations

Usage Examples

Basic Resource Watching

import asyncio
from kubernetes_asyncio import client, config, watch

async def watch_pods():
    await config.load_config()
    v1 = client.CoreV1Api()
    w = watch.Watch()
    
    try:
        print("Starting to watch pods in default namespace...")
        
        async for event in w.stream(v1.list_namespaced_pod, namespace="default"):
            event_type = event['type']
            pod = event['object']
            
            print(f"{event_type}: Pod {pod.metadata.name}")
            print(f"  Namespace: {pod.metadata.namespace}")
            print(f"  Phase: {pod.status.phase}")
            print(f"  Node: {pod.spec.node_name or 'Not scheduled'}")
            print("---")
            
            # Example: Stop watching after specific condition
            if event_type == "DELETED" and pod.metadata.name == "target-pod":
                print("Target pod deleted, stopping watch")
                break
                
    finally:
        w.stop()
        await v1.api_client.close()

asyncio.run(watch_pods())

Watching with Resource Version

async def watch_with_resource_version():
    await config.load_config()
    v1 = client.CoreV1Api()
    w = watch.Watch()
    
    try:
        # Get current resource version
        pod_list = await v1.list_namespaced_pod(namespace="default")
        resource_version = pod_list.metadata.resource_version
        
        print(f"Starting watch from resource version: {resource_version}")
        
        # Watch from specific resource version to avoid missing events
        async for event in w.stream(
            v1.list_namespaced_pod,
            namespace="default", 
            resource_version=resource_version,
            timeout_seconds=300  # 5 minute timeout
        ):
            event_type = event['type']
            pod = event['object']
            
            print(f"{event_type}: {pod.metadata.name} (rv: {pod.metadata.resource_version})")
            
    except asyncio.TimeoutError:
        print("Watch timed out")
    finally:
        w.stop()
        await v1.api_client.close()

asyncio.run(watch_with_resource_version())

Watching Multiple Resource Types

async def watch_multiple_resources():
    await config.load_config()
    v1 = client.CoreV1Api()
    apps_v1 = client.AppsV1Api()
    
    async def watch_pods():
        w = watch.Watch()
        try:
            async for event in w.stream(v1.list_namespaced_pod, namespace="default"):
                pod = event['object']
                print(f"POD {event['type']}: {pod.metadata.name}")
        finally:
            w.stop()
    
    async def watch_deployments():
        w = watch.Watch()
        try:
            async for event in w.stream(apps_v1.list_namespaced_deployment, namespace="default"):
                deployment = event['object']
                print(f"DEPLOYMENT {event['type']}: {deployment.metadata.name}")
        finally:
            w.stop()
    
    # Watch both resource types concurrently
    try:
        await asyncio.gather(
            watch_pods(),
            watch_deployments()
        )
    finally:
        await v1.api_client.close()
        await apps_v1.api_client.close()

# asyncio.run(watch_multiple_resources())  # Uncomment to run

Watching with Label Selectors

async def watch_with_selectors():
    await config.load_config()
    v1 = client.CoreV1Api()
    w = watch.Watch()
    
    try:
        # Watch only pods with specific labels
        async for event in w.stream(
            v1.list_namespaced_pod,
            namespace="default",
            label_selector="app=nginx,env=production"
        ):
            event_type = event['type']
            pod = event['object']
            
            print(f"{event_type}: {pod.metadata.name}")
            print(f"  Labels: {pod.metadata.labels}")
            
            # Handle specific events
            if event_type == "ADDED":
                print("  -> New nginx production pod created")
            elif event_type == "MODIFIED":
                print(f"  -> Pod updated, phase: {pod.status.phase}")
            elif event_type == "DELETED":
                print("  -> Nginx production pod removed")
                
    finally:
        w.stop()
        await v1.api_client.close()

asyncio.run(watch_with_selectors())

Building a Simple Controller

async def simple_controller():
    await config.load_config()
    v1 = client.CoreV1Api()
    w = watch.Watch()
    
    try:
        print("Starting simple pod controller...")
        
        async for event in w.stream(
            v1.list_namespaced_pod,
            namespace="default",
            label_selector="managed-by=simple-controller"
        ):
            event_type = event['type']
            pod = event['object']
            
            if event_type == "ADDED":
                await handle_pod_added(v1, pod)
            elif event_type == "MODIFIED":
                await handle_pod_modified(v1, pod)
            elif event_type == "DELETED":
                await handle_pod_deleted(v1, pod)
            elif event_type == "ERROR":
                print(f"Watch error: {pod}")
                break
                
    finally:
        w.stop()
        await v1.api_client.close()

async def handle_pod_added(v1, pod):
    print(f"Controller: Managing new pod {pod.metadata.name}")
    
    # Example: Add finalizer to pod
    if not pod.metadata.finalizers:
        patch_body = {
            "metadata": {
                "finalizers": ["simple-controller/cleanup"]
            }
        }
        
        await v1.patch_namespaced_pod(
            name=pod.metadata.name,
            namespace=pod.metadata.namespace,
            body=patch_body
        )
        print(f"  -> Added finalizer to {pod.metadata.name}")

async def handle_pod_modified(v1, pod):
    print(f"Controller: Pod {pod.metadata.name} modified, phase: {pod.status.phase}")
    
    # Example: React to pod phase changes
    if pod.status.phase == "Failed":
        # Clean up or restart failed pod
        print(f"  -> Pod {pod.metadata.name} failed, taking corrective action")

async def handle_pod_deleted(v1, pod):
    print(f"Controller: Pod {pod.metadata.name} deleted")
    
    # Example: Cleanup external resources
    if pod.metadata.finalizers and "simple-controller/cleanup" in pod.metadata.finalizers:
        print(f"  -> Cleaning up resources for {pod.metadata.name}")
        
        # Remove finalizer after cleanup
        patch_body = {
            "metadata": {
                "finalizers": [f for f in pod.metadata.finalizers 
                              if f != "simple-controller/cleanup"]
            }
        }
        
        await v1.patch_namespaced_pod(
            name=pod.metadata.name,
            namespace=pod.metadata.namespace,
            body=patch_body
        )

# asyncio.run(simple_controller())  # Uncomment to run

Error Handling and Reconnection

async def resilient_watch():
    await config.load_config()
    v1 = client.CoreV1Api()
    
    retry_count = 0
    max_retries = 5
    resource_version = None
    
    while retry_count < max_retries:
        w = watch.Watch()
        
        try:
            print(f"Starting watch (attempt {retry_count + 1})")
            
            # Start from last known resource version to avoid duplicates
            watch_kwargs = {"namespace": "default"}
            if resource_version:
                watch_kwargs["resource_version"] = resource_version
            
            async for event in w.stream(v1.list_namespaced_pod, **watch_kwargs):
                event_type = event['type']
                
                if event_type == "ERROR":
                    error_status = event['object']
                    print(f"Watch error: {error_status}")
                    
                    # Handle specific error types
                    if error_status.code == 410:  # Gone - resource version too old
                        print("Resource version expired, restarting watch")
                        resource_version = None
                        break
                    else:
                        raise Exception(f"Watch error: {error_status.message}")
                
                else:
                    pod = event['object']
                    resource_version = pod.metadata.resource_version
                    print(f"{event_type}: {pod.metadata.name} (rv: {resource_version})")
                    
                    # Reset retry count on successful events
                    retry_count = 0
            
        except Exception as e:
            print(f"Watch failed: {e}")
            retry_count += 1
            
            if retry_count < max_retries:
                wait_time = min(2 ** retry_count, 30)  # Exponential backoff, max 30s
                print(f"Retrying in {wait_time} seconds...")
                await asyncio.sleep(wait_time)
            else:
                print("Max retries exceeded, giving up")
                break
        
        finally:
            w.stop()
    
    await v1.api_client.close()

asyncio.run(resilient_watch())

Watching Cluster-Wide Resources

async def watch_cluster_wide():
    await config.load_config()
    v1 = client.CoreV1Api()
    w = watch.Watch()
    
    try:
        # Watch all pods across all namespaces
        print("Watching all pods cluster-wide...")
        
        async for event in w.stream(v1.list_pod_for_all_namespaces):
            event_type = event['type']
            pod = event['object']
            
            print(f"{event_type}: {pod.metadata.namespace}/{pod.metadata.name}")
            
            # Example: Monitor pods in system namespaces
            if pod.metadata.namespace.startswith("kube-"):
                print(f"  -> System pod event in {pod.metadata.namespace}")
            
            # Example: Track resource usage patterns
            if event_type == "ADDED" and pod.spec.resources:
                requests = pod.spec.resources.requests or {}
                limits = pod.spec.resources.limits or {}
                print(f"  -> Resource requests: {requests}")
                print(f"  -> Resource limits: {limits}")
                
    finally:
        w.stop()
        await v1.api_client.close()

asyncio.run(watch_cluster_wide())

Install with Tessl CLI

npx tessl i tessl/pypi-kubernetes-asyncio

docs

client-apis.md

configuration.md

dynamic-client.md

index.md

leader-election.md

streaming.md

utils.md

watch.md

tile.json