or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-api.mdcore-types.mdextensions.mdindex.mdresource-endpoints.mdserver-container.mdserver-sent-events.md

server-sent-events.mddocs/

0

# Server-Sent Events

1

2

The JAX-RS Server-Sent Events (SSE) API provides standards-based support for real-time server-to-client communication. SSE enables servers to push data to web clients over a single HTTP connection, making it ideal for live updates, notifications, and streaming data scenarios.

3

4

## Core Imports

5

6

```java

7

import javax.ws.rs.sse.Sse;

8

import javax.ws.rs.sse.SseEvent;

9

import javax.ws.rs.sse.InboundSseEvent;

10

import javax.ws.rs.sse.OutboundSseEvent;

11

import javax.ws.rs.sse.SseEventSink;

12

import javax.ws.rs.sse.SseEventSource;

13

import javax.ws.rs.sse.SseBroadcaster;

14

15

import javax.ws.rs.GET;

16

import javax.ws.rs.Path;

17

import javax.ws.rs.Produces;

18

import javax.ws.rs.core.Context;

19

import javax.ws.rs.core.MediaType;

20

21

import java.util.concurrent.CompletionStage;

22

import java.util.concurrent.TimeUnit;

23

import java.util.function.Consumer;

24

```

25

26

## SSE Factory and Events

27

28

### Sse Interface

29

30

Factory for creating SSE-related objects.

31

32

```java { .api }

33

public interface Sse {

34

35

OutboundSseEvent.Builder newEventBuilder();

36

SseBroadcaster newBroadcaster();

37

}

38

```

39

40

### SseEvent Interface

41

42

Base interface for Server-Sent Events.

43

44

```java { .api }

45

public interface SseEvent {

46

47

String getName();

48

String getId();

49

String getComment();

50

String getData();

51

long getReconnectDelay();

52

boolean isReconnectDelaySet();

53

}

54

```

55

56

### OutboundSseEvent Interface

57

58

Server-side outbound events.

59

60

```java { .api }

61

public interface OutboundSseEvent extends SseEvent {

62

63

MediaType getMediaType();

64

65

public static interface Builder {

66

67

Builder id(String id);

68

Builder name(String name);

69

Builder reconnectDelay(long milliseconds);

70

Builder data(Object data);

71

Builder data(String data);

72

Builder data(Class type, Object data);

73

Builder data(GenericType type, Object data);

74

Builder mediaType(MediaType mediaType);

75

Builder comment(String comment);

76

77

OutboundSseEvent build();

78

}

79

}

80

```

81

82

### InboundSseEvent Interface

83

84

Client-side inbound events.

85

86

```java { .api }

87

public interface InboundSseEvent extends SseEvent {

88

89

<T> T readData(Class<T> type);

90

<T> T readData(GenericType<T> type);

91

<T> T readData(Class<T> messageType, MediaType mediaType);

92

<T> T readData(GenericType<T> type, MediaType mediaType);

93

94

boolean isEmpty();

95

}

96

```

97

98

## Server-Side SSE

99

100

### SseEventSink Interface

101

102

Server-side sink for sending events to clients.

103

104

```java { .api }

105

public interface SseEventSink extends AutoCloseable {

106

107

CompletionStage<?> send(OutboundSseEvent event);

108

boolean isClosed();

109

void close();

110

}

111

```

112

113

**Basic Server-Side SSE Examples:**

114

115

