0
# WebSocket Support
1
2
Spring WebFlux provides comprehensive reactive WebSocket support for both client and server-side WebSocket connections. The implementation supports message handling, session management, and various server implementations with full reactive streams integration.
3
4
## Capabilities
5
6
### WebSocket Session
7
8
Interface representing a WebSocket session with message sending/receiving capabilities.
9
10
```java { .api }
11
interface WebSocketSession {
12
/**
13
* Return the session id.
14
*/
15
String getId();
16
17
/**
18
* Return information from the handshake request.
19
*/
20
HandshakeInfo getHandshakeInfo();
21
22
/**
23
* Return the DataBufferFactory for the session.
24
*/
25
DataBufferFactory bufferFactory();
26
27
/**
28
* Return the map of session attributes.
29
*/
30
Map<String, Object> getAttributes();
31
32
/**
33
* Return a Flux for receiving messages.
34
*/
35
Flux<WebSocketMessage> receive();
36
37
/**
38
* Send messages and return a Mono<Void> that completes when sending is done.
39
* @param messages the messages to send
40
*/
41
Mono<Void> send(Publisher<WebSocketMessage> messages);
42
43
/**
44
* Return whether the connection is still open.
45
*/
46
boolean isOpen();
47
48
/**
49
* Close the session normally.
50
*/
51
Mono<Void> close();
52
53
/**
54
* Close the session with the given close status.
55
* @param status the close status
56
*/
57
Mono<Void> close(CloseStatus status);
58
}
59
```
60
61
**Usage Examples:**
62
63
```java
64
@Component
65
public class ChatWebSocketHandler implements WebSocketHandler {
66
67
@Override
68
public Mono<Void> handle(WebSocketSession session) {
69
// Handle incoming messages
70
Flux<WebSocketMessage> input = session.receive()
71
.doOnNext(message -> {
72
String payload = message.getPayloadAsText();
73
logger.info("Received: {}", payload);
74
});
75
76
// Send periodic messages
77
Flux<WebSocketMessage> output = Flux.interval(Duration.ofSeconds(1))
78
.map(i -> session.textMessage("Ping " + i));
79
80
// Echo messages back
81
Flux<WebSocketMessage> echo = session.receive()
82
.map(message -> session.textMessage("Echo: " + message.getPayloadAsText()));
83
84
return session.send(echo);
85
}
86
}
87
```
88
89
### WebSocket Handler
90
91
Functional interface for handling WebSocket sessions.
92
93
```java { .api }
94
@FunctionalInterface
95
interface WebSocketHandler {
96
/**
97
* Handle the WebSocket session.
98
* @param session the session to handle
99
* @return completion signal
100
*/
101
Mono<Void> handle(WebSocketSession session);
102
103
/**
104
* Return the list of supported sub-protocols.
105
* By default this returns an empty list.
106
*/
107
default List<String> getSubProtocols() {
108
return Collections.emptyList();
109
}
110
}
111
```
112
113
**Usage Examples:**
114
115
```java
116
// Simple echo handler
117
WebSocketHandler echoHandler = session ->
118
session.send(
119
session.receive()
120
.map(message -> session.textMessage("Echo: " + message.getPayloadAsText()))
121
);
122
123
// Chat room handler
124
@Component
125
public class ChatRoomHandler implements WebSocketHandler {
126
private final Sinks.Many<String> messageSink = Sinks.many().multicast().onBackpressureBuffer();
127
128
@Override
129
public Mono<Void> handle(WebSocketSession session) {
130
// Subscribe to chat messages
131
Flux<WebSocketMessage> output = messageSink.asFlux()
132
.map(session::textMessage);
133
134
// Handle incoming messages
135
Mono<Void> input = session.receive()
136
.map(WebSocketMessage::getPayloadAsText)
137
.doOnNext(messageSink::tryEmitNext)
138
.then();
139
140
return Mono.zip(input, session.send(output)).then();
141
}
142
143
@Override
144
public List<String> getSubProtocols() {
145
return List.of("chat-v1", "chat-v2");
146
}
147
}
148
```
149
150
### WebSocket Message
151
152
Class representing a WebSocket message with payload and type information.
153
154
```java { .api }
155
class WebSocketMessage {
156
/**
157
* WebSocket message types.
158
*/
159
enum Type {
160
TEXT, BINARY, PING, PONG
161
}
162
163
/**
164
* Return the message type (text, binary, ping, pong).
165
*/
166
Type getType();
167
168
/**
169
* Return the message payload.
170
*/
171
DataBuffer getPayload();
172
173
/**
174
* Return the message payload as text.
175
* Only applicable for text messages.
176
*/
177
String getPayloadAsText();
178
179
/**
180
* Return the message payload as text using the given charset.
181
* Only applicable for text messages.
182
* @param charset the charset to use
183
*/
184
String getPayloadAsText(Charset charset);
185
186
/**
187
* Retain the data buffer for the message payload.
188
*/
189
void retain();
190
191
/**
192
* Release the data buffer for the message payload.
193
* @return true if the buffer was released; false if already released
194
*/
195
boolean release();
196
}
197
```
198
199
**Usage Examples:**
200
201
```java
202
// Handle different message types
203
public Mono<Void> handle(WebSocketSession session) {
204
return session.receive()
205
.doOnNext(message -> {
206
switch (message.getType()) {
207
case TEXT:
208
String text = message.getPayloadAsText();
209
logger.info("Text message: {}", text);
210
break;
211
case BINARY:
212
DataBuffer buffer = message.getPayload();
213
logger.info("Binary message: {} bytes", buffer.readableByteCount());
214
break;
215
case PING:
216
logger.info("Ping received");
217
break;
218
case PONG:
219
logger.info("Pong received");
220
break;
221
}
222
})
223
.then();
224
}
225
226
// Create messages
227
WebSocketMessage textMsg = session.textMessage("Hello WebSocket!");
228
WebSocketMessage binaryMsg = session.binaryMessage(factory -> {
229
DataBuffer buffer = factory.allocateBuffer();
230
buffer.write("Binary data".getBytes());
231
return buffer;
232
});
233
WebSocketMessage pingMsg = session.pingMessage(factory -> factory.allocateBuffer());
234
```
235
236
### Close Status
237
238
Class representing WebSocket close status codes and reasons.
239
240
```java { .api }
241
class CloseStatus {
242
/**
243
* Create a new CloseStatus instance.
244
* @param code the status code
245
*/
246
CloseStatus(int code);
247
248
/**
249
* Create a new CloseStatus instance.
250
* @param code the status code
251
* @param reason the reason phrase
252
*/
253
CloseStatus(int code, String reason);
254
255
/**
256
* Return the status code.
257
*/
258
int getCode();
259
260
/**
261
* Return the reason phrase.
262
*/
263
String getReason();
264
265
// Standard close status constants
266
static final CloseStatus NORMAL = new CloseStatus(1000);
267
static final CloseStatus GOING_AWAY = new CloseStatus(1001);
268
static final CloseStatus PROTOCOL_ERROR = new CloseStatus(1002);
269
static final CloseStatus NOT_ACCEPTABLE = new CloseStatus(1003);
270
static final CloseStatus NO_STATUS_CODE = new CloseStatus(1005);
271
static final CloseStatus NO_CLOSE_FRAME = new CloseStatus(1006);
272
static final CloseStatus BAD_DATA = new CloseStatus(1007);
273
static final CloseStatus POLICY_VIOLATION = new CloseStatus(1008);
274
static final CloseStatus TOO_BIG_TO_PROCESS = new CloseStatus(1009);
275
static final CloseStatus REQUIRED_EXTENSION = new CloseStatus(1010);
276
static final CloseStatus SERVER_ERROR = new CloseStatus(1011);
277
static final CloseStatus SERVICE_RESTARTED = new CloseStatus(1012);
278
static final CloseStatus SERVICE_OVERLOAD = new CloseStatus(1013);
279
static final CloseStatus TLS_HANDSHAKE_FAILURE = new CloseStatus(1015);
280
}
281
```
282
283
**Usage Examples:**
284
285
```java
286
// Close with standard status
287
return session.close(CloseStatus.NORMAL);
288
289
// Close with custom status and reason
290
return session.close(new CloseStatus(4000, "Custom application error"));
291
292
// Handle close events
293
public Mono<Void> handle(WebSocketSession session) {
294
return session.receive()
295
.doOnError(ex -> logger.error("WebSocket error", ex))
296
.doOnComplete(() -> logger.info("WebSocket closed normally"))
297
.then()
298
.onErrorResume(ex -> session.close(CloseStatus.SERVER_ERROR));
299
}
300
```
301
302
### Handshake Info
303
304
Class containing information about the WebSocket handshake request.
305
306
```java { .api }
307
class HandshakeInfo {
308
/**
309
* Return the request URI.
310
*/
311
URI getUri();
312
313
/**
314
* Return the request headers.
315
*/
316
HttpHeaders getHeaders();
317
318
/**
319
* Return the authenticated principal, if any.
320
*/
321
Mono<Principal> getPrincipal();
322
323
/**
324
* Return the negotiated sub-protocol.
325
*/
326
String getProtocol();
327
328
/**
329
* Return the remote address.
330
*/
331
InetSocketAddress getRemoteAddress();
332
333
/**
334
* Return handshake attributes.
335
*/
336
Map<String, Object> getAttributes();
337
338
/**
339
* Return a log prefix for correlation purposes.
340
*/
341
String getLogPrefix();
342
}
343
```
344
345
**Usage Examples:**
346
347
```java
348
public Mono<Void> handle(WebSocketSession session) {
349
HandshakeInfo handshake = session.getHandshakeInfo();
350
351
logger.info("WebSocket connection from {} to {}",
352
handshake.getRemoteAddress(),
353
handshake.getUri());
354
355
// Check sub-protocol
356
String protocol = handshake.getProtocol();
357
if ("chat-v2".equals(protocol)) {
358
return handleChatV2(session);
359
} else {
360
return handleChatV1(session);
361
}
362
}
363
```
364
365
### WebSocket Client
366
367
Interface for establishing WebSocket connections from client side.
368
369
```java { .api }
370
interface WebSocketClient {
371
/**
372
* Execute a handshake request to the given URL and handle the session.
373
* @param url the handshake URL
374
* @param handler the handler for the WebSocket session
375
* @return completion signal
376
*/
377
Mono<Void> execute(URI url, WebSocketHandler handler);
378
379
/**
380
* Execute a handshake request with headers.
381
* @param url the handshake URL
382
* @param headers headers to send with the handshake request
383
* @param handler the handler for the WebSocket session
384
* @return completion signal
385
*/
386
Mono<Void> execute(URI url, HttpHeaders headers, WebSocketHandler handler);
387
388
/**
389
* Execute a handshake request for the given sub-protocol.
390
* @param url the handshake URL
391
* @param subProtocolType the sub-protocol type
392
* @param handler the handler for the WebSocket session
393
* @return completion signal
394
*/
395
Mono<Void> execute(URI url, Class<?> subProtocolType, WebSocketHandler handler);
396
}
397
```
398
399
**Client Implementations:**
400
401
```java { .api }
402
// Reactor Netty implementation
403
class ReactorNettyWebSocketClient implements WebSocketClient {
404
ReactorNettyWebSocketClient();
405
ReactorNettyWebSocketClient(HttpClient httpClient);
406
}
407
408
// Reactor Netty 2 implementation
409
class ReactorNetty2WebSocketClient implements WebSocketClient {
410
ReactorNetty2WebSocketClient();
411
ReactorNetty2WebSocketClient(HttpClient httpClient);
412
}
413
414
// Jetty implementation
415
class JettyWebSocketClient implements WebSocketClient {
416
JettyWebSocketClient();
417
JettyWebSocketClient(WebSocketClient jettyClient);
418
}
419
420
// JSR-356 standard implementation
421
class StandardWebSocketClient implements WebSocketClient {
422
StandardWebSocketClient();
423
StandardWebSocketClient(WebSocketContainer container);
424
}
425
426
// Tomcat implementation
427
class TomcatWebSocketClient implements WebSocketClient {
428
TomcatWebSocketClient();
429
TomcatWebSocketClient(WebSocketContainer container);
430
}
431
432
// Undertow implementation
433
class UndertowWebSocketClient implements WebSocketClient {
434
UndertowWebSocketClient();
435
UndertowWebSocketClient(WebSocketClient undertowClient);
436
}
437
```
438
439
**Usage Examples:**
440
441
```java
442
// Basic client connection
443
WebSocketClient client = new ReactorNettyWebSocketClient();
444
445
WebSocketHandler clientHandler = session -> {
446
// Send initial message
447
Mono<Void> send = session.send(
448
Mono.just(session.textMessage("Hello Server!"))
449
);
450
451
// Receive messages
452
Mono<Void> receive = session.receive()
453
.map(WebSocketMessage::getPayloadAsText)
454
.doOnNext(message -> logger.info("Received: {}", message))
455
.then();
456
457
return Mono.zip(send, receive).then();
458
};
459
460
// Connect to WebSocket server
461
Mono<Void> connection = client.execute(
462
URI.create("ws://localhost:8080/websocket"),
463
clientHandler
464
);
465
466
// Connect with custom headers
467
HttpHeaders headers = new HttpHeaders();
468
headers.add("Authorization", "Bearer " + token);
469
470
Mono<Void> authenticatedConnection = client.execute(
471
URI.create("ws://localhost:8080/websocket"),
472
headers,
473
clientHandler
474
);
475
```
476
477
### WebSocket Service
478
479
Interface for handling WebSocket upgrade requests on the server side.
480
481
```java { .api }
482
interface WebSocketService {
483
/**
484
* Handle a WebSocket handshake request with the given handler.
485
* @param exchange the current exchange
486
* @param handler the handler for the WebSocket session
487
* @return completion signal
488
*/
489
Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler);
490
}
491
```
492
493
### Request Upgrade Strategy
494
495
Interface for upgrading HTTP requests to WebSocket connections.
496
497
```java { .api }
498
interface RequestUpgradeStrategy {
499
/**
500
* Upgrade the HTTP request to a WebSocket connection.
501
* @param exchange the current exchange
502
* @param handler the WebSocket handler
503
* @param subProtocol the negotiated sub-protocol
504
* @param handshakeInfoFactory factory for creating handshake info
505
* @return completion signal
506
*/
507
Mono<Void> upgrade(
508
ServerWebExchange exchange,
509
WebSocketHandler handler,
510
String subProtocol,
511
Supplier<HandshakeInfo> handshakeInfoFactory
512
);
513
}
514
```
515
516
### Server Support Classes
517
518
Support classes for integrating WebSocket handlers with Spring WebFlux infrastructure.
519
520
```java { .api }
521
class HandshakeWebSocketService implements WebSocketService {
522
/**
523
* Default constructor.
524
*/
525
HandshakeWebSocketService();
526
527
/**
528
* Constructor with a specific upgrade strategy.
529
* @param upgradeStrategy the upgrade strategy to use
530
*/
531
HandshakeWebSocketService(RequestUpgradeStrategy upgradeStrategy);
532
533
@Override
534
Mono<Void> handleRequest(ServerWebExchange exchange, WebSocketHandler handler);
535
}
536
537
class WebSocketHandlerAdapter implements HandlerAdapter {
538
/**
539
* Check if the handler is a WebSocketHandler.
540
* @param handler the handler object
541
* @return true if supported
542
*/
543
@Override
544
boolean supports(Object handler);
545
546
/**
547
* Handle the WebSocket request.
548
* @param exchange the current exchange
549
* @param handler the WebSocket handler
550
* @return the handler result
551
*/
552
@Override
553
Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler);
554
}
555
556
class WebSocketUpgradeHandlerPredicate implements RequestPredicate {
557
/**
558
* Test if the request is a WebSocket upgrade request.
559
* @param request the server request
560
* @return true if it's a WebSocket upgrade request
561
*/
562
@Override
563
boolean test(ServerRequest request);
564
}
565
```
566
567
**Usage Examples:**
568
569
```java
570
// Configure WebSocket in functional routes
571
@Configuration
572
public class WebSocketConfig {
573
574
@Bean
575
public RouterFunction<ServerResponse> webSocketRoutes(
576
ChatWebSocketHandler chatHandler,
577
EchoWebSocketHandler echoHandler) {
578
579
return RouterFunctions.route()
580
.GET("/websocket/chat", accept(TEXT_PLAIN), chatHandler)
581
.GET("/websocket/echo", echoHandler)
582
.build();
583
}
584
585
@Bean
586
public WebSocketHandlerAdapter handlerAdapter() {
587
return new WebSocketHandlerAdapter();
588
}
589
}
590
591
// Using in annotation-based controller
592
@RestController
593
public class WebSocketController {
594
595
@Autowired
596
private ChatWebSocketHandler chatHandler;
597
598
@GetMapping("/chat")
599
public Mono<Void> chat(ServerWebExchange exchange) {
600
return new HandshakeWebSocketService()
601
.handleRequest(exchange, chatHandler);
602
}
603
}
604
```