CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-play--play-java

Java API for the Play Framework providing web application development capabilities including form handling, validation, dependency injection, and utility libraries

Pending
Overview
Eval results
Files

streaming.mddocs/

Streaming and Real-time Communication

Play Framework provides comprehensive streaming capabilities for real-time web applications including Server-Sent Events (SSE) and Comet streaming implementations. These utilities enable efficient chunked response handling with connection lifecycle management for building responsive, real-time user interfaces.

Capabilities

Server-Sent Events (SSE)

Complete Server-Sent Events implementation for real-time server-to-client communication with event lifecycle management.

/**
 * Server-Sent Events implementation for real-time server-to-client communication
 */
public abstract class EventSource extends Chunks<String> {
    /** Create new EventSource */
    public EventSource();
    
    /** Called when SSE connection is ready */
    public void onReady(Chunks.Out<String> out);
    
    /** Send event to client */
    public void send(Event event);
    
    /** Abstract method called when client connects */
    public abstract void onConnected();
    
    /** Add callback for when client disconnects */
    public void onDisconnected(F.Callback0 callback);
    
    /** Close the SSE connection */
    public void close();
    
    /** Create EventSource with connection callback */
    public static EventSource whenConnected(F.Callback<EventSource> callback);
}

/**
 * Server-Sent Event builder with flexible configuration
 */
public static class EventSource.Event {
    /** Create event with data, id, and name */
    public Event(String data, String id, String name);
    
    /** Set event name/type */
    public Event withName(String name);
    
    /** Set event ID for client-side tracking */
    public Event withId(String id);
    
    /** Get formatted SSE event string */
    public String formatted();
    
    /** Create event with string data */
    public static Event event(String data);
    
    /** Create event with JSON data */
    public static Event event(JsonNode json);
}

Usage Examples:

import play.libs.EventSource;
import play.libs.EventSource.Event;
import play.libs.F;
import play.mvc.Result;

// Real-time notification service
public class NotificationController extends Controller {
    
    public Result streamNotifications() {
        return ok(EventSource.whenConnected(eventSource -> {
            // Subscribe to notification events
            notificationService.subscribe(eventSource::onConnected);
            
            // Handle client disconnect
            eventSource.onDisconnected(() -> {
                notificationService.unsubscribe(eventSource);
                Logger.info("Client disconnected from notifications");
            });
        }));
    }
}

// Live data streaming
public class LiveDataController extends Controller {
    
    public Result streamStockPrices() {
        return ok(new EventSource() {
            private final ScheduledExecutorService scheduler = 
                Executors.newSingleThreadScheduledExecutor();
            
            @Override
            public void onConnected() {
                // Send initial data
                send(Event.event("connected").withName("system"));
                
                // Stream stock prices every second
                scheduler.scheduleAtFixedRate(() -> {
                    StockPrice price = stockService.getCurrentPrice("AAPL");
                    JsonNode priceJson = Json.toJson(price);
                    
                    Event priceEvent = Event.event(priceJson)
                        .withName("price")
                        .withId(String.valueOf(System.currentTimeMillis()));
                    
                    send(priceEvent);
                }, 0, 1, TimeUnit.SECONDS);
            }
            
            @Override
            public void onDisconnected(F.Callback0 callback) {
                super.onDisconnected(callback);
                scheduler.shutdown();
            }
        });
    }
}

// Chat application
public class ChatController extends Controller {
    
    private static final List<EventSource> chatClients = new ArrayList<>();
    
    public Result joinChat() {
        return ok(EventSource.whenConnected(eventSource -> {
            synchronized (chatClients) {
                chatClients.add(eventSource);
            }
            
            // Send welcome message
            Event welcome = Event.event("Welcome to the chat!")
                .withName("message")
                .withId(UUID.randomUUID().toString());
            eventSource.send(welcome);
            
            // Handle disconnect
            eventSource.onDisconnected(() -> {
                synchronized (chatClients) {
                    chatClients.remove(eventSource);
                }
            });
        }));
    }
    
    public Result sendMessage() {
        JsonNode json = request().body().asJson();
        String message = json.get("message").asText();
        String user = json.get("user").asText();
        
        // Broadcast to all connected clients
        Event chatEvent = Event.event(user + ": " + message)
            .withName("message")
            .withId(UUID.randomUUID().toString());
        
        synchronized (chatClients) {
            chatClients.forEach(client -> client.send(chatEvent));
        }
        
        return ok("Message sent");
    }
}

Comet Streaming

Comet streaming implementation for real-time bidirectional communication with browser compatibility features.

/**
 * Chunked stream for Comet messaging with browser compatibility
 */
