0
# Resource Watching
1
2
Event streaming capabilities for monitoring real-time changes to Kubernetes resources. Provides efficient watch operations that can be filtered and processed for automated responses to cluster state changes, enabling reactive applications and monitoring systems.
3
4
## Capabilities
5
6
### Watch Events
7
8
Stream real-time events for any Kubernetes resource changes including creation, updates, and deletions.
9
10
```python { .api }
11
class Watch:
12
def stream(
13
self,
14
func,
15
*args,
16
timeout_seconds: int = None,
17
resource_version: str = None,
18
**kwargs
19
):
20
"""
21
Stream events from a Kubernetes API watch endpoint.
22
23
Parameters:
24
- func: API function to watch (e.g., v1.list_namespaced_pod)
25
- *args: Arguments for the API function
26
- timeout_seconds: Timeout for watch operation
27
- resource_version: Resource version to start watching from
28
- **kwargs: Additional arguments for API function
29
30
Yields:
31
Event dictionaries with 'type' and 'object' keys
32
"""
33
34
def stop(self) -> None:
35
"""Stop the watch operation."""
36
```
37
38
### Event Types
39
40
Watch events contain information about the type of change and the affected resource.
41
42
```python { .api }
43
# Event structure returned by watch.stream()
44
{
45
"type": str, # ADDED, MODIFIED, DELETED, ERROR
46
"object": dict, # The Kubernetes resource object
47
"raw_object": dict # Raw object from API response
48
}
49
```
50
51
## Usage Examples
52
53
### Watching Pods
54
55
```python
56
from kubernetes import client, config, watch
57
58
config.load_kube_config()
59
v1 = client.CoreV1Api()
60
w = watch.Watch()
61
62
# Watch all pod events in default namespace
63
print("Watching for pod events...")
64
for event in w.stream(v1.list_namespaced_pod, namespace="default"):
65
event_type = event['type']
66
pod = event['object']
67
pod_name = pod.metadata.name
68
69
print(f"Event: {event_type} - Pod: {pod_name}")
70
71
if event_type == "ADDED":
72
print(f" Pod {pod_name} was created")
73
elif event_type == "MODIFIED":
74
print(f" Pod {pod_name} was updated")
75
print(f" Status: {pod.status.phase}")
76
elif event_type == "DELETED":
77
print(f" Pod {pod_name} was deleted")
78
elif event_type == "ERROR":
79
print(f" Error occurred: {pod}")
80
break
81
```
82
83
### Watching with Label Selectors
84
85
```python
86
from kubernetes import client, config, watch
87
88
config.load_kube_config()
89
v1 = client.CoreV1Api()
90
w = watch.Watch()
91
92
# Watch only pods with specific labels
93
print("Watching for app=nginx pods...")
94
for event in w.stream(
95
v1.list_namespaced_pod,
96
namespace="default",
97
label_selector="app=nginx"
98
):
99
event_type = event['type']
100
pod = event['object']
101
102
print(f"{event_type}: {pod.metadata.name}")
103
104
# Process specific events
105
if event_type == "MODIFIED":
106
# Check if pod is ready
107
conditions = pod.status.conditions or []
108
ready_condition = next(
109
(c for c in conditions if c.type == "Ready"),
110
None
111
)
112
if ready_condition and ready_condition.status == "True":
113
print(f" Pod {pod.metadata.name} is ready!")
114
```
115
116
### Watching Deployments
117
118
```python
119
from kubernetes import client, config, watch
120
121
config.load_kube_config()
122
apps_v1 = client.AppsV1Api()
123
w = watch.Watch()
124
125
# Watch deployment rollouts
126
print("Watching deployment changes...")
127
for event in w.stream(apps_v1.list_namespaced_deployment, namespace="default"):
128
event_type = event['type']
129
deployment = event['object']
130
deployment_name = deployment.metadata.name
131
132
if event_type == "MODIFIED":
133
status = deployment.status
134
if status:
135
ready_replicas = status.ready_replicas or 0
136
desired_replicas = deployment.spec.replicas or 0
137
138
print(f"Deployment {deployment_name}: {ready_replicas}/{desired_replicas} ready")
139
140
# Check if rollout is complete
141
conditions = status.conditions or []
142
progressing = next(
143
(c for c in conditions if c.type == "Progressing"),
144
None
145
)
146
if progressing and progressing.reason == "NewReplicaSetAvailable":
147
print(f" Rollout complete for {deployment_name}")
148
```
149
150
### Watching with Timeout
151
152
```python
153
from kubernetes import client, config, watch
154
import time
155
156
config.load_kube_config()
157
v1 = client.CoreV1Api()
158
w = watch.Watch()
159
160
# Watch with timeout to avoid infinite loops
161
print("Watching pods for 60 seconds...")
162
start_time = time.time()
163
164
for event in w.stream(
165
v1.list_namespaced_pod,
166
namespace="default",
167
timeout_seconds=60
168
):
169
event_type = event['type']
170
pod = event['object']
171
172
elapsed = time.time() - start_time
173
print(f"[{elapsed:.1f}s] {event_type}: {pod.metadata.name}")
174
175
# Handle timeout
176
if event_type == "ERROR":
177
error_obj = event['object']
178
if hasattr(error_obj, 'code') and error_obj.code == 410:
179
print("Watch expired, need to restart with new resource version")
180
break
181
182
print("Watch completed")
183
```
184
185
### Watching from Specific Resource Version
186
187
```python
188
from kubernetes import client, config, watch
189
190
config.load_kube_config()
191
v1 = client.CoreV1Api()
192
w = watch.Watch()
193
194
# Get current resource version
195
pod_list = v1.list_namespaced_pod(namespace="default")
196
resource_version = pod_list.metadata.resource_version
197
198
print(f"Starting watch from resource version: {resource_version}")
199
200
# Watch from specific point in time
201
for event in w.stream(
202
v1.list_namespaced_pod,
203
namespace="default",
204
resource_version=resource_version
205
):
206
event_type = event['type']
207
pod = event['object']
208
209
print(f"{event_type}: {pod.metadata.name}")
210
211
# Update resource version for next watch
212
if hasattr(pod.metadata, 'resource_version'):
213
resource_version = pod.metadata.resource_version
214
```
215
216
### Watching Custom Resources
217
218
```python
219
from kubernetes import client, config, watch, dynamic
220
221
config.load_kube_config()
222
dyn_client = dynamic.DynamicClient(client.ApiClient())
223
w = watch.Watch()
224
225
# Get custom resource definition
226
my_resource = dyn_client.resources.get(
227
api_version="mycompany.io/v1",
228
kind="MyCustomResource"
229
)
230
231
# Watch custom resource events
232
print("Watching custom resources...")
233
for event in w.stream(my_resource.get, namespace="default"):
234
event_type = event['type']
235
obj = event['object']
236
237
print(f"{event_type}: {obj.metadata.name}")
238
239
if event_type == "MODIFIED":
240
# Access custom fields
241
if hasattr(obj, 'status') and obj.status:
242
print(f" Status: {obj.status.get('phase', 'Unknown')}")
243
```
244
245
### Watching Multiple Resources
246
247
```python
248
from kubernetes import client, config, watch
249
import threading
250
import queue
251
252
config.load_kube_config()
253
v1 = client.CoreV1Api()
254
apps_v1 = client.AppsV1Api()
255
256
# Create event queue for multiple watchers
257
event_queue = queue.Queue()
258
259
def watch_pods():
260
w = watch.Watch()
261
for event in w.stream(v1.list_namespaced_pod, namespace="default"):
262
event['resource_type'] = 'Pod'
263
event_queue.put(event)
264
265
def watch_deployments():
266
w = watch.Watch()
267
for event in w.stream(apps_v1.list_namespaced_deployment, namespace="default"):
268
event['resource_type'] = 'Deployment'
269
event_queue.put(event)
270
271
# Start watchers in separate threads
272
pod_thread = threading.Thread(target=watch_pods, daemon=True)
273
deployment_thread = threading.Thread(target=watch_deployments, daemon=True)
274
275
pod_thread.start()
276
deployment_thread.start()
277
278
# Process events from queue
279
print("Watching pods and deployments...")
280
try:
281
while True:
282
event = event_queue.get(timeout=1)
283
resource_type = event['resource_type']
284
event_type = event['type']
285
obj = event['object']
286
287
print(f"{resource_type} {event_type}: {obj.metadata.name}")
288
289
except queue.Empty:
290
print("No events received")
291
except KeyboardInterrupt:
292
print("Stopping watchers...")
293
```
294
295
### Error Handling and Reconnection
296
297
```python
298
from kubernetes import client, config, watch
299
from kubernetes.client.rest import ApiException
300
import time
301
302
config.load_kube_config()
303
v1 = client.CoreV1Api()
304
305
def watch_with_reconnect():
306
"""Watch pods with automatic reconnection on errors."""
307
resource_version = None
308
309
while True:
310
try:
311
w = watch.Watch()
312
print(f"Starting watch from resource version: {resource_version}")
313
314
for event in w.stream(
315
v1.list_namespaced_pod,
316
namespace="default",
317
resource_version=resource_version,
318
timeout_seconds=300 # 5 minute timeout
319
):
320
event_type = event['type']
321
322
if event_type == "ERROR":
323
error_obj = event['object']
324
print(f"Watch error: {error_obj}")
325
326
# Handle resource version too old error
327
if hasattr(error_obj, 'code') and error_obj.code == 410:
328
print("Resource version expired, restarting watch")
329
resource_version = None
330
break
331
else:
332
raise Exception(f"Watch error: {error_obj}")
333
334
pod = event['object']
335
print(f"{event_type}: {pod.metadata.name}")
336
337
# Update resource version
338
resource_version = pod.metadata.resource_version
339
340
except ApiException as e:
341
print(f"API exception: {e}")
342
time.sleep(5) # Wait before reconnecting
343
344
except Exception as e:
345
print(f"Unexpected error: {e}")
346
time.sleep(5)
347
348
except KeyboardInterrupt:
349
print("Stopping watch...")
350
break
351
352
# Start watching with reconnection
353
watch_with_reconnect()
354
```