or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

dependency-injection.mdform-processing.mdformatting.mdindex.mdrouting.mdstreaming.mdutilities.mdvalidation.md

streaming.mddocs/

0

# Streaming and Real-time Communication

1

2

Play Framework provides comprehensive streaming capabilities for real-time web applications including Server-Sent Events (SSE) and Comet streaming implementations. These utilities enable efficient chunked response handling with connection lifecycle management for building responsive, real-time user interfaces.

3

4

## Capabilities

5

6

### Server-Sent Events (SSE)

7

8

Complete Server-Sent Events implementation for real-time server-to-client communication with event lifecycle management.

9

10

```java { .api }

11

/**

12

* Server-Sent Events implementation for real-time server-to-client communication

13

*/

14

public abstract class EventSource extends Chunks<String> {

15

/** Create new EventSource */

16

public EventSource();

17

18

/** Called when SSE connection is ready */

19

public void onReady(Chunks.Out<String> out);

20

21

/** Send event to client */

22

public void send(Event event);

23

24

/** Abstract method called when client connects */

25

public abstract void onConnected();

26

27

/** Add callback for when client disconnects */

28

public void onDisconnected(F.Callback0 callback);

29

30

/** Close the SSE connection */

31

public void close();

32

33

/** Create EventSource with connection callback */

34

public static EventSource whenConnected(F.Callback<EventSource> callback);

35

}

36

37

/**

38

* Server-Sent Event builder with flexible configuration

39

*/

40

public static class EventSource.Event {

41

/** Create event with data, id, and name */

42

public Event(String data, String id, String name);

43

44

/** Set event name/type */

45

public Event withName(String name);

46

47

/** Set event ID for client-side tracking */

48

public Event withId(String id);

49

50

/** Get formatted SSE event string */

51

public String formatted();

52

53

/** Create event with string data */

54

public static Event event(String data);

55

56

/** Create event with JSON data */

57

public static Event event(JsonNode json);

58

}

59

```

60

61

**Usage Examples:**

62

63

```java

64

import play.libs.EventSource;

65

import play.libs.EventSource.Event;

66

import play.libs.F;

67

import play.mvc.Result;

68

69

// Real-time notification service

70

public class NotificationController extends Controller {

71

72

public Result streamNotifications() {

73

return ok(EventSource.whenConnected(eventSource -> {

74

// Subscribe to notification events

75

notificationService.subscribe(eventSource::onConnected);

76

77

// Handle client disconnect

78

eventSource.onDisconnected(() -> {

79

notificationService.unsubscribe(eventSource);

80

Logger.info("Client disconnected from notifications");

81

});

82

}));

83

}

84

}

85

86

// Live data streaming

87

public class LiveDataController extends Controller {

88

89

public Result streamStockPrices() {

90

return ok(new EventSource() {

91

private final ScheduledExecutorService scheduler =

92

Executors.newSingleThreadScheduledExecutor();

93

94

@Override

95

public void onConnected() {

96

// Send initial data

97

send(Event.event("connected").withName("system"));

98

99

// Stream stock prices every second

100

scheduler.scheduleAtFixedRate(() -> {

101

StockPrice price = stockService.getCurrentPrice("AAPL");

102

JsonNode priceJson = Json.toJson(price);

103

104

Event priceEvent = Event.event(priceJson)

105

.withName("price")

106

.withId(String.valueOf(System.currentTimeMillis()));

107

108

send(priceEvent);

109

}, 0, 1, TimeUnit.SECONDS);

110

}

111

112

@Override

113

public void onDisconnected(F.Callback0 callback) {

114

super.onDisconnected(callback);

115

scheduler.shutdown();

116

}

117

});

118

}

119

}

120

121

// Chat application

122

public class ChatController extends Controller {

123

124

private static final List<EventSource> chatClients = new ArrayList<>();

125

126

public Result joinChat() {

127

return ok(EventSource.whenConnected(eventSource -> {

128

synchronized (chatClients) {

129

chatClients.add(eventSource);

130

}

131

132

// Send welcome message

133

Event welcome = Event.event("Welcome to the chat!")

134

.withName("message")

135

.withId(UUID.randomUUID().toString());

136

eventSource.send(welcome);

137

138

// Handle disconnect

139

eventSource.onDisconnected(() -> {

140

synchronized (chatClients) {

141

chatClients.remove(eventSource);

142

}

143

});

144

}));

145

}

146

147

public Result sendMessage() {

148

JsonNode json = request().body().asJson();

149

String message = json.get("message").asText();

150

String user = json.get("user").asText();

151

152

// Broadcast to all connected clients

153

Event chatEvent = Event.event(user + ": " + message)

154

.withName("message")

155

.withId(UUID.randomUUID().toString());

156

157

synchronized (chatClients) {

158

chatClients.forEach(client -> client.send(chatEvent));

159

}

160

161

return ok("Message sent");

162

}

163

}

164

```

165

166

### Comet Streaming

167

168

Comet streaming implementation for real-time bidirectional communication with browser compatibility features.

169

170

