CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-netflix-hystrix--hystrix-metrics-event-stream

A Netflix Hystrix module that exposes circuit breaker and thread pool metrics in Server-Sent Events format for real-time monitoring and dashboard integration

Pending
Overview
Eval results
Files

base-sse-servlet.mddocs/

Base SSE Servlet Framework

Abstract base class providing Server-Sent Events (SSE) functionality for all Hystrix streaming servlets. This framework handles HTTP streaming, connection management, and client lifecycle.

Capabilities

HystrixSampleSseServlet

Abstract servlet that implements the Server-Sent Events protocol for streaming data to HTTP clients.

/**
 * Abstract base servlet for SSE streaming functionality.
 * Handles HTTP streaming protocol, connection management, and client lifecycle.
 */
public abstract class HystrixSampleSseServlet extends HttpServlet {
    
    /**
     * Protected constructor with sample stream using default polling delay
     * @param sampleStream Observable stream of string data to send to clients
     */
    protected HystrixSampleSseServlet(Observable<String> sampleStream);
    
    /**
     * Protected constructor with sample stream and custom polling delay
     * @param sampleStream Observable stream of string data to send to clients
     * @param pausePollerThreadDelayInMs Delay between polling cycles in milliseconds
     */
    protected HystrixSampleSseServlet(Observable<String> sampleStream, int pausePollerThreadDelayInMs);
    
    /**
     * Handle incoming GET requests - establishes SSE connection
     * @param request HTTP request
     * @param response HTTP response configured for text/event-stream
     * @throws ServletException if servlet encounters difficulty
     * @throws IOException if I/O errors occur
     */
    protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException;
    
    /**
     * Static method to gracefully shutdown all servlet instances
     * Sets shutdown flag to terminate active connections
     */
    public static void shutdown();
    
    /**
     * Initialize servlet - resets shutdown flag
     * @throws ServletException if initialization fails
     */
    public void init() throws ServletException;
    
    /**
     * Clean up servlet resources - sets shutdown flag
     */
    public void destroy();
    
    // Abstract methods that must be implemented by subclasses
    
    /**
     * Must return maximum number of concurrent connections allowed
     * @return Maximum concurrent connections
     */
    protected abstract int getMaxNumberConcurrentConnectionsAllowed();
    
    /**
     * Must return current number of active connections
     * @return Current connection count
     */
    protected abstract int getNumberCurrentConnections();
    
    /**
     * Must atomically increment and return current concurrent connection count
     * @return New connection count after increment
     */
    protected abstract int incrementAndGetCurrentConcurrentConnections();
    
    /**
     * Must atomically decrement current concurrent connection count
     */
    protected abstract void decrementCurrentConcurrentConnections();
}

Constants

/**
 * Default delay between polling cycles to check client connection status
 */
public static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500;

/**
 * Static shutdown flag shared across all servlet instances
 */
private static volatile boolean isDestroyed = false;

Usage Examples:

// Implementing a custom SSE servlet
public class CustomMetricsServlet extends HystrixSampleSseServlet {
    private static AtomicInteger concurrentConnections = new AtomicInteger(0);
    private static final int MAX_CONNECTIONS = 10;
    
    public CustomMetricsServlet() {
        super(createCustomStream());
    }
    
    @Override
    protected int getMaxNumberConcurrentConnectionsAllowed() {
        return MAX_CONNECTIONS;
    }
    
    @Override
    protected int getNumberCurrentConnections() {
        return concurrentConnections.get();
    }
    
    @Override
    protected int incrementAndGetCurrentConcurrentConnections() {
        return concurrentConnections.incrementAndGet();
    }
    
    @Override
    protected void decrementCurrentConcurrentConnections() {
        concurrentConnections.decrementAndGet();
    }
    
    private static Observable<String> createCustomStream() {
        return Observable.interval(1, TimeUnit.SECONDS)
            .map(tick -> "data: {\"timestamp\": " + System.currentTimeMillis() + "}\n\n");
    }
}

HTTP Protocol Details

Response Headers

The servlet automatically sets these headers for SSE compliance:

response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
response.setHeader("Pragma", "no-cache");

Data Format

All data is sent in Server-Sent Events format:

data: {"key": "value"}

ping: 

data: {"key": "value"}

Connection Management Flow

  1. Client connects via HTTP GET request
  2. Server checks concurrent connection limit
  3. If under limit, establishes SSE connection
  4. If over limit, returns HTTP 503 error
  5. Server subscribes to data stream using RxJava
  6. Data events are written to client as "data: JSON\n\n"
  7. Periodic "ping: \n\n" messages maintain connection
  8. Connection cleanup on client disconnect or servlet shutdown

Threading Model

  • HTTP Request Thread: Handles initial connection setup
  • RxJava IO Thread: Processes stream data and writes to client (non-blocking)
  • Polling Thread: Periodically checks connection status and sends ping messages
  • Stream Thread: RxJava computation thread for data processing

Error Handling

// Error scenarios handled by base servlet:

// 1. Max connections exceeded
response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed);

// 2. Service shutdown
response.sendError(503, "Service has been shut down.");

// 3. Client disconnect detection
if (writer.checkError()) {
    moreDataWillBeSent.set(false);
}

// 4. Stream errors
subscriber.onError(throwable -> moreDataWillBeSent.set(false));

Lifecycle Management

// Graceful shutdown pattern
HystrixSampleSseServlet.shutdown(); // Call before application shutdown

// WebSphere-specific shutdown hook
// Invoke shutdown() from another servlet's destroy() method to handle
// WebSphere's 60-second timeout requirement

Install with Tessl CLI

npx tessl i tessl/maven-com-netflix-hystrix--hystrix-metrics-event-stream

docs

base-sse-servlet.md

configuration-streaming.md

index.md

legacy-metrics-polling.md

metrics-streaming.md

request-events-streaming.md

utilization-streaming.md

tile.json