```java

116

@Path("/events")

117

public class EventResource {

118

119

@Context

120

private Sse sse;

121

122

// Basic SSE endpoint

123

@GET

124

@Path("/stream")

125

@Produces(MediaType.SERVER_SENT_EVENTS)

126

public void getEventStream(@Context SseEventSink eventSink) {

127

128

// Send initial event

129

OutboundSseEvent event = sse.newEventBuilder()

130

.name("message")

131

.id("1")

132

.data("Connected to event stream")

133

.build();

134

135

eventSink.send(event);

136

137

// Simulate sending periodic updates

138

CompletableFuture.runAsync(() -> {

139

try {

140

for (int i = 2; i <= 10; i++) {

141

Thread.sleep(1000); // Wait 1 second

142

143

OutboundSseEvent periodicEvent = sse.newEventBuilder()

144

.name("update")

145

.id(String.valueOf(i))

146

.data("Update #" + i + " at " + new Date())

147

.build();

148

149

CompletionStage<?> result = eventSink.send(periodicEvent);

150

151

// Handle send completion

152

result.whenComplete((unused, throwable) -> {

153

if (throwable != null) {

154

System.err.println("Failed to send event: " + throwable.getMessage());

155

}

156

});

157

}

158

} catch (InterruptedException e) {

159

Thread.currentThread().interrupt();

160

} finally {

161

eventSink.close();

162

}

163

});

164

}

165

166

// SSE with JSON data

167

@GET

168

@Path("/notifications")

169

@Produces(MediaType.SERVER_SENT_EVENTS)

170

public void getNotifications(@Context SseEventSink eventSink) {

171

172

// Send structured data as JSON

173

Notification notification = new Notification(

174

"SYSTEM",

175

"Welcome to the notification service",

176

System.currentTimeMillis()

177

);

178

179

OutboundSseEvent event = sse.newEventBuilder()

180

.name("notification")

181

.id(UUID.randomUUID().toString())

182

.mediaType(MediaType.APPLICATION_JSON_TYPE)

183

.data(notification)

184

.build();

185

186

eventSink.send(event).whenComplete((unused, throwable) -> {

187

if (throwable == null) {

188

// Schedule more notifications

189

schedulePeriodicNotifications(eventSink);

190

} else {

191

System.err.println("Failed to send notification: " + throwable.getMessage());

192

}

193

});

194

}

195

196

// SSE with reconnection

197

@GET

198

@Path("/reliable")

199

@Produces(MediaType.SERVER_SENT_EVENTS)

200

public void getReliableStream(@Context SseEventSink eventSink,

201

@QueryParam("lastEventId") String lastEventId) {

202

203

// Handle reconnection with last event ID

204

int startFromId = 1;

205

if (lastEventId != null) {

206

try {

207

startFromId = Integer.parseInt(lastEventId) + 1;

208

} catch (NumberFormatException e) {

209

// Invalid last event ID, start from beginning

210

}

211

}

212

213

// Send reconnection delay

214

OutboundSseEvent reconnectEvent = sse.newEventBuilder()

215

.reconnectDelay(5000) // 5 seconds

216

.comment("Reconnect in 5 seconds if connection is lost")

217

.build();

218

219

eventSink.send(reconnectEvent);

220

221

// Send events starting from the specified ID

222

sendEventsFromId(eventSink, startFromId);

223

}

224

225

private void schedulePeriodicNotifications(SseEventSink eventSink) {

226

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

227

228

scheduler.scheduleAtFixedRate(() -> {

229

if (eventSink.isClosed()) {

230

scheduler.shutdown();

231

return;

232

}

233

234

Notification notification = notificationService.getNextNotification();

235

if (notification != null) {

236

OutboundSseEvent event = sse.newEventBuilder()

237

.name("notification")

238

.id(notification.getId())

239

.mediaType(MediaType.APPLICATION_JSON_TYPE)

240

.data(notification)

241

.build();

242

243

eventSink.send(event);

244

}

245

}, 0, 30, TimeUnit.SECONDS);

246

}

247

248

private void sendEventsFromId(SseEventSink eventSink, int startFromId) {

249

CompletableFuture.runAsync(() -> {

250

List<EventData> events = eventService.getEventsFromId(startFromId);

251

252

for (EventData eventData : events) {

253

if (eventSink.isClosed()) {

254

break;

255

}

256

257

OutboundSseEvent event = sse.newEventBuilder()

258

.name(eventData.getType())

259

.id(String.valueOf(eventData.getId()))

260

.data(eventData.getData())

261

.build();

262

263

eventSink.send(event);

264

265

try {

266

Thread.sleep(100); // Small delay between events

267

} catch (InterruptedException e) {

268

Thread.currentThread().interrupt();

269

break;

270

}

271

}

272

});

273

}

274

}

275

276

// Supporting classes

277

public class Notification {

278

private String type;

279

private String message;

280

private long timestamp;

281

private String id;

282

283

public Notification(String type, String message, long timestamp) {

284

this.type = type;

285

this.message = message;

286

this.timestamp = timestamp;

287

this.id = UUID.randomUUID().toString();

288

}

289

290

// Getters and setters...

291

}

292

293

public class EventData {

294

private int id;

295

private String type;

296

private Object data;

297

298

// Constructors, getters and setters...

299

}

300

```