```java { .api }

171

/**

172

* Chunked stream for Comet messaging with browser compatibility

173

*/

174

public abstract class Comet extends Chunks<String> {

175

/** Create Comet with JavaScript callback method */

176

public Comet(String callbackMethod);

177

178

/** Called when Comet connection is ready */

179

public void onReady(Chunks.Out<String> out);

180

181

/** Get initial buffer for browser compatibility */

182

public String initialBuffer();

183

184

/** Send string message to client */

185

public void sendMessage(String message);

186

187

/** Send JSON message to client */

188

public void sendMessage(JsonNode message);

189

190

/** Abstract method called when client connects */

191

public abstract void onConnected();

192

193

/** Add callback for when client disconnects */

194

public void onDisconnected(Callback0 callback);

195

196

/** Close the Comet connection */

197

public void close();

198

199

/** Create Comet with JavaScript method and connection callback */

200

public static Comet whenConnected(String jsMethod, Callback<Comet> callback);

201

}

202

```

203

204

**Usage Examples:**

205

206

```java

207

import play.libs.Comet;

208

import play.libs.F.Callback;

209

import play.libs.F.Callback0;

210

211

// Real-time dashboard

212

public class DashboardController extends Controller {

213

214

public Result cometDashboard() {

215

return ok(Comet.whenConnected("updateDashboard", comet -> {

216

217

// Send initial dashboard data

218

comet.onConnected();

219

220

// Stream updates every 5 seconds

221

ActorSystem.system().scheduler().schedule(

222

Duration.create(0, TimeUnit.SECONDS),

223

Duration.create(5, TimeUnit.SECONDS),

224

() -> {

225

DashboardData data = dashboardService.getCurrentData();

226

comet.sendMessage(Json.toJson(data));

227

},

228

ActorSystem.system().dispatcher()

229

);

230

231

}));

232

}

233

}

234

235

// Live game updates

236

public class GameController extends Controller {

237

238

public Result streamGameUpdates(String gameId) {

239

return ok(new Comet("gameUpdate") {

240

private GameSubscription subscription;

241

242

@Override

243

public void onConnected() {

244

// Subscribe to game events

245

subscription = gameService.subscribe(gameId, this::handleGameEvent);

246

247

// Send current game state

248

GameState state = gameService.getGameState(gameId);

249

sendMessage(Json.toJson(state));

250

}

251

252

private void handleGameEvent(GameEvent event) {

253

sendMessage(Json.toJson(event));

254

}

255

256

@Override

257

public void onDisconnected(Callback0 callback) {

258

super.onDisconnected(callback);

259

if (subscription != null) {

260

subscription.cancel();

261

}

262

}

263

});

264

}

265

}

266

267

// Live log streaming

268

public class LogController extends Controller {

269

270

public Result streamLogs() {

271

return ok(Comet.whenConnected("logUpdate", comet -> {

272

273

LogTailer tailer = new LogTailer(new File("application.log"),

274

new LogTailerListener() {

275

@Override

276

public void handle(String line) {

277

JsonNode logEntry = Json.newObject()

278

.put("timestamp", System.currentTimeMillis())

279

.put("message", line);

280

comet.sendMessage(logEntry);

281

}

282

283

@Override

284

public void fileRotated() {

285

comet.sendMessage("Log file rotated");

286

}

287

}, 1000);

288

289

// Start tailing

290

new Thread(tailer).start();

291

292

// Stop tailing on disconnect

293

comet.onDisconnected(() -> tailer.stop());

294

}));

295

}

296

}

297

```

298

299

## Advanced Usage Patterns

300

301

### Connection Pool Management

302

303

```java

304

// Connection pool for managing multiple streaming connections

305

public class StreamingConnectionPool {

306

307

private final Map<String, Set<EventSource>> topicSubscriptions = new ConcurrentHashMap<>();

308

private final Map<EventSource, String> clientTopics = new ConcurrentHashMap<>();

309

310

public void subscribe(String topic, EventSource client) {

311

topicSubscriptions.computeIfAbsent(topic, k -> ConcurrentHashMap.newKeySet()).add(client);

312

clientTopics.put(client, topic);

313

314

// Handle disconnect

315

client.onDisconnected(() -> unsubscribe(client));

316

}

317

318

public void unsubscribe(EventSource client) {

319

String topic = clientTopics.remove(client);

320

if (topic != null) {

321

Set<EventSource> clients = topicSubscriptions.get(topic);

322

if (clients != null) {

323

clients.remove(client);

324

if (clients.isEmpty()) {

325

topicSubscriptions.remove(topic);

326

}

327

}

328

}

329

}

330

331

public void broadcast(String topic, Event event) {

332

Set<EventSource> clients = topicSubscriptions.get(topic);

333

if (clients != null) {

334

clients.forEach(client -> {

335

try {

336

client.send(event);

337

} catch (Exception e) {

338

Logger.warn("Failed to send event to client", e);

339

unsubscribe(client);

340

}

341

});

342

}

343

}

344

345

public int getSubscriberCount(String topic) {

346

Set<EventSource> clients = topicSubscriptions.get(topic);

347

return clients != null ? clients.size() : 0;

348

}

349

}

350

351

// Usage in controller

352

public class StreamingController extends Controller {

353

354

@Inject

355

private StreamingConnectionPool connectionPool;

356

357

public Result subscribeToTopic(String topic) {

358

return ok(EventSource.whenConnected(eventSource -> {

359

connectionPool.subscribe(topic, eventSource);

360

361

// Send subscription confirmation

362

Event confirmEvent = Event.event("Subscribed to " + topic)

363

.withName("subscription")

364

.withId(UUID.randomUUID().toString());

365

eventSource.send(confirmEvent);

366

}));

367

}

368

}

369

```

