0
# Server-Sent Events
1
2
The JAX-RS Server-Sent Events (SSE) API provides standards-based support for real-time server-to-client communication. SSE enables servers to push data to web clients over a single HTTP connection, making it ideal for live updates, notifications, and streaming data scenarios.
3
4
## Core Imports
5
6
```java
7
import javax.ws.rs.sse.Sse;
8
import javax.ws.rs.sse.SseEvent;
9
import javax.ws.rs.sse.InboundSseEvent;
10
import javax.ws.rs.sse.OutboundSseEvent;
11
import javax.ws.rs.sse.SseEventSink;
12
import javax.ws.rs.sse.SseEventSource;
13
import javax.ws.rs.sse.SseBroadcaster;
14
15
import javax.ws.rs.GET;
16
import javax.ws.rs.Path;
17
import javax.ws.rs.Produces;
18
import javax.ws.rs.core.Context;
19
import javax.ws.rs.core.MediaType;
20
21
import java.util.concurrent.CompletionStage;
22
import java.util.concurrent.TimeUnit;
23
import java.util.function.Consumer;
24
```
25
26
## SSE Factory and Events
27
28
### Sse Interface
29
30
Factory for creating SSE-related objects.
31
32
```java { .api }
33
public interface Sse {
34
35
OutboundSseEvent.Builder newEventBuilder();
36
SseBroadcaster newBroadcaster();
37
}
38
```
39
40
### SseEvent Interface
41
42
Base interface for Server-Sent Events.
43
44
```java { .api }
45
public interface SseEvent {
46
47
String getName();
48
String getId();
49
String getComment();
50
String getData();
51
long getReconnectDelay();
52
boolean isReconnectDelaySet();
53
}
54
```
55
56
### OutboundSseEvent Interface
57
58
Server-side outbound events.
59
60
```java { .api }
61
public interface OutboundSseEvent extends SseEvent {
62
63
MediaType getMediaType();
64
65
public static interface Builder {
66
67
Builder id(String id);
68
Builder name(String name);
69
Builder reconnectDelay(long milliseconds);
70
Builder data(Object data);
71
Builder data(String data);
72
Builder data(Class type, Object data);
73
Builder data(GenericType type, Object data);
74
Builder mediaType(MediaType mediaType);
75
Builder comment(String comment);
76
77
OutboundSseEvent build();
78
}
79
}
80
```
81
82
### InboundSseEvent Interface
83
84
Client-side inbound events.
85
86
```java { .api }
87
public interface InboundSseEvent extends SseEvent {
88
89
<T> T readData(Class<T> type);
90
<T> T readData(GenericType<T> type);
91
<T> T readData(Class<T> messageType, MediaType mediaType);
92
<T> T readData(GenericType<T> type, MediaType mediaType);
93
94
boolean isEmpty();
95
}
96
```
97
98
## Server-Side SSE
99
100
### SseEventSink Interface
101
102
Server-side sink for sending events to clients.
103
104
```java { .api }
105
public interface SseEventSink extends AutoCloseable {
106
107
CompletionStage<?> send(OutboundSseEvent event);
108
boolean isClosed();
109
void close();
110
}
111
```
112
113
**Basic Server-Side SSE Examples:**
114
115
```java
116
@Path("/events")
117
public class EventResource {
118
119
@Context
120
private Sse sse;
121
122
// Basic SSE endpoint
123
@GET
124
@Path("/stream")
125
@Produces(MediaType.SERVER_SENT_EVENTS)
126
public void getEventStream(@Context SseEventSink eventSink) {
127
128
// Send initial event
129
OutboundSseEvent event = sse.newEventBuilder()
130
.name("message")
131
.id("1")
132
.data("Connected to event stream")
133
.build();
134
135
eventSink.send(event);
136
137
// Simulate sending periodic updates
138
CompletableFuture.runAsync(() -> {
139
try {
140
for (int i = 2; i <= 10; i++) {
141
Thread.sleep(1000); // Wait 1 second
142
143
OutboundSseEvent periodicEvent = sse.newEventBuilder()
144
.name("update")
145
.id(String.valueOf(i))
146
.data("Update #" + i + " at " + new Date())
147
.build();
148
149
CompletionStage<?> result = eventSink.send(periodicEvent);
150
151
// Handle send completion
152
result.whenComplete((unused, throwable) -> {
153
if (throwable != null) {
154
System.err.println("Failed to send event: " + throwable.getMessage());
155
}
156
});
157
}
158
} catch (InterruptedException e) {
159
Thread.currentThread().interrupt();
160
} finally {
161
eventSink.close();
162
}
163
});
164
}
165
166
// SSE with JSON data
167
@GET
168
@Path("/notifications")
169
@Produces(MediaType.SERVER_SENT_EVENTS)
170
public void getNotifications(@Context SseEventSink eventSink) {
171
172
// Send structured data as JSON
173
Notification notification = new Notification(
174
"SYSTEM",
175
"Welcome to the notification service",
176
System.currentTimeMillis()
177
);
178
179
OutboundSseEvent event = sse.newEventBuilder()
180
.name("notification")
181
.id(UUID.randomUUID().toString())
182
.mediaType(MediaType.APPLICATION_JSON_TYPE)
183
.data(notification)
184
.build();
185
186
eventSink.send(event).whenComplete((unused, throwable) -> {
187
if (throwable == null) {
188
// Schedule more notifications
189
schedulePeriodicNotifications(eventSink);
190
} else {
191
System.err.println("Failed to send notification: " + throwable.getMessage());
192
}
193
});
194
}
195
196
// SSE with reconnection
197
@GET
198
@Path("/reliable")
199
@Produces(MediaType.SERVER_SENT_EVENTS)
200
public void getReliableStream(@Context SseEventSink eventSink,
201
@QueryParam("lastEventId") String lastEventId) {
202
203
// Handle reconnection with last event ID
204
int startFromId = 1;
205
if (lastEventId != null) {
206
try {
207
startFromId = Integer.parseInt(lastEventId) + 1;
208
} catch (NumberFormatException e) {
209
// Invalid last event ID, start from beginning
210
}
211
}
212
213
// Send reconnection delay
214
OutboundSseEvent reconnectEvent = sse.newEventBuilder()
215
.reconnectDelay(5000) // 5 seconds
216
.comment("Reconnect in 5 seconds if connection is lost")
217
.build();
218
219
eventSink.send(reconnectEvent);
220
221
// Send events starting from the specified ID
222
sendEventsFromId(eventSink, startFromId);
223
}
224
225
private void schedulePeriodicNotifications(SseEventSink eventSink) {
226
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
227
228
scheduler.scheduleAtFixedRate(() -> {
229
if (eventSink.isClosed()) {
230
scheduler.shutdown();
231
return;
232
}
233
234
Notification notification = notificationService.getNextNotification();
235
if (notification != null) {
236
OutboundSseEvent event = sse.newEventBuilder()
237
.name("notification")
238
.id(notification.getId())
239
.mediaType(MediaType.APPLICATION_JSON_TYPE)
240
.data(notification)
241
.build();
242
243
eventSink.send(event);
244
}
245
}, 0, 30, TimeUnit.SECONDS);
246
}
247
248
private void sendEventsFromId(SseEventSink eventSink, int startFromId) {
249
CompletableFuture.runAsync(() -> {
250
List<EventData> events = eventService.getEventsFromId(startFromId);
251
252
for (EventData eventData : events) {
253
if (eventSink.isClosed()) {
254
break;
255
}
256
257
OutboundSseEvent event = sse.newEventBuilder()
258
.name(eventData.getType())
259
.id(String.valueOf(eventData.getId()))
260
.data(eventData.getData())
261
.build();
262
263
eventSink.send(event);
264
265
try {
266
Thread.sleep(100); // Small delay between events
267
} catch (InterruptedException e) {
268
Thread.currentThread().interrupt();
269
break;
270
}
271
}
272
});
273
}
274
}
275
276
// Supporting classes
277
public class Notification {
278
private String type;
279
private String message;
280
private long timestamp;
281
private String id;
282
283
public Notification(String type, String message, long timestamp) {
284
this.type = type;
285
this.message = message;
286
this.timestamp = timestamp;
287
this.id = UUID.randomUUID().toString();
288
}
289
290
// Getters and setters...
291
}
292
293
public class EventData {
294
private int id;
295
private String type;
296
private Object data;
297
298
// Constructors, getters and setters...
299
}
300
```
301
302
## Broadcasting Events
303
304
### SseBroadcaster Interface
305
306
Broadcasts events to multiple clients.
307
308
```java { .api }
309
public interface SseBroadcaster extends AutoCloseable {
310
311
void register(SseEventSink sseEventSink);
312
CompletionStage<?> broadcast(OutboundSseEvent event);
313
void close();
314
void onError(BiConsumer<SseEventSink, Throwable> onError);
315
void onClose(Consumer<SseEventSink> onClose);
316
}
317
```
318
319
**Broadcasting Examples:**
320
321
```java
322
@Path("/broadcast")
323
@Singleton // Important: broadcaster should be singleton
324
public class BroadcastResource {
325
326
@Context
327
private Sse sse;
328
329
private final SseBroadcaster broadcaster;
330
private final Set<SseEventSink> clients = ConcurrentHashMap.newKeySet();
331
332
public BroadcastResource() {
333
// Will be initialized when first client connects
334
this.broadcaster = null;
335
}
336
337
@PostConstruct
338
private void initBroadcaster() {
339
// Configure broadcaster error handling
340
broadcaster.onError((eventSink, throwable) -> {
341
System.err.println("Error sending to client: " + throwable.getMessage());
342
clients.remove(eventSink);
343
eventSink.close();
344
});
345
346
broadcaster.onClose((eventSink) -> {
347
System.out.println("Client disconnected");
348
clients.remove(eventSink);
349
});
350
}
351
352
@GET
353
@Path("/connect")
354
@Produces(MediaType.SERVER_SENT_EVENTS)
355
public void connect(@Context SseEventSink eventSink) {
356
357
// Lazy initialization of broadcaster
358
if (broadcaster == null) {
359
synchronized (this) {
360
if (broadcaster == null) {
361
broadcaster = sse.newBroadcaster();
362
initBroadcaster();
363
}
364
}
365
}
366
367
// Register new client
368
broadcaster.register(eventSink);
369
clients.add(eventSink);
370
371
// Send welcome message to new client
372
OutboundSseEvent welcomeEvent = sse.newEventBuilder()
373
.name("welcome")
374
.data("Connected to broadcast channel. " + clients.size() + " clients online.")
375
.build();
376
377
eventSink.send(welcomeEvent);
378
379
System.out.println("New client connected. Total clients: " + clients.size());
380
}
381
382
@POST
383
@Path("/announce")
384
public Response announce(String message) {
385
386
if (broadcaster == null) {
387
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
388
.entity("No broadcast service available")
389
.build();
390
}
391
392
// Broadcast message to all connected clients
393
OutboundSseEvent announcement = sse.newEventBuilder()
394
.name("announcement")
395
.id(UUID.randomUUID().toString())
396
.data("ANNOUNCEMENT: " + message)
397
.build();
398
399
CompletionStage<?> result = broadcaster.broadcast(announcement);
400
401
result.whenComplete((unused, throwable) -> {
402
if (throwable != null) {
403
System.err.println("Broadcast failed: " + throwable.getMessage());
404
} else {
405
System.out.println("Broadcasted to " + clients.size() + " clients: " + message);
406
}
407
});
408
409
return Response.ok("Announcement sent to " + clients.size() + " clients").build();
410
}
411
412
@POST
413
@Path("/alert")
414
public Response sendAlert(AlertMessage alert) {
415
416
if (broadcaster == null) {
417
return Response.status(Response.Status.SERVICE_UNAVAILABLE)
418
.entity("No broadcast service available")
419
.build();
420
}
421
422
// Send structured alert
423
OutboundSseEvent alertEvent = sse.newEventBuilder()
424
.name("alert")
425
.id(alert.getId())
426
.mediaType(MediaType.APPLICATION_JSON_TYPE)
427
.data(alert)
428
.build();
429
430
broadcaster.broadcast(alertEvent);
431
432
return Response.ok("Alert broadcasted").build();
433
}
434
435
@GET
436
@Path("/status")
437
public Response getStatus() {
438
Map<String, Object> status = new HashMap<>();
439
status.put("connectedClients", clients.size());
440
status.put("broadcasterActive", broadcaster != null);
441
442
return Response.ok(status).build();
443
}
444
445
@PreDestroy
446
private void cleanup() {
447
if (broadcaster != null) {
448
broadcaster.close();
449
}
450
clients.forEach(SseEventSink::close);
451
clients.clear();
452
}
453
}
454
455
// Chat room example
456
@Path("/chat")
457
@Singleton
458
public class ChatResource {
459
460
@Context
461
private Sse sse;
462
463
private SseBroadcaster chatBroadcaster;
464
private final Map<String, SseEventSink> chatClients = new ConcurrentHashMap<>();
465
466
@PostConstruct
467
private void initChat() {
468
chatBroadcaster = sse.newBroadcaster();
469
470
chatBroadcaster.onClose(eventSink -> {
471
chatClients.values().removeIf(sink -> sink == eventSink);
472
broadcastUserCount();
473
});
474
}
475
476
@GET
477
@Path("/join")
478
@Produces(MediaType.SERVER_SENT_EVENTS)
479
public void joinChat(@QueryParam("username") String username,
480
@Context SseEventSink eventSink) {
481
482
if (username == null || username.trim().isEmpty()) {
483
eventSink.close();
484
return;
485
}
486
487
// Register user
488
chatBroadcaster.register(eventSink);
489
chatClients.put(username, eventSink);
490
491
// Notify about user joining
492
OutboundSseEvent joinEvent = sse.newEventBuilder()
493
.name("user_joined")
494
.data(username + " joined the chat")
495
.build();
496
497
chatBroadcaster.broadcast(joinEvent);
498
broadcastUserCount();
499
}
500
501
@POST
502
@Path("/message")
503
public Response sendMessage(@FormParam("username") String username,
504
@FormParam("message") String message) {
505
506
if (!chatClients.containsKey(username)) {
507
return Response.status(Response.Status.UNAUTHORIZED)
508
.entity("User not in chat")
509
.build();
510
}
511
512
ChatMessage chatMessage = new ChatMessage(username, message, System.currentTimeMillis());
513
514
OutboundSseEvent messageEvent = sse.newEventBuilder()
515
.name("chat_message")
516
.mediaType(MediaType.APPLICATION_JSON_TYPE)
517
.data(chatMessage)
518
.build();
519
520
chatBroadcaster.broadcast(messageEvent);
521
522
return Response.ok().build();
523
}
524
525
private void broadcastUserCount() {
526
OutboundSseEvent countEvent = sse.newEventBuilder()
527
.name("user_count")
528
.data("Users online: " + chatClients.size())
529
.build();
530
531
chatBroadcaster.broadcast(countEvent);
532
}
533
}
534
```
535
536
## Client-Side SSE
537
538
### SseEventSource Interface
539
540
Client-side source for consuming Server-Sent Events.
541
542
```java { .api }
543
public interface SseEventSource extends AutoCloseable {
544
545
void register(Consumer<InboundSseEvent> onEvent);
546
void register(Consumer<InboundSseEvent> onEvent,
547
Consumer<Throwable> onError);
548
void register(Consumer<InboundSseEvent> onEvent,
549
Consumer<Throwable> onError,
550
Runnable onComplete);
551
552
void open();
553
boolean isOpen();
554
void close();
555
556
public static abstract class Builder {
557
558
public static Builder newBuilder();
559
560
public abstract Builder named(String name);
561
public abstract Builder reconnectingEvery(long delay, TimeUnit unit);
562
public abstract SseEventSource build();
563
564
protected abstract Builder target(WebTarget endpoint);
565
}
566
}
567
```
568
569
**Client-Side SSE Examples:**
570
571
```java
572
public class SseClientExamples {
573
574
// Basic SSE client
575
public void basicSseClient() {
576
Client client = ClientBuilder.newClient();
577
WebTarget target = client.target("http://localhost:8080/events/stream");
578
579
SseEventSource eventSource = SseEventSource.target(target).build();
580
581
// Register event handler
582
eventSource.register(event -> {
583
System.out.println("Received event:");
584
System.out.println(" Name: " + event.getName());
585
System.out.println(" ID: " + event.getId());
586
System.out.println(" Data: " + event.readData(String.class));
587
});
588
589
// Open connection
590
eventSource.open();
591
592
// Keep running for demonstration
593
try {
594
Thread.sleep(30000); // 30 seconds
595
} catch (InterruptedException e) {
596
Thread.currentThread().interrupt();
597
} finally {
598
eventSource.close();
599
client.close();
600
}
601
}
602
603
// SSE client with error handling
604
public void sseClientWithErrorHandling() {
605
Client client = ClientBuilder.newClient();
606
WebTarget target = client.target("http://localhost:8080/events/notifications");
607
608
SseEventSource eventSource = SseEventSource.target(target)
609
.reconnectingEvery(5, TimeUnit.SECONDS)
610
.build();
611
612
// Register with error handling
613
eventSource.register(
614
event -> {
615
// Handle different event types
616
String eventName = event.getName();
617
switch (eventName) {
618
case "notification":
619
Notification notification = event.readData(Notification.class);
620
handleNotification(notification);
621
break;
622
case "alert":
623
AlertMessage alert = event.readData(AlertMessage.class);
624
handleAlert(alert);
625
break;
626
default:
627
System.out.println("Unknown event: " + eventName);
628
}
629
},
630
error -> {
631
System.err.println("SSE Error: " + error.getMessage());
632
// Could implement custom reconnection logic here
633
},
634
() -> {
635
System.out.println("SSE stream completed");
636
}
637
);
638
639
eventSource.open();
640
641
// Run until interrupted
642
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
643
eventSource.close();
644
client.close();
645
}));
646
647
try {
648
Thread.currentThread().join(); // Wait forever
649
} catch (InterruptedException e) {
650
Thread.currentThread().interrupt();
651
}
652
}
653
654
// Chat client
655
public void chatClient(String username) {
656
Client client = ClientBuilder.newClient();
657
WebTarget chatTarget = client.target("http://localhost:8080/chat/join")
658
.queryParam("username", username);
659
660
SseEventSource chatSource = SseEventSource.target(chatTarget).build();
661
662
chatSource.register(event -> {
663
String eventType = event.getName();
664
switch (eventType) {
665
case "user_joined":
666
case "user_count":
667
System.out.println("[SYSTEM] " + event.readData(String.class));
668
break;
669
case "chat_message":
670
ChatMessage message = event.readData(ChatMessage.class);
671
System.out.printf("[%s] %s%n", message.getUsername(), message.getMessage());
672
break;
673
}
674
});
675
676
chatSource.open();
677
678
// Send messages from console input
679
Scanner scanner = new Scanner(System.in);
680
WebTarget messageTarget = client.target("http://localhost:8080/chat/message");
681
682
System.out.println("Connected to chat. Type messages (or 'quit' to exit):");
683
684
String input;
685
while (!(input = scanner.nextLine()).equals("quit")) {
686
Form messageForm = new Form()
687
.param("username", username)
688
.param("message", input);
689
690
messageTarget.request()
691
.post(Entity.form(messageForm));
692
}
693
694
chatSource.close();
695
client.close();
696
}
697
698
// Resilient SSE client with custom reconnection
699
public void resilientSseClient() {
700
Client client = ClientBuilder.newClient();
701
WebTarget target = client.target("http://localhost:8080/events/reliable");
702
703
AtomicReference<String> lastEventId = new AtomicReference<>();
704
AtomicBoolean shouldReconnect = new AtomicBoolean(true);
705
706
while (shouldReconnect.get()) {
707
try {
708
WebTarget currentTarget = target;
709
if (lastEventId.get() != null) {
710
currentTarget = target.queryParam("lastEventId", lastEventId.get());
711
}
712
713
SseEventSource eventSource = SseEventSource.target(currentTarget).build();
714
715
eventSource.register(
716
event -> {
717
// Update last received event ID
718
if (event.getId() != null) {
719
lastEventId.set(event.getId());
720
}
721
722
System.out.printf("Event [%s]: %s%n",
723
event.getId(), event.readData(String.class));
724
},
725
error -> {
726
System.err.println("Connection error: " + error.getMessage());
727
// Will trigger reconnection due to error
728
}
729
);
730
731
eventSource.open();
732
733
// Wait for connection to close or error
734
Thread.sleep(Long.MAX_VALUE);
735
736
} catch (InterruptedException e) {
737
shouldReconnect.set(false);
738
Thread.currentThread().interrupt();
739
} catch (Exception e) {
740
System.err.println("Reconnecting in 5 seconds...");
741
try {
742
Thread.sleep(5000);
743
} catch (InterruptedException ie) {
744
shouldReconnect.set(false);
745
Thread.currentThread().interrupt();
746
}
747
}
748
}
749
750
client.close();
751
}
752
753
private void handleNotification(Notification notification) {
754
System.out.println("NOTIFICATION: " + notification.getMessage());
755
}
756
757
private void handleAlert(AlertMessage alert) {
758
System.err.println("ALERT: " + alert.getMessage());
759
}
760
}
761
762
// Supporting classes for examples
763
public class AlertMessage {
764
private String id;
765
private String severity;
766
private String message;
767
private long timestamp;
768
769
// Constructors, getters, setters...
770
}
771
772
public class ChatMessage {
773
private String username;
774
private String message;
775
private long timestamp;
776
777
public ChatMessage(String username, String message, long timestamp) {
778
this.username = username;
779
this.message = message;
780
this.timestamp = timestamp;
781
}
782
783
// Getters and setters...
784
}
785
```
786
787
## Media Type Constants
788
789
JAX-RS provides a constant for SSE media type:
790
791
```java { .api }
792
public class MediaType {
793
794
public static final String SERVER_SENT_EVENTS = "text/event-stream";
795
public static final MediaType SERVER_SENT_EVENTS_TYPE = new MediaType("text", "event-stream");
796
}
797
```
798
799
## Best Practices
800
801
### Connection Management
802
803
```java
804
@Path("/managed-events")
805
public class ManagedEventResource {
806
807
private final Set<SseEventSink> activeSinks = ConcurrentHashMap.newKeySet();
808
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
809
810
@GET
811
@Path("/heartbeat")
812
@Produces(MediaType.SERVER_SENT_EVENTS)
813
public void startHeartbeat(@Context SseEventSink eventSink,
814
@Context Sse sse) {
815
816
activeSinks.add(eventSink);
817
818
// Send periodic heartbeat
819
ScheduledFuture<?> heartbeat = scheduler.scheduleAtFixedRate(() -> {
820
if (eventSink.isClosed()) {
821
activeSinks.remove(eventSink);
822
return;
823
}
824
825
OutboundSseEvent heartbeatEvent = sse.newEventBuilder()
826
.name("heartbeat")
827
.data("ping")
828
.build();
829
830
eventSink.send(heartbeatEvent).whenComplete((unused, throwable) -> {
831
if (throwable != null) {
832
activeSinks.remove(eventSink);
833
eventSink.close();
834
}
835
});
836
}, 0, 30, TimeUnit.SECONDS);
837
838
// Clean up on close
839
eventSink.send(sse.newEventBuilder().name("connected").build())
840
.whenComplete((unused, throwable) -> {
841
if (throwable != null) {
842
heartbeat.cancel(true);
843
activeSinks.remove(eventSink);
844
}
845
});
846
}
847
848
@PreDestroy
849
private void cleanup() {
850
activeSinks.forEach(SseEventSink::close);
851
scheduler.shutdown();
852
}
853
}