301

302

## Broadcasting Events

303

304

### SseBroadcaster Interface

305

306

Broadcasts events to multiple clients.

307

308

```java { .api }

309

public interface SseBroadcaster extends AutoCloseable {

310

311

void register(SseEventSink sseEventSink);

312

CompletionStage<?> broadcast(OutboundSseEvent event);

313

void close();

314

void onError(BiConsumer<SseEventSink, Throwable> onError);

315

void onClose(Consumer<SseEventSink> onClose);

316

}

317

```

318

319

**Broadcasting Examples:**

320

321

```java

322

@Path("/broadcast")

323

@Singleton // Important: broadcaster should be singleton

324

public class BroadcastResource {

325

326

@Context

327

private Sse sse;

328

329

private final SseBroadcaster broadcaster;

330

private final Set<SseEventSink> clients = ConcurrentHashMap.newKeySet();

331

332

public BroadcastResource() {

333

// Will be initialized when first client connects

334

this.broadcaster = null;

335

}

336

337

@PostConstruct

338

private void initBroadcaster() {

339

// Configure broadcaster error handling

340

broadcaster.onError((eventSink, throwable) -> {

341

System.err.println("Error sending to client: " + throwable.getMessage());

342

clients.remove(eventSink);

343

eventSink.close();

344

});

345

346

broadcaster.onClose((eventSink) -> {

347

System.out.println("Client disconnected");

348

clients.remove(eventSink);

349

});

350

}

351

352

@GET

353

@Path("/connect")

354

@Produces(MediaType.SERVER_SENT_EVENTS)

355

public void connect(@Context SseEventSink eventSink) {

356

357

// Lazy initialization of broadcaster

358

if (broadcaster == null) {

359

synchronized (this) {

360

if (broadcaster == null) {

361

broadcaster = sse.newBroadcaster();

362

initBroadcaster();

363

}

364

}

365

}

366

367

// Register new client

368

broadcaster.register(eventSink);

369

clients.add(eventSink);

370

371

// Send welcome message to new client

372

OutboundSseEvent welcomeEvent = sse.newEventBuilder()

373

.name("welcome")

374

.data("Connected to broadcast channel. " + clients.size() + " clients online.")

375

.build();

376

377

eventSink.send(welcomeEvent);

378

379

System.out.println("New client connected. Total clients: " + clients.size());

380

}

381

382

@POST

383

@Path("/announce")

384

public Response announce(String message) {

385

386

if (broadcaster == null) {

387

return Response.status(Response.Status.SERVICE_UNAVAILABLE)

388

.entity("No broadcast service available")

389

.build();

390

}

391

392

// Broadcast message to all connected clients

393

OutboundSseEvent announcement = sse.newEventBuilder()

394

.name("announcement")

395

.id(UUID.randomUUID().toString())

396

.data("ANNOUNCEMENT: " + message)

397

.build();

398

399

CompletionStage<?> result = broadcaster.broadcast(announcement);

400

401

result.whenComplete((unused, throwable) -> {

402

if (throwable != null) {

403

System.err.println("Broadcast failed: " + throwable.getMessage());

404

} else {

405

System.out.println("Broadcasted to " + clients.size() + " clients: " + message);

406

}

407

});

408

409

return Response.ok("Announcement sent to " + clients.size() + " clients").build();

410

}

411

412

@POST

413

@Path("/alert")

414

public Response sendAlert(AlertMessage alert) {

415

416

if (broadcaster == null) {

417

return Response.status(Response.Status.SERVICE_UNAVAILABLE)

418

.entity("No broadcast service available")

419

.build();

420

}

421

422

// Send structured alert

423

OutboundSseEvent alertEvent = sse.newEventBuilder()

424

.name("alert")

425

.id(alert.getId())

426

.mediaType(MediaType.APPLICATION_JSON_TYPE)

427

.data(alert)

428

.build();

429

430

broadcaster.broadcast(alertEvent);

431

432

return Response.ok("Alert broadcasted").build();

433

}

434

435

@GET

436

@Path("/status")

437

public Response getStatus() {

438

Map<String, Object> status = new HashMap<>();

439

status.put("connectedClients", clients.size());

440

status.put("broadcasterActive", broadcaster != null);

441

442

return Response.ok(status).build();

443

}

444

445

@PreDestroy

446

private void cleanup() {

447

if (broadcaster != null) {

448

broadcaster.close();

449

}

450

clients.forEach(SseEventSink::close);

451

clients.clear();

452

}

453

}

454

455

// Chat room example

456

@Path("/chat")

457

@Singleton

458

public class ChatResource {

459

460

@Context

461

private Sse sse;

462

463

private SseBroadcaster chatBroadcaster;

464

private final Map<String, SseEventSink> chatClients = new ConcurrentHashMap<>();

465

466

@PostConstruct

467

private void initChat() {

468

chatBroadcaster = sse.newBroadcaster();

469

470

chatBroadcaster.onClose(eventSink -> {

471

chatClients.values().removeIf(sink -> sink == eventSink);

472

broadcastUserCount();

473

});

474

}

475

476

@GET

477

@Path("/join")

478

@Produces(MediaType.SERVER_SENT_EVENTS)

479

public void joinChat(@QueryParam("username") String username,

480

@Context SseEventSink eventSink) {

481

482

if (username == null || username.trim().isEmpty()) {

483

eventSink.close();

484

return;

485

}

486

487

// Register user

488

chatBroadcaster.register(eventSink);

489

chatClients.put(username, eventSink);

490

491

// Notify about user joining

492

OutboundSseEvent joinEvent = sse.newEventBuilder()

493

.name("user_joined")

494

.data(username + " joined the chat")

495

.build();

496

497

chatBroadcaster.broadcast(joinEvent);

498

broadcastUserCount();

499

}

500

501

@POST

502

@Path("/message")

503

public Response sendMessage(@FormParam("username") String username,

504

@FormParam("message") String message) {

505

506

if (!chatClients.containsKey(username)) {

507

return Response.status(Response.Status.UNAUTHORIZED)

508

.entity("User not in chat")

509

.build();

510

}

511

512

ChatMessage chatMessage = new ChatMessage(username, message, System.currentTimeMillis());

513

514

OutboundSseEvent messageEvent = sse.newEventBuilder()

515

.name("chat_message")

516

.mediaType(MediaType.APPLICATION_JSON_TYPE)

517

.data(chatMessage)

518

.build();

519

520

chatBroadcaster.broadcast(messageEvent);

521

522

return Response.ok().build();

523

}

524

525

private void broadcastUserCount() {

526

OutboundSseEvent countEvent = sse.newEventBuilder()

527

.name("user_count")

528

.data("Users online: " + chatClients.size())

529

.build();

530

531

chatBroadcaster.broadcast(countEvent);

532

}

533

}

534

```

