or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

content-handling.mdcore-client.mdengine-configuration.mdindex.mdplugin-system.mdrequest-building.mdresponse-processing.mdserver-sent-events.mdwebsocket-support.md
tile.json

server-sent-events.mddocs/

Server-Sent Events

Server-Sent Events (SSE) client implementation for receiving real-time event streams from servers. SSE provides a unidirectional communication channel from server to client, ideal for live updates, notifications, and streaming data.

Capabilities

SSE Connection Functions

Functions for establishing SSE connections with event stream processing.

/**
 * Connect to SSE endpoint and process events
 */
suspend fun <T> HttpClient.sse(
    request: HttpRequestBuilder.() -> Unit = {},
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean = false,
    showRetryEvents: Boolean = false,
    block: suspend ClientSSESession.() -> T
): T

suspend fun <T> HttpClient.sse(
    host: String = "localhost",
    port: Int = DEFAULT_PORT,
    path: String = "/",
    request: HttpRequestBuilder.() -> Unit = {},
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean = false,
    showRetryEvents: Boolean = false,
    block: suspend ClientSSESession.() -> T
): T

suspend fun <T> HttpClient.sse(
    urlString: String,
    request: HttpRequestBuilder.() -> Unit = {},
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean = false,
    showRetryEvents: Boolean = false,
    block: suspend ClientSSESession.() -> T
): T

suspend fun <T> HttpClient.sse(
    url: Url,
    request: HttpRequestBuilder.() -> Unit = {},
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean = false,
    showRetryEvents: Boolean = false,
    block: suspend ClientSSESession.() -> T
): T

/**
 * Create SSE session without automatic connection management
 */
suspend fun HttpClient.sseSession(
    request: HttpRequestBuilder.() -> Unit = {},
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean = false,
    showRetryEvents: Boolean = false
): ClientSSESession

suspend fun HttpClient.sseSession(
    urlString: String,
    request: HttpRequestBuilder.() -> Unit = {},
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean = false,
    showRetryEvents: Boolean = false
): ClientSSESession

SSE Session Interface

Session interface for receiving and processing server-sent events.

/**
 * Client SSE session for receiving events
 */
interface ClientSSESession {
    /** Incoming events channel */
    val incoming: ReceiveChannel<ServerSentEvent>
    
    /** Associated HTTP call */
    val call: HttpClientCall
    
    /** Close the SSE connection */
    suspend fun close()
}

/**
 * Default SSE session implementation
 */
class DefaultClientSSESession(
    override val call: HttpClientCall,
    override val incoming: ReceiveChannel<ServerSentEvent>
) : ClientSSESession {
    /** Session coroutine scope */
    val coroutineScope: CoroutineScope
    
    override suspend fun close() {
        incoming.cancel()
        coroutineScope.cancel()
    }
}

Server-Sent Event Structure

Data structure representing individual SSE events.

/**
 * Represents a server-sent event
 */
data class ServerSentEvent(
    /** Event data content */
    val data: String?,
    
    /** Event type identifier */
    val event: String?,
    
    /** Event ID for reconnection */
    val id: String?,
    
    /** Retry interval in milliseconds */
    val retry: Int?,
    
    /** Comment content (if showCommentEvents = true) */
    val comments: String?
) {
    companion object {
        /** Empty event instance */
        val Empty: ServerSentEvent = ServerSentEvent(null, null, null, null, null)
    }
    
    /** Check if event is empty */
    val isEmpty: Boolean
        get() = data == null && event == null && id == null && retry == null && comments == null
}

SSE Plugin Configuration

Plugin for configuring SSE functionality and behavior.

/**
 * Server-Sent Events plugin for client configuration
 */
object SSE : HttpClientPlugin<SSEConfig, SSEConfig> {
    override val key: AttributeKey<SSEConfig>
}

/**
 * SSE configuration options
 */
class SSEConfig {
    /** Default reconnection time */
    var reconnectionTime: Duration? = Duration.ofSeconds(3)
    
    /** Show comment events in the stream */
    var showCommentEvents: Boolean = false
    
    /** Show retry events in the stream */
    var showRetryEvents: Boolean = false
}

Event Stream Processing

Utilities for processing SSE event streams with filtering and transformation.

/**
 * Filter events by type
 */
suspend fun ReceiveChannel<ServerSentEvent>.filterByType(
    eventType: String
): ReceiveChannel<ServerSentEvent>

/**
 * Filter events by data content
 */
