0
# Base SSE Servlet Framework
1
2
Abstract base class providing Server-Sent Events (SSE) functionality for all Hystrix streaming servlets. This framework handles HTTP streaming, connection management, and client lifecycle.
3
4
## Capabilities
5
6
### HystrixSampleSseServlet
7
8
Abstract servlet that implements the Server-Sent Events protocol for streaming data to HTTP clients.
9
10
```java { .api }
11
/**
12
* Abstract base servlet for SSE streaming functionality.
13
* Handles HTTP streaming protocol, connection management, and client lifecycle.
14
*/
15
public abstract class HystrixSampleSseServlet extends HttpServlet {
16
17
/**
18
* Protected constructor with sample stream using default polling delay
19
* @param sampleStream Observable stream of string data to send to clients
20
*/
21
protected HystrixSampleSseServlet(Observable<String> sampleStream);
22
23
/**
24
* Protected constructor with sample stream and custom polling delay
25
* @param sampleStream Observable stream of string data to send to clients
26
* @param pausePollerThreadDelayInMs Delay between polling cycles in milliseconds
27
*/
28
protected HystrixSampleSseServlet(Observable<String> sampleStream, int pausePollerThreadDelayInMs);
29
30
/**
31
* Handle incoming GET requests - establishes SSE connection
32
* @param request HTTP request
33
* @param response HTTP response configured for text/event-stream
34
* @throws ServletException if servlet encounters difficulty
35
* @throws IOException if I/O errors occur
36
*/
37
protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException;
38
39
/**
40
* Static method to gracefully shutdown all servlet instances
41
* Sets shutdown flag to terminate active connections
42
*/
43
public static void shutdown();
44
45
/**
46
* Initialize servlet - resets shutdown flag
47
* @throws ServletException if initialization fails
48
*/
49
public void init() throws ServletException;
50
51
/**
52
* Clean up servlet resources - sets shutdown flag
53
*/
54
public void destroy();
55
56
// Abstract methods that must be implemented by subclasses
57
58
/**
59
* Must return maximum number of concurrent connections allowed
60
* @return Maximum concurrent connections
61
*/
62
protected abstract int getMaxNumberConcurrentConnectionsAllowed();
63
64
/**
65
* Must return current number of active connections
66
* @return Current connection count
67
*/
68
protected abstract int getNumberCurrentConnections();
69
70
/**
71
* Must atomically increment and return current concurrent connection count
72
* @return New connection count after increment
73
*/
74
protected abstract int incrementAndGetCurrentConcurrentConnections();
75
76
/**
77
* Must atomically decrement current concurrent connection count
78
*/
79
protected abstract void decrementCurrentConcurrentConnections();
80
}
81
```
82
83
### Constants
84
85
```java { .api }
86
/**
87
* Default delay between polling cycles to check client connection status
88
*/
89
public static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500;
90
91
/**
92
* Static shutdown flag shared across all servlet instances
93
*/
94
private static volatile boolean isDestroyed = false;
95
```
96
97
**Usage Examples:**
98
99
```java
100
// Implementing a custom SSE servlet
101
public class CustomMetricsServlet extends HystrixSampleSseServlet {
102
private static AtomicInteger concurrentConnections = new AtomicInteger(0);
103
private static final int MAX_CONNECTIONS = 10;
104
105
public CustomMetricsServlet() {
106
super(createCustomStream());
107
}
108
109
@Override
110
protected int getMaxNumberConcurrentConnectionsAllowed() {
111
return MAX_CONNECTIONS;
112
}
113
114
@Override
115
protected int getNumberCurrentConnections() {
116
return concurrentConnections.get();
117
}
118
119
@Override
120
protected int incrementAndGetCurrentConcurrentConnections() {
121
return concurrentConnections.incrementAndGet();
122
}
123
124
@Override
125
protected void decrementCurrentConcurrentConnections() {
126
concurrentConnections.decrementAndGet();
127
}
128
129
private static Observable<String> createCustomStream() {
130
return Observable.interval(1, TimeUnit.SECONDS)
131
.map(tick -> "data: {\"timestamp\": " + System.currentTimeMillis() + "}\n\n");
132
}
133
}
134
```
135
136
## HTTP Protocol Details
137
138
### Response Headers
139
140
The servlet automatically sets these headers for SSE compliance:
141
142
```java
143
response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
144
response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
145
response.setHeader("Pragma", "no-cache");
146
```
147
148
### Data Format
149
150
All data is sent in Server-Sent Events format:
151
152
```
153
data: {"key": "value"}
154
155
ping:
156
157
data: {"key": "value"}
158
159
```
160
161
### Connection Management Flow
162
163
1. Client connects via HTTP GET request
164
2. Server checks concurrent connection limit
165
3. If under limit, establishes SSE connection
166
4. If over limit, returns HTTP 503 error
167
5. Server subscribes to data stream using RxJava
168
6. Data events are written to client as "data: JSON\n\n"
169
7. Periodic "ping: \n\n" messages maintain connection
170
8. Connection cleanup on client disconnect or servlet shutdown
171
172
## Threading Model
173
174
- **HTTP Request Thread**: Handles initial connection setup
175
- **RxJava IO Thread**: Processes stream data and writes to client (non-blocking)
176
- **Polling Thread**: Periodically checks connection status and sends ping messages
177
- **Stream Thread**: RxJava computation thread for data processing
178
179
## Error Handling
180
181
```java { .api }
182
// Error scenarios handled by base servlet:
183
184
// 1. Max connections exceeded
185
response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed);
186
187
// 2. Service shutdown
188
response.sendError(503, "Service has been shut down.");
189
190
// 3. Client disconnect detection
191
if (writer.checkError()) {
192
moreDataWillBeSent.set(false);
193
}
194
195
// 4. Stream errors
196
subscriber.onError(throwable -> moreDataWillBeSent.set(false));
197
```
198
199
## Lifecycle Management
200
201
```java
202
// Graceful shutdown pattern
203
HystrixSampleSseServlet.shutdown(); // Call before application shutdown
204
205
// WebSphere-specific shutdown hook
206
// Invoke shutdown() from another servlet's destroy() method to handle
207
// WebSphere's 60-second timeout requirement
208
```