CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-kubernetes

Python client library for interacting with Kubernetes clusters through the Kubernetes API

Pending
Overview
Eval results
Files

resource-watching.mddocs/

Resource Watching

Event streaming capabilities for monitoring real-time changes to Kubernetes resources. Provides efficient watch operations that can be filtered and processed for automated responses to cluster state changes, enabling reactive applications and monitoring systems.

Capabilities

Watch Events

Stream real-time events for any Kubernetes resource changes including creation, updates, and deletions.

class Watch:
    def stream(
        self,
        func,
        *args,
        timeout_seconds: int = None,
        resource_version: str = None,
        **kwargs
    ):
        """
        Stream events from a Kubernetes API watch endpoint.
        
        Parameters:
        - func: API function to watch (e.g., v1.list_namespaced_pod)
        - *args: Arguments for the API function
        - timeout_seconds: Timeout for watch operation
        - resource_version: Resource version to start watching from
        - **kwargs: Additional arguments for API function
        
        Yields:
        Event dictionaries with 'type' and 'object' keys
        """
    
    def stop(self) -> None:
        """Stop the watch operation."""

Event Types

Watch events contain information about the type of change and the affected resource.

# Event structure returned by watch.stream()
{
    "type": str,  # ADDED, MODIFIED, DELETED, ERROR
    "object": dict,  # The Kubernetes resource object
    "raw_object": dict  # Raw object from API response
}

Usage Examples

Watching Pods

from kubernetes import client, config, watch

config.load_kube_config()
v1 = client.CoreV1Api()
w = watch.Watch()

# Watch all pod events in default namespace
print("Watching for pod events...")
for event in w.stream(v1.list_namespaced_pod, namespace="default"):
    event_type = event['type']
    pod = event['object']
    pod_name = pod.metadata.name
    
    print(f"Event: {event_type} - Pod: {pod_name}")
    
    if event_type == "ADDED":
        print(f"  Pod {pod_name} was created")
    elif event_type == "MODIFIED":
        print(f"  Pod {pod_name} was updated")
        print(f"  Status: {pod.status.phase}")
    elif event_type == "DELETED":
        print(f"  Pod {pod_name} was deleted")
    elif event_type == "ERROR":
        print(f"  Error occurred: {pod}")
        break

Watching with Label Selectors

from kubernetes import client, config, watch

config.load_kube_config()
v1 = client.CoreV1Api()
w = watch.Watch()

# Watch only pods with specific labels
print("Watching for app=nginx pods...")
for event in w.stream(
    v1.list_namespaced_pod,
    namespace="default",
    label_selector="app=nginx"
):
    event_type = event['type']
    pod = event['object']
    
    print(f"{event_type}: {pod.metadata.name}")
    
    # Process specific events
    if event_type == "MODIFIED":
        # Check if pod is ready
        conditions = pod.status.conditions or []
        ready_condition = next(
            (c for c in conditions if c.type == "Ready"),
            None
        )
        if ready_condition and ready_condition.status == "True":
            print(f"  Pod {pod.metadata.name} is ready!")

Watching Deployments

from kubernetes import client, config, watch

config.load_kube_config()
apps_v1 = client.AppsV1Api()
w = watch.Watch()

# Watch deployment rollouts
print("Watching deployment changes...")
for event in w.stream(apps_v1.list_namespaced_deployment, namespace="default"):
    event_type = event['type']
    deployment = event['object']
    deployment_name = deployment.metadata.name
    
    if event_type == "MODIFIED":
        status = deployment.status
        if status:
            ready_replicas = status.ready_replicas or 0
            desired_replicas = deployment.spec.replicas or 0
            
            print(f"Deployment {deployment_name}: {ready_replicas}/{desired_replicas} ready")
            
            # Check if rollout is complete
            conditions = status.conditions or []
            progressing = next(
                (c for c in conditions if c.type == "Progressing"),
                None
            )
            if progressing and progressing.reason == "NewReplicaSetAvailable":
                print(f"  Rollout complete for {deployment_name}")

Watching with Timeout

from kubernetes import client, config, watch
import time

config.load_kube_config()
v1 = client.CoreV1Api()
w = watch.Watch()

# Watch with timeout to avoid infinite loops
print("Watching pods for 60 seconds...")
start_time = time.time()

for event in w.stream(
    v1.list_namespaced_pod,
    namespace="default",
    timeout_seconds=60
):
    event_type = event['type']
    pod = event['object']
    
    elapsed = time.time() - start_time
    print(f"[{elapsed:.1f}s] {event_type}: {pod.metadata.name}")
    
    # Handle timeout
    if event_type == "ERROR":
        error_obj = event['object']
        if hasattr(error_obj, 'code') and error_obj.code == 410:
            print("Watch expired, need to restart with new resource version")
            break

