Java API for RESTful Web Services
—
The JAX-RS Server-Sent Events (SSE) API provides standards-based support for real-time server-to-client communication. SSE enables servers to push data to web clients over a single HTTP connection, making it ideal for live updates, notifications, and streaming data scenarios.
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEvent;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseEventSource;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;Factory for creating SSE-related objects.
public interface Sse {
OutboundSseEvent.Builder newEventBuilder();
SseBroadcaster newBroadcaster();
}Base interface for Server-Sent Events.
public interface SseEvent {
String getName();
String getId();
String getComment();
String getData();
long getReconnectDelay();
boolean isReconnectDelaySet();
}Server-side outbound events.
public interface OutboundSseEvent extends SseEvent {
MediaType getMediaType();
public static interface Builder {
Builder id(String id);
Builder name(String name);
Builder reconnectDelay(long milliseconds);
Builder data(Object data);
Builder data(String data);
Builder data(Class type, Object data);
Builder data(GenericType type, Object data);
Builder mediaType(MediaType mediaType);
Builder comment(String comment);
OutboundSseEvent build();
}
}Client-side inbound events.
public interface InboundSseEvent extends SseEvent {
<T> T readData(Class<T> type);
<T> T readData(GenericType<T> type);
<T> T readData(Class<T> messageType, MediaType mediaType);
<T> T readData(GenericType<T> type, MediaType mediaType);
boolean isEmpty();
}Server-side sink for sending events to clients.
public interface SseEventSink extends AutoCloseable {
CompletionStage<?> send(OutboundSseEvent event);
boolean isClosed();
void close();
}Basic Server-Side SSE Examples:
@Path("/events")
public class EventResource {
@Context
private Sse sse;
// Basic SSE endpoint
@GET
@Path("/stream")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getEventStream(@Context SseEventSink eventSink) {
// Send initial event
OutboundSseEvent event = sse.newEventBuilder()
.name("message")
.id("1")
.data("Connected to event stream")
.build();
eventSink.send(event);
// Simulate sending periodic updates
CompletableFuture.runAsync(() -> {
try {
for (int i = 2; i <= 10; i++) {
Thread.sleep(1000); // Wait 1 second
OutboundSseEvent periodicEvent = sse.newEventBuilder()
.name("update")
.id(String.valueOf(i))
.data("Update #" + i + " at " + new Date())
.build();
CompletionStage<?> result = eventSink.send(periodicEvent);
// Handle send completion
result.whenComplete((unused, throwable) -> {
if (throwable != null) {
System.err.println("Failed to send event: " + throwable.getMessage());
}
});
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
eventSink.close();
}
});
}
// SSE with JSON data
@GET
@Path("/notifications")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getNotifications(@Context SseEventSink eventSink) {
// Send structured data as JSON
Notification notification = new Notification(
"SYSTEM",
"Welcome to the notification service",
System.currentTimeMillis()
);
OutboundSseEvent event = sse.newEventBuilder()
.name("notification")
.id(UUID.randomUUID().toString())
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(notification)
.build();
eventSink.send(event).whenComplete((unused, throwable) -> {
if (throwable == null) {
// Schedule more notifications
schedulePeriodicNotifications(eventSink);
} else {
System.err.println("Failed to send notification: " + throwable.getMessage());
}
});
}
// SSE with reconnection
@GET
@Path("/reliable")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void getReliableStream(@Context SseEventSink eventSink,
@QueryParam("lastEventId") String lastEventId) {
// Handle reconnection with last event ID
int startFromId = 1;
if (lastEventId != null) {
try {
startFromId = Integer.parseInt(lastEventId) + 1;
} catch (NumberFormatException e) {
// Invalid last event ID, start from beginning
}
}
// Send reconnection delay
OutboundSseEvent reconnectEvent = sse.newEventBuilder()
.reconnectDelay(5000) // 5 seconds
.comment("Reconnect in 5 seconds if connection is lost")
.build();
eventSink.send(reconnectEvent);
// Send events starting from the specified ID
sendEventsFromId(eventSink, startFromId);
}
private void schedulePeriodicNotifications(SseEventSink eventSink) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
if (eventSink.isClosed()) {
scheduler.shutdown();
return;
}
Notification notification = notificationService.getNextNotification();
if (notification != null) {
OutboundSseEvent event = sse.newEventBuilder()
.name("notification")
.id(notification.getId())
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(notification)
.build();
eventSink.send(event);
}
}, 0, 30, TimeUnit.SECONDS);
}
private void sendEventsFromId(SseEventSink eventSink, int startFromId) {
CompletableFuture.runAsync(() -> {
List<EventData> events = eventService.getEventsFromId(startFromId);
for (EventData eventData : events) {
if (eventSink.isClosed()) {
break;
}
OutboundSseEvent event = sse.newEventBuilder()
.name(eventData.getType())
.id(String.valueOf(eventData.getId()))
.data(eventData.getData())
.build();
eventSink.send(event);
try {
Thread.sleep(100); // Small delay between events
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}
// Supporting classes
public class Notification {
private String type;
private String message;
private long timestamp;
private String id;
public Notification(String type, String message, long timestamp) {
this.type = type;
this.message = message;
this.timestamp = timestamp;
this.id = UUID.randomUUID().toString();
}
// Getters and setters...
}
public class EventData {
private int id;
private String type;
private Object data;
// Constructors, getters and setters...
}Broadcasts events to multiple clients.
public interface SseBroadcaster extends AutoCloseable {
void register(SseEventSink sseEventSink);
CompletionStage<?> broadcast(OutboundSseEvent event);
void close();
void onError(BiConsumer<SseEventSink, Throwable> onError);
void onClose(Consumer<SseEventSink> onClose);
}Broadcasting Examples:
@Path("/broadcast")
@Singleton // Important: broadcaster should be singleton
public class BroadcastResource {
@Context
private Sse sse;
private final SseBroadcaster broadcaster;
private final Set<SseEventSink> clients = ConcurrentHashMap.newKeySet();
public BroadcastResource() {
// Will be initialized when first client connects
this.broadcaster = null;
}
@PostConstruct
private void initBroadcaster() {
// Configure broadcaster error handling
broadcaster.onError((eventSink, throwable) -> {
System.err.println("Error sending to client: " + throwable.getMessage());
clients.remove(eventSink);
eventSink.close();
});
broadcaster.onClose((eventSink) -> {
System.out.println("Client disconnected");
clients.remove(eventSink);
});
}
@GET
@Path("/connect")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void connect(@Context SseEventSink eventSink) {
// Lazy initialization of broadcaster
if (broadcaster == null) {
synchronized (this) {
if (broadcaster == null) {
broadcaster = sse.newBroadcaster();
initBroadcaster();
}
}
}
// Register new client
broadcaster.register(eventSink);
clients.add(eventSink);
// Send welcome message to new client
OutboundSseEvent welcomeEvent = sse.newEventBuilder()
.name("welcome")
.data("Connected to broadcast channel. " + clients.size() + " clients online.")
.build();
eventSink.send(welcomeEvent);
System.out.println("New client connected. Total clients: " + clients.size());
}
@POST
@Path("/announce")
public Response announce(String message) {
if (broadcaster == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("No broadcast service available")
.build();
}
// Broadcast message to all connected clients
OutboundSseEvent announcement = sse.newEventBuilder()
.name("announcement")
.id(UUID.randomUUID().toString())
.data("ANNOUNCEMENT: " + message)
.build();
CompletionStage<?> result = broadcaster.broadcast(announcement);
result.whenComplete((unused, throwable) -> {
if (throwable != null) {
System.err.println("Broadcast failed: " + throwable.getMessage());
} else {
System.out.println("Broadcasted to " + clients.size() + " clients: " + message);
}
});
return Response.ok("Announcement sent to " + clients.size() + " clients").build();
}
@POST
@Path("/alert")
public Response sendAlert(AlertMessage alert) {
if (broadcaster == null) {
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
.entity("No broadcast service available")
.build();
}
// Send structured alert
OutboundSseEvent alertEvent = sse.newEventBuilder()
.name("alert")
.id(alert.getId())
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(alert)
.build();
broadcaster.broadcast(alertEvent);
return Response.ok("Alert broadcasted").build();
}
@GET
@Path("/status")
public Response getStatus() {
Map<String, Object> status = new HashMap<>();
status.put("connectedClients", clients.size());
status.put("broadcasterActive", broadcaster != null);
return Response.ok(status).build();
}
@PreDestroy
private void cleanup() {
if (broadcaster != null) {
broadcaster.close();
}
clients.forEach(SseEventSink::close);
clients.clear();
}
}
// Chat room example
@Path("/chat")
@Singleton
public class ChatResource {
@Context
private Sse sse;
private SseBroadcaster chatBroadcaster;
private final Map<String, SseEventSink> chatClients = new ConcurrentHashMap<>();
@PostConstruct
private void initChat() {
chatBroadcaster = sse.newBroadcaster();
chatBroadcaster.onClose(eventSink -> {
chatClients.values().removeIf(sink -> sink == eventSink);
broadcastUserCount();
});
}
@GET
@Path("/join")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void joinChat(@QueryParam("username") String username,
@Context SseEventSink eventSink) {
if (username == null || username.trim().isEmpty()) {
eventSink.close();
return;
}
// Register user
chatBroadcaster.register(eventSink);
chatClients.put(username, eventSink);
// Notify about user joining
OutboundSseEvent joinEvent = sse.newEventBuilder()
.name("user_joined")
.data(username + " joined the chat")
.build();
chatBroadcaster.broadcast(joinEvent);
broadcastUserCount();
}
@POST
@Path("/message")
public Response sendMessage(@FormParam("username") String username,
@FormParam("message") String message) {
if (!chatClients.containsKey(username)) {
return Response.status(Response.Status.UNAUTHORIZED)
.entity("User not in chat")
.build();
}
ChatMessage chatMessage = new ChatMessage(username, message, System.currentTimeMillis());
OutboundSseEvent messageEvent = sse.newEventBuilder()
.name("chat_message")
.mediaType(MediaType.APPLICATION_JSON_TYPE)
.data(chatMessage)
.build();
chatBroadcaster.broadcast(messageEvent);
return Response.ok().build();
}
private void broadcastUserCount() {
OutboundSseEvent countEvent = sse.newEventBuilder()
.name("user_count")
.data("Users online: " + chatClients.size())
.build();
chatBroadcaster.broadcast(countEvent);
}
}Client-side source for consuming Server-Sent Events.
public interface SseEventSource extends AutoCloseable {
void register(Consumer<InboundSseEvent> onEvent);
void register(Consumer<InboundSseEvent> onEvent,
Consumer<Throwable> onError);
void register(Consumer<InboundSseEvent> onEvent,
Consumer<Throwable> onError,
Runnable onComplete);
void open();
boolean isOpen();
void close();
public static abstract class Builder {
public static Builder newBuilder();
public abstract Builder named(String name);
public abstract Builder reconnectingEvery(long delay, TimeUnit unit);
public abstract SseEventSource build();
protected abstract Builder target(WebTarget endpoint);
}
}Client-Side SSE Examples:
public class SseClientExamples {
// Basic SSE client
public void basicSseClient() {
Client client = ClientBuilder.newClient();
WebTarget target = client.target("http://localhost:8080/events/stream");
SseEventSource eventSource = SseEventSource.target(target).build();
// Register event handler
eventSource.register(event -> {
System.out.println("Received event:");
System.out.println(" Name: " + event.getName());
System.out.println(" ID: " + event.getId());
System.out.println(" Data: " + event.readData(String.class));
});
// Open connection
eventSource.open();
// Keep running for demonstration
try {
Thread.sleep(30000); // 30 seconds
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
eventSource.close();
client.close();
}
}
// SSE client with error handling
public void sseClientWithErrorHandling() {
Client client = ClientBuilder.newClient();
WebTarget target = client.target("http://localhost:8080/events/notifications");
SseEventSource eventSource = SseEventSource.target(target)
.reconnectingEvery(5, TimeUnit.SECONDS)
.build();
// Register with error handling
eventSource.register(
event -> {
// Handle different event types
String eventName = event.getName();
switch (eventName) {
case "notification":
Notification notification = event.readData(Notification.class);
handleNotification(notification);
break;
case "alert":
AlertMessage alert = event.readData(AlertMessage.class);
handleAlert(alert);
break;
default:
System.out.println("Unknown event: " + eventName);
}
},
error -> {
System.err.println("SSE Error: " + error.getMessage());
// Could implement custom reconnection logic here
},
() -> {
System.out.println("SSE stream completed");
}
);
eventSource.open();
// Run until interrupted
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
eventSource.close();
client.close();
}));
try {
Thread.currentThread().join(); // Wait forever
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// Chat client
public void chatClient(String username) {
Client client = ClientBuilder.newClient();
WebTarget chatTarget = client.target("http://localhost:8080/chat/join")
.queryParam("username", username);
SseEventSource chatSource = SseEventSource.target(chatTarget).build();
chatSource.register(event -> {
String eventType = event.getName();
switch (eventType) {
case "user_joined":
case "user_count":
System.out.println("[SYSTEM] " + event.readData(String.class));
break;
case "chat_message":
ChatMessage message = event.readData(ChatMessage.class);
System.out.printf("[%s] %s%n", message.getUsername(), message.getMessage());
break;
}
});
chatSource.open();
// Send messages from console input
Scanner scanner = new Scanner(System.in);
WebTarget messageTarget = client.target("http://localhost:8080/chat/message");
System.out.println("Connected to chat. Type messages (or 'quit' to exit):");
String input;
while (!(input = scanner.nextLine()).equals("quit")) {
Form messageForm = new Form()
.param("username", username)
.param("message", input);
messageTarget.request()
.post(Entity.form(messageForm));
}
chatSource.close();
client.close();
}
// Resilient SSE client with custom reconnection
public void resilientSseClient() {
Client client = ClientBuilder.newClient();
WebTarget target = client.target("http://localhost:8080/events/reliable");
AtomicReference<String> lastEventId = new AtomicReference<>();
AtomicBoolean shouldReconnect = new AtomicBoolean(true);
while (shouldReconnect.get()) {
try {
WebTarget currentTarget = target;
if (lastEventId.get() != null) {
currentTarget = target.queryParam("lastEventId", lastEventId.get());
}
SseEventSource eventSource = SseEventSource.target(currentTarget).build();
eventSource.register(
event -> {
// Update last received event ID
if (event.getId() != null) {
lastEventId.set(event.getId());
}
System.out.printf("Event [%s]: %s%n",
event.getId(), event.readData(String.class));
},
error -> {
System.err.println("Connection error: " + error.getMessage());
// Will trigger reconnection due to error
}
);
eventSource.open();
// Wait for connection to close or error
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
shouldReconnect.set(false);
Thread.currentThread().interrupt();
} catch (Exception e) {
System.err.println("Reconnecting in 5 seconds...");
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
shouldReconnect.set(false);
Thread.currentThread().interrupt();
}
}
}
client.close();
}
private void handleNotification(Notification notification) {
System.out.println("NOTIFICATION: " + notification.getMessage());
}
private void handleAlert(AlertMessage alert) {
System.err.println("ALERT: " + alert.getMessage());
}
}
// Supporting classes for examples
public class AlertMessage {
private String id;
private String severity;
private String message;
private long timestamp;
// Constructors, getters, setters...
}
public class ChatMessage {
private String username;
private String message;
private long timestamp;
public ChatMessage(String username, String message, long timestamp) {
this.username = username;
this.message = message;
this.timestamp = timestamp;
}
// Getters and setters...
}JAX-RS provides a constant for SSE media type:
public class MediaType {
public static final String SERVER_SENT_EVENTS = "text/event-stream";
public static final MediaType SERVER_SENT_EVENTS_TYPE = new MediaType("text", "event-stream");
}@Path("/managed-events")
public class ManagedEventResource {
private final Set<SseEventSink> activeSinks = ConcurrentHashMap.newKeySet();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
@GET
@Path("/heartbeat")
@Produces(MediaType.SERVER_SENT_EVENTS)
public void startHeartbeat(@Context SseEventSink eventSink,
@Context Sse sse) {
activeSinks.add(eventSink);
// Send periodic heartbeat
ScheduledFuture<?> heartbeat = scheduler.scheduleAtFixedRate(() -> {
if (eventSink.isClosed()) {
activeSinks.remove(eventSink);
return;
}
OutboundSseEvent heartbeatEvent = sse.newEventBuilder()
.name("heartbeat")
.data("ping")
.build();
eventSink.send(heartbeatEvent).whenComplete((unused, throwable) -> {
if (throwable != null) {
activeSinks.remove(eventSink);
eventSink.close();
}
});
}, 0, 30, TimeUnit.SECONDS);
// Clean up on close
eventSink.send(sse.newEventBuilder().name("connected").build())
.whenComplete((unused, throwable) -> {
if (throwable != null) {
heartbeat.cancel(true);
activeSinks.remove(eventSink);
}
});
}
@PreDestroy
private void cleanup() {
activeSinks.forEach(SseEventSink::close);
scheduler.shutdown();
}
}Install with Tessl CLI
npx tessl i tessl/maven-javax-ws-rs--javax-ws-rs-api