CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-ktor--ktor-client-core

Ktor HTTP Client Core - a multiplatform asynchronous HTTP client library for Kotlin providing comprehensive HTTP request/response handling with plugin architecture.

Pending
Overview
Eval results
Files

server-sent-events.mddocs/

Server-Sent Events (SSE)

Server-Sent Events client implementation for real-time event streaming with automatic reconnection and event parsing.

Capabilities

SSE Connection Functions

Establish Server-Sent Events connections for real-time data streaming.

/**
 * Establish SSE connection with URL string
 * @param urlString SSE endpoint URL
 * @param block SSE session handling block
 */
suspend fun HttpClient.sse(
    urlString: String,
    block: suspend ClientSSESession.() -> Unit
)

/**
 * Establish SSE connection with detailed configuration
 * @param host SSE server host
 * @param port SSE server port (default: 80 for HTTP, 443 for HTTPS)
 * @param path SSE endpoint path
 * @param block SSE session handling block
 */
suspend fun HttpClient.sse(
    host: String,
    port: Int = DEFAULT_PORT,
    path: String = "/",
    block: suspend ClientSSESession.() -> Unit
)

/**
 * Get SSE session without automatic event handling
 * @param urlString SSE endpoint URL
 * @returns SSE session for manual management
 */
suspend fun HttpClient.sseSession(urlString: String): ClientSSESession

/**
 * Get SSE session with detailed configuration
 * @param host SSE server host
 * @param port SSE server port
 * @param path SSE endpoint path
 * @returns SSE session for manual management
 */
suspend fun HttpClient.sseSession(
    host: String,
    port: Int,
    path: String
): ClientSSESession

Usage Examples:

val client = HttpClient {
    install(SSE)
}

// Basic SSE connection
client.sse("https://api.example.com/events") {
    // Process incoming server-sent events
    for (event in incoming) {
        when (event.event) {
            "message" -> {
                println("New message: ${event.data}")
            }
            "notification" -> {
                println("Notification: ${event.data}")
                // Parse JSON data if needed
                val notification = Json.decodeFromString<Notification>(event.data ?: "")
                handleNotification(notification)
            }
            "heartbeat" -> {
                println("Server heartbeat received")
            }
            null -> {
                // Default event type
                println("Default event: ${event.data}")
            }
        }
    }
}

// SSE with custom host and port
client.sse("localhost", 8080, "/stream") {
    incoming.consumeEach { event ->
        println("Event ID: ${event.id}")
        println("Event Type: ${event.event}")
        println("Event Data: ${event.data}")
        println("Retry: ${event.retry}")
        println("---")
    }
}

// Manual SSE session management
val session = client.sseSession("https://api.example.com/live-updates")
try {
    while (true) {
        val event = session.incoming.receive()
        processServerSentEvent(event)
    }
} finally {
    session.close()
}

ClientSSESession Interface

Core SSE session interface for event handling and connection management.

/**
 * Client SSE session interface
 */
interface ClientSSESession : CoroutineScope {
    /**
     * Incoming server-sent events channel
     */
    val incoming: ReceiveChannel<ServerSentEvent>
    
    /**
     * HTTP call associated with this SSE session
     */
    val call: HttpClientCall
    
    /**
     * Close the SSE connection
     */
    suspend fun close()
}

/**
 * Default SSE session implementation
 */
class DefaultClientSSESession(
    override val call: HttpClientCall,
    override val coroutineContext: CoroutineContext,
    override val incoming: ReceiveChannel<ServerSentEvent>
) : ClientSSESession

ServerSentEvent Data Class

Representation of individual server-sent events with all SSE fields.

/**
 * Server-sent event representation
 */
data class ServerSentEvent(
    /** Event data payload */
    val data: String?,
    
    /** Event type identifier */
    val event: String?,
    
    /** Event ID for last-event-id tracking */
    val id: String?,
    
    /** Retry timeout in milliseconds */
    val retry: Long?,
    
    /** Comments from the event stream */
    val comments: String?
) {
    companion object {
        /**
         * Create empty SSE event
         * @returns Empty ServerSentEvent
         */
        fun empty(): ServerSentEvent = ServerSentEvent(null, null, null, null, null)
    }
}

