CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/gradle-io-ktor--ktor-client-core-jvm

Multiplatform asynchronous HTTP client core library for JVM that provides request/response handling, plugin architecture, and extensible HTTP communication capabilities.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

server-sent-events.mddocs/

Server-Sent Events

Server-Sent Events (SSE) client implementation for receiving real-time event streams from servers. SSE provides a unidirectional communication channel from server to client, ideal for live updates, notifications, and streaming data.

Capabilities

SSE Connection Functions

Functions for establishing SSE connections with event stream processing.

/**
 * Connect to SSE endpoint and process events
 */
suspend fun <T> HttpClient.sse(
    request: HttpRequestBuilder.() -> Unit = {},
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean = false,
    showRetryEvents: Boolean = false,
    block: suspend ClientSSESession.() -> T
): T

suspend fun <T> HttpClient.sse(
    host: String = "localhost",
    port: Int = DEFAULT_PORT,
    path: String = "/",
    request: HttpRequestBuilder.() -> Unit = {},
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean = false,
    showRetryEvents: Boolean = false,
    block: suspend ClientSSESession.() -> T
): T

suspend fun <T> HttpClient.sse(
    urlString: String,
    request: HttpRequestBuilder.() -> Unit = {},
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean = false,
    showRetryEvents: Boolean = false,
    block: suspend ClientSSESession.() -> T
): T

suspend fun <T> HttpClient.sse(
    url: Url,
    request: HttpRequestBuilder.() -> Unit = {},
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean = false,
    showRetryEvents: Boolean = false,
    block: suspend ClientSSESession.() -> T
): T

/**
 * Create SSE session without automatic connection management
 */
suspend fun HttpClient.sseSession(
    request: HttpRequestBuilder.() -> Unit = {},
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean = false,
    showRetryEvents: Boolean = false
): ClientSSESession

suspend fun HttpClient.sseSession(
    urlString: String,
    request: HttpRequestBuilder.() -> Unit = {},
    reconnectionTime: Duration? = null,
    showCommentEvents: Boolean = false,
    showRetryEvents: Boolean = false
): ClientSSESession

SSE Session Interface

Session interface for receiving and processing server-sent events.

/**
 * Client SSE session for receiving events
 */
interface ClientSSESession {
    /** Incoming events channel */
    val incoming: ReceiveChannel<ServerSentEvent>
    
    /** Associated HTTP call */
    val call: HttpClientCall
    
    /** Close the SSE connection */
    suspend fun close()
}

/**
 * Default SSE session implementation
 */
class DefaultClientSSESession(
    override val call: HttpClientCall,
    override val incoming: ReceiveChannel<ServerSentEvent>
) : ClientSSESession {
    /** Session coroutine scope */
    val coroutineScope: CoroutineScope
    
    override suspend fun close() {
        incoming.cancel()
        coroutineScope.cancel()
    }
}

Server-Sent Event Structure

Data structure representing individual SSE events.

/**
 * Represents a server-sent event
 */
data class ServerSentEvent(
    /** Event data content */
    val data: String?,
    
    /** Event type identifier */
    val event: String?,
    
    /** Event ID for reconnection */
    val id: String?,
    
    /** Retry interval in milliseconds */
    val retry: Int?,
    
    /** Comment content (if showCommentEvents = true) */
    val comments: String?
) {
    companion object {
        /** Empty event instance */
        val Empty: ServerSentEvent = ServerSentEvent(null, null, null, null, null)
    }
    
    /** Check if event is empty */
    val isEmpty: Boolean
        get() = data == null && event == null && id == null && retry == null && comments == null
}

SSE Plugin Configuration

Plugin for configuring SSE functionality and behavior.

/**
 * Server-Sent Events plugin for client configuration
 */
object SSE : HttpClientPlugin<SSEConfig, SSEConfig> {
    override val key: AttributeKey<SSEConfig>
}

/**
 * SSE configuration options
 */
class SSEConfig {
    /** Default reconnection time */
    var reconnectionTime: Duration? = Duration.ofSeconds(3)
    
    /** Show comment events in the stream */
    var showCommentEvents: Boolean = false
    
    /** Show retry events in the stream */
    var showRetryEvents: Boolean = false
}

Event Stream Processing

Utilities for processing SSE event streams with filtering and transformation.

/**
 * Filter events by type
 */
suspend fun ReceiveChannel<ServerSentEvent>.filterByType(
    eventType: String
): ReceiveChannel<ServerSentEvent>

/**
 * Filter events by data content
 */
