CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-javax-ws-rs--javax-ws-rs-api

Java API for RESTful Web Services

Pending
Overview
Eval results
Files

server-sent-events.mddocs/

Server-Sent Events

The JAX-RS Server-Sent Events (SSE) API provides standards-based support for real-time server-to-client communication. SSE enables servers to push data to web clients over a single HTTP connection, making it ideal for live updates, notifications, and streaming data scenarios.

Core Imports

import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseEvent;
import javax.ws.rs.sse.InboundSseEvent;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.SseEventSink;
import javax.ws.rs.sse.SseEventSource;
import javax.ws.rs.sse.SseBroadcaster;

import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

SSE Factory and Events

Sse Interface

Factory for creating SSE-related objects.

public interface Sse {
    
    OutboundSseEvent.Builder newEventBuilder();
    SseBroadcaster newBroadcaster();
}

SseEvent Interface

Base interface for Server-Sent Events.

public interface SseEvent {
    
    String getName();
    String getId();
    String getComment();
    String getData();
    long getReconnectDelay();
    boolean isReconnectDelaySet();
}

OutboundSseEvent Interface

Server-side outbound events.

public interface OutboundSseEvent extends SseEvent {
    
    MediaType getMediaType();
    
    public static interface Builder {
        
        Builder id(String id);
        Builder name(String name);
        Builder reconnectDelay(long milliseconds);
        Builder data(Object data);
        Builder data(String data);
        Builder data(Class type, Object data);
        Builder data(GenericType type, Object data);
        Builder mediaType(MediaType mediaType);
        Builder comment(String comment);
        
        OutboundSseEvent build();
    }
}

InboundSseEvent Interface

Client-side inbound events.

public interface InboundSseEvent extends SseEvent {
    
    <T> T readData(Class<T> type);
    <T> T readData(GenericType<T> type);
    <T> T readData(Class<T> messageType, MediaType mediaType);
    <T> T readData(GenericType<T> type, MediaType mediaType);
    
    boolean isEmpty();
}

Server-Side SSE

SseEventSink Interface

Server-side sink for sending events to clients.

public interface SseEventSink extends AutoCloseable {
    
    CompletionStage<?> send(OutboundSseEvent event);
    boolean isClosed();
    void close();
}

Basic Server-Side SSE Examples:

@Path("/events")
public class EventResource {
    
    @Context
    private Sse sse;
    
    // Basic SSE endpoint
    @GET
    @Path("/stream")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void getEventStream(@Context SseEventSink eventSink) {
        
        // Send initial event
        OutboundSseEvent event = sse.newEventBuilder()
            .name("message")
            .id("1")
            .data("Connected to event stream")
            .build();
        
        eventSink.send(event);
        
        // Simulate sending periodic updates
        CompletableFuture.runAsync(() -> {
            try {
                for (int i = 2; i <= 10; i++) {
                    Thread.sleep(1000); // Wait 1 second
                    
                    OutboundSseEvent periodicEvent = sse.newEventBuilder()
                        .name("update")
                        .id(String.valueOf(i))
                        .data("Update #" + i + " at " + new Date())
                        .build();
                    
                    CompletionStage<?> result = eventSink.send(periodicEvent);
                    
                    // Handle send completion
                    result.whenComplete((unused, throwable) -> {
                        if (throwable != null) {
                            System.err.println("Failed to send event: " + throwable.getMessage());
                        }
                    });
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                eventSink.close();
            }
        });
    }
    
    // SSE with JSON data
    @GET
    @Path("/notifications")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void getNotifications(@Context SseEventSink eventSink) {
        
        // Send structured data as JSON
        Notification notification = new Notification(
            "SYSTEM", 
            "Welcome to the notification service", 
            System.currentTimeMillis()
        );
        
        OutboundSseEvent event = sse.newEventBuilder()
            .name("notification")
            .id(UUID.randomUUID().toString())
            .mediaType(MediaType.APPLICATION_JSON_TYPE)
            .data(notification)
            .build();
        
        eventSink.send(event).whenComplete((unused, throwable) -> {
            if (throwable == null) {
                // Schedule more notifications
                schedulePeriodicNotifications(eventSink);
            } else {
                System.err.println("Failed to send notification: " + throwable.getMessage());
            }
        });
    }
    
    // SSE with reconnection
    @GET
    @Path("/reliable")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void getReliableStream(@Context SseEventSink eventSink,
                                 @QueryParam("lastEventId") String lastEventId) {
        
        // Handle reconnection with last event ID
        int startFromId = 1;
        if (lastEventId != null) {
            try {
                startFromId = Integer.parseInt(lastEventId) + 1;
            } catch (NumberFormatException e) {
                // Invalid last event ID, start from beginning
            }
        }
        
        // Send reconnection delay
        OutboundSseEvent reconnectEvent = sse.newEventBuilder()
            .reconnectDelay(5000) // 5 seconds
            .comment("Reconnect in 5 seconds if connection is lost")
            .build();
        
        eventSink.send(reconnectEvent);
        
        // Send events starting from the specified ID
        sendEventsFromId(eventSink, startFromId);
    }
    
    private void schedulePeriodicNotifications(SseEventSink eventSink) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        
        scheduler.scheduleAtFixedRate(() -> {
            if (eventSink.isClosed()) {
                scheduler.shutdown();
                return;
            }
            
            Notification notification = notificationService.getNextNotification();
            if (notification != null) {
                OutboundSseEvent event = sse.newEventBuilder()
                    .name("notification")
                    .id(notification.getId())
                    .mediaType(MediaType.APPLICATION_JSON_TYPE)
                    .data(notification)
                    .build();
                
                eventSink.send(event);
            }
        }, 0, 30, TimeUnit.SECONDS);
    }
    
