CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-fabric8--kubernetes-server-mock

A JUnit 5 testing library that provides a Kubernetes mock server for testing Kubernetes client applications

Pending
Overview
Eval results
Files

websocket-operations.mddocs/

WebSocket Operations

WebSocket-based functionality for watch operations and exec/attach commands with proper stream handling and message formatting.

Capabilities

Watch Events

Real-time resource change notifications through WebSocket connections.

/**
 * Handles WebSocket watch events for resource monitoring
 * Manages WebSocket connections and event distribution
 */
public class WatchEventsListener {
    
    /**
     * Create a new WatchEventsListener
     * @param context Server context for processing
     * @param attributeSet Filter attributes for this watch
     * @param watchEventListenerList Set of all active listeners
     * @param logger Logger instance for this listener
     * @param onOpenAction Action to execute when WebSocket opens
     */
    public WatchEventsListener(Context context, AttributeSet attributeSet, 
                              Set<WatchEventsListener> watchEventListenerList, 
                              Logger logger, Consumer<WatchEventsListener> onOpenAction);
    
    /**
     * Send a WebSocket response for a resource event
     * @param resource JSON representation of the resource
     * @param action Type of action (ADDED, MODIFIED, DELETED)
     */
    public void sendWebSocketResponse(String resource, Watcher.Action action);
    
    /**
     * Check if attributes match this watch listener's criteria
     * @param attributes AttributeSet to check against watch filters
     * @return true if attributes match the watch criteria
     */
    public boolean attributeMatches(AttributeSet attributes);
    
    // WebSocket lifecycle callbacks
    
    /**
     * Called when WebSocket connection is opened
     * @param webSocket The opened WebSocket
     * @param response The HTTP response that initiated the WebSocket
     */
    public void onOpen(WebSocket webSocket, Response response);
    
    /**
     * Called when WebSocket connection is closing
     * @param webSocket The closing WebSocket
     * @param code Close status code
     * @param reason Close reason message
     */
    public void onClosing(WebSocket webSocket, int code, String reason);
    
    /**
     * Called when WebSocket connection is closed
     * @param webSocket The closed WebSocket
     * @param code Close status code
     * @param reason Close reason message
     */
    public void onClosed(WebSocket webSocket, int code, String reason);
    
    /**
     * Called when WebSocket connection fails
     * @param webSocket The failed WebSocket
     * @param t Exception that caused the failure
     * @param response The HTTP response (may be null)
     */
    public void onFailure(WebSocket webSocket, Throwable t, Response response);
}

Watch Actions:

// Watch event actions from Kubernetes client
import io.fabric8.kubernetes.client.Watcher.Action;

// Available actions:
// - Action.ADDED: Resource was created
// - Action.MODIFIED: Resource was updated  
// - Action.DELETED: Resource was deleted
// - Action.ERROR: Watch error occurred
// - Action.BOOKMARK: Bookmark event for resuming watches

Usage Examples:

@EnableKubernetesMockClient(crud = true)
class WatchOperationsTest {
    KubernetesClient client;
    
    @Test
    void testPodWatch() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(3);
        List<String> events = new ArrayList<>();
        
        // Start watching pods in default namespace
        Watch watch = client.pods().inNamespace("default").watch(new Watcher<Pod>() {
            @Override
            public void eventReceived(Action action, Pod resource) {
                events.add(action + ":" + resource.getMetadata().getName());
                latch.countDown();
            }
            
            @Override
            public void onClose(WatcherException cause) {
                if (cause != null) {
                    cause.printStackTrace();
                }
            }
        });
        
        try {
            // Create a pod - triggers ADDED
            Pod pod = new PodBuilder()
                .withNewMetadata().withName("watched-pod").endMetadata()
                .build();
            client.pods().inNamespace("default").resource(pod).create();
            
            // Update the pod - triggers MODIFIED
            pod.getMetadata().setLabels(Map.of("updated", "true"));
            client.pods().inNamespace("default").resource(pod).update();
            
            // Delete the pod - triggers DELETED
            client.pods().inNamespace("default").withName("watched-pod").delete();
            
            // Wait for all events
            assertTrue(latch.await(10, TimeUnit.SECONDS));
            assertEquals(3, events.size());
            assertEquals("ADDED:watched-pod", events.get(0));
            assertEquals("MODIFIED:watched-pod", events.get(1));
            assertEquals("DELETED:watched-pod", events.get(2));
        } finally {
            watch.close();
        }
    }
    
    @Test
    void testWatchWithLabelSelector() throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        List<Pod> matchedPods = new ArrayList<>();
        
        // Watch only pods with specific label
        Watch watch = client.pods().inNamespace("default")
            .withLabel("app", "web")
            .watch(new Watcher<Pod>() {
                @Override
                public void eventReceived(Action action, Pod resource) {
                    matchedPods.add(resource);
                    latch.countDown();
                }
                
                @Override
                public void onClose(WatcherException cause) {}
            });
        
        try {
            // Create pod without label - should not trigger watch
            Pod podNoLabel = new PodBuilder()
                .withNewMetadata().withName("no-label").endMetadata()
                .build();
            client.pods().inNamespace("default").resource(podNoLabel).create();
            
            // Create pod with matching label - should trigger watch
            Pod podWithLabel = new PodBuilder()
                .withNewMetadata()
                    .withName("with-label")
                    .addToLabels("app", "web")
                .endMetadata()
                .build();
            client.pods().inNamespace("default").resource(podWithLabel).create();
            
            assertTrue(latch.await(5, TimeUnit.SECONDS));
            assertEquals(1, matchedPods.size());
            assertEquals("with-label", matchedPods.get(0).getMetadata().getName());
        } finally {
            watch.close();
        }
    }
}

