Modern, JVM-based framework for building modular, easily testable microservice and serverless applications with compile-time DI and fast startup.
—
Micronaut provides comprehensive WebSocket support for full-duplex communication between clients and servers, including both server-side endpoints and client-side connections.
Create WebSocket endpoints on the server to handle client connections.
/**
* WebSocket server endpoint
*/
@ServerWebSocket("/ws")
public class ChatWebSocket {
@OnOpen
public void onOpen(WebSocketSession session) {
// Handle connection opening
session.send("Welcome!", MediaType.TEXT_PLAIN_TYPE);
}
@OnMessage
public void onMessage(String message, WebSocketSession session) {
// Handle incoming message
session.sendSync(message, MediaType.TEXT_PLAIN_TYPE);
}
@OnClose
public void onClose(WebSocketSession session, CloseReason closeReason) {
// Handle connection closing
System.out.println("Connection closed: " + closeReason.getReasonPhrase());
}
@OnError
public void onError(WebSocketSession session, Throwable t) {
// Handle errors
t.printStackTrace();
}
}Create WebSocket clients to connect to remote WebSocket endpoints.
/**
* WebSocket client
*/
@ClientWebSocket("/ws")
public abstract class ChatClient implements AutoCloseable {
@OnOpen
public void onOpen(WebSocketSession session) {
// Handle connection opening
}
@OnMessage
public void onMessage(String message) {
// Handle received message
}
@OnClose
public void onClose(CloseReason closeReason) {
// Handle connection closing
}
@OnError
public void onError(Throwable t) {
// Handle errors
}
public abstract void send(String message);
public abstract void sendAsync(String message);
}Manage WebSocket sessions and broadcast messages to multiple clients.
/**
* WebSocket session operations
*/
public interface WebSocketSession extends AttributeHolder {
String getId();
Publisher<String> send(Object message, MediaType mediaType);
<T> Publisher<T> send(T message);
void sendSync(Object message, MediaType mediaType);
<T> void sendSync(T message);
boolean isOpen();
boolean isSecure();
Optional<Principal> getUserPrincipal();
Set<WebSocketSession> getOpenSessions();
void close();
void close(CloseReason closeReason);
}Broadcast messages to multiple WebSocket sessions.
/**
* WebSocket broadcaster service
*/
@Singleton
public class WebSocketBroadcasterService {
private final WebSocketBroadcaster broadcaster;
public WebSocketBroadcasterService(WebSocketBroadcaster broadcaster) {
this.broadcaster = broadcaster;
}
public void broadcastToAll(String message) {
broadcaster.broadcastSync(message, MediaType.TEXT_PLAIN_TYPE);
}
public void broadcastToPath(String path, String message) {
broadcaster.broadcastSync(message, MediaType.TEXT_PLAIN_TYPE,
session -> session.getRequestURI().getPath().equals(path));
}
}
/**
* WebSocket broadcaster interface
*/
public interface WebSocketBroadcaster {
<T> Publisher<T> broadcast(T message, MediaType mediaType);
<T> Publisher<T> broadcast(T message, MediaType mediaType,
Predicate<WebSocketSession> filter);
<T> void broadcastSync(T message, MediaType mediaType);
<T> void broadcastSync(T message, MediaType mediaType,
Predicate<WebSocketSession> filter);
}Use reactive types for WebSocket message handling.
/**
* Reactive WebSocket endpoint
*/
@ServerWebSocket("/reactive")
public class ReactiveWebSocket {
@OnMessage
public Publisher<String> onMessage(String message, WebSocketSession session) {
return Flowable.fromArray(message.split(" "))
.map(String::toUpperCase);
}
@OnMessage
public Single<String> processMessage(String message) {
return Single.fromCallable(() -> "Processed: " + message);
}
}// WebSocket annotations
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ServerWebSocket {
String value() default "/";
String[] subprotocols() default {};
}
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface ClientWebSocket {
String value() default "/";
String[] subprotocols() default {};
}
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface OnOpen {
}
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface OnMessage {
}
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface OnClose {
}
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface OnError {
}
// Core WebSocket interfaces
public interface WebSocketSession extends AttributeHolder {
String getId();
Publisher<String> send(Object message, MediaType mediaType);
<T> Publisher<T> send(T message);
void sendSync(Object message, MediaType mediaType);
<T> void sendSync(T message);
boolean isOpen();
boolean isSecure();
void close();
void close(CloseReason closeReason);
}
public interface WebSocketBroadcaster {
<T> Publisher<T> broadcast(T message, MediaType mediaType);
<T> Publisher<T> broadcast(T message, MediaType mediaType, Predicate<WebSocketSession> filter);
<T> void broadcastSync(T message, MediaType mediaType);
<T> void broadcastSync(T message, MediaType mediaType, Predicate<WebSocketSession> filter);
}
public final class CloseReason {
public static enum Code {
NORMAL_CLOSURE, GOING_AWAY, PROTOCOL_ERROR, UNSUPPORTED_DATA,
ABNORMAL_CLOSURE, INVALID_PAYLOAD_DATA, POLICY_VIOLATION,
MESSAGE_TOO_BIG, MANDATORY_EXTENSION, INTERNAL_SERVER_ERROR,
SERVICE_RESTART, TRY_AGAIN_LATER, BAD_GATEWAY, TLS_HANDSHAKE_FAILURE
}
public Code getCode();
public String getReasonPhrase();
}Install with Tessl CLI
npx tessl i tessl/maven-micronaut