public abstract class Comet extends Chunks<String> {
    /** Create Comet with JavaScript callback method */
    public Comet(String callbackMethod);
    
    /** Called when Comet connection is ready */
    public void onReady(Chunks.Out<String> out);
    
    /** Get initial buffer for browser compatibility */
    public String initialBuffer();
    
    /** Send string message to client */
    public void sendMessage(String message);
    
    /** Send JSON message to client */
    public void sendMessage(JsonNode message);
    
    /** Abstract method called when client connects */
    public abstract void onConnected();
    
    /** Add callback for when client disconnects */
    public void onDisconnected(Callback0 callback);
    
    /** Close the Comet connection */
    public void close();
    
    /** Create Comet with JavaScript method and connection callback */
    public static Comet whenConnected(String jsMethod, Callback<Comet> callback);
}

Usage Examples:

import play.libs.Comet;
import play.libs.F.Callback;
import play.libs.F.Callback0;

// Real-time dashboard
public class DashboardController extends Controller {
    
    public Result cometDashboard() {
        return ok(Comet.whenConnected("updateDashboard", comet -> {
            
            // Send initial dashboard data
            comet.onConnected();
            
            // Stream updates every 5 seconds
            ActorSystem.system().scheduler().schedule(
                Duration.create(0, TimeUnit.SECONDS),
                Duration.create(5, TimeUnit.SECONDS),
                () -> {
                    DashboardData data = dashboardService.getCurrentData();
                    comet.sendMessage(Json.toJson(data));
                },
                ActorSystem.system().dispatcher()
            );
            
        }));
    }
}

// Live game updates
public class GameController extends Controller {
    
    public Result streamGameUpdates(String gameId) {
        return ok(new Comet("gameUpdate") {
            private GameSubscription subscription;
            
            @Override
            public void onConnected() {
                // Subscribe to game events
                subscription = gameService.subscribe(gameId, this::handleGameEvent);
                
                // Send current game state
                GameState state = gameService.getGameState(gameId);
                sendMessage(Json.toJson(state));
            }
            
            private void handleGameEvent(GameEvent event) {
                sendMessage(Json.toJson(event));
            }
            
            @Override
            public void onDisconnected(Callback0 callback) {
                super.onDisconnected(callback);
                if (subscription != null) {
                    subscription.cancel();
                }
            }
        });
    }
}

// Live log streaming
public class LogController extends Controller {
    
    public Result streamLogs() {
        return ok(Comet.whenConnected("logUpdate", comet -> {
            
            LogTailer tailer = new LogTailer(new File("application.log"), 
                new LogTailerListener() {
                    @Override
                    public void handle(String line) {
                        JsonNode logEntry = Json.newObject()
                            .put("timestamp", System.currentTimeMillis())
                            .put("message", line);
                        comet.sendMessage(logEntry);
                    }
                    
                    @Override
                    public void fileRotated() {
                        comet.sendMessage("Log file rotated");
                    }
                }, 1000);
            
            // Start tailing
            new Thread(tailer).start();
            
            // Stop tailing on disconnect
            comet.onDisconnected(() -> tailer.stop());
        }));
    }
}

Advanced Usage Patterns

Connection Pool Management

// Connection pool for managing multiple streaming connections
public class StreamingConnectionPool {
    
    private final Map<String, Set<EventSource>> topicSubscriptions = new ConcurrentHashMap<>();
    private final Map<EventSource, String> clientTopics = new ConcurrentHashMap<>();
    
    public void subscribe(String topic, EventSource client) {
        topicSubscriptions.computeIfAbsent(topic, k -> ConcurrentHashMap.newKeySet()).add(client);
        clientTopics.put(client, topic);
        
        // Handle disconnect
        client.onDisconnected(() -> unsubscribe(client));
    }
    
    public void unsubscribe(EventSource client) {
        String topic = clientTopics.remove(client);
        if (topic != null) {
            Set<EventSource> clients = topicSubscriptions.get(topic);
            if (clients != null) {
                clients.remove(client);
                if (clients.isEmpty()) {
                    topicSubscriptions.remove(topic);
                }
            }
        }
    }
    
    public void broadcast(String topic, Event event) {
        Set<EventSource> clients = topicSubscriptions.get(topic);
        if (clients != null) {
            clients.forEach(client -> {
                try {
                    client.send(event);
                } catch (Exception e) {
                    Logger.warn("Failed to send event to client", e);
                    unsubscribe(client);
                }
            });
        }
    }
    
    public int getSubscriberCount(String topic) {
        Set<EventSource> clients = topicSubscriptions.get(topic);
        return clients != null ? clients.size() : 0;
    }
}

// Usage in controller
public class StreamingController extends Controller {
    
    @Inject
    private StreamingConnectionPool connectionPool;
    
