Python client library for interacting with Kubernetes clusters through the Kubernetes API
—
Event streaming capabilities for monitoring real-time changes to Kubernetes resources. Provides efficient watch operations that can be filtered and processed for automated responses to cluster state changes, enabling reactive applications and monitoring systems.
Stream real-time events for any Kubernetes resource changes including creation, updates, and deletions.
class Watch:
def stream(
self,
func,
*args,
timeout_seconds: int = None,
resource_version: str = None,
**kwargs
):
"""
Stream events from a Kubernetes API watch endpoint.
Parameters:
- func: API function to watch (e.g., v1.list_namespaced_pod)
- *args: Arguments for the API function
- timeout_seconds: Timeout for watch operation
- resource_version: Resource version to start watching from
- **kwargs: Additional arguments for API function
Yields:
Event dictionaries with 'type' and 'object' keys
"""
def stop(self) -> None:
"""Stop the watch operation."""Watch events contain information about the type of change and the affected resource.
# Event structure returned by watch.stream()
{
"type": str, # ADDED, MODIFIED, DELETED, ERROR
"object": dict, # The Kubernetes resource object
"raw_object": dict # Raw object from API response
}from kubernetes import client, config, watch
config.load_kube_config()
v1 = client.CoreV1Api()
w = watch.Watch()
# Watch all pod events in default namespace
print("Watching for pod events...")
for event in w.stream(v1.list_namespaced_pod, namespace="default"):
event_type = event['type']
pod = event['object']
pod_name = pod.metadata.name
print(f"Event: {event_type} - Pod: {pod_name}")
if event_type == "ADDED":
print(f" Pod {pod_name} was created")
elif event_type == "MODIFIED":
print(f" Pod {pod_name} was updated")
print(f" Status: {pod.status.phase}")
elif event_type == "DELETED":
print(f" Pod {pod_name} was deleted")
elif event_type == "ERROR":
print(f" Error occurred: {pod}")
breakfrom kubernetes import client, config, watch
config.load_kube_config()
v1 = client.CoreV1Api()
w = watch.Watch()
# Watch only pods with specific labels
print("Watching for app=nginx pods...")
for event in w.stream(
v1.list_namespaced_pod,
namespace="default",
label_selector="app=nginx"
):
event_type = event['type']
pod = event['object']
print(f"{event_type}: {pod.metadata.name}")
# Process specific events
if event_type == "MODIFIED":
# Check if pod is ready
conditions = pod.status.conditions or []
ready_condition = next(
(c for c in conditions if c.type == "Ready"),
None
)
if ready_condition and ready_condition.status == "True":
print(f" Pod {pod.metadata.name} is ready!")from kubernetes import client, config, watch
config.load_kube_config()
apps_v1 = client.AppsV1Api()
w = watch.Watch()
# Watch deployment rollouts
print("Watching deployment changes...")
for event in w.stream(apps_v1.list_namespaced_deployment, namespace="default"):
event_type = event['type']
deployment = event['object']
deployment_name = deployment.metadata.name
if event_type == "MODIFIED":
status = deployment.status
if status:
ready_replicas = status.ready_replicas or 0
desired_replicas = deployment.spec.replicas or 0
print(f"Deployment {deployment_name}: {ready_replicas}/{desired_replicas} ready")
# Check if rollout is complete
conditions = status.conditions or []
progressing = next(
(c for c in conditions if c.type == "Progressing"),
None
)
if progressing and progressing.reason == "NewReplicaSetAvailable":
print(f" Rollout complete for {deployment_name}")from kubernetes import client, config, watch
import time
config.load_kube_config()
v1 = client.CoreV1Api()
w = watch.Watch()
# Watch with timeout to avoid infinite loops
print("Watching pods for 60 seconds...")
start_time = time.time()
for event in w.stream(
v1.list_namespaced_pod,
namespace="default",
timeout_seconds=60
):
event_type = event['type']
pod = event['object']
elapsed = time.time() - start_time
print(f"[{elapsed:.1f}s] {event_type}: {pod.metadata.name}")
# Handle timeout
if event_type == "ERROR":
error_obj = event['object']
if hasattr(error_obj, 'code') and error_obj.code == 410:
print("Watch expired, need to restart with new resource version")
break
print("Watch completed")from kubernetes import client, config, watch
config.load_kube_config()
v1 = client.CoreV1Api()
w = watch.Watch()
# Get current resource version
pod_list = v1.list_namespaced_pod(namespace="default")
resource_version = pod_list.metadata.resource_version
print(f"Starting watch from resource version: {resource_version}")
# Watch from specific point in time
for event in w.stream(
v1.list_namespaced_pod,
namespace="default",
resource_version=resource_version
):
event_type = event['type']
pod = event['object']
print(f"{event_type}: {pod.metadata.name}")
# Update resource version for next watch
if hasattr(pod.metadata, 'resource_version'):
resource_version = pod.metadata.resource_versionfrom kubernetes import client, config, watch, dynamic
config.load_kube_config()
dyn_client = dynamic.DynamicClient(client.ApiClient())
w = watch.Watch()
# Get custom resource definition
my_resource = dyn_client.resources.get(
api_version="mycompany.io/v1",
kind="MyCustomResource"
)
# Watch custom resource events
print("Watching custom resources...")
for event in w.stream(my_resource.get, namespace="default"):
event_type = event['type']
obj = event['object']
print(f"{event_type}: {obj.metadata.name}")
if event_type == "MODIFIED":
# Access custom fields
if hasattr(obj, 'status') and obj.status:
print(f" Status: {obj.status.get('phase', 'Unknown')}")from kubernetes import client, config, watch
import threading
import queue
config.load_kube_config()
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
# Create event queue for multiple watchers
event_queue = queue.Queue()
def watch_pods():
w = watch.Watch()
for event in w.stream(v1.list_namespaced_pod, namespace="default"):
event['resource_type'] = 'Pod'
event_queue.put(event)
def watch_deployments():
w = watch.Watch()
for event in w.stream(apps_v1.list_namespaced_deployment, namespace="default"):
event['resource_type'] = 'Deployment'
event_queue.put(event)
# Start watchers in separate threads
pod_thread = threading.Thread(target=watch_pods, daemon=True)
deployment_thread = threading.Thread(target=watch_deployments, daemon=True)
pod_thread.start()
deployment_thread.start()
# Process events from queue
print("Watching pods and deployments...")
try:
while True:
event = event_queue.get(timeout=1)
resource_type = event['resource_type']
event_type = event['type']
obj = event['object']
print(f"{resource_type} {event_type}: {obj.metadata.name}")
except queue.Empty:
print("No events received")
except KeyboardInterrupt:
print("Stopping watchers...")from kubernetes import client, config, watch
from kubernetes.client.rest import ApiException
import time
config.load_kube_config()
v1 = client.CoreV1Api()
def watch_with_reconnect():
"""Watch pods with automatic reconnection on errors."""
resource_version = None
while True:
try:
w = watch.Watch()
print(f"Starting watch from resource version: {resource_version}")
for event in w.stream(
v1.list_namespaced_pod,
namespace="default",
resource_version=resource_version,
timeout_seconds=300 # 5 minute timeout
):
event_type = event['type']
if event_type == "ERROR":
error_obj = event['object']
print(f"Watch error: {error_obj}")
# Handle resource version too old error
if hasattr(error_obj, 'code') and error_obj.code == 410:
print("Resource version expired, restarting watch")
resource_version = None
break
else:
raise Exception(f"Watch error: {error_obj}")
pod = event['object']
print(f"{event_type}: {pod.metadata.name}")
# Update resource version
resource_version = pod.metadata.resource_version
except ApiException as e:
print(f"API exception: {e}")
time.sleep(5) # Wait before reconnecting
except Exception as e:
print(f"Unexpected error: {e}")
time.sleep(5)
except KeyboardInterrupt:
print("Stopping watch...")
break
# Start watching with reconnection
watch_with_reconnect()Install with Tessl CLI
npx tessl i tessl/pypi-kubernetes