Java API for the Play Framework providing web application development capabilities including form handling, validation, dependency injection, and utility libraries
—
Play Framework provides comprehensive streaming capabilities for real-time web applications including Server-Sent Events (SSE) and Comet streaming implementations. These utilities enable efficient chunked response handling with connection lifecycle management for building responsive, real-time user interfaces.
Complete Server-Sent Events implementation for real-time server-to-client communication with event lifecycle management.
/**
* Server-Sent Events implementation for real-time server-to-client communication
*/
public abstract class EventSource extends Chunks<String> {
/** Create new EventSource */
public EventSource();
/** Called when SSE connection is ready */
public void onReady(Chunks.Out<String> out);
/** Send event to client */
public void send(Event event);
/** Abstract method called when client connects */
public abstract void onConnected();
/** Add callback for when client disconnects */
public void onDisconnected(F.Callback0 callback);
/** Close the SSE connection */
public void close();
/** Create EventSource with connection callback */
public static EventSource whenConnected(F.Callback<EventSource> callback);
}
/**
* Server-Sent Event builder with flexible configuration
*/
public static class EventSource.Event {
/** Create event with data, id, and name */
public Event(String data, String id, String name);
/** Set event name/type */
public Event withName(String name);
/** Set event ID for client-side tracking */
public Event withId(String id);
/** Get formatted SSE event string */
public String formatted();
/** Create event with string data */
public static Event event(String data);
/** Create event with JSON data */
public static Event event(JsonNode json);
}Usage Examples:
import play.libs.EventSource;
import play.libs.EventSource.Event;
import play.libs.F;
import play.mvc.Result;
// Real-time notification service
public class NotificationController extends Controller {
public Result streamNotifications() {
return ok(EventSource.whenConnected(eventSource -> {
// Subscribe to notification events
notificationService.subscribe(eventSource::onConnected);
// Handle client disconnect
eventSource.onDisconnected(() -> {
notificationService.unsubscribe(eventSource);
Logger.info("Client disconnected from notifications");
});
}));
}
}
// Live data streaming
public class LiveDataController extends Controller {
public Result streamStockPrices() {
return ok(new EventSource() {
private final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor();
@Override
public void onConnected() {
// Send initial data
send(Event.event("connected").withName("system"));
// Stream stock prices every second
scheduler.scheduleAtFixedRate(() -> {
StockPrice price = stockService.getCurrentPrice("AAPL");
JsonNode priceJson = Json.toJson(price);
Event priceEvent = Event.event(priceJson)
.withName("price")
.withId(String.valueOf(System.currentTimeMillis()));
send(priceEvent);
}, 0, 1, TimeUnit.SECONDS);
}
@Override
public void onDisconnected(F.Callback0 callback) {
super.onDisconnected(callback);
scheduler.shutdown();
}
});
}
}
// Chat application
public class ChatController extends Controller {
private static final List<EventSource> chatClients = new ArrayList<>();
public Result joinChat() {
return ok(EventSource.whenConnected(eventSource -> {
synchronized (chatClients) {
chatClients.add(eventSource);
}
// Send welcome message
Event welcome = Event.event("Welcome to the chat!")
.withName("message")
.withId(UUID.randomUUID().toString());
eventSource.send(welcome);
// Handle disconnect
eventSource.onDisconnected(() -> {
synchronized (chatClients) {
chatClients.remove(eventSource);
}
});
}));
}
public Result sendMessage() {
JsonNode json = request().body().asJson();
String message = json.get("message").asText();
String user = json.get("user").asText();
// Broadcast to all connected clients
Event chatEvent = Event.event(user + ": " + message)
.withName("message")
.withId(UUID.randomUUID().toString());
synchronized (chatClients) {
chatClients.forEach(client -> client.send(chatEvent));
}
return ok("Message sent");
}
}Comet streaming implementation for real-time bidirectional communication with browser compatibility features.
/**
* Chunked stream for Comet messaging with browser compatibility
*/
public abstract class Comet extends Chunks<String> {
/** Create Comet with JavaScript callback method */
public Comet(String callbackMethod);
/** Called when Comet connection is ready */
public void onReady(Chunks.Out<String> out);
/** Get initial buffer for browser compatibility */
public String initialBuffer();
/** Send string message to client */
public void sendMessage(String message);
/** Send JSON message to client */
public void sendMessage(JsonNode message);
/** Abstract method called when client connects */
public abstract void onConnected();
/** Add callback for when client disconnects */
public void onDisconnected(Callback0 callback);
/** Close the Comet connection */
public void close();
/** Create Comet with JavaScript method and connection callback */
public static Comet whenConnected(String jsMethod, Callback<Comet> callback);
}Usage Examples:
import play.libs.Comet;
import play.libs.F.Callback;
import play.libs.F.Callback0;
// Real-time dashboard
public class DashboardController extends Controller {
public Result cometDashboard() {
return ok(Comet.whenConnected("updateDashboard", comet -> {
// Send initial dashboard data
comet.onConnected();
// Stream updates every 5 seconds
ActorSystem.system().scheduler().schedule(
Duration.create(0, TimeUnit.SECONDS),
Duration.create(5, TimeUnit.SECONDS),
() -> {
DashboardData data = dashboardService.getCurrentData();
comet.sendMessage(Json.toJson(data));
},
ActorSystem.system().dispatcher()
);
}));
}
}
// Live game updates
public class GameController extends Controller {
public Result streamGameUpdates(String gameId) {
return ok(new Comet("gameUpdate") {
private GameSubscription subscription;
@Override
public void onConnected() {
// Subscribe to game events
subscription = gameService.subscribe(gameId, this::handleGameEvent);
// Send current game state
GameState state = gameService.getGameState(gameId);
sendMessage(Json.toJson(state));
}
private void handleGameEvent(GameEvent event) {
sendMessage(Json.toJson(event));
}
@Override
public void onDisconnected(Callback0 callback) {
super.onDisconnected(callback);
if (subscription != null) {
subscription.cancel();
}
}
});
}
}
// Live log streaming
public class LogController extends Controller {
public Result streamLogs() {
return ok(Comet.whenConnected("logUpdate", comet -> {
LogTailer tailer = new LogTailer(new File("application.log"),
new LogTailerListener() {
@Override
public void handle(String line) {
JsonNode logEntry = Json.newObject()
.put("timestamp", System.currentTimeMillis())
.put("message", line);
comet.sendMessage(logEntry);
}
@Override
public void fileRotated() {
comet.sendMessage("Log file rotated");
}
}, 1000);
// Start tailing
new Thread(tailer).start();
// Stop tailing on disconnect
comet.onDisconnected(() -> tailer.stop());
}));
}
}// Connection pool for managing multiple streaming connections
public class StreamingConnectionPool {
private final Map<String, Set<EventSource>> topicSubscriptions = new ConcurrentHashMap<>();
private final Map<EventSource, String> clientTopics = new ConcurrentHashMap<>();
public void subscribe(String topic, EventSource client) {
topicSubscriptions.computeIfAbsent(topic, k -> ConcurrentHashMap.newKeySet()).add(client);
clientTopics.put(client, topic);
// Handle disconnect
client.onDisconnected(() -> unsubscribe(client));
}
public void unsubscribe(EventSource client) {
String topic = clientTopics.remove(client);
if (topic != null) {
Set<EventSource> clients = topicSubscriptions.get(topic);
if (clients != null) {
clients.remove(client);
if (clients.isEmpty()) {
topicSubscriptions.remove(topic);
}
}
}
}
public void broadcast(String topic, Event event) {
Set<EventSource> clients = topicSubscriptions.get(topic);
if (clients != null) {
clients.forEach(client -> {
try {
client.send(event);
} catch (Exception e) {
Logger.warn("Failed to send event to client", e);
unsubscribe(client);
}
});
}
}
public int getSubscriberCount(String topic) {
Set<EventSource> clients = topicSubscriptions.get(topic);
return clients != null ? clients.size() : 0;
}
}
// Usage in controller
public class StreamingController extends Controller {
@Inject
private StreamingConnectionPool connectionPool;
public Result subscribeToTopic(String topic) {
return ok(EventSource.whenConnected(eventSource -> {
connectionPool.subscribe(topic, eventSource);
// Send subscription confirmation
Event confirmEvent = Event.event("Subscribed to " + topic)
.withName("subscription")
.withId(UUID.randomUUID().toString());
eventSource.send(confirmEvent);
}));
}
}// Service for broadcasting messages to multiple streaming connections
@Singleton
public class BroadcastService {
private final StreamingConnectionPool connectionPool;
private final ActorSystem actorSystem;
@Inject
public BroadcastService(StreamingConnectionPool connectionPool, ActorSystem actorSystem) {
this.connectionPool = connectionPool;
this.actorSystem = actorSystem;
}
public void broadcastToTopic(String topic, Object message) {
JsonNode messageJson = Json.toJson(message);
Event event = Event.event(messageJson)
.withName("broadcast")
.withId(UUID.randomUUID().toString());
connectionPool.broadcast(topic, event);
}
public void schedulePeriodicBroadcast(String topic, Supplier<Object> messageSupplier,
Duration interval) {
actorSystem.scheduler().schedule(
Duration.Zero(),
interval,
() -> broadcastToTopic(topic, messageSupplier.get()),
actorSystem.dispatcher()
);
}
public CompletionStage<Void> broadcastToTopicAsync(String topic, CompletionStage<Object> messageFuture) {
return messageFuture.thenAccept(message -> broadcastToTopic(topic, message));
}
}// Resilient streaming implementation with error handling
public abstract class ResilientEventSource extends EventSource {
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private volatile boolean isConnected = true;
@Override
public void send(Event event) {
if (isConnected) {
try {
super.send(event);
} catch (Exception e) {
Logger.warn("Failed to send event, client may have disconnected", e);
handleSendError(e, event);
}
}
}
protected void handleSendError(Exception error, Event event) {
// Attempt to reconnect or queue message
isConnected = false;
// Try to resend after delay
scheduler.schedule(() -> {
if (isConnected) {
try {
super.send(event);
} catch (Exception retryError) {
Logger.error("Failed to resend event after retry", retryError);
}
}
}, 5, TimeUnit.SECONDS);
}
@Override
public void onDisconnected(F.Callback0 callback) {
super.onDisconnected(() -> {
isConnected = false;
scheduler.shutdown();
callback.invoke();
});
}
protected void sendHeartbeat() {
scheduler.scheduleAtFixedRate(() -> {
if (isConnected) {
Event heartbeat = Event.event("ping")
.withName("heartbeat")
.withId(String.valueOf(System.currentTimeMillis()));
send(heartbeat);
}
}, 30, 30, TimeUnit.SECONDS);
}
}// Authenticated streaming with token validation
public class AuthenticatedStreamingController extends Controller {
public Result authenticatedStream() {
String token = request().getQueryString("token");
if (!authService.validateToken(token)) {
return unauthorized("Invalid token");
}
String userId = authService.getUserId(token);
return ok(EventSource.whenConnected(eventSource -> {
// Send authentication confirmation
Event authEvent = Event.event("Authenticated as " + userId)
.withName("auth")
.withId(UUID.randomUUID().toString());
eventSource.send(authEvent);
// Subscribe to user-specific events
userEventService.subscribe(userId, event -> {
Event userEvent = Event.event(Json.toJson(event))
.withName("user_event")
.withId(event.getId());
eventSource.send(userEvent);
});
// Cleanup on disconnect
eventSource.onDisconnected(() -> {
userEventService.unsubscribe(userId, eventSource);
});
}));
}
public Result authorizedComet(String topic) {
String token = request().getQueryString("token");
String userId = authService.getUserId(token);
if (!authService.canAccessTopic(userId, topic)) {
return forbidden("Access denied to topic: " + topic);
}
return ok(Comet.whenConnected("topicUpdate", comet -> {
topicService.subscribe(topic, userId, message -> {
comet.sendMessage(Json.toJson(message));
});
}));
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-play--play-java