370

371

### Message Broadcasting Service

372

373

```java

374

// Service for broadcasting messages to multiple streaming connections

375

@Singleton

376

public class BroadcastService {

377

378

private final StreamingConnectionPool connectionPool;

379

private final ActorSystem actorSystem;

380

381

@Inject

382

public BroadcastService(StreamingConnectionPool connectionPool, ActorSystem actorSystem) {

383

this.connectionPool = connectionPool;

384

this.actorSystem = actorSystem;

385

}

386

387

public void broadcastToTopic(String topic, Object message) {

388

JsonNode messageJson = Json.toJson(message);

389

Event event = Event.event(messageJson)

390

.withName("broadcast")

391

.withId(UUID.randomUUID().toString());

392

393

connectionPool.broadcast(topic, event);

394

}

395

396

public void schedulePeriodicBroadcast(String topic, Supplier<Object> messageSupplier,

397

Duration interval) {

398

actorSystem.scheduler().schedule(

399

Duration.Zero(),

400

interval,

401

() -> broadcastToTopic(topic, messageSupplier.get()),

402

actorSystem.dispatcher()

403

);

404

}

405

406

public CompletionStage<Void> broadcastToTopicAsync(String topic, CompletionStage<Object> messageFuture) {

407

return messageFuture.thenAccept(message -> broadcastToTopic(topic, message));

408

}

409

}

410

```

411

412

### Error Handling and Resilience

413

414

```java

415

// Resilient streaming implementation with error handling

416

public abstract class ResilientEventSource extends EventSource {

417

418

private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

419

private volatile boolean isConnected = true;

420

421

@Override

422

public void send(Event event) {

423

if (isConnected) {

424

try {

425

super.send(event);

426

} catch (Exception e) {

427

Logger.warn("Failed to send event, client may have disconnected", e);

428

handleSendError(e, event);

429

}

430

}

431

}

432

433

protected void handleSendError(Exception error, Event event) {

434

// Attempt to reconnect or queue message

435

isConnected = false;

436

437

// Try to resend after delay

438

scheduler.schedule(() -> {

439

if (isConnected) {

440

try {

441

super.send(event);

442

} catch (Exception retryError) {

443

Logger.error("Failed to resend event after retry", retryError);

444

}

445

}

446

}, 5, TimeUnit.SECONDS);

447

}

448

449

@Override

450

public void onDisconnected(F.Callback0 callback) {

451

super.onDisconnected(() -> {

452

isConnected = false;

453

scheduler.shutdown();

454

callback.invoke();

455

});

456

}

457

458

protected void sendHeartbeat() {

459

scheduler.scheduleAtFixedRate(() -> {

460

if (isConnected) {

461

Event heartbeat = Event.event("ping")

462

.withName("heartbeat")

463

.withId(String.valueOf(System.currentTimeMillis()));

464

send(heartbeat);

465

}

466

}, 30, 30, TimeUnit.SECONDS);

467

}

468

}

469

```

470

471

### Authentication and Authorization

472

473

```java

474

// Authenticated streaming with token validation

475

public class AuthenticatedStreamingController extends Controller {

476

477

public Result authenticatedStream() {

478

String token = request().getQueryString("token");

479

480

if (!authService.validateToken(token)) {

481

return unauthorized("Invalid token");

482

}

483

484

String userId = authService.getUserId(token);

485

486

return ok(EventSource.whenConnected(eventSource -> {

487

// Send authentication confirmation

488

Event authEvent = Event.event("Authenticated as " + userId)

489

.withName("auth")

490

.withId(UUID.randomUUID().toString());

491

eventSource.send(authEvent);

492

493

// Subscribe to user-specific events

494

userEventService.subscribe(userId, event -> {

495

Event userEvent = Event.event(Json.toJson(event))

496

.withName("user_event")

497

.withId(event.getId());

498

eventSource.send(userEvent);

499

});

500

501

// Cleanup on disconnect

502

eventSource.onDisconnected(() -> {

503

userEventService.unsubscribe(userId, eventSource);

504

});

505

}));

506

}

507

508

public Result authorizedComet(String topic) {

509

String token = request().getQueryString("token");

510

String userId = authService.getUserId(token);

511

512

if (!authService.canAccessTopic(userId, topic)) {

513

return forbidden("Access denied to topic: " + topic);

514

}

515

516

return ok(Comet.whenConnected("topicUpdate", comet -> {

517

topicService.subscribe(topic, userId, message -> {

518

comet.sendMessage(Json.toJson(message));

519

});

520

}));

521

}

522

}

523

```