    public Result subscribeToTopic(String topic) {
        return ok(EventSource.whenConnected(eventSource -> {
            connectionPool.subscribe(topic, eventSource);
            
            // Send subscription confirmation
            Event confirmEvent = Event.event("Subscribed to " + topic)
                .withName("subscription")
                .withId(UUID.randomUUID().toString());
            eventSource.send(confirmEvent);
        }));
    }
}

Message Broadcasting Service

// Service for broadcasting messages to multiple streaming connections
@Singleton
public class BroadcastService {
    
    private final StreamingConnectionPool connectionPool;
    private final ActorSystem actorSystem;
    
    @Inject
    public BroadcastService(StreamingConnectionPool connectionPool, ActorSystem actorSystem) {
        this.connectionPool = connectionPool;
        this.actorSystem = actorSystem;
    }
    
    public void broadcastToTopic(String topic, Object message) {
        JsonNode messageJson = Json.toJson(message);
        Event event = Event.event(messageJson)
            .withName("broadcast")
            .withId(UUID.randomUUID().toString());
        
        connectionPool.broadcast(topic, event);
    }
    
    public void schedulePeriodicBroadcast(String topic, Supplier<Object> messageSupplier, 
                                         Duration interval) {
        actorSystem.scheduler().schedule(
            Duration.Zero(),
            interval,
            () -> broadcastToTopic(topic, messageSupplier.get()),
            actorSystem.dispatcher()
        );
    }
    
    public CompletionStage<Void> broadcastToTopicAsync(String topic, CompletionStage<Object> messageFuture) {
        return messageFuture.thenAccept(message -> broadcastToTopic(topic, message));
    }
}

Error Handling and Resilience

// Resilient streaming implementation with error handling
public abstract class ResilientEventSource extends EventSource {
    
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private volatile boolean isConnected = true;
    
    @Override
    public void send(Event event) {
        if (isConnected) {
            try {
                super.send(event);
            } catch (Exception e) {
                Logger.warn("Failed to send event, client may have disconnected", e);
                handleSendError(e, event);
            }
        }
    }
    
    protected void handleSendError(Exception error, Event event) {
        // Attempt to reconnect or queue message
        isConnected = false;
        
        // Try to resend after delay
        scheduler.schedule(() -> {
            if (isConnected) {
                try {
                    super.send(event);
                } catch (Exception retryError) {
                    Logger.error("Failed to resend event after retry", retryError);
                }
            }
        }, 5, TimeUnit.SECONDS);
    }
    
    @Override
    public void onDisconnected(F.Callback0 callback) {
        super.onDisconnected(() -> {
            isConnected = false;
            scheduler.shutdown();
            callback.invoke();
        });
    }
    
    protected void sendHeartbeat() {
        scheduler.scheduleAtFixedRate(() -> {
            if (isConnected) {
                Event heartbeat = Event.event("ping")
                    .withName("heartbeat")
                    .withId(String.valueOf(System.currentTimeMillis()));
                send(heartbeat);
            }
        }, 30, 30, TimeUnit.SECONDS);
    }
}

Authentication and Authorization

// Authenticated streaming with token validation
public class AuthenticatedStreamingController extends Controller {
    
    public Result authenticatedStream() {
        String token = request().getQueryString("token");
        
        if (!authService.validateToken(token)) {
            return unauthorized("Invalid token");
        }
        
        String userId = authService.getUserId(token);
        
        return ok(EventSource.whenConnected(eventSource -> {
            // Send authentication confirmation
            Event authEvent = Event.event("Authenticated as " + userId)
                .withName("auth")
                .withId(UUID.randomUUID().toString());
            eventSource.send(authEvent);
            
            // Subscribe to user-specific events
            userEventService.subscribe(userId, event -> {
                Event userEvent = Event.event(Json.toJson(event))
                    .withName("user_event")
                    .withId(event.getId());
                eventSource.send(userEvent);
            });
            
            // Cleanup on disconnect
            eventSource.onDisconnected(() -> {
                userEventService.unsubscribe(userId, eventSource);
            });
        }));
    }
    
    public Result authorizedComet(String topic) {
        String token = request().getQueryString("token");
        String userId = authService.getUserId(token);
        
        if (!authService.canAccessTopic(userId, topic)) {
            return forbidden("Access denied to topic: " + topic);
        }
        
        return ok(Comet.whenConnected("topicUpdate", comet -> {
            topicService.subscribe(topic, userId, message -> {
                comet.sendMessage(Json.toJson(message));
            });
        }));
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-play--play-java

docs

dependency-injection.md

form-processing.md

formatting.md

index.md

routing.md

streaming.md

utilities.md

validation.md

tile.json