suspend fun ReceiveChannel<ServerSentEvent>.filterByData(
    predicate: (String) -> Boolean
): ReceiveChannel<ServerSentEvent>

/**
 * Transform events to specific type
 */
suspend fun <T> ReceiveChannel<ServerSentEvent>.mapEvents(
    transform: (ServerSentEvent) -> T
): ReceiveChannel<T>

/**
 * Collect events into a flow
 */
fun ReceiveChannel<ServerSentEvent>.asFlow(): Flow<ServerSentEvent>

Reconnection Management

Automatic reconnection handling for SSE connections.

/**
 * SSE reconnection configuration
 */
data class SSEReconnectionConfig(
    /** Enable automatic reconnection */
    val enabled: Boolean = true,
    
    /** Maximum number of reconnection attempts */
    val maxAttempts: Int = -1, // Unlimited
    
    /** Initial reconnection delay */
    val initialDelay: Duration = Duration.ofSeconds(1),
    
    /** Maximum reconnection delay */
    val maxDelay: Duration = Duration.ofSeconds(30),
    
    /** Backoff multiplier */
    val backoffMultiplier: Double = 1.5,
    
    /** Last event ID for reconnection */
    var lastEventId: String? = null
)

/**
 * Create SSE connection with automatic reconnection
 */
suspend fun <T> HttpClient.sseWithReconnection(
    url: String,
    config: SSEReconnectionConfig = SSEReconnectionConfig(),
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend ClientSSESession.() -> T
): T

Usage Examples:

import io.ktor.client.*
import io.ktor.client.plugins.sse.*
import io.ktor.sse.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val client = HttpClient {
    install(SSE) {
        reconnectionTime = Duration.ofSeconds(5)
        showCommentEvents = false
        showRetryEvents = true
    }
}

// Basic SSE connection
client.sse("https://api.example.com/events") {
    for (event in incoming) {
        when {
            event.data != null -> {
                println("Event data: ${event.data}")
                
                // Process different event types
                when (event.event) {
                    "user-joined" -> println("User joined: ${event.data}")
                    "user-left" -> println("User left: ${event.data}")
                    "message" -> println("New message: ${event.data}")
                    else -> println("Unknown event type: ${event.event}")
                }
            }
            event.retry != null -> {
                println("Server requested retry in ${event.retry}ms")
            }
            event.comments != null -> {
                println("Server comment: ${event.comments}")
            }
        }
    }
}

// SSE with custom headers and authentication
client.sse("https://api.example.com/live-feed") {
    headers {
        append("Authorization", "Bearer your-token-here")
        append("Accept", "text/event-stream")
        append("Cache-Control", "no-cache")
    }
    
    parameter("channel", "notifications")
    parameter("filter", "important")
} {
    println("Connected to live feed")
    
    for (event in incoming) {
        if (event.data != null) {
            // Parse JSON event data
            try {
                val eventData = Json.decodeFromString<NotificationEvent>(event.data)
                when (eventData.type) {
                    "alert" -> handleAlert(eventData.payload)
                    "update" -> handleUpdate(eventData.payload)
                    "status" -> handleStatus(eventData.payload)
                }
            } catch (e: Exception) {
                println("Failed to parse event: ${e.message}")
            }
        }
    }
}

// SSE session with manual management
val session = client.sseSession("https://api.example.com/stream") {
    headers {
        append("X-Client-ID", "client-123")
    }
}

// Process events in background
val eventJob = launch {
    try {
        for (event in session.incoming) {
            if (event.data != null) {
                println("Received: ${event.data}")
                
                // Store last event ID for potential reconnection
                event.id?.let { eventId ->
                    // Save for reconnection
                    saveLastEventId(eventId)
                }
            }
        }
    } catch (e: Exception) {
        println("Event processing error: ${e.message}")
    }
}

// Do other work...
delay(60000) // Process events for 1 minute

// Clean shutdown
eventJob.cancel()
session.close()

// SSE with event filtering and transformation
client.sse("https://api.example.com/activity-stream") {
    showCommentEvents = true
} {
    incoming
        .asFlow()
        .filter { it.event == "user-activity" || it.event == "system-alert" }
        .map { event ->
            ActivityEvent(
                type = event.event ?: "unknown",
                data = event.data ?: "",
                timestamp = System.currentTimeMillis(),
                id = event.id
            )
        }
        .collect { activity ->
            println("Activity: ${activity.type} - ${activity.data}")
            
            // Handle specific activities
            when (activity.type) {
                "system-alert" -> handleSystemAlert(activity)
                "user-activity" -> handleUserActivity(activity)
            }
        }
}

