0
# Watch and Informers
1
2
The Fabric8 Kubernetes Client provides powerful mechanisms for real-time monitoring of Kubernetes resources through watch operations and the informer pattern. These features enable efficient tracking of resource changes with caching and event-driven updates.
3
4
## Watch Operations
5
6
### Basic Watch Interface
7
8
Direct watch operations for monitoring resource changes in real-time.
9
10
```java { .api }
11
public interface Watch extends Closeable {
12
void close();
13
}
14
15
public interface Watcher<T> {
16
void eventReceived(Action action, T resource);
17
default void onClose() {}
18
void onClose(WatcherException cause);
19
default boolean reconnecting() { return false; }
20
21
enum Action {
22
ADDED, MODIFIED, DELETED, ERROR, BOOKMARK
23
}
24
}
25
```
26
27
### Resource Watch Methods
28
29
All resource operations support watching for changes.
30
31
```java { .api }
32
// From Resource interface
33
public interface Resource<T extends HasMetadata> {
34
Watch watch(Watcher<T> watcher);
35
}
36
37
// From NonNamespaceOperation interface
38
public interface NonNamespaceOperation<T extends HasMetadata, L extends KubernetesResourceList<T>, R extends Resource<T>> {
39
Watch watch(Watcher<T> watcher);
40
Watch watch(String resourceVersion, Watcher<T> watcher);
41
Watch watch(ListOptions listOptions, Watcher<T> watcher);
42
}
43
```
44
45
## Informer Framework
46
47
### SharedInformerFactory
48
49
Factory for creating and managing informers with efficient resource caching.
50
51
```java { .api }
52
public interface SharedInformerFactory {
53
// Create informers
54
<T extends HasMetadata> SharedIndexInformer<T> sharedIndexInformerFor(
55
Class<T> apiTypeClass, long resyncPeriodInMillis);
56
57
58
// Get existing informers
59
<T extends HasMetadata> SharedIndexInformer<T> getExistingSharedIndexInformer(Class<T> apiTypeClass);
60
61
// Lifecycle management
62
void startAllRegisteredInformers();
63
void stopAllRegisteredInformers();
64
65
// Event listeners
66
void addSharedInformerEventListener(SharedInformerEventListener event);
67
}
68
69
// Access via client
70
public interface KubernetesClient {
71
SharedInformerFactory informers();
72
}
73
```
74
75
### SharedIndexInformer
76
77
Individual informer for a specific resource type with caching and indexing.
78
79
```java { .api }
80
public interface SharedIndexInformer<T> extends AutoCloseable {
81
// Event handlers
82
SharedIndexInformer<T> addEventHandler(ResourceEventHandler<? super T> handler);
83
SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? super T> handler, long resyncPeriod);
84
SharedIndexInformer<T> removeEventHandler(ResourceEventHandler<? super T> handler);
85
86
// Indexing and caching
87
Indexer<T> getIndexer();
88
SharedIndexInformer<T> addIndexers(Map<String, Function<T, List<String>>> indexers);
89
SharedIndexInformer<T> removeIndexer(String name);
90
SharedIndexInformer<T> removeNamespaceIndex();
91
92
// Lifecycle
93
SharedIndexInformer<T> run();
94
void stop();
95
void close();
96
boolean hasSynced();
97
String lastSyncResourceVersion();
98
}
99
```
100
101
### ResourceEventHandler
102
103
Handler interface for processing resource change events.
104
105
```java { .api }
106
public interface ResourceEventHandler<T> {
107
void onAdd(T obj);
108
void onUpdate(T oldObj, T newObj);
109
void onDelete(T obj, boolean deletedFinalStateUnknown);
110
111
// Default empty implementations
112
default void onAdd(T obj) {}
113
default void onUpdate(T oldObj, T newObj) {}
114
default void onDelete(T obj, boolean deletedFinalStateUnknown) {}
115
}
116
```
117
118
### Cache and Indexer
119
120
Caching and indexing interfaces for efficient resource lookups.
121
122
```java { .api }
123
public interface Store<T> {
124
void add(T obj);
125
void update(T obj);
126
void delete(T obj);
127
List<T> list();
128
List<String> listKeys();
129
T get(T obj);
130
T getByKey(String key);
131
void replace(List<T> objects, String resourceVersion);
132
}
133
134
public interface Indexer<T> extends Store<T> {
135
List<String> index(String indexName, T obj);
136
List<String> indexKeys(String indexName, String indexKey);
137
List<T> getByIndex(String indexName, String indexKey);
138
void addIndexers(Map<String, Function<T, List<String>>> indexers);
139
}
140
```
141
142
## Usage Examples
143
144
### Basic Watch Operations
145
146
```java
147
// Watch pods in current namespace
148
Watch podWatch = client.pods().watch(new Watcher<Pod>() {
149
@Override
150
public void eventReceived(Action action, Pod pod) {
151
switch (action) {
152
case ADDED:
153
System.out.println("Pod added: " + pod.getMetadata().getName());
154
break;
155
case MODIFIED:
156
System.out.println("Pod modified: " + pod.getMetadata().getName() +
157
" phase: " + pod.getStatus().getPhase());
158
break;
159
case DELETED:
160
System.out.println("Pod deleted: " + pod.getMetadata().getName());
161
break;
162
case ERROR:
163
System.out.println("Watch error for pod: " + pod);
164
break;
165
}
166
}
167
168
@Override
169
public void onClose(WatcherException cause) {
170
if (cause != null) {
171
System.out.println("Pod watch closed with error: " + cause.getMessage());
172
if (cause.isHttpGone()) {
173
System.out.println("Resource version too old, need to relist");
174
}
175
} else {
176
System.out.println("Pod watch closed normally");
177
}
178
}
179
});
180
181
// Watch with label selector
182
Watch labelWatch = client.pods()
183
.withLabel("app", "my-app")
184
.watch(new Watcher<Pod>() {
185
@Override
186
public void eventReceived(Action action, Pod pod) {
187
System.out.println(action + " pod with app=my-app: " + pod.getMetadata().getName());
188
}
189
190
@Override
191
public void onClose(WatcherException cause) {
192
System.out.println("Labeled pod watch closed");
193
}
194
});
195
196
// Remember to close watches
197
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
198
podWatch.close();
199
labelWatch.close();
200
}));
201
```
202
203
### Watch with ListOptions
204
205
```java
206
// Watch from specific resource version
207
ListOptions listOptions = new ListOptionsBuilder()
208
.withResourceVersion("12345")
209
.withTimeoutSeconds(300L)
210
.build();
211
212
Watch versionWatch = client.services().watch(listOptions, new Watcher<Service>() {
213
@Override
214
public void eventReceived(Action action, Service service) {
215
System.out.println("Service " + action.name().toLowerCase() + ": " +
216
service.getMetadata().getName());
217
}
218
219
@Override
220
public void onClose(WatcherException cause) {
221
System.out.println("Service watch closed");
222
}
223
});
224
```
225
226
### Using Informers
227
228
```java
229
// Create informer for pods with 30 second resync period
230
SharedIndexInformer<Pod> podInformer = client.informers()
231
.sharedIndexInformerFor(Pod.class, 30 * 1000);
232
233
// Add event handler
234
podInformer.addEventHandler(new ResourceEventHandler<Pod>() {
235
@Override
236
public void onAdd(Pod pod) {
237
System.out.println("Informer - Pod added: " + pod.getMetadata().getName());
238
239
// Access cached data
240
List<Pod> allPods = podInformer.getIndexer().list();
241
System.out.println("Total cached pods: " + allPods.size());
242
}
243
244
@Override
245
public void onUpdate(Pod oldPod, Pod newPod) {
246
String name = newPod.getMetadata().getName();
247
String oldPhase = oldPod.getStatus() != null ? oldPod.getStatus().getPhase() : "Unknown";
248
String newPhase = newPod.getStatus() != null ? newPod.getStatus().getPhase() : "Unknown";
249
250
if (!oldPhase.equals(newPhase)) {
251
System.out.println("Pod " + name + " phase changed: " + oldPhase + " -> " + newPhase);
252
}
253
}
254
255
@Override
256
public void onDelete(Pod pod, boolean deletedFinalStateUnknown) {
257
System.out.println("Informer - Pod deleted: " + pod.getMetadata().getName());
258
if (deletedFinalStateUnknown) {
259
System.out.println("Delete event may have been missed");
260
}
261
}
262
});
263
264
// Add another handler with different resync period
265
podInformer.addEventHandlerWithResyncPeriod(new ResourceEventHandler<Pod>() {
266
@Override
267
public void onAdd(Pod pod) {
268
// This handler gets periodic resync events even if pod hasn't changed
269
System.out.println("Resync handler - Pod: " + pod.getMetadata().getName());
270
}
271
}, 60 * 1000); // 60 second resync
272
273
// Start all informers
274
client.informers().startAllRegisteredInformers();
275
276
// Wait for initial sync
277
CompletableFuture<Void> syncFuture = client.informers().startAllRegisteredInformers(true);
278
syncFuture.get(30, TimeUnit.SECONDS); // Wait up to 30 seconds for sync
279
280
// Check sync status
281
if (podInformer.hasSynced()) {
282
System.out.println("Pod informer synced, cache is ready");
283
284
// Get cached pods
285
List<Pod> cachedPods = podInformer.getIndexer().list();
286
System.out.println("Cached pods: " + cachedPods.size());
287
288
// Get specific pod from cache
289
Pod cachedPod = podInformer.getIndexer().getByKey("default/my-pod");
290
if (cachedPod != null) {
291
System.out.println("Found cached pod: " + cachedPod.getMetadata().getName());
292
}
293
}
294
```
295
296
### Multiple Resource Informers
297
298
```java
299
// Create informers for multiple resource types
300
SharedIndexInformer<Pod> podInformer =
301
client.informers().sharedIndexInformerFor(Pod.class, 30 * 1000);
302
303
SharedIndexInformer<Service> serviceInformer =
304
client.informers().sharedIndexInformerFor(Service.class, 30 * 1000);
305
306
SharedIndexInformer<Deployment> deploymentInformer =
307
client.informers().sharedIndexInformerFor(Deployment.class, 30 * 1000);
308
309
// Add handlers
310
podInformer.addEventHandler(new ResourceEventHandler<Pod>() {
311
@Override
312
public void onAdd(Pod pod) {
313
System.out.println("New pod: " + pod.getMetadata().getName());
314
}
315
});
316
317
serviceInformer.addEventHandler(new ResourceEventHandler<Service>() {
318
@Override
319
public void onAdd(Service service) {
320
System.out.println("New service: " + service.getMetadata().getName());
321
}
322
});
323
324
deploymentInformer.addEventHandler(new ResourceEventHandler<Deployment>() {
325
@Override
326
public void onUpdate(Deployment oldDeployment, Deployment newDeployment) {
327
Integer oldReplicas = oldDeployment.getSpec().getReplicas();
328
Integer newReplicas = newDeployment.getSpec().getReplicas();
329
330
if (!Objects.equals(oldReplicas, newReplicas)) {
331
System.out.println("Deployment " + newDeployment.getMetadata().getName() +
332
" scaled: " + oldReplicas + " -> " + newReplicas);
333
}
334
}
335
});
336
337
// Start all informers at once
338
client.informers().startAllRegisteredInformers();
339
```
340
341
### Custom Resource Informers
342
343
```java
344
// Assuming you have a custom Database resource class
345
SharedIndexInformer<Database> databaseInformer = client.informers()
346
.sharedIndexInformerForCustomResource(Database.class, 30 * 1000);
347
348
databaseInformer.addEventHandler(new ResourceEventHandler<Database>() {
349
@Override
350
public void onAdd(Database database) {
351
System.out.println("New database: " + database.getMetadata().getName() +
352
" version: " + database.getSpec().getVersion());
353
}
354
355
@Override
356
public void onUpdate(Database oldDatabase, Database newDatabase) {
357
DatabaseStatus oldStatus = oldDatabase.getStatus();
358
DatabaseStatus newStatus = newDatabase.getStatus();
359
360
String oldPhase = oldStatus != null ? oldStatus.getPhase() : "Unknown";
361
String newPhase = newStatus != null ? newStatus.getPhase() : "Unknown";
362
363
if (!oldPhase.equals(newPhase)) {
364
System.out.println("Database " + newDatabase.getMetadata().getName() +
365
" status: " + oldPhase + " -> " + newPhase);
366
}
367
}
368
});
369
370
client.informers().startAllRegisteredInformers();
371
```
372
373
### Namespace-Scoped Informers
374
375
```java
376
// Create informer for specific namespace
377
SharedIndexInformer<Pod> namespacedInformer = client.informers()
378
.sharedIndexInformerFor(Pod.class, 30 * 1000, "production");
379
380
namespacedInformer.addEventHandler(new ResourceEventHandler<Pod>() {
381
@Override
382
public void onAdd(Pod pod) {
383
System.out.println("Production pod added: " + pod.getMetadata().getName());
384
}
385
});
386
387
client.informers().startAllRegisteredInformers();
388
```
389
390
### Advanced Cache Operations
391
392
```java
393
SharedIndexInformer<Pod> podInformer =
394
client.informers().sharedIndexInformerFor(Pod.class, 30 * 1000);
395
396
// Add custom indexers
397
Map<String, Function<Pod, List<String>>> indexers = new HashMap<>();
398
399
// Index pods by node name
400
indexers.put("byNode", pod -> {
401
String nodeName = pod.getSpec().getNodeName();
402
return nodeName != null ? Collections.singletonList(nodeName) : Collections.emptyList();
403
});
404
405
// Index pods by owner reference
406
indexers.put("byOwner", pod -> {
407
return pod.getMetadata().getOwnerReferences().stream()
408
.map(ref -> ref.getKind() + "/" + ref.getName())
409
.collect(Collectors.toList());
410
});
411
412
podInformer.getIndexer().addIndexers(indexers);
413
414
podInformer.addEventHandler(new ResourceEventHandler<Pod>() {
415
@Override
416
public void onAdd(Pod pod) {
417
Indexer<Pod> indexer = podInformer.getIndexer();
418
419
// Get all pods on the same node
420
String nodeName = pod.getSpec().getNodeName();
421
if (nodeName != null) {
422
List<Pod> podsOnNode = indexer.getByIndex("byNode", nodeName);
423
System.out.println("Pods on node " + nodeName + ": " + podsOnNode.size());
424
}
425
426
// Get pods with same owner
427
pod.getMetadata().getOwnerReferences().forEach(owner -> {
428
String ownerKey = owner.getKind() + "/" + owner.getName();
429
List<Pod> podsWithSameOwner = indexer.getByIndex("byOwner", ownerKey);
430
System.out.println("Pods owned by " + ownerKey + ": " + podsWithSameOwner.size());
431
});
432
}
433
});
434
435
client.informers().startAllRegisteredInformers();
436
```
437
438
### Error Handling and Recovery
439
440
```java
441
SharedIndexInformer<Pod> podInformer =
442
client.informers().sharedIndexInformerFor(Pod.class, 30 * 1000);
443
444
// Add event listener for informer lifecycle events
445
client.informers().addSharedInformerEventListener(new SharedInformerEventListener() {
446
@Override
447
public void onAdd(SharedIndexInformer informer) {
448
System.out.println("Informer started: " + informer.getClass().getSimpleName());
449
}
450
451
@Override
452
public void onUpdate(SharedIndexInformer informer) {
453
System.out.println("Informer updated: " + informer.getClass().getSimpleName());
454
}
455
456
@Override
457
public void onDelete(SharedIndexInformer informer) {
458
System.out.println("Informer stopped: " + informer.getClass().getSimpleName());
459
}
460
});
461
462
podInformer.addEventHandler(new ResourceEventHandler<Pod>() {
463
@Override
464
public void onAdd(Pod pod) {
465
// Check if informer is still running
466
if (!podInformer.isRunning()) {
467
System.out.println("Warning: Informer is not running");
468
return;
469
}
470
471
// Check sync status
472
if (!podInformer.hasSynced()) {
473
System.out.println("Warning: Informer not yet synced");
474
return;
475
}
476
477
System.out.println("Pod added: " + pod.getMetadata().getName());
478
}
479
});
480
481
// Graceful shutdown
482
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
483
System.out.println("Shutting down informers...");
484
client.informers().stopAllRegisteredInformers();
485
}));
486
487
client.informers().startAllRegisteredInformers();
488
```
489
490
## Best Practices
491
492
### Resource Management
493
494
```java
495
// Always close watches
496
try (Watch watch = client.pods().watch(watcher)) {
497
// Watch is automatically closed when leaving try block
498
Thread.sleep(60000); // Watch for 1 minute
499
}
500
501
// Use informers for long-running applications
502
SharedInformerFactory informers = client.informers();
503
// Informers are more efficient than watches for long-term monitoring
504
505
// Set appropriate resync periods
506
// Too short: unnecessary CPU usage
507
// Too long: delayed detection of missed events
508
SharedIndexInformer<Pod> informer =
509
informers.sharedIndexInformerFor(Pod.class, 5 * 60 * 1000); // 5 minutes
510
```
511
512
### Error Handling
513
514
```java
515
Watch watch = client.pods().watch(new Watcher<Pod>() {
516
@Override
517
public void eventReceived(Action action, Pod pod) {
518
try {
519
// Process event
520
processEvent(action, pod);
521
} catch (Exception e) {
522
System.err.println("Error processing pod event: " + e.getMessage());
523
}
524
}
525
526
@Override
527
public void onClose(WatcherException cause) {
528
if (cause != null) {
529
if (cause.isHttpGone()) {
530
// Resource version too old, need to restart watch
531
System.out.println("Restarting watch due to HTTP 410 Gone");
532
restartWatch();
533
} else {
534
System.err.println("Watch error: " + cause.getMessage());
535
}
536
}
537
}
538
});
539
```