print("Watch completed")

Watching from Specific Resource Version

from kubernetes import client, config, watch

config.load_kube_config()
v1 = client.CoreV1Api()
w = watch.Watch()

# Get current resource version
pod_list = v1.list_namespaced_pod(namespace="default")
resource_version = pod_list.metadata.resource_version

print(f"Starting watch from resource version: {resource_version}")

# Watch from specific point in time
for event in w.stream(
    v1.list_namespaced_pod,
    namespace="default",
    resource_version=resource_version
):
    event_type = event['type']
    pod = event['object']
    
    print(f"{event_type}: {pod.metadata.name}")
    
    # Update resource version for next watch
    if hasattr(pod.metadata, 'resource_version'):
        resource_version = pod.metadata.resource_version

Watching Custom Resources

from kubernetes import client, config, watch, dynamic

config.load_kube_config()
dyn_client = dynamic.DynamicClient(client.ApiClient())
w = watch.Watch()

# Get custom resource definition
my_resource = dyn_client.resources.get(
    api_version="mycompany.io/v1",
    kind="MyCustomResource"
)

# Watch custom resource events
print("Watching custom resources...")
for event in w.stream(my_resource.get, namespace="default"):
    event_type = event['type']
    obj = event['object']
    
    print(f"{event_type}: {obj.metadata.name}")
    
    if event_type == "MODIFIED":
        # Access custom fields
        if hasattr(obj, 'status') and obj.status:
            print(f"  Status: {obj.status.get('phase', 'Unknown')}")

Watching Multiple Resources

from kubernetes import client, config, watch
import threading
import queue

config.load_kube_config()
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()

# Create event queue for multiple watchers
event_queue = queue.Queue()

def watch_pods():
    w = watch.Watch()
    for event in w.stream(v1.list_namespaced_pod, namespace="default"):
        event['resource_type'] = 'Pod'
        event_queue.put(event)

def watch_deployments():
    w = watch.Watch()
    for event in w.stream(apps_v1.list_namespaced_deployment, namespace="default"):
        event['resource_type'] = 'Deployment'
        event_queue.put(event)

# Start watchers in separate threads
pod_thread = threading.Thread(target=watch_pods, daemon=True)
deployment_thread = threading.Thread(target=watch_deployments, daemon=True)

pod_thread.start()
deployment_thread.start()

# Process events from queue
print("Watching pods and deployments...")
try:
    while True:
        event = event_queue.get(timeout=1)
        resource_type = event['resource_type']
        event_type = event['type']
        obj = event['object']
        
        print(f"{resource_type} {event_type}: {obj.metadata.name}")
        
except queue.Empty:
    print("No events received")
except KeyboardInterrupt:
    print("Stopping watchers...")

Error Handling and Reconnection

from kubernetes import client, config, watch
from kubernetes.client.rest import ApiException
import time

config.load_kube_config()
v1 = client.CoreV1Api()

def watch_with_reconnect():
    """Watch pods with automatic reconnection on errors."""
    resource_version = None
    
    while True:
        try:
            w = watch.Watch()
            print(f"Starting watch from resource version: {resource_version}")
            
            for event in w.stream(
                v1.list_namespaced_pod,
                namespace="default",
                resource_version=resource_version,
                timeout_seconds=300  # 5 minute timeout
            ):
                event_type = event['type']
                
                if event_type == "ERROR":
                    error_obj = event['object']
                    print(f"Watch error: {error_obj}")
                    
                    # Handle resource version too old error
                    if hasattr(error_obj, 'code') and error_obj.code == 410:
                        print("Resource version expired, restarting watch")
                        resource_version = None
                        break
                    else:
                        raise Exception(f"Watch error: {error_obj}")
                
                pod = event['object']
                print(f"{event_type}: {pod.metadata.name}")
                
                # Update resource version
                resource_version = pod.metadata.resource_version
                
        except ApiException as e:
            print(f"API exception: {e}")
            time.sleep(5)  # Wait before reconnecting
            
        except Exception as e:
            print(f"Unexpected error: {e}")
            time.sleep(5)
            
        except KeyboardInterrupt:
            print("Stopping watch...")
            break

# Start watching with reconnection
watch_with_reconnect()

Install with Tessl CLI

npx tessl i tessl/pypi-kubernetes

docs

application-workloads.md

autoscaling.md

configuration.md

core-resources.md

custom-resources.md

dynamic-client.md

index.md

leader-election.md

networking.md

rbac-security.md

resource-watching.md

storage.md

streaming-operations.md

utilities.md

tile.json