or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

base-sse-servlet.mdconfiguration-streaming.mdindex.mdlegacy-metrics-polling.mdmetrics-streaming.mdrequest-events-streaming.mdutilization-streaming.md

base-sse-servlet.mddocs/

0

# Base SSE Servlet Framework

1

2

Abstract base class providing Server-Sent Events (SSE) functionality for all Hystrix streaming servlets. This framework handles HTTP streaming, connection management, and client lifecycle.

3

4

## Capabilities

5

6

### HystrixSampleSseServlet

7

8

Abstract servlet that implements the Server-Sent Events protocol for streaming data to HTTP clients.

9

10

```java { .api }

11

/**

12

* Abstract base servlet for SSE streaming functionality.

13

* Handles HTTP streaming protocol, connection management, and client lifecycle.

14

*/

15

public abstract class HystrixSampleSseServlet extends HttpServlet {

16

17

/**

18

* Protected constructor with sample stream using default polling delay

19

* @param sampleStream Observable stream of string data to send to clients

20

*/

21

protected HystrixSampleSseServlet(Observable<String> sampleStream);

22

23

/**

24

* Protected constructor with sample stream and custom polling delay

25

* @param sampleStream Observable stream of string data to send to clients

26

* @param pausePollerThreadDelayInMs Delay between polling cycles in milliseconds

27

*/

28

protected HystrixSampleSseServlet(Observable<String> sampleStream, int pausePollerThreadDelayInMs);

29

30

/**

31

* Handle incoming GET requests - establishes SSE connection

32

* @param request HTTP request

33

* @param response HTTP response configured for text/event-stream

34

* @throws ServletException if servlet encounters difficulty

35

* @throws IOException if I/O errors occur

36

*/

37

protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException;

38

39

/**

40

* Static method to gracefully shutdown all servlet instances

41

* Sets shutdown flag to terminate active connections

42

*/

43

public static void shutdown();

44

45

/**

46

* Initialize servlet - resets shutdown flag

47

* @throws ServletException if initialization fails

48

*/

49

public void init() throws ServletException;

50

51

/**

52

* Clean up servlet resources - sets shutdown flag

53

*/

54

public void destroy();

55

56

// Abstract methods that must be implemented by subclasses

57

58

/**

59

* Must return maximum number of concurrent connections allowed

60

* @return Maximum concurrent connections

61

*/

62

protected abstract int getMaxNumberConcurrentConnectionsAllowed();

63

64

/**

65

* Must return current number of active connections

66

* @return Current connection count

67

*/

68

protected abstract int getNumberCurrentConnections();

69

70

/**

71

* Must atomically increment and return current concurrent connection count

72

* @return New connection count after increment

73

*/

74

protected abstract int incrementAndGetCurrentConcurrentConnections();

75

76

/**

77

* Must atomically decrement current concurrent connection count

78

*/

79

protected abstract void decrementCurrentConcurrentConnections();

80

}

81

```

82

83

### Constants

84

85

```java { .api }

86

/**

87

* Default delay between polling cycles to check client connection status

88

*/

89

public static final int DEFAULT_PAUSE_POLLER_THREAD_DELAY_IN_MS = 500;

90

91

/**

92

* Static shutdown flag shared across all servlet instances

93

*/

94

private static volatile boolean isDestroyed = false;

95

```

96

97

**Usage Examples:**

98

99

```java

100

// Implementing a custom SSE servlet

101

public class CustomMetricsServlet extends HystrixSampleSseServlet {

102

private static AtomicInteger concurrentConnections = new AtomicInteger(0);

103

private static final int MAX_CONNECTIONS = 10;

104

105

public CustomMetricsServlet() {

106

super(createCustomStream());

107

}

108

109

@Override

110

protected int getMaxNumberConcurrentConnectionsAllowed() {

111

return MAX_CONNECTIONS;

112

}

113

114

@Override

115

protected int getNumberCurrentConnections() {

116

return concurrentConnections.get();

117

}

118

119

@Override

120

protected int incrementAndGetCurrentConcurrentConnections() {

121

return concurrentConnections.incrementAndGet();

122

}

123

124

@Override

125

protected void decrementCurrentConcurrentConnections() {

126

concurrentConnections.decrementAndGet();

127

}

128

129

private static Observable<String> createCustomStream() {

130

return Observable.interval(1, TimeUnit.SECONDS)

131

.map(tick -> "data: {\"timestamp\": " + System.currentTimeMillis() + "}\n\n");

132

}

133

}

134

```

135

136

## HTTP Protocol Details

137

138

### Response Headers

139

140

The servlet automatically sets these headers for SSE compliance:

141

142

```java

143

response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");

144

response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");

145

response.setHeader("Pragma", "no-cache");

146

```

147

148

### Data Format

149

150

All data is sent in Server-Sent Events format:

151

152

```

153

data: {"key": "value"}

154

155

ping:

156

157

data: {"key": "value"}

158

159

```

160

161

### Connection Management Flow

162

163

1. Client connects via HTTP GET request

164

2. Server checks concurrent connection limit

165

3. If under limit, establishes SSE connection

166

4. If over limit, returns HTTP 503 error

167

5. Server subscribes to data stream using RxJava

168

6. Data events are written to client as "data: JSON\n\n"

169

7. Periodic "ping: \n\n" messages maintain connection

170

8. Connection cleanup on client disconnect or servlet shutdown

171

172

## Threading Model

173

174

- **HTTP Request Thread**: Handles initial connection setup

175

- **RxJava IO Thread**: Processes stream data and writes to client (non-blocking)

176

- **Polling Thread**: Periodically checks connection status and sends ping messages

177

- **Stream Thread**: RxJava computation thread for data processing

178

179

## Error Handling

180

181

```java { .api }

182

// Error scenarios handled by base servlet:

183

184

// 1. Max connections exceeded

185

response.sendError(503, "MaxConcurrentConnections reached: " + maxNumberConnectionsAllowed);

186

187

// 2. Service shutdown

188

response.sendError(503, "Service has been shut down.");

189

190

// 3. Client disconnect detection

191

if (writer.checkError()) {

192

moreDataWillBeSent.set(false);

193

}

194

195

// 4. Stream errors

196

subscriber.onError(throwable -> moreDataWillBeSent.set(false));

197

```

198

199

## Lifecycle Management

200

201

```java

202

// Graceful shutdown pattern

203

HystrixSampleSseServlet.shutdown(); // Call before application shutdown

204

205

// WebSphere-specific shutdown hook

206

// Invoke shutdown() from another servlet's destroy() method to handle

207

// WebSphere's 60-second timeout requirement

208

```