0
# Watch
1
2
Event streaming for monitoring resource changes in real-time. Efficiently tracks additions, modifications, deletions, and errors for any Kubernetes resource, providing a reactive interface for building controllers and monitoring applications.
3
4
## Capabilities
5
6
### Watch Class
7
8
Primary interface for streaming Kubernetes resource change events.
9
10
```python { .api }
11
class Watch:
12
def __init__(self, return_type=None):
13
"""
14
Initialize watch instance for streaming resource events.
15
16
Parameters:
17
- return_type: type, expected return type for automatic deserialization
18
"""
19
20
def stop(self):
21
"""
22
Stop the watch stream and clean up resources.
23
"""
24
25
def get_return_type(self, func):
26
"""
27
Determine the return type for event objects from API function.
28
29
Parameters:
30
- func: callable, Kubernetes API function that supports watching
31
32
Returns:
33
- type: Expected return type for event objects
34
"""
35
36
def get_watch_argument_name(self, func):
37
"""
38
Get the watch parameter name for the API function.
39
40
Parameters:
41
- func: callable, Kubernetes API function
42
43
Returns:
44
- str: Parameter name for watch flag (usually 'watch')
45
"""
46
47
def unmarshal_event(self, data, return_type):
48
"""
49
Parse raw watch event data into structured event object.
50
51
Parameters:
52
- data: str, raw JSON event data from watch stream
53
- return_type: type, expected object type for deserialization
54
55
Returns:
56
- dict: Parsed event with 'type' and 'object' fields
57
"""
58
59
async def stream(self, func, *args, **kwargs):
60
"""
61
Start streaming events from Kubernetes API list operation.
62
63
Parameters:
64
- func: callable, API function that supports watch parameter
65
- *args: positional arguments passed to API function
66
- **kwargs: keyword arguments passed to API function
67
68
Yields:
69
- dict: Event objects with structure:
70
{
71
'type': str, # 'ADDED', 'MODIFIED', 'DELETED', 'ERROR'
72
'object': object # Kubernetes resource object or error status
73
}
74
"""
75
```
76
77
### Stream Base Class
78
79
Base class for event stream implementations.
80
81
```python { .api }
82
class Stream:
83
def __init__(self, func, *args, **kwargs):
84
"""
85
Base class for event stream wrappers.
86
87
Parameters:
88
- func: callable, API function for streaming
89
- *args: positional arguments for API function
90
- **kwargs: keyword arguments for API function
91
"""
92
93
async def __aiter__(self):
94
"""Async iterator interface for streaming events."""
95
96
async def __anext__(self):
97
"""Get next event from stream."""
98
```
99
100
### Constants
101
102
Important constants used in watch operations.
103
104
```python { .api }
105
TYPE_LIST_SUFFIX: str = "List" # Suffix used for inferring object types from list operations
106
```
107
108
## Usage Examples
109
110
### Basic Resource Watching
111
112
```python
113
import asyncio
114
from kubernetes_asyncio import client, config, watch
115
116
async def watch_pods():
117
await config.load_config()
118
v1 = client.CoreV1Api()
119
w = watch.Watch()
120
121
try:
122
print("Starting to watch pods in default namespace...")
123
124
async for event in w.stream(v1.list_namespaced_pod, namespace="default"):
125
event_type = event['type']
126
pod = event['object']
127
128
print(f"{event_type}: Pod {pod.metadata.name}")
129
print(f" Namespace: {pod.metadata.namespace}")
130
print(f" Phase: {pod.status.phase}")
131
print(f" Node: {pod.spec.node_name or 'Not scheduled'}")
132
print("---")
133
134
# Example: Stop watching after specific condition
135
if event_type == "DELETED" and pod.metadata.name == "target-pod":
136
print("Target pod deleted, stopping watch")
137
break
138
139
finally:
140
w.stop()
141
await v1.api_client.close()
142
143
asyncio.run(watch_pods())
144
```
145
146
### Watching with Resource Version
147
148
```python
149
async def watch_with_resource_version():
150
await config.load_config()
151
v1 = client.CoreV1Api()
152
w = watch.Watch()
153
154
try:
155
# Get current resource version
156
pod_list = await v1.list_namespaced_pod(namespace="default")
157
resource_version = pod_list.metadata.resource_version
158
159
print(f"Starting watch from resource version: {resource_version}")
160
161
# Watch from specific resource version to avoid missing events
162
async for event in w.stream(
163
v1.list_namespaced_pod,
164
namespace="default",
165
resource_version=resource_version,
166
timeout_seconds=300 # 5 minute timeout
167
):
168
event_type = event['type']
169
pod = event['object']
170
171
print(f"{event_type}: {pod.metadata.name} (rv: {pod.metadata.resource_version})")
172
173
except asyncio.TimeoutError:
174
print("Watch timed out")
175
finally:
176
w.stop()
177
await v1.api_client.close()
178
179
asyncio.run(watch_with_resource_version())
180
```
181
182
### Watching Multiple Resource Types
183
184
```python
185
async def watch_multiple_resources():
186
await config.load_config()
187
v1 = client.CoreV1Api()
188
apps_v1 = client.AppsV1Api()
189
190
async def watch_pods():
191
w = watch.Watch()
192
try:
193
async for event in w.stream(v1.list_namespaced_pod, namespace="default"):
194
pod = event['object']
195
print(f"POD {event['type']}: {pod.metadata.name}")
196
finally:
197
w.stop()
198
199
async def watch_deployments():
200
w = watch.Watch()
201
try:
202
async for event in w.stream(apps_v1.list_namespaced_deployment, namespace="default"):
203
deployment = event['object']
204
print(f"DEPLOYMENT {event['type']}: {deployment.metadata.name}")
205
finally:
206
w.stop()
207
208
# Watch both resource types concurrently
209
try:
210
await asyncio.gather(
211
watch_pods(),
212
watch_deployments()
213
)
214
finally:
215
await v1.api_client.close()
216
await apps_v1.api_client.close()
217
218
# asyncio.run(watch_multiple_resources()) # Uncomment to run
219
```
220
221
### Watching with Label Selectors
222
223
```python
224
async def watch_with_selectors():
225
await config.load_config()
226
v1 = client.CoreV1Api()
227
w = watch.Watch()
228
229
try:
230
# Watch only pods with specific labels
231
async for event in w.stream(
232
v1.list_namespaced_pod,
233
namespace="default",
234
label_selector="app=nginx,env=production"
235
):
236
event_type = event['type']
237
pod = event['object']
238
239
print(f"{event_type}: {pod.metadata.name}")
240
print(f" Labels: {pod.metadata.labels}")
241
242
# Handle specific events
243
if event_type == "ADDED":
244
print(" -> New nginx production pod created")
245
elif event_type == "MODIFIED":
246
print(f" -> Pod updated, phase: {pod.status.phase}")
247
elif event_type == "DELETED":
248
print(" -> Nginx production pod removed")
249
250
finally:
251
w.stop()
252
await v1.api_client.close()
253
254
asyncio.run(watch_with_selectors())
255
```
256
257
### Building a Simple Controller
258
259
```python
260
async def simple_controller():
261
await config.load_config()
262
v1 = client.CoreV1Api()
263
w = watch.Watch()
264
265
try:
266
print("Starting simple pod controller...")
267
268
async for event in w.stream(
269
v1.list_namespaced_pod,
270
namespace="default",
271
label_selector="managed-by=simple-controller"
272
):
273
event_type = event['type']
274
pod = event['object']
275
276
if event_type == "ADDED":
277
await handle_pod_added(v1, pod)
278
elif event_type == "MODIFIED":
279
await handle_pod_modified(v1, pod)
280
elif event_type == "DELETED":
281
await handle_pod_deleted(v1, pod)
282
elif event_type == "ERROR":
283
print(f"Watch error: {pod}")
284
break
285
286
finally:
287
w.stop()
288
await v1.api_client.close()
289
290
async def handle_pod_added(v1, pod):
291
print(f"Controller: Managing new pod {pod.metadata.name}")
292
293
# Example: Add finalizer to pod
294
if not pod.metadata.finalizers:
295
patch_body = {
296
"metadata": {
297
"finalizers": ["simple-controller/cleanup"]
298
}
299
}
300
301
await v1.patch_namespaced_pod(
302
name=pod.metadata.name,
303
namespace=pod.metadata.namespace,
304
body=patch_body
305
)
306
print(f" -> Added finalizer to {pod.metadata.name}")
307
308
async def handle_pod_modified(v1, pod):
309
print(f"Controller: Pod {pod.metadata.name} modified, phase: {pod.status.phase}")
310
311
# Example: React to pod phase changes
312
if pod.status.phase == "Failed":
313
# Clean up or restart failed pod
314
print(f" -> Pod {pod.metadata.name} failed, taking corrective action")
315
316
async def handle_pod_deleted(v1, pod):
317
print(f"Controller: Pod {pod.metadata.name} deleted")
318
319
# Example: Cleanup external resources
320
if pod.metadata.finalizers and "simple-controller/cleanup" in pod.metadata.finalizers:
321
print(f" -> Cleaning up resources for {pod.metadata.name}")
322
323
# Remove finalizer after cleanup
324
patch_body = {
325
"metadata": {
326
"finalizers": [f for f in pod.metadata.finalizers
327
if f != "simple-controller/cleanup"]
328
}
329
}
330
331
await v1.patch_namespaced_pod(
332
name=pod.metadata.name,
333
namespace=pod.metadata.namespace,
334
body=patch_body
335
)
336
337
# asyncio.run(simple_controller()) # Uncomment to run
338
```
339
340
### Error Handling and Reconnection
341
342
```python
343
async def resilient_watch():
344
await config.load_config()
345
v1 = client.CoreV1Api()
346
347
retry_count = 0
348
max_retries = 5
349
resource_version = None
350
351
while retry_count < max_retries:
352
w = watch.Watch()
353
354
try:
355
print(f"Starting watch (attempt {retry_count + 1})")
356
357
# Start from last known resource version to avoid duplicates
358
watch_kwargs = {"namespace": "default"}
359
if resource_version:
360
watch_kwargs["resource_version"] = resource_version
361
362
async for event in w.stream(v1.list_namespaced_pod, **watch_kwargs):
363
event_type = event['type']
364
365
if event_type == "ERROR":
366
error_status = event['object']
367
print(f"Watch error: {error_status}")
368
369
# Handle specific error types
370
if error_status.code == 410: # Gone - resource version too old
371
print("Resource version expired, restarting watch")
372
resource_version = None
373
break
374
else:
375
raise Exception(f"Watch error: {error_status.message}")
376
377
else:
378
pod = event['object']
379
resource_version = pod.metadata.resource_version
380
print(f"{event_type}: {pod.metadata.name} (rv: {resource_version})")
381
382
# Reset retry count on successful events
383
retry_count = 0
384
385
except Exception as e:
386
print(f"Watch failed: {e}")
387
retry_count += 1
388
389
if retry_count < max_retries:
390
wait_time = min(2 ** retry_count, 30) # Exponential backoff, max 30s
391
print(f"Retrying in {wait_time} seconds...")
392
await asyncio.sleep(wait_time)
393
else:
394
print("Max retries exceeded, giving up")
395
break
396
397
finally:
398
w.stop()
399
400
await v1.api_client.close()
401
402
asyncio.run(resilient_watch())
403
```
404
405
### Watching Cluster-Wide Resources
406
407
```python
408
async def watch_cluster_wide():
409
await config.load_config()
410
v1 = client.CoreV1Api()
411
w = watch.Watch()
412
413
try:
414
# Watch all pods across all namespaces
415
print("Watching all pods cluster-wide...")
416
417
async for event in w.stream(v1.list_pod_for_all_namespaces):
418
event_type = event['type']
419
pod = event['object']
420
421
print(f"{event_type}: {pod.metadata.namespace}/{pod.metadata.name}")
422
423
# Example: Monitor pods in system namespaces
424
if pod.metadata.namespace.startswith("kube-"):
425
print(f" -> System pod event in {pod.metadata.namespace}")
426
427
# Example: Track resource usage patterns
428
if event_type == "ADDED" and pod.spec.resources:
429
requests = pod.spec.resources.requests or {}
430
limits = pod.spec.resources.limits or {}
431
print(f" -> Resource requests: {requests}")
432
print(f" -> Resource limits: {limits}")
433
434
finally:
435
w.stop()
436
await v1.api_client.close()
437
438
asyncio.run(watch_cluster_wide())
439
```