Server-Sent Events (SSE) client implementation for receiving real-time event streams from servers. SSE provides a unidirectional communication channel from server to client, ideal for live updates, notifications, and streaming data.
Functions for establishing SSE connections with event stream processing.
/**
* Connect to SSE endpoint and process events
*/
suspend fun <T> HttpClient.sse(
request: HttpRequestBuilder.() -> Unit = {},
reconnectionTime: Duration? = null,
showCommentEvents: Boolean = false,
showRetryEvents: Boolean = false,
block: suspend ClientSSESession.() -> T
): T
suspend fun <T> HttpClient.sse(
host: String = "localhost",
port: Int = DEFAULT_PORT,
path: String = "/",
request: HttpRequestBuilder.() -> Unit = {},
reconnectionTime: Duration? = null,
showCommentEvents: Boolean = false,
showRetryEvents: Boolean = false,
block: suspend ClientSSESession.() -> T
): T
suspend fun <T> HttpClient.sse(
urlString: String,
request: HttpRequestBuilder.() -> Unit = {},
reconnectionTime: Duration? = null,
showCommentEvents: Boolean = false,
showRetryEvents: Boolean = false,
block: suspend ClientSSESession.() -> T
): T
suspend fun <T> HttpClient.sse(
url: Url,
request: HttpRequestBuilder.() -> Unit = {},
reconnectionTime: Duration? = null,
showCommentEvents: Boolean = false,
showRetryEvents: Boolean = false,
block: suspend ClientSSESession.() -> T
): T
/**
* Create SSE session without automatic connection management
*/
suspend fun HttpClient.sseSession(
request: HttpRequestBuilder.() -> Unit = {},
reconnectionTime: Duration? = null,
showCommentEvents: Boolean = false,
showRetryEvents: Boolean = false
): ClientSSESession
suspend fun HttpClient.sseSession(
urlString: String,
request: HttpRequestBuilder.() -> Unit = {},
reconnectionTime: Duration? = null,
showCommentEvents: Boolean = false,
showRetryEvents: Boolean = false
): ClientSSESessionSession interface for receiving and processing server-sent events.
/**
* Client SSE session for receiving events
*/
interface ClientSSESession {
/** Incoming events channel */
val incoming: ReceiveChannel<ServerSentEvent>
/** Associated HTTP call */
val call: HttpClientCall
/** Close the SSE connection */
suspend fun close()
}
/**
* Default SSE session implementation
*/
class DefaultClientSSESession(
override val call: HttpClientCall,
override val incoming: ReceiveChannel<ServerSentEvent>
) : ClientSSESession {
/** Session coroutine scope */
val coroutineScope: CoroutineScope
override suspend fun close() {
incoming.cancel()
coroutineScope.cancel()
}
}Data structure representing individual SSE events.
/**
* Represents a server-sent event
*/
data class ServerSentEvent(
/** Event data content */
val data: String?,
/** Event type identifier */
val event: String?,
/** Event ID for reconnection */
val id: String?,
/** Retry interval in milliseconds */
val retry: Int?,
/** Comment content (if showCommentEvents = true) */
val comments: String?
) {
companion object {
/** Empty event instance */
val Empty: ServerSentEvent = ServerSentEvent(null, null, null, null, null)
}
/** Check if event is empty */
val isEmpty: Boolean
get() = data == null && event == null && id == null && retry == null && comments == null
}Plugin for configuring SSE functionality and behavior.
/**
* Server-Sent Events plugin for client configuration
*/
object SSE : HttpClientPlugin<SSEConfig, SSEConfig> {
override val key: AttributeKey<SSEConfig>
}
/**
* SSE configuration options
*/
class SSEConfig {
/** Default reconnection time */
var reconnectionTime: Duration? = Duration.ofSeconds(3)
/** Show comment events in the stream */
var showCommentEvents: Boolean = false
/** Show retry events in the stream */
var showRetryEvents: Boolean = false
}Utilities for processing SSE event streams with filtering and transformation.
/**
* Filter events by type
*/
suspend fun ReceiveChannel<ServerSentEvent>.filterByType(
eventType: String
): ReceiveChannel<ServerSentEvent>
/**
* Filter events by data content
*/
suspend fun ReceiveChannel<ServerSentEvent>.filterByData(
predicate: (String) -> Boolean
): ReceiveChannel<ServerSentEvent>
/**
* Transform events to specific type
*/
suspend fun <T> ReceiveChannel<ServerSentEvent>.mapEvents(
transform: (ServerSentEvent) -> T
): ReceiveChannel<T>
/**
* Collect events into a flow
*/
fun ReceiveChannel<ServerSentEvent>.asFlow(): Flow<ServerSentEvent>Automatic reconnection handling for SSE connections.
/**
* SSE reconnection configuration
*/
data class SSEReconnectionConfig(
/** Enable automatic reconnection */
val enabled: Boolean = true,
/** Maximum number of reconnection attempts */
val maxAttempts: Int = -1, // Unlimited
/** Initial reconnection delay */
val initialDelay: Duration = Duration.ofSeconds(1),
/** Maximum reconnection delay */
val maxDelay: Duration = Duration.ofSeconds(30),
/** Backoff multiplier */
val backoffMultiplier: Double = 1.5,
/** Last event ID for reconnection */
var lastEventId: String? = null
)
/**
* Create SSE connection with automatic reconnection
*/
suspend fun <T> HttpClient.sseWithReconnection(
url: String,
config: SSEReconnectionConfig = SSEReconnectionConfig(),
request: HttpRequestBuilder.() -> Unit = {},
block: suspend ClientSSESession.() -> T
): TUsage Examples:
import io.ktor.client.*
import io.ktor.client.plugins.sse.*
import io.ktor.sse.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val client = HttpClient {
install(SSE) {
reconnectionTime = Duration.ofSeconds(5)
showCommentEvents = false
showRetryEvents = true
}
}
// Basic SSE connection
client.sse("https://api.example.com/events") {
for (event in incoming) {
when {
event.data != null -> {
println("Event data: ${event.data}")
// Process different event types
when (event.event) {
"user-joined" -> println("User joined: ${event.data}")
"user-left" -> println("User left: ${event.data}")
"message" -> println("New message: ${event.data}")
else -> println("Unknown event type: ${event.event}")
}
}
event.retry != null -> {
println("Server requested retry in ${event.retry}ms")
}
event.comments != null -> {
println("Server comment: ${event.comments}")
}
}
}
}
// SSE with custom headers and authentication
client.sse("https://api.example.com/live-feed") {
headers {
append("Authorization", "Bearer your-token-here")
append("Accept", "text/event-stream")
append("Cache-Control", "no-cache")
}
parameter("channel", "notifications")
parameter("filter", "important")
} {
println("Connected to live feed")
for (event in incoming) {
if (event.data != null) {
// Parse JSON event data
try {
val eventData = Json.decodeFromString<NotificationEvent>(event.data)
when (eventData.type) {
"alert" -> handleAlert(eventData.payload)
"update" -> handleUpdate(eventData.payload)
"status" -> handleStatus(eventData.payload)
}
} catch (e: Exception) {
println("Failed to parse event: ${e.message}")
}
}
}
}
// SSE session with manual management
val session = client.sseSession("https://api.example.com/stream") {
headers {
append("X-Client-ID", "client-123")
}
}
// Process events in background
val eventJob = launch {
try {
for (event in session.incoming) {
if (event.data != null) {
println("Received: ${event.data}")
// Store last event ID for potential reconnection
event.id?.let { eventId ->
// Save for reconnection
saveLastEventId(eventId)
}
}
}
} catch (e: Exception) {
println("Event processing error: ${e.message}")
}
}
// Do other work...
delay(60000) // Process events for 1 minute
// Clean shutdown
eventJob.cancel()
session.close()
// SSE with event filtering and transformation
client.sse("https://api.example.com/activity-stream") {
showCommentEvents = true
} {
incoming
.asFlow()
.filter { it.event == "user-activity" || it.event == "system-alert" }
.map { event ->
ActivityEvent(
type = event.event ?: "unknown",
data = event.data ?: "",
timestamp = System.currentTimeMillis(),
id = event.id
)
}
.collect { activity ->
println("Activity: ${activity.type} - ${activity.data}")
// Handle specific activities
when (activity.type) {
"system-alert" -> handleSystemAlert(activity)
"user-activity" -> handleUserActivity(activity)
}
}
}
// SSE with reconnection handling
suspend fun connectWithReconnection(maxAttempts: Int = 5) {
var attempt = 0
var lastEventId: String? = null
while (attempt < maxAttempts) {
try {
client.sse("https://api.example.com/events") {
// Include Last-Event-ID for reconnection
lastEventId?.let { id ->
headers {
append("Last-Event-ID", id)
}
}
} {
println("Connected (attempt ${attempt + 1})")
attempt = 0 // Reset on successful connection
for (event in incoming) {
if (event.data != null) {
println("Event: ${event.data}")
// Track last event ID
event.id?.let { lastEventId = it }
}
}
}
// If we get here, connection ended normally
break
} catch (e: Exception) {
attempt++
println("Connection failed (attempt $attempt): ${e.message}")
if (attempt < maxAttempts) {
val delay = minOf(1000L * (1 shl attempt), 30000L) // Exponential backoff
println("Reconnecting in ${delay}ms...")
delay(delay)
} else {
println("Max reconnection attempts reached")
throw e
}
}
}
}
// Use reconnection function
connectWithReconnection()
// SSE for real-time dashboard updates
client.sse("https://dashboard.example.com/metrics-stream") {
parameter("metrics", "cpu,memory,disk")
parameter("interval", "1000") // 1 second updates
} {
for (event in incoming) {
when (event.event) {
"metric-update" -> {
val metrics = Json.decodeFromString<SystemMetrics>(event.data!!)
updateDashboard(metrics)
}
"alert" -> {
val alert = Json.decodeFromString<Alert>(event.data!!)
showAlert(alert)
}
"heartbeat" -> {
println("Server heartbeat - connection alive")
}
}
}
}
// SSE for chat application
client.sse("wss://chat.example.com/room/123/events") {
headers {
append("Authorization", "Bearer $chatToken")
}
} {
for (event in incoming) {
when (event.event) {
"message" -> {
val message = Json.decodeFromString<ChatMessage>(event.data!!)
displayMessage(message)
}
"user-typing" -> {
val typing = Json.decodeFromString<TypingIndicator>(event.data!!)
showTypingIndicator(typing)
}
"user-joined" -> {
val user = Json.decodeFromString<User>(event.data!!)
addUserToRoom(user)
}
"user-left" -> {
val user = Json.decodeFromString<User>(event.data!!)
removeUserFromRoom(user)
}
}
}
}
client.close()
// Data classes for examples
@Serializable
data class NotificationEvent(val type: String, val payload: JsonObject)
@Serializable
data class ActivityEvent(val type: String, val data: String, val timestamp: Long, val id: String?)
@Serializable
data class SystemMetrics(val cpu: Double, val memory: Double, val disk: Double)
@Serializable
data class Alert(val level: String, val message: String, val timestamp: Long)
@Serializable
data class ChatMessage(val id: String, val user: String, val text: String, val timestamp: Long)
@Serializable
data class TypingIndicator(val user: String, val isTyping: Boolean)
@Serializable
data class User(val id: String, val name: String, val avatar: String?)Low-level SSE parsing utilities for custom implementations.
/**
* Parse SSE event from raw text lines
*/
fun parseServerSentEvent(lines: List<String>): ServerSentEvent
/**
* SSE event parser state
*/
class SSEEventParser {
/** Parse a line and update internal state */
fun parseLine(line: String)
/** Get current event and reset parser */
fun getCurrentEvent(): ServerSentEvent?
/** Reset parser state */
fun reset()
}
/**
* Create SSE event parser
*/
fun createSSEParser(): SSEEventParserSSE-specific error handling and connection management.
/**
* SSE connection exception
*/
class SSEException(
message: String,
cause: Throwable? = null
) : Exception(message, cause)
/**
* SSE connection closed exception
*/
class SSEConnectionClosedException(
message: String = "SSE connection was closed"
) : SSEException(message)
/**
* SSE parsing exception
*/
class SSEParsingException(
message: String,
val rawData: String,
cause: Throwable? = null
) : SSEException(message, cause)