Usage Examples:

client.sse("https://api.example.com/events") {
    for (event in incoming) {
        // Access all event fields
        val eventData = event.data ?: "No data"
        val eventType = event.event ?: "message" // Default event type
        val eventId = event.id
        val retryTime = event.retry
        val comments = event.comments
        
        println("Event: $eventType")
        println("Data: $eventData")
        
        // Use event ID for tracking
        if (eventId != null) {
            saveLastEventId(eventId)
        }
        
        // Handle retry timing
        if (retryTime != null) {
            println("Server suggests retry after ${retryTime}ms")
        }
        
        // Process based on event type
        when (eventType) {
            "user-joined" -> {
                val user = Json.decodeFromString<User>(eventData)
                handleUserJoined(user)
            }
            "user-left" -> {
                val user = Json.decodeFromString<User>(eventData)
                handleUserLeft(user)
            }
            "chat-message" -> {
                val message = Json.decodeFromString<ChatMessage>(eventData)
                displayMessage(message)
            }
        }
    }
}

SSE Plugin Configuration

SSE plugin installation and configuration options.

/**
 * SSE plugin object
 */
object SSE : HttpClientPlugin<SSEConfig, SSEConfig> {
    override val key: AttributeKey<SSEConfig>
    
    override fun prepare(block: SSEConfig.() -> Unit): SSEConfig
    override fun install(plugin: SSEConfig, scope: HttpClient)
}

/**
 * SSE configuration
 */
class SSEConfig {
    /** Whether to show comment lines in events */
    var showCommentLines: Boolean = false
    
    /** Whether to show retry directive in events */
    var showRetryDirective: Boolean = false
    
    /** Custom reconnection strategy */
    var reconnectionStrategy: SSEReconnectionStrategy? = null
}

/**
 * SSE reconnection strategy interface
 */
interface SSEReconnectionStrategy {
    /**
     * Determine if reconnection should be attempted
     * @param attempt Reconnection attempt number (starting from 1)
     * @param lastError Last connection error
     * @returns True if should reconnect, false otherwise
     */
    suspend fun shouldReconnect(attempt: Int, lastError: Throwable?): Boolean
    
    /**
     * Calculate delay before reconnection attempt
     * @param attempt Reconnection attempt number
     * @returns Delay in milliseconds
     */
    suspend fun delayBeforeReconnect(attempt: Int): Long
}

Usage Examples:

// Configure SSE plugin
val client = HttpClient {
    install(SSE) {
        showCommentLines = true
        showRetryDirective = true
        
        // Custom reconnection strategy
        reconnectionStrategy = object : SSEReconnectionStrategy {
            override suspend fun shouldReconnect(attempt: Int, lastError: Throwable?): Boolean {
                return attempt <= 5 // Max 5 reconnection attempts
            }
            
            override suspend fun delayBeforeReconnect(attempt: Int): Long {
                return (attempt * 1000).toLong() // Exponential backoff
            }
        }
    }
}

// Use configured SSE client
client.sse("https://api.example.com/events") {
    for (event in incoming) {
        // Comments and retry directives included based on config
        if (event.comments != null) {
            println("Server comment: ${event.comments}")
        }
        
        if (event.retry != null) {
            println("Retry directive: ${event.retry}ms")
        }
        
        // Process event data
        event.data?.let { data ->
            processEventData(data, event.event)
        }
    }
}

SSE Request Configuration

Configure SSE requests with headers, authentication, and parameters.

/**
 * Configure SSE request with custom headers and parameters
 */
suspend fun HttpClient.sse(
    urlString: String,
    block: suspend ClientSSESession.() -> Unit,
    requestBuilder: HttpRequestBuilder.() -> Unit = {}
)

Usage Examples:

// SSE with authentication and custom headers
client.sse("https://api.example.com/private-events", {
    // SSE session handling
    for (event in incoming) {
        handlePrivateEvent(event)
    }
}) {
    // Request configuration
    bearerAuth("your-jwt-token")
    header("X-Client-Version", "1.0")
    parameter("channel", "notifications")
    parameter("user_id", "12345")
    
    // Set Last-Event-ID for resuming
    val lastEventId = getStoredLastEventId()
    if (lastEventId != null) {
        header("Last-Event-ID", lastEventId)
    }
}

// SSE with query parameters for filtering
client.sse("https://api.example.com/events", {
    for (event in incoming) {
        when (event.event) {
            "price-update" -> handlePriceUpdate(event.data)
            "trade-executed" -> handleTradeExecution(event.data)
        }
    }
}) {
    parameter("symbols", "AAPL,GOOGL,MSFT")
    parameter("events", "price-update,trade-executed")
    header("Accept", "text/event-stream")
}

Error Handling and Reconnection

Handle SSE connection errors and implement custom reconnection logic.

/**
 * SSE connection exception
 */
class SSEConnectionException(
    message: String,
    cause: Throwable? = null
) : Exception(message, cause)

/**
 * Handle SSE with error recovery
 */
suspend fun <T> HttpClient.sseWithRetry(
    urlString: String,
    maxRetries: Int = 3,
    retryDelay: Long = 1000,
    block: suspend ClientSSESession.() -> T
): T

Usage Examples:

// Error handling with manual retry
suspend fun connectToSSEWithRetry() {
    var attempt = 0
    val maxAttempts = 5
    
    while (attempt < maxAttempts) {
        try {
            client.sse("https://api.example.com/events") {
                println("Connected to SSE stream (attempt ${attempt + 1})")
                
                for (event in incoming) {
                    processEvent(event)
                }
            }
            break // Success, exit retry loop
        } catch (e: SSEConnectionException) {
            attempt++
            println("SSE connection failed (attempt $attempt): ${e.message}")
            
            if (attempt < maxAttempts) {
                val delay = attempt * 2000L // Exponential backoff
                println("Retrying in ${delay}ms...")
                delay(delay)
            } else {
                println("Max retry attempts reached, giving up")
                throw e
            }
        }
    }
}

// Graceful error handling within SSE session
client.sse("https://api.example.com/events") {
    try {
        for (event in incoming) {
            try {
                processEvent(event)
            } catch (e: Exception) {
                println("Error processing event: ${e.message}")
                // Continue processing other events
            }
        }
    } catch (e: ChannelClosedException) {
        println("SSE channel closed: ${e.message}")
    } catch (e: Exception) {
        println("SSE session error: ${e.message}")
        throw e
    }
}

Types

SSE Types

/**
 * SSE content wrapper for pipeline processing
 */
class SSEClientContent : OutgoingContent.NoContent() {
    override val contentType: ContentType = ContentType.Text.EventStream
}

/**
 * Channel result for SSE operations
 */
sealed class ChannelResult<out T> {
    data class Success<T>(val value: T) : ChannelResult<T>()
    data class Failure(val exception: Throwable) : ChannelResult<Nothing>()
    object Closed : ChannelResult<Nothing>()
    
    val isSuccess: Boolean
    val isFailure: Boolean  
    val isClosed: Boolean
    
    fun getOrNull(): T?
    fun exceptionOrNull(): Throwable?
}

/**
 * Channel closed exception
 */
class ChannelClosedException(
    message: String? = null,
    cause: Throwable? = null
) : Exception(message, cause)

Content Types

/**
 * SSE-specific content types
 */
object ContentType {
    object Text {
        /** Content type for Server-Sent Events */
        val EventStream: ContentType = ContentType("text", "event-stream")
    }
}

/**
 * SSE-specific HTTP headers
 */
object SSEHeaders {
    const val LastEventId = "Last-Event-ID"
    const val CacheControl = "Cache-Control"
    const val Connection = "Connection"
    const val Accept = "Accept"
}

Install with Tessl CLI

npx tessl i tessl/maven-io-ktor--ktor-client-core

docs

client-configuration.md

cookie-management.md

forms-and-uploads.md

http-caching.md

http-requests.md

index.md

plugin-system.md

response-handling.md

server-sent-events.md

websockets.md

tile.json