535

536

## Client-Side SSE

537

538

### SseEventSource Interface

539

540

Client-side source for consuming Server-Sent Events.

541

542

```java { .api }

543

public interface SseEventSource extends AutoCloseable {

544

545

void register(Consumer<InboundSseEvent> onEvent);

546

void register(Consumer<InboundSseEvent> onEvent,

547

Consumer<Throwable> onError);

548

void register(Consumer<InboundSseEvent> onEvent,

549

Consumer<Throwable> onError,

550

Runnable onComplete);

551

552

void open();

553

boolean isOpen();

554

void close();

555

556

public static abstract class Builder {

557

558

public static Builder newBuilder();

559

560

public abstract Builder named(String name);

561

public abstract Builder reconnectingEvery(long delay, TimeUnit unit);

562

public abstract SseEventSource build();

563

564

protected abstract Builder target(WebTarget endpoint);

565

}

566

}

567

```

568

569

**Client-Side SSE Examples:**

570

571

```java

572

public class SseClientExamples {

573

574

// Basic SSE client

575

public void basicSseClient() {

576

Client client = ClientBuilder.newClient();

577

WebTarget target = client.target("http://localhost:8080/events/stream");

578

579

SseEventSource eventSource = SseEventSource.target(target).build();

580

581

// Register event handler

582

eventSource.register(event -> {

583

System.out.println("Received event:");

584

System.out.println(" Name: " + event.getName());

585

System.out.println(" ID: " + event.getId());

586

System.out.println(" Data: " + event.readData(String.class));

587

});

588

589

// Open connection

590

eventSource.open();

591

592

// Keep running for demonstration

593

try {

594

Thread.sleep(30000); // 30 seconds

595

} catch (InterruptedException e) {

596

Thread.currentThread().interrupt();

597

} finally {

598

eventSource.close();

599

client.close();

600

}

601

}

602

603

// SSE client with error handling

604

public void sseClientWithErrorHandling() {

605

Client client = ClientBuilder.newClient();

606

WebTarget target = client.target("http://localhost:8080/events/notifications");

607

608

SseEventSource eventSource = SseEventSource.target(target)

609

.reconnectingEvery(5, TimeUnit.SECONDS)

610

.build();

611

612

// Register with error handling

613

eventSource.register(

614

event -> {

615

// Handle different event types

616

String eventName = event.getName();

617

switch (eventName) {

618

case "notification":

619

Notification notification = event.readData(Notification.class);

620

handleNotification(notification);

621

break;

622

case "alert":

623

AlertMessage alert = event.readData(AlertMessage.class);

624

handleAlert(alert);

625

break;

626

default:

627

System.out.println("Unknown event: " + eventName);

628

}

629

},

630

error -> {

631

System.err.println("SSE Error: " + error.getMessage());

632

// Could implement custom reconnection logic here

633

},

634

() -> {

635

System.out.println("SSE stream completed");

636

}

637

);

638

639

eventSource.open();

640

641

// Run until interrupted

642

Runtime.getRuntime().addShutdownHook(new Thread(() -> {

643

eventSource.close();

644

client.close();

645

}));

646

647

try {

648

Thread.currentThread().join(); // Wait forever

649

} catch (InterruptedException e) {

650

Thread.currentThread().interrupt();

651

}

652

}

653

654

// Chat client

655

public void chatClient(String username) {

656

Client client = ClientBuilder.newClient();

657

WebTarget chatTarget = client.target("http://localhost:8080/chat/join")

658

.queryParam("username", username);

659

660

SseEventSource chatSource = SseEventSource.target(chatTarget).build();

661

662

chatSource.register(event -> {

663

String eventType = event.getName();

664

switch (eventType) {

665

case "user_joined":

666

case "user_count":

667

System.out.println("[SYSTEM] " + event.readData(String.class));

668

break;

669

case "chat_message":

670

ChatMessage message = event.readData(ChatMessage.class);

671

System.out.printf("[%s] %s%n", message.getUsername(), message.getMessage());

672

break;

673

}

674

});

675

676

chatSource.open();

677

678

// Send messages from console input

679

Scanner scanner = new Scanner(System.in);

680

WebTarget messageTarget = client.target("http://localhost:8080/chat/message");

681

682

System.out.println("Connected to chat. Type messages (or 'quit' to exit):");

683

684

String input;

685

while (!(input = scanner.nextLine()).equals("quit")) {

686

Form messageForm = new Form()

687

.param("username", username)

688

.param("message", input);

689

690

messageTarget.request()

691

.post(Entity.form(messageForm));

692

}

693

694

chatSource.close();

695

client.close();

696

}

697

698

// Resilient SSE client with custom reconnection

699

public void resilientSseClient() {

700

Client client = ClientBuilder.newClient();

701

WebTarget target = client.target("http://localhost:8080/events/reliable");

702

703

AtomicReference<String> lastEventId = new AtomicReference<>();

704

AtomicBoolean shouldReconnect = new AtomicBoolean(true);

705

706

while (shouldReconnect.get()) {

707

try {

708

WebTarget currentTarget = target;

709

if (lastEventId.get() != null) {

710

currentTarget = target.queryParam("lastEventId", lastEventId.get());

711

}

712

713

SseEventSource eventSource = SseEventSource.target(currentTarget).build();

714

715

eventSource.register(

716

event -> {

717

// Update last received event ID

718

if (event.getId() != null) {

719

lastEventId.set(event.getId());

720

}

721

722

System.out.printf("Event [%s]: %s%n",

723

event.getId(), event.readData(String.class));

724

},

725

error -> {

726

System.err.println("Connection error: " + error.getMessage());

727

// Will trigger reconnection due to error

728

}

729

);

730

731

eventSource.open();

732

733

// Wait for connection to close or error

734

Thread.sleep(Long.MAX_VALUE);

735

736

} catch (InterruptedException e) {

737

shouldReconnect.set(false);

738

Thread.currentThread().interrupt();

739

} catch (Exception e) {

740

System.err.println("Reconnecting in 5 seconds...");

741

try {

742

Thread.sleep(5000);

743

} catch (InterruptedException ie) {

744

shouldReconnect.set(false);

745

Thread.currentThread().interrupt();

746

}

747

}

748

}

749

750

client.close();

751

}

752

753

private void handleNotification(Notification notification) {

754

System.out.println("NOTIFICATION: " + notification.getMessage());

755

}

756

757

private void handleAlert(AlertMessage alert) {

758

System.err.println("ALERT: " + alert.getMessage());

759

}

760

}

761

762

// Supporting classes for examples

763

public class AlertMessage {

764

private String id;

765

private String severity;

766

private String message;

767

private long timestamp;

768

769

// Constructors, getters, setters...

770

}

771

772

public class ChatMessage {

773

private String username;

774

private String message;

775

private long timestamp;

776

777

public ChatMessage(String username, String message, long timestamp) {

778

this.username = username;

779

this.message = message;

780

this.timestamp = timestamp;

781

}

782

783

// Getters and setters...

784

}

785

```

