0
# WebSocket Operations
1
2
WebSocket-based functionality for watch operations and exec/attach commands with proper stream handling and message formatting.
3
4
## Capabilities
5
6
### Watch Events
7
8
Real-time resource change notifications through WebSocket connections.
9
10
```java { .api }
11
/**
12
* Handles WebSocket watch events for resource monitoring
13
* Manages WebSocket connections and event distribution
14
*/
15
public class WatchEventsListener {
16
17
/**
18
* Create a new WatchEventsListener
19
* @param context Server context for processing
20
* @param attributeSet Filter attributes for this watch
21
* @param watchEventListenerList Set of all active listeners
22
* @param logger Logger instance for this listener
23
* @param onOpenAction Action to execute when WebSocket opens
24
*/
25
public WatchEventsListener(Context context, AttributeSet attributeSet,
26
Set<WatchEventsListener> watchEventListenerList,
27
Logger logger, Consumer<WatchEventsListener> onOpenAction);
28
29
/**
30
* Send a WebSocket response for a resource event
31
* @param resource JSON representation of the resource
32
* @param action Type of action (ADDED, MODIFIED, DELETED)
33
*/
34
public void sendWebSocketResponse(String resource, Watcher.Action action);
35
36
/**
37
* Check if attributes match this watch listener's criteria
38
* @param attributes AttributeSet to check against watch filters
39
* @return true if attributes match the watch criteria
40
*/
41
public boolean attributeMatches(AttributeSet attributes);
42
43
// WebSocket lifecycle callbacks
44
45
/**
46
* Called when WebSocket connection is opened
47
* @param webSocket The opened WebSocket
48
* @param response The HTTP response that initiated the WebSocket
49
*/
50
public void onOpen(WebSocket webSocket, Response response);
51
52
/**
53
* Called when WebSocket connection is closing
54
* @param webSocket The closing WebSocket
55
* @param code Close status code
56
* @param reason Close reason message
57
*/
58
public void onClosing(WebSocket webSocket, int code, String reason);
59
60
/**
61
* Called when WebSocket connection is closed
62
* @param webSocket The closed WebSocket
63
* @param code Close status code
64
* @param reason Close reason message
65
*/
66
public void onClosed(WebSocket webSocket, int code, String reason);
67
68
/**
69
* Called when WebSocket connection fails
70
* @param webSocket The failed WebSocket
71
* @param t Exception that caused the failure
72
* @param response The HTTP response (may be null)
73
*/
74
public void onFailure(WebSocket webSocket, Throwable t, Response response);
75
}
76
```
77
78
**Watch Actions:**
79
80
```java { .api }
81
// Watch event actions from Kubernetes client
82
import io.fabric8.kubernetes.client.Watcher.Action;
83
84
// Available actions:
85
// - Action.ADDED: Resource was created
86
// - Action.MODIFIED: Resource was updated
87
// - Action.DELETED: Resource was deleted
88
// - Action.ERROR: Watch error occurred
89
// - Action.BOOKMARK: Bookmark event for resuming watches
90
```
91
92
**Usage Examples:**
93
94
```java
95
@EnableKubernetesMockClient(crud = true)
96
class WatchOperationsTest {
97
KubernetesClient client;
98
99
@Test
100
void testPodWatch() throws InterruptedException {
101
CountDownLatch latch = new CountDownLatch(3);
102
List<String> events = new ArrayList<>();
103
104
// Start watching pods in default namespace
105
Watch watch = client.pods().inNamespace("default").watch(new Watcher<Pod>() {
106
@Override
107
public void eventReceived(Action action, Pod resource) {
108
events.add(action + ":" + resource.getMetadata().getName());
109
latch.countDown();
110
}
111
112
@Override
113
public void onClose(WatcherException cause) {
114
if (cause != null) {
115
cause.printStackTrace();
116
}
117
}
118
});
119
120
try {
121
// Create a pod - triggers ADDED
122
Pod pod = new PodBuilder()
123
.withNewMetadata().withName("watched-pod").endMetadata()
124
.build();
125
client.pods().inNamespace("default").resource(pod).create();
126
127
// Update the pod - triggers MODIFIED
128
pod.getMetadata().setLabels(Map.of("updated", "true"));
129
client.pods().inNamespace("default").resource(pod).update();
130
131
// Delete the pod - triggers DELETED
132
client.pods().inNamespace("default").withName("watched-pod").delete();
133
134
// Wait for all events
135
assertTrue(latch.await(10, TimeUnit.SECONDS));
136
assertEquals(3, events.size());
137
assertEquals("ADDED:watched-pod", events.get(0));
138
assertEquals("MODIFIED:watched-pod", events.get(1));
139
assertEquals("DELETED:watched-pod", events.get(2));
140
} finally {
141
watch.close();
142
}
143
}
144
145
@Test
146
void testWatchWithLabelSelector() throws InterruptedException {
147
CountDownLatch latch = new CountDownLatch(1);
148
List<Pod> matchedPods = new ArrayList<>();
149
150
// Watch only pods with specific label
151
Watch watch = client.pods().inNamespace("default")
152
.withLabel("app", "web")
153
.watch(new Watcher<Pod>() {
154
@Override
155
public void eventReceived(Action action, Pod resource) {
156
matchedPods.add(resource);
157
latch.countDown();
158
}
159
160
@Override
161
public void onClose(WatcherException cause) {}
162
});
163
164
try {
165
// Create pod without label - should not trigger watch
166
Pod podNoLabel = new PodBuilder()
167
.withNewMetadata().withName("no-label").endMetadata()
168
.build();
169
client.pods().inNamespace("default").resource(podNoLabel).create();
170
171
// Create pod with matching label - should trigger watch
172
Pod podWithLabel = new PodBuilder()
173
.withNewMetadata()
174
.withName("with-label")
175
.addToLabels("app", "web")
176
.endMetadata()
177
.build();
178
client.pods().inNamespace("default").resource(podWithLabel).create();
179
180
assertTrue(latch.await(5, TimeUnit.SECONDS));
181
assertEquals(1, matchedPods.size());
182
assertEquals("with-label", matchedPods.get(0).getMetadata().getName());
183
} finally {
184
watch.close();
185
}
186
}
187
}
188
```
189
190
### Stream Messages
191
192
WebSocket message types for exec and attach operations with proper stream multiplexing.
193
194
```java { .api }
195
/**
196
* WebSocket message for standard output stream data
197
* Used in exec and attach operations
198
*/
199
public class OutputStreamMessage extends WebSocketMessage {
200
/**
201
* Create output stream message
202
* @param body String content for stdout
203
*/
204
public OutputStreamMessage(String body);
205
}
206
207
/**
208
* WebSocket message for error stream data
209
* Used in exec and attach operations
210
*/
211
public class ErrorStreamMessage extends WebSocketMessage {
212
/**
213
* Create error stream message
214
* @param body String content for stderr
215
*/
216
public ErrorStreamMessage(String body);
217
}
218
219
/**
220
* WebSocket message for status information
221
* Used to indicate command completion status
222
*/
223
public class StatusStreamMessage extends WebSocketMessage {
224
/**
225
* Create status stream message
226
* @param body String content for status information
227
*/
228
public StatusStreamMessage(String body);
229
}
230
231
/**
232
* Status message for exec/attach operations
233
* Contains exit code and termination reason
234
*/
235
public class StatusMessage {
236
/**
237
* Get the exit status of the command
238
* @return Exit code (0 for success, non-zero for failure)
239
*/
240
public int getStatus();
241
242
/**
243
* Get the termination reason
244
* @return String describing why the command terminated
245
*/
246
public String getReason();
247
}
248
```
249
250
**Message Format:**
251
252
The WebSocket messages follow Kubernetes' SPDY protocol format with stream multiplexing:
253
- Stream 0: Reserved for error stream
254
- Stream 1: Standard output
255
- Stream 2: Standard error
256
- Stream 3: Status/control messages
257
258
**Usage Examples:**
259
260
```java
261
@Test
262
void testExecOperation() {
263
// Create a pod first
264
Pod pod = new PodBuilder()
265
.withNewMetadata().withName("exec-pod").endMetadata()
266
.withNewSpec()
267
.addNewContainer()
268
.withName("main")
269
.withImage("busybox")
270
.withCommand("sleep", "3600")
271
.endContainer()
272
.endSpec()
273
.build();
274
client.pods().inNamespace("default").resource(pod).create();
275
276
// Set up expectations for exec operation
277
server.expect().get()
278
.withPath("/api/v1/namespaces/default/pods/exec-pod/exec")
279
.andUpgradeToWebSocket()
280
.open()
281
.waitFor(1000)
282
.andEmit(new OutputStreamMessage("Hello from container\n"))
283
.andEmit(new StatusStreamMessage("{\"status\":0,\"reason\":\"Completed\"}"))
284
.done()
285
.once();
286
287
// Execute command
288
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
289
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
290
291
ExecWatch execWatch = client.pods()
292
.inNamespace("default")
293
.withName("exec-pod")
294
.writingOutput(stdout)
295
.writingError(stderr)
296
.exec("echo", "Hello from container");
297
298
// Wait for completion
299
execWatch.exitCode().join();
300
301
assertEquals("Hello from container\n", stdout.toString());
302
assertEquals(0, execWatch.exitCode().getNow(-1));
303
}
304
305
@Test
306
void testAttachOperation() {
307
Pod pod = new PodBuilder()
308
.withNewMetadata().withName("attach-pod").endMetadata()
309
.withNewSpec()
310
.addNewContainer()
311
.withName("main")
312
.withImage("nginx")
313
.endContainer()
314
.endSpec()
315
.build();
316
client.pods().inNamespace("default").resource(pod).create();
317
318
// Set up expectations for attach
319
server.expect().get()
320
.withPath("/api/v1/namespaces/default/pods/attach-pod/attach")
321
.andUpgradeToWebSocket()
322
.open()
323
.waitFor(500)
324
.andEmit(new OutputStreamMessage("Container output\n"))
325
.andEmit(new ErrorStreamMessage("Container error\n"))
326
.done()
327
.once();
328
329
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
330
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
331
332
// Attach to running container
333
ExecWatch attachWatch = client.pods()
334
.inNamespace("default")
335
.withName("attach-pod")
336
.writingOutput(stdout)
337
.writingError(stderr)
338
.attach();
339
340
// Wait for some output
341
Thread.sleep(1000);
342
attachWatch.close();
343
344
assertTrue(stdout.toString().contains("Container output"));
345
assertTrue(stderr.toString().contains("Container error"));
346
}
347
```
348
349
### Log Streaming
350
351
WebSocket-based log streaming with follow support.
352
353
```java
354
@Test
355
void testLogStreaming() throws InterruptedException {
356
Pod pod = new PodBuilder()
357
.withNewMetadata().withName("log-pod").endMetadata()
358
.build();
359
client.pods().inNamespace("default").resource(pod).create();
360
361
// Set up log streaming expectation
362
server.expect().get()
363
.withPath("/api/v1/namespaces/default/pods/log-pod/log?follow=true")
364
.andUpgradeToWebSocket()
365
.open()
366
.waitFor(100)
367
.andEmit(new OutputStreamMessage("Log line 1\n"))
368
.waitFor(100)
369
.andEmit(new OutputStreamMessage("Log line 2\n"))
370
.waitFor(100)
371
.andEmit(new OutputStreamMessage("Log line 3\n"))
372
.done()
373
.once();
374
375
CountDownLatch latch = new CountDownLatch(3);
376
List<String> logLines = new ArrayList<>();
377
378
// Follow logs
379
LogWatch logWatch = client.pods()
380
.inNamespace("default")
381
.withName("log-pod")
382
.watchLog(new InputStream() {
383
// Custom input stream that captures log lines
384
private final List<String> lines = Arrays.asList("Log line 1\n", "Log line 2\n", "Log line 3\n");
385
private int index = 0;
386
387
@Override
388
public int read() {
389
if (index >= lines.size()) return -1;
390
String line = lines.get(index++);
391
logLines.add(line);
392
latch.countDown();
393
return line.charAt(0);
394
}
395
});
396
397
try {
398
assertTrue(latch.await(5, TimeUnit.SECONDS));
399
assertEquals(3, logLines.size());
400
} finally {
401
logWatch.close();
402
}
403
}
404
```
405
406
### Port Forwarding
407
408
WebSocket-based port forwarding functionality.
409
410
```java
411
@Test
412
void testPortForwarding() {
413
Pod pod = new PodBuilder()
414
.withNewMetadata().withName("port-forward-pod").endMetadata()
415
.withNewSpec()
416
.addNewContainer()
417
.withName("web")
418
.withImage("nginx")
419
.addNewPort()
420
.withContainerPort(80)
421
.endPort()
422
.endContainer()
423
.endSpec()
424
.build();
425
client.pods().inNamespace("default").resource(pod).create();
426
427
// Set up port forward expectation
428
server.expect().get()
429
.withPath("/api/v1/namespaces/default/pods/port-forward-pod/portforward")
430
.andUpgradeToWebSocket()
431
.open()
432
// Port forwarding uses binary WebSocket frames
433
// Mock server handles the port forwarding protocol
434
.done()
435
.once();
436
437
// Start port forwarding
438
LocalPortForward portForward = client.pods()
439
.inNamespace("default")
440
.withName("port-forward-pod")
441
.portForward(80, 8080);
442
443
try {
444
// Port forward is now active on local port 8080
445
assertTrue(portForward.isAlive());
446
assertEquals(8080, portForward.getLocalPort());
447
448
// In a real scenario, you could now connect to localhost:8080
449
// to reach the pod's port 80
450
} finally {
451
portForward.close();
452
}
453
}
454
```
455
456
### WebSocket Connection Management
457
458
The mock server automatically handles WebSocket protocol upgrades and connection lifecycle.
459
460
**Connection Features:**
461
- Automatic protocol upgrade from HTTP to WebSocket
462
- Proper handshake handling
463
- Binary and text frame support
464
- Connection keep-alive and cleanup
465
- Multiple concurrent connections
466
- Stream multiplexing for exec/attach operations
467
468
**Error Handling:**
469
470
```java
471
@Test
472
void testWebSocketErrors() throws InterruptedException {
473
CountDownLatch errorLatch = new CountDownLatch(1);
474
WatcherException[] exception = new WatcherException[1];
475
476
// Watch for a pod that will cause an error
477
Watch watch = client.pods().inNamespace("nonexistent").watch(new Watcher<Pod>() {
478
@Override
479
public void eventReceived(Action action, Pod resource) {
480
// Should not be called
481
}
482
483
@Override
484
public void onClose(WatcherException cause) {
485
exception[0] = cause;
486
errorLatch.countDown();
487
}
488
});
489
490
// Simulate server error by not setting up expectation
491
// This will cause the watch to fail
492
493
assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
494
assertNotNull(exception[0]);
495
watch.close();
496
}
497
```