0
# Streaming and Real-time Communication
1
2
Play Framework provides comprehensive streaming capabilities for real-time web applications including Server-Sent Events (SSE) and Comet streaming implementations. These utilities enable efficient chunked response handling with connection lifecycle management for building responsive, real-time user interfaces.
3
4
## Capabilities
5
6
### Server-Sent Events (SSE)
7
8
Complete Server-Sent Events implementation for real-time server-to-client communication with event lifecycle management.
9
10
```java { .api }
11
/**
12
* Server-Sent Events implementation for real-time server-to-client communication
13
*/
14
public abstract class EventSource extends Chunks<String> {
15
/** Create new EventSource */
16
public EventSource();
17
18
/** Called when SSE connection is ready */
19
public void onReady(Chunks.Out<String> out);
20
21
/** Send event to client */
22
public void send(Event event);
23
24
/** Abstract method called when client connects */
25
public abstract void onConnected();
26
27
/** Add callback for when client disconnects */
28
public void onDisconnected(F.Callback0 callback);
29
30
/** Close the SSE connection */
31
public void close();
32
33
/** Create EventSource with connection callback */
34
public static EventSource whenConnected(F.Callback<EventSource> callback);
35
}
36
37
/**
38
* Server-Sent Event builder with flexible configuration
39
*/
40
public static class EventSource.Event {
41
/** Create event with data, id, and name */
42
public Event(String data, String id, String name);
43
44
/** Set event name/type */
45
public Event withName(String name);
46
47
/** Set event ID for client-side tracking */
48
public Event withId(String id);
49
50
/** Get formatted SSE event string */
51
public String formatted();
52
53
/** Create event with string data */
54
public static Event event(String data);
55
56
/** Create event with JSON data */
57
public static Event event(JsonNode json);
58
}
59
```
60
61
**Usage Examples:**
62
63
```java
64
import play.libs.EventSource;
65
import play.libs.EventSource.Event;
66
import play.libs.F;
67
import play.mvc.Result;
68
69
// Real-time notification service
70
public class NotificationController extends Controller {
71
72
public Result streamNotifications() {
73
return ok(EventSource.whenConnected(eventSource -> {
74
// Subscribe to notification events
75
notificationService.subscribe(eventSource::onConnected);
76
77
// Handle client disconnect
78
eventSource.onDisconnected(() -> {
79
notificationService.unsubscribe(eventSource);
80
Logger.info("Client disconnected from notifications");
81
});
82
}));
83
}
84
}
85
86
// Live data streaming
87
public class LiveDataController extends Controller {
88
89
public Result streamStockPrices() {
90
return ok(new EventSource() {
91
private final ScheduledExecutorService scheduler =
92
Executors.newSingleThreadScheduledExecutor();
93
94
@Override
95
public void onConnected() {
96
// Send initial data
97
send(Event.event("connected").withName("system"));
98
99
// Stream stock prices every second
100
scheduler.scheduleAtFixedRate(() -> {
101
StockPrice price = stockService.getCurrentPrice("AAPL");
102
JsonNode priceJson = Json.toJson(price);
103
104
Event priceEvent = Event.event(priceJson)
105
.withName("price")
106
.withId(String.valueOf(System.currentTimeMillis()));
107
108
send(priceEvent);
109
}, 0, 1, TimeUnit.SECONDS);
110
}
111
112
@Override
113
public void onDisconnected(F.Callback0 callback) {
114
super.onDisconnected(callback);
115
scheduler.shutdown();
116
}
117
});
118
}
119
}
120
121
// Chat application
122
public class ChatController extends Controller {
123
124
private static final List<EventSource> chatClients = new ArrayList<>();
125
126
public Result joinChat() {
127
return ok(EventSource.whenConnected(eventSource -> {
128
synchronized (chatClients) {
129
chatClients.add(eventSource);
130
}
131
132
// Send welcome message
133
Event welcome = Event.event("Welcome to the chat!")
134
.withName("message")
135
.withId(UUID.randomUUID().toString());
136
eventSource.send(welcome);
137
138
// Handle disconnect
139
eventSource.onDisconnected(() -> {
140
synchronized (chatClients) {
141
chatClients.remove(eventSource);
142
}
143
});
144
}));
145
}
146
147
public Result sendMessage() {
148
JsonNode json = request().body().asJson();
149
String message = json.get("message").asText();
150
String user = json.get("user").asText();
151
152
// Broadcast to all connected clients
153
Event chatEvent = Event.event(user + ": " + message)
154
.withName("message")
155
.withId(UUID.randomUUID().toString());
156
157
synchronized (chatClients) {
158
chatClients.forEach(client -> client.send(chatEvent));
159
}
160
161
return ok("Message sent");
162
}
163
}
164
```
165
166
### Comet Streaming
167
168
Comet streaming implementation for real-time bidirectional communication with browser compatibility features.
169
170
```java { .api }
171
/**
172
* Chunked stream for Comet messaging with browser compatibility
173
*/
174
public abstract class Comet extends Chunks<String> {
175
/** Create Comet with JavaScript callback method */
176
public Comet(String callbackMethod);
177
178
/** Called when Comet connection is ready */
179
public void onReady(Chunks.Out<String> out);
180
181
/** Get initial buffer for browser compatibility */
182
public String initialBuffer();
183
184
/** Send string message to client */
185
public void sendMessage(String message);
186
187
/** Send JSON message to client */
188
public void sendMessage(JsonNode message);
189
190
/** Abstract method called when client connects */
191
public abstract void onConnected();
192
193
/** Add callback for when client disconnects */
194
public void onDisconnected(Callback0 callback);
195
196
/** Close the Comet connection */
197
public void close();
198
199
/** Create Comet with JavaScript method and connection callback */
200
public static Comet whenConnected(String jsMethod, Callback<Comet> callback);
201
}
202
```
203
204
**Usage Examples:**
205
206
```java
207
import play.libs.Comet;
208
import play.libs.F.Callback;
209
import play.libs.F.Callback0;
210
211
// Real-time dashboard
212
public class DashboardController extends Controller {
213
214
public Result cometDashboard() {
215
return ok(Comet.whenConnected("updateDashboard", comet -> {
216
217
// Send initial dashboard data
218
comet.onConnected();
219
220
// Stream updates every 5 seconds
221
ActorSystem.system().scheduler().schedule(
222
Duration.create(0, TimeUnit.SECONDS),
223
Duration.create(5, TimeUnit.SECONDS),
224
() -> {
225
DashboardData data = dashboardService.getCurrentData();
226
comet.sendMessage(Json.toJson(data));
227
},
228
ActorSystem.system().dispatcher()
229
);
230
231
}));
232
}
233
}
234
235
// Live game updates
236
public class GameController extends Controller {
237
238
public Result streamGameUpdates(String gameId) {
239
return ok(new Comet("gameUpdate") {
240
private GameSubscription subscription;
241
242
@Override
243
public void onConnected() {
244
// Subscribe to game events
245
subscription = gameService.subscribe(gameId, this::handleGameEvent);
246
247
// Send current game state
248
GameState state = gameService.getGameState(gameId);
249
sendMessage(Json.toJson(state));
250
}
251
252
private void handleGameEvent(GameEvent event) {
253
sendMessage(Json.toJson(event));
254
}
255
256
@Override
257
public void onDisconnected(Callback0 callback) {
258
super.onDisconnected(callback);
259
if (subscription != null) {
260
subscription.cancel();
261
}
262
}
263
});
264
}
265
}
266
267
// Live log streaming
268
public class LogController extends Controller {
269
270
public Result streamLogs() {
271
return ok(Comet.whenConnected("logUpdate", comet -> {
272
273
LogTailer tailer = new LogTailer(new File("application.log"),
274
new LogTailerListener() {
275
@Override
276
public void handle(String line) {
277
JsonNode logEntry = Json.newObject()
278
.put("timestamp", System.currentTimeMillis())
279
.put("message", line);
280
comet.sendMessage(logEntry);
281
}
282
283
@Override
284
public void fileRotated() {
285
comet.sendMessage("Log file rotated");
286
}
287
}, 1000);
288
289
// Start tailing
290
new Thread(tailer).start();
291
292
// Stop tailing on disconnect
293
comet.onDisconnected(() -> tailer.stop());
294
}));
295
}
296
}
297
```
298
299
## Advanced Usage Patterns
300
301
### Connection Pool Management
302
303
```java
304
// Connection pool for managing multiple streaming connections
305
public class StreamingConnectionPool {
306
307
private final Map<String, Set<EventSource>> topicSubscriptions = new ConcurrentHashMap<>();
308
private final Map<EventSource, String> clientTopics = new ConcurrentHashMap<>();
309
310
public void subscribe(String topic, EventSource client) {
311
topicSubscriptions.computeIfAbsent(topic, k -> ConcurrentHashMap.newKeySet()).add(client);
312
clientTopics.put(client, topic);
313
314
// Handle disconnect
315
client.onDisconnected(() -> unsubscribe(client));
316
}
317
318
public void unsubscribe(EventSource client) {
319
String topic = clientTopics.remove(client);
320
if (topic != null) {
321
Set<EventSource> clients = topicSubscriptions.get(topic);
322
if (clients != null) {
323
clients.remove(client);
324
if (clients.isEmpty()) {
325
topicSubscriptions.remove(topic);
326
}
327
}
328
}
329
}
330
331
public void broadcast(String topic, Event event) {
332
Set<EventSource> clients = topicSubscriptions.get(topic);
333
if (clients != null) {
334
clients.forEach(client -> {
335
try {
336
client.send(event);
337
} catch (Exception e) {
338
Logger.warn("Failed to send event to client", e);
339
unsubscribe(client);
340
}
341
});
342
}
343
}
344
345
public int getSubscriberCount(String topic) {
346
Set<EventSource> clients = topicSubscriptions.get(topic);
347
return clients != null ? clients.size() : 0;
348
}
349
}
350
351
// Usage in controller
352
public class StreamingController extends Controller {
353
354
@Inject
355
private StreamingConnectionPool connectionPool;
356
357
public Result subscribeToTopic(String topic) {
358
return ok(EventSource.whenConnected(eventSource -> {
359
connectionPool.subscribe(topic, eventSource);
360
361
// Send subscription confirmation
362
Event confirmEvent = Event.event("Subscribed to " + topic)
363
.withName("subscription")
364
.withId(UUID.randomUUID().toString());
365
eventSource.send(confirmEvent);
366
}));
367
}
368
}
369
```
370
371
### Message Broadcasting Service
372
373
```java
374
// Service for broadcasting messages to multiple streaming connections
375
@Singleton
376
public class BroadcastService {
377
378
private final StreamingConnectionPool connectionPool;
379
private final ActorSystem actorSystem;
380
381
@Inject
382
public BroadcastService(StreamingConnectionPool connectionPool, ActorSystem actorSystem) {
383
this.connectionPool = connectionPool;
384
this.actorSystem = actorSystem;
385
}
386
387
public void broadcastToTopic(String topic, Object message) {
388
JsonNode messageJson = Json.toJson(message);
389
Event event = Event.event(messageJson)
390
.withName("broadcast")
391
.withId(UUID.randomUUID().toString());
392
393
connectionPool.broadcast(topic, event);
394
}
395
396
public void schedulePeriodicBroadcast(String topic, Supplier<Object> messageSupplier,
397
Duration interval) {
398
actorSystem.scheduler().schedule(
399
Duration.Zero(),
400
interval,
401
() -> broadcastToTopic(topic, messageSupplier.get()),
402
actorSystem.dispatcher()
403
);
404
}
405
406
public CompletionStage<Void> broadcastToTopicAsync(String topic, CompletionStage<Object> messageFuture) {
407
return messageFuture.thenAccept(message -> broadcastToTopic(topic, message));
408
}
409
}
410
```
411
412
### Error Handling and Resilience
413
414
```java
415
// Resilient streaming implementation with error handling
416
public abstract class ResilientEventSource extends EventSource {
417
418
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
419
private volatile boolean isConnected = true;
420
421
@Override
422
public void send(Event event) {
423
if (isConnected) {
424
try {
425
super.send(event);
426
} catch (Exception e) {
427
Logger.warn("Failed to send event, client may have disconnected", e);
428
handleSendError(e, event);
429
}
430
}
431
}
432
433
protected void handleSendError(Exception error, Event event) {
434
// Attempt to reconnect or queue message
435
isConnected = false;
436
437
// Try to resend after delay
438
scheduler.schedule(() -> {
439
if (isConnected) {
440
try {
441
super.send(event);
442
} catch (Exception retryError) {
443
Logger.error("Failed to resend event after retry", retryError);
444
}
445
}
446
}, 5, TimeUnit.SECONDS);
447
}
448
449
@Override
450
public void onDisconnected(F.Callback0 callback) {
451
super.onDisconnected(() -> {
452
isConnected = false;
453
scheduler.shutdown();
454
callback.invoke();
455
});
456
}
457
458
protected void sendHeartbeat() {
459
scheduler.scheduleAtFixedRate(() -> {
460
if (isConnected) {
461
Event heartbeat = Event.event("ping")
462
.withName("heartbeat")
463
.withId(String.valueOf(System.currentTimeMillis()));
464
send(heartbeat);
465
}
466
}, 30, 30, TimeUnit.SECONDS);
467
}
468
}
469
```
470
471
### Authentication and Authorization
472
473
```java
474
// Authenticated streaming with token validation
475
public class AuthenticatedStreamingController extends Controller {
476
477
public Result authenticatedStream() {
478
String token = request().getQueryString("token");
479
480
if (!authService.validateToken(token)) {
481
return unauthorized("Invalid token");
482
}
483
484
String userId = authService.getUserId(token);
485
486
return ok(EventSource.whenConnected(eventSource -> {
487
// Send authentication confirmation
488
Event authEvent = Event.event("Authenticated as " + userId)
489
.withName("auth")
490
.withId(UUID.randomUUID().toString());
491
eventSource.send(authEvent);
492
493
// Subscribe to user-specific events
494
userEventService.subscribe(userId, event -> {
495
Event userEvent = Event.event(Json.toJson(event))
496
.withName("user_event")
497
.withId(event.getId());
498
eventSource.send(userEvent);
499
});
500
501
// Cleanup on disconnect
502
eventSource.onDisconnected(() -> {
503
userEventService.unsubscribe(userId, eventSource);
504
});
505
}));
506
}
507
508
public Result authorizedComet(String topic) {
509
String token = request().getQueryString("token");
510
String userId = authService.getUserId(token);
511
512
if (!authService.canAccessTopic(userId, topic)) {
513
return forbidden("Access denied to topic: " + topic);
514
}
515
516
return ok(Comet.whenConnected("topicUpdate", comet -> {
517
topicService.subscribe(topic, userId, message -> {
518
comet.sendMessage(Json.toJson(message));
519
});
520
}));
521
}
522
}
523
```