suspend fun ReceiveChannel<ServerSentEvent>.filterByData(
    predicate: (String) -> Boolean
): ReceiveChannel<ServerSentEvent>

/**
 * Transform events to specific type
 */
suspend fun <T> ReceiveChannel<ServerSentEvent>.mapEvents(
    transform: (ServerSentEvent) -> T
): ReceiveChannel<T>

/**
 * Collect events into a flow
 */
fun ReceiveChannel<ServerSentEvent>.asFlow(): Flow<ServerSentEvent>

Reconnection Management

Automatic reconnection handling for SSE connections.

/**
 * SSE reconnection configuration
 */
data class SSEReconnectionConfig(
    /** Enable automatic reconnection */
    val enabled: Boolean = true,
    
    /** Maximum number of reconnection attempts */
    val maxAttempts: Int = -1, // Unlimited
    
    /** Initial reconnection delay */
    val initialDelay: Duration = Duration.ofSeconds(1),
    
    /** Maximum reconnection delay */
    val maxDelay: Duration = Duration.ofSeconds(30),
    
    /** Backoff multiplier */
    val backoffMultiplier: Double = 1.5,
    
    /** Last event ID for reconnection */
    var lastEventId: String? = null
)

/**
 * Create SSE connection with automatic reconnection
 */
suspend fun <T> HttpClient.sseWithReconnection(
    url: String,
    config: SSEReconnectionConfig = SSEReconnectionConfig(),
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend ClientSSESession.() -> T
): T

Usage Examples:

import io.ktor.client.*
import io.ktor.client.plugins.sse.*
import io.ktor.sse.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val client = HttpClient {
    install(SSE) {
        reconnectionTime = Duration.ofSeconds(5)
        showCommentEvents = false
        showRetryEvents = true
    }
}

// Basic SSE connection
client.sse("https://api.example.com/events") {
    for (event in incoming) {
        when {
            event.data != null -> {
                println("Event data: ${event.data}")
                
                // Process different event types
                when (event.event) {
                    "user-joined" -> println("User joined: ${event.data}")
                    "user-left" -> println("User left: ${event.data}")
                    "message" -> println("New message: ${event.data}")
                    else -> println("Unknown event type: ${event.event}")
                }
            }
            event.retry != null -> {
                println("Server requested retry in ${event.retry}ms")
            }
            event.comments != null -> {
                println("Server comment: ${event.comments}")
            }
        }
    }
}

// SSE with custom headers and authentication
client.sse("https://api.example.com/live-feed") {
    headers {
        append("Authorization", "Bearer your-token-here")
        append("Accept", "text/event-stream")
        append("Cache-Control", "no-cache")
    }
    
    parameter("channel", "notifications")
    parameter("filter", "important")
} {
    println("Connected to live feed")
    
    for (event in incoming) {
        if (event.data != null) {
            // Parse JSON event data
            try {
                val eventData = Json.decodeFromString<NotificationEvent>(event.data)
                when (eventData.type) {
                    "alert" -> handleAlert(eventData.payload)
                    "update" -> handleUpdate(eventData.payload)
                    "status" -> handleStatus(eventData.payload)
                }
            } catch (e: Exception) {
                println("Failed to parse event: ${e.message}")
            }
        }
    }
}

// SSE session with manual management
val session = client.sseSession("https://api.example.com/stream") {
    headers {
        append("X-Client-ID", "client-123")
    }
}

// Process events in background
val eventJob = launch {
    try {
        for (event in session.incoming) {
            if (event.data != null) {
                println("Received: ${event.data}")
                
                // Store last event ID for potential reconnection
                event.id?.let { eventId ->
                    // Save for reconnection
                    saveLastEventId(eventId)
                }
            }
        }
    } catch (e: Exception) {
        println("Event processing error: ${e.message}")
    }
}

// Do other work...
delay(60000) // Process events for 1 minute

// Clean shutdown
eventJob.cancel()
session.close()

// SSE with event filtering and transformation
client.sse("https://api.example.com/activity-stream") {
    showCommentEvents = true
} {
    incoming
        .asFlow()
        .filter { it.event == "user-activity" || it.event == "system-alert" }
        .map { event ->
            ActivityEvent(
                type = event.event ?: "unknown",
                data = event.data ?: "",
                timestamp = System.currentTimeMillis(),
                id = event.id
            )
        }
        .collect { activity ->
            println("Activity: ${activity.type} - ${activity.data}")
            
            // Handle specific activities
            when (activity.type) {
                "system-alert" -> handleSystemAlert(activity)
                "user-activity" -> handleUserActivity(activity)
            }
        }
}