Stream Messages

WebSocket message types for exec and attach operations with proper stream multiplexing.

/**
 * WebSocket message for standard output stream data
 * Used in exec and attach operations
 */
public class OutputStreamMessage extends WebSocketMessage {
    /**
     * Create output stream message
     * @param body String content for stdout
     */
    public OutputStreamMessage(String body);
}

/**
 * WebSocket message for error stream data  
 * Used in exec and attach operations
 */
public class ErrorStreamMessage extends WebSocketMessage {
    /**
     * Create error stream message
     * @param body String content for stderr
     */
    public ErrorStreamMessage(String body);
}

/**
 * WebSocket message for status information
 * Used to indicate command completion status
 */
public class StatusStreamMessage extends WebSocketMessage {
    /**
     * Create status stream message
     * @param body String content for status information
     */
    public StatusStreamMessage(String body);
}

/**
 * Status message for exec/attach operations
 * Contains exit code and termination reason
 */
public class StatusMessage {
    /**
     * Get the exit status of the command
     * @return Exit code (0 for success, non-zero for failure)
     */
    public int getStatus();
    
    /**
     * Get the termination reason
     * @return String describing why the command terminated
     */
    public String getReason();
}

Message Format:

The WebSocket messages follow Kubernetes' SPDY protocol format with stream multiplexing:

  • Stream 0: Reserved for error stream
  • Stream 1: Standard output
  • Stream 2: Standard error
  • Stream 3: Status/control messages

Usage Examples:

@Test
void testExecOperation() {
    // Create a pod first
    Pod pod = new PodBuilder()
        .withNewMetadata().withName("exec-pod").endMetadata()
        .withNewSpec()
            .addNewContainer()
                .withName("main")
                .withImage("busybox")
                .withCommand("sleep", "3600")
            .endContainer()
        .endSpec()
        .build();
    client.pods().inNamespace("default").resource(pod).create();
    
    // Set up expectations for exec operation
    server.expect().get()
        .withPath("/api/v1/namespaces/default/pods/exec-pod/exec")
        .andUpgradeToWebSocket()
        .open()
        .waitFor(1000)
        .andEmit(new OutputStreamMessage("Hello from container\n"))
        .andEmit(new StatusStreamMessage("{\"status\":0,\"reason\":\"Completed\"}"))
        .done()
        .once();
    
    // Execute command
    ByteArrayOutputStream stdout = new ByteArrayOutputStream();
    ByteArrayOutputStream stderr = new ByteArrayOutputStream();
    
    ExecWatch execWatch = client.pods()
        .inNamespace("default")
        .withName("exec-pod")
        .writingOutput(stdout)
        .writingError(stderr)
        .exec("echo", "Hello from container");
    
    // Wait for completion
    execWatch.exitCode().join();
    
    assertEquals("Hello from container\n", stdout.toString());
    assertEquals(0, execWatch.exitCode().getNow(-1));
}

@Test  
void testAttachOperation() {
    Pod pod = new PodBuilder()
        .withNewMetadata().withName("attach-pod").endMetadata()
        .withNewSpec()
            .addNewContainer()
                .withName("main")
                .withImage("nginx")
            .endContainer()
        .endSpec()
        .build();
    client.pods().inNamespace("default").resource(pod).create();
    
    // Set up expectations for attach
    server.expect().get()
        .withPath("/api/v1/namespaces/default/pods/attach-pod/attach")
        .andUpgradeToWebSocket()
        .open()
        .waitFor(500)
        .andEmit(new OutputStreamMessage("Container output\n"))
        .andEmit(new ErrorStreamMessage("Container error\n"))
        .done()
        .once();
    
    ByteArrayOutputStream stdout = new ByteArrayOutputStream();
    ByteArrayOutputStream stderr = new ByteArrayOutputStream();
    
    // Attach to running container
    ExecWatch attachWatch = client.pods()
        .inNamespace("default")
        .withName("attach-pod")
        .writingOutput(stdout)
        .writingError(stderr)
        .attach();
    
    // Wait for some output
    Thread.sleep(1000);
    attachWatch.close();
    
    assertTrue(stdout.toString().contains("Container output"));
    assertTrue(stderr.toString().contains("Container error"));
}