// SSE with reconnection handling
suspend fun connectWithReconnection(maxAttempts: Int = 5) {
    var attempt = 0
    var lastEventId: String? = null
    
    while (attempt < maxAttempts) {
        try {
            client.sse("https://api.example.com/events") {
                // Include Last-Event-ID for reconnection
                lastEventId?.let { id ->
                    headers {
                        append("Last-Event-ID", id)
                    }
                }
            } {
                println("Connected (attempt ${attempt + 1})")
                attempt = 0 // Reset on successful connection
                
                for (event in incoming) {
                    if (event.data != null) {
                        println("Event: ${event.data}")
                        
                        // Track last event ID
                        event.id?.let { lastEventId = it }
                    }
                }
            }
            
            // If we get here, connection ended normally
            break
            
        } catch (e: Exception) {
            attempt++
            println("Connection failed (attempt $attempt): ${e.message}")
            
            if (attempt < maxAttempts) {
                val delay = minOf(1000L * (1 shl attempt), 30000L) // Exponential backoff
                println("Reconnecting in ${delay}ms...")
                delay(delay)
            } else {
                println("Max reconnection attempts reached")
                throw e
            }
        }
    }
}

// Use reconnection function
connectWithReconnection()

// SSE for real-time dashboard updates
client.sse("https://dashboard.example.com/metrics-stream") {
    parameter("metrics", "cpu,memory,disk")
    parameter("interval", "1000") // 1 second updates
} {
    for (event in incoming) {
        when (event.event) {
            "metric-update" -> {
                val metrics = Json.decodeFromString<SystemMetrics>(event.data!!)
                updateDashboard(metrics)
            }
            "alert" -> {
                val alert = Json.decodeFromString<Alert>(event.data!!)
                showAlert(alert)
            }
            "heartbeat" -> {
                println("Server heartbeat - connection alive")
            }
        }
    }
}

// SSE for chat application
client.sse("wss://chat.example.com/room/123/events") {
    headers {
        append("Authorization", "Bearer $chatToken")
    }
} {
    for (event in incoming) {
        when (event.event) {
            "message" -> {
                val message = Json.decodeFromString<ChatMessage>(event.data!!)
                displayMessage(message)
            }
            "user-typing" -> {
                val typing = Json.decodeFromString<TypingIndicator>(event.data!!)
                showTypingIndicator(typing)
            }
            "user-joined" -> {
                val user = Json.decodeFromString<User>(event.data!!)
                addUserToRoom(user)
            }
            "user-left" -> {
                val user = Json.decodeFromString<User>(event.data!!)
                removeUserFromRoom(user)
            }
        }
    }
}

client.close()

// Data classes for examples
@Serializable
data class NotificationEvent(val type: String, val payload: JsonObject)

@Serializable  
data class ActivityEvent(val type: String, val data: String, val timestamp: Long, val id: String?)

@Serializable
data class SystemMetrics(val cpu: Double, val memory: Double, val disk: Double)

@Serializable
data class Alert(val level: String, val message: String, val timestamp: Long)

@Serializable
data class ChatMessage(val id: String, val user: String, val text: String, val timestamp: Long)

@Serializable
data class TypingIndicator(val user: String, val isTyping: Boolean)

@Serializable
data class User(val id: String, val name: String, val avatar: String?)

Event Stream Parsing

Low-level SSE parsing utilities for custom implementations.

/**
 * Parse SSE event from raw text lines
 */
fun parseServerSentEvent(lines: List<String>): ServerSentEvent

/**
 * SSE event parser state
 */
class SSEEventParser {
    /** Parse a line and update internal state */
    fun parseLine(line: String)
    
    /** Get current event and reset parser */
    fun getCurrentEvent(): ServerSentEvent?
    
    /** Reset parser state */
    fun reset()
}

/**
 * Create SSE event parser
 */
fun createSSEParser(): SSEEventParser

Error Handling

SSE-specific error handling and connection management.

/**
 * SSE connection exception
 */
class SSEException(
    message: String,
    cause: Throwable? = null
) : Exception(message, cause)

/**
 * SSE connection closed exception
 */
class SSEConnectionClosedException(
    message: String = "SSE connection was closed"
) : SSEException(message)

/**
 * SSE parsing exception
 */
class SSEParsingException(
    message: String,
    val rawData: String,
    cause: Throwable? = null
) : SSEException(message, cause)