0
# WebSocket Support
1
2
Micronaut provides comprehensive WebSocket support for full-duplex communication between clients and servers, including both server-side endpoints and client-side connections.
3
4
## Capabilities
5
6
### Server WebSockets
7
8
Create WebSocket endpoints on the server to handle client connections.
9
10
```java { .api }
11
/**
12
* WebSocket server endpoint
13
*/
14
@ServerWebSocket("/ws")
15
public class ChatWebSocket {
16
17
@OnOpen
18
public void onOpen(WebSocketSession session) {
19
// Handle connection opening
20
session.send("Welcome!", MediaType.TEXT_PLAIN_TYPE);
21
}
22
23
@OnMessage
24
public void onMessage(String message, WebSocketSession session) {
25
// Handle incoming message
26
session.sendSync(message, MediaType.TEXT_PLAIN_TYPE);
27
}
28
29
@OnClose
30
public void onClose(WebSocketSession session, CloseReason closeReason) {
31
// Handle connection closing
32
System.out.println("Connection closed: " + closeReason.getReasonPhrase());
33
}
34
35
@OnError
36
public void onError(WebSocketSession session, Throwable t) {
37
// Handle errors
38
t.printStackTrace();
39
}
40
}
41
```
42
43
### Client WebSockets
44
45
Create WebSocket clients to connect to remote WebSocket endpoints.
46
47
```java { .api }
48
/**
49
* WebSocket client
50
*/
51
@ClientWebSocket("/ws")
52
public abstract class ChatClient implements AutoCloseable {
53
54
@OnOpen
55
public void onOpen(WebSocketSession session) {
56
// Handle connection opening
57
}
58
59
@OnMessage
60
public void onMessage(String message) {
61
// Handle received message
62
}
63
64
@OnClose
65
public void onClose(CloseReason closeReason) {
66
// Handle connection closing
67
}
68
69
@OnError
70
public void onError(Throwable t) {
71
// Handle errors
72
}
73
74
public abstract void send(String message);
75
76
public abstract void sendAsync(String message);
77
}
78
```
79
80
### WebSocket Session Management
81
82
Manage WebSocket sessions and broadcast messages to multiple clients.
83
84
```java { .api }
85
/**
86
* WebSocket session operations
87
*/
88
public interface WebSocketSession extends AttributeHolder {
89
90
String getId();
91
92
Publisher<String> send(Object message, MediaType mediaType);
93
94
<T> Publisher<T> send(T message);
95
96
void sendSync(Object message, MediaType mediaType);
97
98
<T> void sendSync(T message);
99
100
boolean isOpen();
101
102
boolean isSecure();
103
104
Optional<Principal> getUserPrincipal();
105
106
Set<WebSocketSession> getOpenSessions();
107
108
void close();
109
110
void close(CloseReason closeReason);
111
}
112
```
113
114
### WebSocket Broadcasting
115
116
Broadcast messages to multiple WebSocket sessions.
117
118
```java { .api }
119
/**
120
* WebSocket broadcaster service
121
*/
122
@Singleton
123
public class WebSocketBroadcasterService {
124
125
private final WebSocketBroadcaster broadcaster;
126
127
public WebSocketBroadcasterService(WebSocketBroadcaster broadcaster) {
128
this.broadcaster = broadcaster;
129
}
130
131
public void broadcastToAll(String message) {
132
broadcaster.broadcastSync(message, MediaType.TEXT_PLAIN_TYPE);
133
}
134
135
public void broadcastToPath(String path, String message) {
136
broadcaster.broadcastSync(message, MediaType.TEXT_PLAIN_TYPE,
137
session -> session.getRequestURI().getPath().equals(path));
138
}
139
}
140
141
/**
142
* WebSocket broadcaster interface
143
*/
144
public interface WebSocketBroadcaster {
145
146
<T> Publisher<T> broadcast(T message, MediaType mediaType);
147
148
<T> Publisher<T> broadcast(T message, MediaType mediaType,
149
Predicate<WebSocketSession> filter);
150
151
<T> void broadcastSync(T message, MediaType mediaType);
152
153
<T> void broadcastSync(T message, MediaType mediaType,
154
Predicate<WebSocketSession> filter);
155
}
156
```
157
158
### Reactive WebSocket Operations
159
160
Use reactive types for WebSocket message handling.
161
162
```java { .api }
163
/**
164
* Reactive WebSocket endpoint
165
*/
166
@ServerWebSocket("/reactive")
167
public class ReactiveWebSocket {
168
169
@OnMessage
170
public Publisher<String> onMessage(String message, WebSocketSession session) {
171
return Flowable.fromArray(message.split(" "))
172
.map(String::toUpperCase);
173
}
174
175
@OnMessage
176
public Single<String> processMessage(String message) {
177
return Single.fromCallable(() -> "Processed: " + message);
178
}
179
}
180
```
181
182
## Types
183
184
```java { .api }
185
// WebSocket annotations
186
@Target({ElementType.TYPE})
187
@Retention(RetentionPolicy.RUNTIME)
188
public @interface ServerWebSocket {
189
String value() default "/";
190
String[] subprotocols() default {};
191
}
192
193
@Target({ElementType.TYPE})
194
@Retention(RetentionPolicy.RUNTIME)
195
public @interface ClientWebSocket {
196
String value() default "/";
197
String[] subprotocols() default {};
198
}
199
200
@Target({ElementType.METHOD})
201
@Retention(RetentionPolicy.RUNTIME)
202
public @interface OnOpen {
203
}
204
205
@Target({ElementType.METHOD})
206
@Retention(RetentionPolicy.RUNTIME)
207
public @interface OnMessage {
208
}
209
210
@Target({ElementType.METHOD})
211
@Retention(RetentionPolicy.RUNTIME)
212
public @interface OnClose {
213
}
214
215
@Target({ElementType.METHOD})
216
@Retention(RetentionPolicy.RUNTIME)
217
public @interface OnError {
218
}
219
220
// Core WebSocket interfaces
221
public interface WebSocketSession extends AttributeHolder {
222
String getId();
223
Publisher<String> send(Object message, MediaType mediaType);
224
<T> Publisher<T> send(T message);
225
void sendSync(Object message, MediaType mediaType);
226
<T> void sendSync(T message);
227
boolean isOpen();
228
boolean isSecure();
229
void close();
230
void close(CloseReason closeReason);
231
}
232
233
public interface WebSocketBroadcaster {
234
<T> Publisher<T> broadcast(T message, MediaType mediaType);
235
<T> Publisher<T> broadcast(T message, MediaType mediaType, Predicate<WebSocketSession> filter);
236
<T> void broadcastSync(T message, MediaType mediaType);
237
<T> void broadcastSync(T message, MediaType mediaType, Predicate<WebSocketSession> filter);
238
}
239
240
public final class CloseReason {
241
public static enum Code {
242
NORMAL_CLOSURE, GOING_AWAY, PROTOCOL_ERROR, UNSUPPORTED_DATA,
243
ABNORMAL_CLOSURE, INVALID_PAYLOAD_DATA, POLICY_VIOLATION,
244
MESSAGE_TOO_BIG, MANDATORY_EXTENSION, INTERNAL_SERVER_ERROR,
245
SERVICE_RESTART, TRY_AGAIN_LATER, BAD_GATEWAY, TLS_HANDSHAKE_FAILURE
246
}
247
248
public Code getCode();
249
public String getReasonPhrase();
250
}
251
```