Log Streaming

WebSocket-based log streaming with follow support.

@Test
void testLogStreaming() throws InterruptedException {
    Pod pod = new PodBuilder()
        .withNewMetadata().withName("log-pod").endMetadata()
        .build();
    client.pods().inNamespace("default").resource(pod).create();
    
    // Set up log streaming expectation
    server.expect().get()
        .withPath("/api/v1/namespaces/default/pods/log-pod/log?follow=true")
        .andUpgradeToWebSocket()
        .open()
        .waitFor(100)
        .andEmit(new OutputStreamMessage("Log line 1\n"))
        .waitFor(100)
        .andEmit(new OutputStreamMessage("Log line 2\n"))
        .waitFor(100)
        .andEmit(new OutputStreamMessage("Log line 3\n"))
        .done()
        .once();
    
    CountDownLatch latch = new CountDownLatch(3);
    List<String> logLines = new ArrayList<>();
    
    // Follow logs
    LogWatch logWatch = client.pods()
        .inNamespace("default")
        .withName("log-pod")
        .watchLog(new InputStream() {
            // Custom input stream that captures log lines
            private final List<String> lines = Arrays.asList("Log line 1\n", "Log line 2\n", "Log line 3\n");
            private int index = 0;
            
            @Override
            public int read() {
                if (index >= lines.size()) return -1;
                String line = lines.get(index++);
                logLines.add(line);
                latch.countDown();
                return line.charAt(0);
            }
        });
    
    try {
        assertTrue(latch.await(5, TimeUnit.SECONDS));
        assertEquals(3, logLines.size());
    } finally {
        logWatch.close();
    }
}

Port Forwarding

WebSocket-based port forwarding functionality.

@Test
void testPortForwarding() {
    Pod pod = new PodBuilder()
        .withNewMetadata().withName("port-forward-pod").endMetadata()
        .withNewSpec()
            .addNewContainer()
                .withName("web")
                .withImage("nginx")
                .addNewPort()
                    .withContainerPort(80)
                .endPort()
            .endContainer()
        .endSpec()
        .build();
    client.pods().inNamespace("default").resource(pod).create();
    
    // Set up port forward expectation
    server.expect().get()
        .withPath("/api/v1/namespaces/default/pods/port-forward-pod/portforward")
        .andUpgradeToWebSocket()
        .open()
        // Port forwarding uses binary WebSocket frames
        // Mock server handles the port forwarding protocol
        .done()
        .once();
    
    // Start port forwarding
    LocalPortForward portForward = client.pods()
        .inNamespace("default")
        .withName("port-forward-pod")
        .portForward(80, 8080);
    
    try {
        // Port forward is now active on local port 8080
        assertTrue(portForward.isAlive());
        assertEquals(8080, portForward.getLocalPort());
        
        // In a real scenario, you could now connect to localhost:8080
        // to reach the pod's port 80
    } finally {
        portForward.close();
    }
}

WebSocket Connection Management

The mock server automatically handles WebSocket protocol upgrades and connection lifecycle.

Connection Features:

  • Automatic protocol upgrade from HTTP to WebSocket
  • Proper handshake handling
  • Binary and text frame support
  • Connection keep-alive and cleanup
  • Multiple concurrent connections
  • Stream multiplexing for exec/attach operations

Error Handling:

@Test
void testWebSocketErrors() throws InterruptedException {
    CountDownLatch errorLatch = new CountDownLatch(1);
    WatcherException[] exception = new WatcherException[1];
    
    // Watch for a pod that will cause an error
    Watch watch = client.pods().inNamespace("nonexistent").watch(new Watcher<Pod>() {
        @Override
        public void eventReceived(Action action, Pod resource) {
            // Should not be called
        }
        
        @Override
        public void onClose(WatcherException cause) {
            exception[0] = cause;
            errorLatch.countDown();
        }
    });
    
    // Simulate server error by not setting up expectation
    // This will cause the watch to fail
    
    assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
    assertNotNull(exception[0]);
    watch.close();
}

Install with Tessl CLI

npx tessl i tessl/maven-io-fabric8--kubernetes-server-mock

docs

attribute-extraction.md

crud-handlers.md

crud-operations.md

custom-resources.md

index.md

junit-integration.md

mock-server-management.md

websocket-operations.md

tile.json