Asynchronous Python client library for the Kubernetes API providing async/await support for all Kubernetes operations
—
Event streaming for monitoring resource changes in real-time. Efficiently tracks additions, modifications, deletions, and errors for any Kubernetes resource, providing a reactive interface for building controllers and monitoring applications.
Primary interface for streaming Kubernetes resource change events.
class Watch:
def __init__(self, return_type=None):
"""
Initialize watch instance for streaming resource events.
Parameters:
- return_type: type, expected return type for automatic deserialization
"""
def stop(self):
"""
Stop the watch stream and clean up resources.
"""
def get_return_type(self, func):
"""
Determine the return type for event objects from API function.
Parameters:
- func: callable, Kubernetes API function that supports watching
Returns:
- type: Expected return type for event objects
"""
def get_watch_argument_name(self, func):
"""
Get the watch parameter name for the API function.
Parameters:
- func: callable, Kubernetes API function
Returns:
- str: Parameter name for watch flag (usually 'watch')
"""
def unmarshal_event(self, data, return_type):
"""
Parse raw watch event data into structured event object.
Parameters:
- data: str, raw JSON event data from watch stream
- return_type: type, expected object type for deserialization
Returns:
- dict: Parsed event with 'type' and 'object' fields
"""
async def stream(self, func, *args, **kwargs):
"""
Start streaming events from Kubernetes API list operation.
Parameters:
- func: callable, API function that supports watch parameter
- *args: positional arguments passed to API function
- **kwargs: keyword arguments passed to API function
Yields:
- dict: Event objects with structure:
{
'type': str, # 'ADDED', 'MODIFIED', 'DELETED', 'ERROR'
'object': object # Kubernetes resource object or error status
}
"""Base class for event stream implementations.
class Stream:
def __init__(self, func, *args, **kwargs):
"""
Base class for event stream wrappers.
Parameters:
- func: callable, API function for streaming
- *args: positional arguments for API function
- **kwargs: keyword arguments for API function
"""
async def __aiter__(self):
"""Async iterator interface for streaming events."""
async def __anext__(self):
"""Get next event from stream."""Important constants used in watch operations.
TYPE_LIST_SUFFIX: str = "List" # Suffix used for inferring object types from list operationsimport asyncio
from kubernetes_asyncio import client, config, watch
async def watch_pods():
await config.load_config()
v1 = client.CoreV1Api()
w = watch.Watch()
try:
print("Starting to watch pods in default namespace...")
async for event in w.stream(v1.list_namespaced_pod, namespace="default"):
event_type = event['type']
pod = event['object']
print(f"{event_type}: Pod {pod.metadata.name}")
print(f" Namespace: {pod.metadata.namespace}")
print(f" Phase: {pod.status.phase}")
print(f" Node: {pod.spec.node_name or 'Not scheduled'}")
print("---")
# Example: Stop watching after specific condition
if event_type == "DELETED" and pod.metadata.name == "target-pod":
print("Target pod deleted, stopping watch")
break
finally:
w.stop()
await v1.api_client.close()
asyncio.run(watch_pods())async def watch_with_resource_version():
await config.load_config()
v1 = client.CoreV1Api()
w = watch.Watch()
try:
# Get current resource version
pod_list = await v1.list_namespaced_pod(namespace="default")
resource_version = pod_list.metadata.resource_version
print(f"Starting watch from resource version: {resource_version}")
# Watch from specific resource version to avoid missing events
async for event in w.stream(
v1.list_namespaced_pod,
namespace="default",
resource_version=resource_version,
timeout_seconds=300 # 5 minute timeout
):
event_type = event['type']
pod = event['object']
print(f"{event_type}: {pod.metadata.name} (rv: {pod.metadata.resource_version})")
except asyncio.TimeoutError:
print("Watch timed out")
finally:
w.stop()
await v1.api_client.close()
asyncio.run(watch_with_resource_version())async def watch_multiple_resources():
await config.load_config()
v1 = client.CoreV1Api()
apps_v1 = client.AppsV1Api()
async def watch_pods():
w = watch.Watch()
try:
async for event in w.stream(v1.list_namespaced_pod, namespace="default"):
pod = event['object']
print(f"POD {event['type']}: {pod.metadata.name}")
finally:
w.stop()
async def watch_deployments():
w = watch.Watch()
try:
async for event in w.stream(apps_v1.list_namespaced_deployment, namespace="default"):
deployment = event['object']
print(f"DEPLOYMENT {event['type']}: {deployment.metadata.name}")
finally:
w.stop()
# Watch both resource types concurrently
try:
await asyncio.gather(
watch_pods(),
watch_deployments()
)
finally:
await v1.api_client.close()
await apps_v1.api_client.close()
# asyncio.run(watch_multiple_resources()) # Uncomment to runasync def watch_with_selectors():
await config.load_config()
v1 = client.CoreV1Api()
w = watch.Watch()
try:
# Watch only pods with specific labels
async for event in w.stream(
v1.list_namespaced_pod,
namespace="default",
label_selector="app=nginx,env=production"
):
event_type = event['type']
pod = event['object']
print(f"{event_type}: {pod.metadata.name}")
print(f" Labels: {pod.metadata.labels}")
# Handle specific events
if event_type == "ADDED":
print(" -> New nginx production pod created")
elif event_type == "MODIFIED":
print(f" -> Pod updated, phase: {pod.status.phase}")
elif event_type == "DELETED":
print(" -> Nginx production pod removed")
finally:
w.stop()
await v1.api_client.close()
asyncio.run(watch_with_selectors())async def simple_controller():
await config.load_config()
v1 = client.CoreV1Api()
w = watch.Watch()
try:
print("Starting simple pod controller...")
async for event in w.stream(
v1.list_namespaced_pod,
namespace="default",
label_selector="managed-by=simple-controller"
):
event_type = event['type']
pod = event['object']
if event_type == "ADDED":
await handle_pod_added(v1, pod)
elif event_type == "MODIFIED":
await handle_pod_modified(v1, pod)
elif event_type == "DELETED":
await handle_pod_deleted(v1, pod)
elif event_type == "ERROR":
print(f"Watch error: {pod}")
break
finally:
w.stop()
await v1.api_client.close()
async def handle_pod_added(v1, pod):
print(f"Controller: Managing new pod {pod.metadata.name}")
# Example: Add finalizer to pod
if not pod.metadata.finalizers:
patch_body = {
"metadata": {
"finalizers": ["simple-controller/cleanup"]
}
}
await v1.patch_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
body=patch_body
)
print(f" -> Added finalizer to {pod.metadata.name}")
async def handle_pod_modified(v1, pod):
print(f"Controller: Pod {pod.metadata.name} modified, phase: {pod.status.phase}")
# Example: React to pod phase changes
if pod.status.phase == "Failed":
# Clean up or restart failed pod
print(f" -> Pod {pod.metadata.name} failed, taking corrective action")
async def handle_pod_deleted(v1, pod):
print(f"Controller: Pod {pod.metadata.name} deleted")
# Example: Cleanup external resources
if pod.metadata.finalizers and "simple-controller/cleanup" in pod.metadata.finalizers:
print(f" -> Cleaning up resources for {pod.metadata.name}")
# Remove finalizer after cleanup
patch_body = {
"metadata": {
"finalizers": [f for f in pod.metadata.finalizers
if f != "simple-controller/cleanup"]
}
}
await v1.patch_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
body=patch_body
)
# asyncio.run(simple_controller()) # Uncomment to runasync def resilient_watch():
await config.load_config()
v1 = client.CoreV1Api()
retry_count = 0
max_retries = 5
resource_version = None
while retry_count < max_retries:
w = watch.Watch()
try:
print(f"Starting watch (attempt {retry_count + 1})")
# Start from last known resource version to avoid duplicates
watch_kwargs = {"namespace": "default"}
if resource_version:
watch_kwargs["resource_version"] = resource_version
async for event in w.stream(v1.list_namespaced_pod, **watch_kwargs):
event_type = event['type']
if event_type == "ERROR":
error_status = event['object']
print(f"Watch error: {error_status}")
# Handle specific error types
if error_status.code == 410: # Gone - resource version too old
print("Resource version expired, restarting watch")
resource_version = None
break
else:
raise Exception(f"Watch error: {error_status.message}")
else:
pod = event['object']
resource_version = pod.metadata.resource_version
print(f"{event_type}: {pod.metadata.name} (rv: {resource_version})")
# Reset retry count on successful events
retry_count = 0
except Exception as e:
print(f"Watch failed: {e}")
retry_count += 1
if retry_count < max_retries:
wait_time = min(2 ** retry_count, 30) # Exponential backoff, max 30s
print(f"Retrying in {wait_time} seconds...")
await asyncio.sleep(wait_time)
else:
print("Max retries exceeded, giving up")
break
finally:
w.stop()
await v1.api_client.close()
asyncio.run(resilient_watch())async def watch_cluster_wide():
await config.load_config()
v1 = client.CoreV1Api()
w = watch.Watch()
try:
# Watch all pods across all namespaces
print("Watching all pods cluster-wide...")
async for event in w.stream(v1.list_pod_for_all_namespaces):
event_type = event['type']
pod = event['object']
print(f"{event_type}: {pod.metadata.namespace}/{pod.metadata.name}")
# Example: Monitor pods in system namespaces
if pod.metadata.namespace.startswith("kube-"):
print(f" -> System pod event in {pod.metadata.namespace}")
# Example: Track resource usage patterns
if event_type == "ADDED" and pod.spec.resources:
requests = pod.spec.resources.requests or {}
limits = pod.spec.resources.limits or {}
print(f" -> Resource requests: {requests}")
print(f" -> Resource limits: {limits}")
finally:
w.stop()
await v1.api_client.close()
asyncio.run(watch_cluster_wide())Install with Tessl CLI
npx tessl i tessl/pypi-kubernetes-asyncio