A JUnit 5 testing library that provides a Kubernetes mock server for testing Kubernetes client applications
—
WebSocket-based functionality for watch operations and exec/attach commands with proper stream handling and message formatting.
Real-time resource change notifications through WebSocket connections.
/**
* Handles WebSocket watch events for resource monitoring
* Manages WebSocket connections and event distribution
*/
public class WatchEventsListener {
/**
* Create a new WatchEventsListener
* @param context Server context for processing
* @param attributeSet Filter attributes for this watch
* @param watchEventListenerList Set of all active listeners
* @param logger Logger instance for this listener
* @param onOpenAction Action to execute when WebSocket opens
*/
public WatchEventsListener(Context context, AttributeSet attributeSet,
Set<WatchEventsListener> watchEventListenerList,
Logger logger, Consumer<WatchEventsListener> onOpenAction);
/**
* Send a WebSocket response for a resource event
* @param resource JSON representation of the resource
* @param action Type of action (ADDED, MODIFIED, DELETED)
*/
public void sendWebSocketResponse(String resource, Watcher.Action action);
/**
* Check if attributes match this watch listener's criteria
* @param attributes AttributeSet to check against watch filters
* @return true if attributes match the watch criteria
*/
public boolean attributeMatches(AttributeSet attributes);
// WebSocket lifecycle callbacks
/**
* Called when WebSocket connection is opened
* @param webSocket The opened WebSocket
* @param response The HTTP response that initiated the WebSocket
*/
public void onOpen(WebSocket webSocket, Response response);
/**
* Called when WebSocket connection is closing
* @param webSocket The closing WebSocket
* @param code Close status code
* @param reason Close reason message
*/
public void onClosing(WebSocket webSocket, int code, String reason);
/**
* Called when WebSocket connection is closed
* @param webSocket The closed WebSocket
* @param code Close status code
* @param reason Close reason message
*/
public void onClosed(WebSocket webSocket, int code, String reason);
/**
* Called when WebSocket connection fails
* @param webSocket The failed WebSocket
* @param t Exception that caused the failure
* @param response The HTTP response (may be null)
*/
public void onFailure(WebSocket webSocket, Throwable t, Response response);
}Watch Actions:
// Watch event actions from Kubernetes client
import io.fabric8.kubernetes.client.Watcher.Action;
// Available actions:
// - Action.ADDED: Resource was created
// - Action.MODIFIED: Resource was updated
// - Action.DELETED: Resource was deleted
// - Action.ERROR: Watch error occurred
// - Action.BOOKMARK: Bookmark event for resuming watchesUsage Examples:
@EnableKubernetesMockClient(crud = true)
class WatchOperationsTest {
KubernetesClient client;
@Test
void testPodWatch() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(3);
List<String> events = new ArrayList<>();
// Start watching pods in default namespace
Watch watch = client.pods().inNamespace("default").watch(new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod resource) {
events.add(action + ":" + resource.getMetadata().getName());
latch.countDown();
}
@Override
public void onClose(WatcherException cause) {
if (cause != null) {
cause.printStackTrace();
}
}
});
try {
// Create a pod - triggers ADDED
Pod pod = new PodBuilder()
.withNewMetadata().withName("watched-pod").endMetadata()
.build();
client.pods().inNamespace("default").resource(pod).create();
// Update the pod - triggers MODIFIED
pod.getMetadata().setLabels(Map.of("updated", "true"));
client.pods().inNamespace("default").resource(pod).update();
// Delete the pod - triggers DELETED
client.pods().inNamespace("default").withName("watched-pod").delete();
// Wait for all events
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals(3, events.size());
assertEquals("ADDED:watched-pod", events.get(0));
assertEquals("MODIFIED:watched-pod", events.get(1));
assertEquals("DELETED:watched-pod", events.get(2));
} finally {
watch.close();
}
}
@Test
void testWatchWithLabelSelector() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
List<Pod> matchedPods = new ArrayList<>();
// Watch only pods with specific label
Watch watch = client.pods().inNamespace("default")
.withLabel("app", "web")
.watch(new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod resource) {
matchedPods.add(resource);
latch.countDown();
}
@Override
public void onClose(WatcherException cause) {}
});
try {
// Create pod without label - should not trigger watch
Pod podNoLabel = new PodBuilder()
.withNewMetadata().withName("no-label").endMetadata()
.build();
client.pods().inNamespace("default").resource(podNoLabel).create();
// Create pod with matching label - should trigger watch
Pod podWithLabel = new PodBuilder()
.withNewMetadata()
.withName("with-label")
.addToLabels("app", "web")
.endMetadata()
.build();
client.pods().inNamespace("default").resource(podWithLabel).create();
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(1, matchedPods.size());
assertEquals("with-label", matchedPods.get(0).getMetadata().getName());
} finally {
watch.close();
}
}
}WebSocket message types for exec and attach operations with proper stream multiplexing.
/**
* WebSocket message for standard output stream data
* Used in exec and attach operations
*/
public class OutputStreamMessage extends WebSocketMessage {
/**
* Create output stream message
* @param body String content for stdout
*/
public OutputStreamMessage(String body);
}
/**
* WebSocket message for error stream data
* Used in exec and attach operations
*/
public class ErrorStreamMessage extends WebSocketMessage {
/**
* Create error stream message
* @param body String content for stderr
*/
public ErrorStreamMessage(String body);
}
/**
* WebSocket message for status information
* Used to indicate command completion status
*/
public class StatusStreamMessage extends WebSocketMessage {
/**
* Create status stream message
* @param body String content for status information
*/
public StatusStreamMessage(String body);
}
/**
* Status message for exec/attach operations
* Contains exit code and termination reason
*/
public class StatusMessage {
/**
* Get the exit status of the command
* @return Exit code (0 for success, non-zero for failure)
*/
public int getStatus();
/**
* Get the termination reason
* @return String describing why the command terminated
*/
public String getReason();
}Message Format:
The WebSocket messages follow Kubernetes' SPDY protocol format with stream multiplexing:
Usage Examples:
@Test
void testExecOperation() {
// Create a pod first
Pod pod = new PodBuilder()
.withNewMetadata().withName("exec-pod").endMetadata()
.withNewSpec()
.addNewContainer()
.withName("main")
.withImage("busybox")
.withCommand("sleep", "3600")
.endContainer()
.endSpec()
.build();
client.pods().inNamespace("default").resource(pod).create();
// Set up expectations for exec operation
server.expect().get()
.withPath("/api/v1/namespaces/default/pods/exec-pod/exec")
.andUpgradeToWebSocket()
.open()
.waitFor(1000)
.andEmit(new OutputStreamMessage("Hello from container\n"))
.andEmit(new StatusStreamMessage("{\"status\":0,\"reason\":\"Completed\"}"))
.done()
.once();
// Execute command
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
ExecWatch execWatch = client.pods()
.inNamespace("default")
.withName("exec-pod")
.writingOutput(stdout)
.writingError(stderr)
.exec("echo", "Hello from container");
// Wait for completion
execWatch.exitCode().join();
assertEquals("Hello from container\n", stdout.toString());
assertEquals(0, execWatch.exitCode().getNow(-1));
}
@Test
void testAttachOperation() {
Pod pod = new PodBuilder()
.withNewMetadata().withName("attach-pod").endMetadata()
.withNewSpec()
.addNewContainer()
.withName("main")
.withImage("nginx")
.endContainer()
.endSpec()
.build();
client.pods().inNamespace("default").resource(pod).create();
// Set up expectations for attach
server.expect().get()
.withPath("/api/v1/namespaces/default/pods/attach-pod/attach")
.andUpgradeToWebSocket()
.open()
.waitFor(500)
.andEmit(new OutputStreamMessage("Container output\n"))
.andEmit(new ErrorStreamMessage("Container error\n"))
.done()
.once();
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
ByteArrayOutputStream stderr = new ByteArrayOutputStream();
// Attach to running container
ExecWatch attachWatch = client.pods()
.inNamespace("default")
.withName("attach-pod")
.writingOutput(stdout)
.writingError(stderr)
.attach();
// Wait for some output
Thread.sleep(1000);
attachWatch.close();
assertTrue(stdout.toString().contains("Container output"));
assertTrue(stderr.toString().contains("Container error"));
}WebSocket-based log streaming with follow support.
@Test
void testLogStreaming() throws InterruptedException {
Pod pod = new PodBuilder()
.withNewMetadata().withName("log-pod").endMetadata()
.build();
client.pods().inNamespace("default").resource(pod).create();
// Set up log streaming expectation
server.expect().get()
.withPath("/api/v1/namespaces/default/pods/log-pod/log?follow=true")
.andUpgradeToWebSocket()
.open()
.waitFor(100)
.andEmit(new OutputStreamMessage("Log line 1\n"))
.waitFor(100)
.andEmit(new OutputStreamMessage("Log line 2\n"))
.waitFor(100)
.andEmit(new OutputStreamMessage("Log line 3\n"))
.done()
.once();
CountDownLatch latch = new CountDownLatch(3);
List<String> logLines = new ArrayList<>();
// Follow logs
LogWatch logWatch = client.pods()
.inNamespace("default")
.withName("log-pod")
.watchLog(new InputStream() {
// Custom input stream that captures log lines
private final List<String> lines = Arrays.asList("Log line 1\n", "Log line 2\n", "Log line 3\n");
private int index = 0;
@Override
public int read() {
if (index >= lines.size()) return -1;
String line = lines.get(index++);
logLines.add(line);
latch.countDown();
return line.charAt(0);
}
});
try {
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(3, logLines.size());
} finally {
logWatch.close();
}
}WebSocket-based port forwarding functionality.
@Test
void testPortForwarding() {
Pod pod = new PodBuilder()
.withNewMetadata().withName("port-forward-pod").endMetadata()
.withNewSpec()
.addNewContainer()
.withName("web")
.withImage("nginx")
.addNewPort()
.withContainerPort(80)
.endPort()
.endContainer()
.endSpec()
.build();
client.pods().inNamespace("default").resource(pod).create();
// Set up port forward expectation
server.expect().get()
.withPath("/api/v1/namespaces/default/pods/port-forward-pod/portforward")
.andUpgradeToWebSocket()
.open()
// Port forwarding uses binary WebSocket frames
// Mock server handles the port forwarding protocol
.done()
.once();
// Start port forwarding
LocalPortForward portForward = client.pods()
.inNamespace("default")
.withName("port-forward-pod")
.portForward(80, 8080);
try {
// Port forward is now active on local port 8080
assertTrue(portForward.isAlive());
assertEquals(8080, portForward.getLocalPort());
// In a real scenario, you could now connect to localhost:8080
// to reach the pod's port 80
} finally {
portForward.close();
}
}The mock server automatically handles WebSocket protocol upgrades and connection lifecycle.
Connection Features:
Error Handling:
@Test
void testWebSocketErrors() throws InterruptedException {
CountDownLatch errorLatch = new CountDownLatch(1);
WatcherException[] exception = new WatcherException[1];
// Watch for a pod that will cause an error
Watch watch = client.pods().inNamespace("nonexistent").watch(new Watcher<Pod>() {
@Override
public void eventReceived(Action action, Pod resource) {
// Should not be called
}
@Override
public void onClose(WatcherException cause) {
exception[0] = cause;
errorLatch.countDown();
}
});
// Simulate server error by not setting up expectation
// This will cause the watch to fail
assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
assertNotNull(exception[0]);
watch.close();
}Install with Tessl CLI
npx tessl i tessl/maven-io-fabric8--kubernetes-server-mock