    private void sendEventsFromId(SseEventSink eventSink, int startFromId) {
        CompletableFuture.runAsync(() -> {
            List<EventData> events = eventService.getEventsFromId(startFromId);
            
            for (EventData eventData : events) {
                if (eventSink.isClosed()) {
                    break;
                }
                
                OutboundSseEvent event = sse.newEventBuilder()
                    .name(eventData.getType())
                    .id(String.valueOf(eventData.getId()))
                    .data(eventData.getData())
                    .build();
                
                eventSink.send(event);
                
                try {
                    Thread.sleep(100); // Small delay between events
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
    }
}

// Supporting classes
public class Notification {
    private String type;
    private String message;
    private long timestamp;
    private String id;
    
    public Notification(String type, String message, long timestamp) {
        this.type = type;
        this.message = message;
        this.timestamp = timestamp;
        this.id = UUID.randomUUID().toString();
    }
    
    // Getters and setters...
}

public class EventData {
    private int id;
    private String type;
    private Object data;
    
    // Constructors, getters and setters...
}

Broadcasting Events

SseBroadcaster Interface

Broadcasts events to multiple clients.

public interface SseBroadcaster extends AutoCloseable {
    
    void register(SseEventSink sseEventSink);
    CompletionStage<?> broadcast(OutboundSseEvent event);
    void close();
    void onError(BiConsumer<SseEventSink, Throwable> onError);
    void onClose(Consumer<SseEventSink> onClose);
}

Broadcasting Examples:

@Path("/broadcast")
@Singleton  // Important: broadcaster should be singleton
public class BroadcastResource {
    
    @Context
    private Sse sse;
    
    private final SseBroadcaster broadcaster;
    private final Set<SseEventSink> clients = ConcurrentHashMap.newKeySet();
    
    public BroadcastResource() {
        // Will be initialized when first client connects
        this.broadcaster = null;
    }
    
    @PostConstruct
    private void initBroadcaster() {
        // Configure broadcaster error handling
        broadcaster.onError((eventSink, throwable) -> {
            System.err.println("Error sending to client: " + throwable.getMessage());
            clients.remove(eventSink);
            eventSink.close();
        });
        
        broadcaster.onClose((eventSink) -> {
            System.out.println("Client disconnected");
            clients.remove(eventSink);
        });
    }
    
    @GET
    @Path("/connect")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void connect(@Context SseEventSink eventSink) {
        
        // Lazy initialization of broadcaster
        if (broadcaster == null) {
            synchronized (this) {
                if (broadcaster == null) {
                    broadcaster = sse.newBroadcaster();
                    initBroadcaster();
                }
            }
        }
        
        // Register new client
        broadcaster.register(eventSink);
        clients.add(eventSink);
        
        // Send welcome message to new client
        OutboundSseEvent welcomeEvent = sse.newEventBuilder()
            .name("welcome")
            .data("Connected to broadcast channel. " + clients.size() + " clients online.")
            .build();
        
        eventSink.send(welcomeEvent);
        
        System.out.println("New client connected. Total clients: " + clients.size());
    }
    
    @POST
    @Path("/announce")
    public Response announce(String message) {
        
        if (broadcaster == null) {
            return Response.status(Response.Status.SERVICE_UNAVAILABLE)
                          .entity("No broadcast service available")
                          .build();
        }
        
        // Broadcast message to all connected clients
        OutboundSseEvent announcement = sse.newEventBuilder()
            .name("announcement")
            .id(UUID.randomUUID().toString())
            .data("ANNOUNCEMENT: " + message)
            .build();
        
        CompletionStage<?> result = broadcaster.broadcast(announcement);
        
        result.whenComplete((unused, throwable) -> {
            if (throwable != null) {
                System.err.println("Broadcast failed: " + throwable.getMessage());
            } else {
                System.out.println("Broadcasted to " + clients.size() + " clients: " + message);
            }
        });
        
        return Response.ok("Announcement sent to " + clients.size() + " clients").build();
    }
    
    @POST
    @Path("/alert")
    public Response sendAlert(AlertMessage alert) {
        
        if (broadcaster == null) {
            return Response.status(Response.Status.SERVICE_UNAVAILABLE)
                          .entity("No broadcast service available")
                          .build();
        }
        
        // Send structured alert
        OutboundSseEvent alertEvent = sse.newEventBuilder()
            .name("alert")
            .id(alert.getId())
            .mediaType(MediaType.APPLICATION_JSON_TYPE)
            .data(alert)
            .build();
        
        broadcaster.broadcast(alertEvent);
        
        return Response.ok("Alert broadcasted").build();
    }
    
    @GET
    @Path("/status")
    public Response getStatus() {
        Map<String, Object> status = new HashMap<>();
        status.put("connectedClients", clients.size());
        status.put("broadcasterActive", broadcaster != null);
        
        return Response.ok(status).build();
    }
    
    @PreDestroy
    private void cleanup() {
        if (broadcaster != null) {
            broadcaster.close();
        }
        clients.forEach(SseEventSink::close);
        clients.clear();
    }
}

// Chat room example
@Path("/chat")
@Singleton
public class ChatResource {
    
    @Context
    private Sse sse;
    
    private SseBroadcaster chatBroadcaster;
    private final Map<String, SseEventSink> chatClients = new ConcurrentHashMap<>();
    
    @PostConstruct
    private void initChat() {
        chatBroadcaster = sse.newBroadcaster();
        
        chatBroadcaster.onClose(eventSink -> {
            chatClients.values().removeIf(sink -> sink == eventSink);
            broadcastUserCount();
        });
    }
    
    @GET
    @Path("/join")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void joinChat(@QueryParam("username") String username,
                        @Context SseEventSink eventSink) {
        
        if (username == null || username.trim().isEmpty()) {
            eventSink.close();
            return;
        }
        
        // Register user
        chatBroadcaster.register(eventSink);
        chatClients.put(username, eventSink);
        
        // Notify about user joining
        OutboundSseEvent joinEvent = sse.newEventBuilder()
            .name("user_joined")
            .data(username + " joined the chat")
            .build();
        
        chatBroadcaster.broadcast(joinEvent);
        broadcastUserCount();
    }
    
    @POST
    @Path("/message")
    public Response sendMessage(@FormParam("username") String username,
                               @FormParam("message") String message) {
        
        if (!chatClients.containsKey(username)) {
            return Response.status(Response.Status.UNAUTHORIZED)
                          .entity("User not in chat")
                          .build();
        }
        
        ChatMessage chatMessage = new ChatMessage(username, message, System.currentTimeMillis());
        
        OutboundSseEvent messageEvent = sse.newEventBuilder()
            .name("chat_message")
            .mediaType(MediaType.APPLICATION_JSON_TYPE)
            .data(chatMessage)
            .build();
        
        chatBroadcaster.broadcast(messageEvent);
        
        return Response.ok().build();
    }
    
    private void broadcastUserCount() {
        OutboundSseEvent countEvent = sse.newEventBuilder()
            .name("user_count")
            .data("Users online: " + chatClients.size())
            .build();
        
        chatBroadcaster.broadcast(countEvent);
    }
}

Client-Side SSE

SseEventSource Interface

Client-side source for consuming Server-Sent Events.

public interface SseEventSource extends AutoCloseable {
    
    void register(Consumer<InboundSseEvent> onEvent);
    void register(Consumer<InboundSseEvent> onEvent, 
                 Consumer<Throwable> onError);
    void register(Consumer<InboundSseEvent> onEvent,
                 Consumer<Throwable> onError, 
                 Runnable onComplete);
    
    void open();
    boolean isOpen();
    void close();
    
    public static abstract class Builder {
        
        public static Builder newBuilder();
        
        public abstract Builder named(String name);
        public abstract Builder reconnectingEvery(long delay, TimeUnit unit);
        public abstract SseEventSource build();
        
        protected abstract Builder target(WebTarget endpoint);
    }
}

Client-Side SSE Examples:

public class SseClientExamples {
    
    // Basic SSE client
    public void basicSseClient() {
        Client client = ClientBuilder.newClient();
        WebTarget target = client.target("http://localhost:8080/events/stream");
        
        SseEventSource eventSource = SseEventSource.target(target).build();
        
        // Register event handler
        eventSource.register(event -> {
            System.out.println("Received event:");
            System.out.println("  Name: " + event.getName());
            System.out.println("  ID: " + event.getId());
            System.out.println("  Data: " + event.readData(String.class));
        });
        
        // Open connection
        eventSource.open();
        
        // Keep running for demonstration
        try {
            Thread.sleep(30000); // 30 seconds
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            eventSource.close();
            client.close();
        }
    }
    
    // SSE client with error handling
    public void sseClientWithErrorHandling() {
        Client client = ClientBuilder.newClient();
        WebTarget target = client.target("http://localhost:8080/events/notifications");
        
        SseEventSource eventSource = SseEventSource.target(target)
            .reconnectingEvery(5, TimeUnit.SECONDS)
            .build();
        
        // Register with error handling
        eventSource.register(
            event -> {
                // Handle different event types
                String eventName = event.getName();
                switch (eventName) {
                    case "notification":
                        Notification notification = event.readData(Notification.class);
                        handleNotification(notification);
                        break;
                    case "alert":
                        AlertMessage alert = event.readData(AlertMessage.class);
                        handleAlert(alert);
                        break;
                    default:
                        System.out.println("Unknown event: " + eventName);
                }
            },
            error -> {
                System.err.println("SSE Error: " + error.getMessage());
                // Could implement custom reconnection logic here
            },
            () -> {
                System.out.println("SSE stream completed");
            }
        );
        
        eventSource.open();
        
        // Run until interrupted
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            eventSource.close();
            client.close();
        }));
        
        try {
            Thread.currentThread().join(); // Wait forever
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // Chat client
    public void chatClient(String username) {
        Client client = ClientBuilder.newClient();
        WebTarget chatTarget = client.target("http://localhost:8080/chat/join")
                                    .queryParam("username", username);
        
        SseEventSource chatSource = SseEventSource.target(chatTarget).build();
        
        chatSource.register(event -> {
            String eventType = event.getName();
            switch (eventType) {
                case "user_joined":
                case "user_count":
                    System.out.println("[SYSTEM] " + event.readData(String.class));
                    break;
                case "chat_message":
                    ChatMessage message = event.readData(ChatMessage.class);
                    System.out.printf("[%s] %s%n", message.getUsername(), message.getMessage());
                    break;
            }
        });
        
        chatSource.open();
        
        // Send messages from console input
        Scanner scanner = new Scanner(System.in);
        WebTarget messageTarget = client.target("http://localhost:8080/chat/message");
        
        System.out.println("Connected to chat. Type messages (or 'quit' to exit):");
        
        String input;
        while (!(input = scanner.nextLine()).equals("quit")) {
            Form messageForm = new Form()
                .param("username", username)
                .param("message", input);
            
            messageTarget.request()
                        .post(Entity.form(messageForm));
        }
        
        chatSource.close();
        client.close();
    }
    
    // Resilient SSE client with custom reconnection
    public void resilientSseClient() {
        Client client = ClientBuilder.newClient();
        WebTarget target = client.target("http://localhost:8080/events/reliable");
        
        AtomicReference<String> lastEventId = new AtomicReference<>();
        AtomicBoolean shouldReconnect = new AtomicBoolean(true);
        
        while (shouldReconnect.get()) {
            try {
                WebTarget currentTarget = target;
                if (lastEventId.get() != null) {
                    currentTarget = target.queryParam("lastEventId", lastEventId.get());
                }
                
                SseEventSource eventSource = SseEventSource.target(currentTarget).build();
                
                eventSource.register(
                    event -> {
                        // Update last received event ID
                        if (event.getId() != null) {
                            lastEventId.set(event.getId());
                        }
                        
                        System.out.printf("Event [%s]: %s%n", 
                                        event.getId(), event.readData(String.class));
                    },
                    error -> {
                        System.err.println("Connection error: " + error.getMessage());
                        // Will trigger reconnection due to error
                    }
                );
                
                eventSource.open();
                
                // Wait for connection to close or error
                Thread.sleep(Long.MAX_VALUE);
                
            } catch (InterruptedException e) {
                shouldReconnect.set(false);
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                System.err.println("Reconnecting in 5 seconds...");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException ie) {
                    shouldReconnect.set(false);
                    Thread.currentThread().interrupt();
                }
            }
        }
        
        client.close();
    }
    
    private void handleNotification(Notification notification) {
        System.out.println("NOTIFICATION: " + notification.getMessage());
    }
    
    private void handleAlert(AlertMessage alert) {
        System.err.println("ALERT: " + alert.getMessage());
    }
}

// Supporting classes for examples
public class AlertMessage {
    private String id;
    private String severity;
    private String message;
    private long timestamp;
    
    // Constructors, getters, setters...
}

public class ChatMessage {
    private String username;
    private String message;
    private long timestamp;
    
    public ChatMessage(String username, String message, long timestamp) {
        this.username = username;
        this.message = message;
        this.timestamp = timestamp;
    }
    
    // Getters and setters...
}

Media Type Constants

JAX-RS provides a constant for SSE media type:

public class MediaType {
    
    public static final String SERVER_SENT_EVENTS = "text/event-stream";
    public static final MediaType SERVER_SENT_EVENTS_TYPE = new MediaType("text", "event-stream");
}

Best Practices

Connection Management

@Path("/managed-events")
public class ManagedEventResource {
    
    private final Set<SseEventSink> activeSinks = ConcurrentHashMap.newKeySet();
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    
    @GET
    @Path("/heartbeat")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void startHeartbeat(@Context SseEventSink eventSink,
                              @Context Sse sse) {
        
        activeSinks.add(eventSink);
        
        // Send periodic heartbeat
        ScheduledFuture<?> heartbeat = scheduler.scheduleAtFixedRate(() -> {
            if (eventSink.isClosed()) {
                activeSinks.remove(eventSink);
                return;
            }
            
            OutboundSseEvent heartbeatEvent = sse.newEventBuilder()
                .name("heartbeat")
                .data("ping")
                .build();
            
            eventSink.send(heartbeatEvent).whenComplete((unused, throwable) -> {
                if (throwable != null) {
                    activeSinks.remove(eventSink);
                    eventSink.close();
                }
            });
        }, 0, 30, TimeUnit.SECONDS);
        
        // Clean up on close
        eventSink.send(sse.newEventBuilder().name("connected").build())
                .whenComplete((unused, throwable) -> {
                    if (throwable != null) {
                        heartbeat.cancel(true);
                        activeSinks.remove(eventSink);
                    }
                });
    }
    
    @PreDestroy
    private void cleanup() {
        activeSinks.forEach(SseEventSink::close);
        scheduler.shutdown();
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-javax-ws-rs--javax-ws-rs-api

docs

client-api.md

core-types.md

extensions.md

index.md

resource-endpoints.md

server-container.md

server-sent-events.md

tile.json