// SSE with reconnection handling
suspend fun connectWithReconnection(maxAttempts: Int = 5) {
    var attempt = 0
    var lastEventId: String? = null
    
    while (attempt < maxAttempts) {
        try {
            client.sse("https://api.example.com/events") {
                // Include Last-Event-ID for reconnection
                lastEventId?.let { id ->
                    headers {
                        append("Last-Event-ID", id)
                    }
                }
            } {
                println("Connected (attempt ${attempt + 1})")
                attempt = 0 // Reset on successful connection
                
                for (event in incoming) {
                    if (event.data != null) {
                        println("Event: ${event.data}")
                        
                        // Track last event ID
                        event.id?.let { lastEventId = it }
                    }
                }
            }
            
            // If we get here, connection ended normally
            break
            
        } catch (e: Exception) {
            attempt++
            println("Connection failed (attempt $attempt): ${e.message}")
            
            if (attempt < maxAttempts) {
                val delay = minOf(1000L * (1 shl attempt), 30000L) // Exponential backoff
                println("Reconnecting in ${delay}ms...")
                delay(delay)
            } else {
                println("Max reconnection attempts reached")
                throw e
            }
        }
    }
}

// Use reconnection function
connectWithReconnection()

// SSE for real-time dashboard updates
client.sse("https://dashboard.example.com/metrics-stream") {
    parameter("metrics", "cpu,memory,disk")
    parameter("interval", "1000") // 1 second updates
} {
    for (event in incoming) {
        when (event.event) {
            "metric-update" -> {
                val metrics = Json.decodeFromString<SystemMetrics>(event.data!!)
                updateDashboard(metrics)
            }
            "alert" -> {
                val alert = Json.decodeFromString<Alert>(event.data!!)
                showAlert(alert)
            }
            "heartbeat" -> {
                println("Server heartbeat - connection alive")
            }
        }
    }
}

// SSE for chat application
client.sse("wss://chat.example.com/room/123/events") {
    headers {
        append("Authorization", "Bearer $chatToken")
    }
} {
    for (event in incoming) {
        when (event.event) {
            "message" -> {
                val message = Json.decodeFromString<ChatMessage>(event.data!!)
                displayMessage(message)
            }
            "user-typing" -> {
                val typing = Json.decodeFromString<TypingIndicator>(event.data!!)
                showTypingIndicator(typing)
            }
            "user-joined" -> {
                val user = Json.decodeFromString<User>(event.data!!)
                addUserToRoom(user)
            }
            "user-left" -> {
                val user = Json.decodeFromString<User>(event.data!!)
                removeUserFromRoom(user)
            }
        }
    }
}

client.close()

// Data classes for examples
@Serializable
data class NotificationEvent(val type: String, val payload: JsonObject)

@Serializable  
data class ActivityEvent(val type: String, val data: String, val timestamp: Long, val id: String?)

@Serializable
data class SystemMetrics(val cpu: Double, val memory: Double, val disk: Double)

@Serializable
data class Alert(val level: String, val message: String, val timestamp: Long)

@Serializable
data class ChatMessage(val id: String, val user: String, val text: String, val timestamp: Long)

@Serializable
data class TypingIndicator(val user: String, val isTyping: Boolean)

@Serializable
data class User(val id: String, val name: String, val avatar: String?)

Event Stream Parsing

Low-level SSE parsing utilities for custom implementations.

/**
 * Parse SSE event from raw text lines
 */
fun parseServerSentEvent(lines: List<String>): ServerSentEvent

/**
 * SSE event parser state
 */
class SSEEventParser {
    /** Parse a line and update internal state */
    fun parseLine(line: String)
    
    /** Get current event and reset parser */
    fun getCurrentEvent(): ServerSentEvent?
    
    /** Reset parser state */
    fun reset()
}

/**
 * Create SSE event parser
 */
fun createSSEParser(): SSEEventParser

Error Handling

SSE-specific error handling and connection management.

/**
 * SSE connection exception
 */
class SSEException(
    message: String,
    cause: Throwable? = null
) : Exception(message, cause)

/**
 * SSE connection closed exception
 */
class SSEConnectionClosedException(
    message: String = "SSE connection was closed"
) : SSEException(message)

/**
 * SSE parsing exception
 */
class SSEParsingException(
    message: String,
    val rawData: String,
    cause: Throwable? = null
) : SSEException(message, cause)

docs

content-handling.md

core-client.md

engine-configuration.md

index.md

plugin-system.md

request-building.md

response-processing.md

server-sent-events.md

websocket-support.md

tile.json