786

787

## Media Type Constants

788

789

JAX-RS provides a constant for SSE media type:

790

791

```java { .api }

792

public class MediaType {

793

794

public static final String SERVER_SENT_EVENTS = "text/event-stream";

795

public static final MediaType SERVER_SENT_EVENTS_TYPE = new MediaType("text", "event-stream");

796

}

797

```

798

799

## Best Practices

800

801

### Connection Management

802

803

```java

804

@Path("/managed-events")

805

public class ManagedEventResource {

806

807

private final Set<SseEventSink> activeSinks = ConcurrentHashMap.newKeySet();

808

private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);

809

810

@GET

811

@Path("/heartbeat")

812

@Produces(MediaType.SERVER_SENT_EVENTS)

813

public void startHeartbeat(@Context SseEventSink eventSink,

814

@Context Sse sse) {

815

816

activeSinks.add(eventSink);

817

818

// Send periodic heartbeat

819

ScheduledFuture<?> heartbeat = scheduler.scheduleAtFixedRate(() -> {

820

if (eventSink.isClosed()) {

821

activeSinks.remove(eventSink);

822

return;

823

}

824

825

OutboundSseEvent heartbeatEvent = sse.newEventBuilder()

826

.name("heartbeat")

827

.data("ping")

828

.build();

829

830

eventSink.send(heartbeatEvent).whenComplete((unused, throwable) -> {

831

if (throwable != null) {

832

activeSinks.remove(eventSink);

833

eventSink.close();

834

}

835

});

836

}, 0, 30, TimeUnit.SECONDS);

837

838

// Clean up on close

839

eventSink.send(sse.newEventBuilder().name("connected").build())

840

.whenComplete((unused, throwable) -> {

841

if (throwable != null) {

842

heartbeat.cancel(true);

843

activeSinks.remove(eventSink);

844

}

845

});

846

}

847

848

@PreDestroy

849

private void cleanup() {

850

activeSinks.forEach(SseEventSink::close);

851

scheduler.shutdown();

852

}

853

}