Ktor HTTP Client Core - a multiplatform asynchronous HTTP client library for Kotlin providing comprehensive HTTP request/response handling with plugin architecture.
—
Server-Sent Events client implementation for real-time event streaming with automatic reconnection and event parsing.
Establish Server-Sent Events connections for real-time data streaming.
/**
* Establish SSE connection with URL string
* @param urlString SSE endpoint URL
* @param block SSE session handling block
*/
suspend fun HttpClient.sse(
urlString: String,
block: suspend ClientSSESession.() -> Unit
)
/**
* Establish SSE connection with detailed configuration
* @param host SSE server host
* @param port SSE server port (default: 80 for HTTP, 443 for HTTPS)
* @param path SSE endpoint path
* @param block SSE session handling block
*/
suspend fun HttpClient.sse(
host: String,
port: Int = DEFAULT_PORT,
path: String = "/",
block: suspend ClientSSESession.() -> Unit
)
/**
* Get SSE session without automatic event handling
* @param urlString SSE endpoint URL
* @returns SSE session for manual management
*/
suspend fun HttpClient.sseSession(urlString: String): ClientSSESession
/**
* Get SSE session with detailed configuration
* @param host SSE server host
* @param port SSE server port
* @param path SSE endpoint path
* @returns SSE session for manual management
*/
suspend fun HttpClient.sseSession(
host: String,
port: Int,
path: String
): ClientSSESessionUsage Examples:
val client = HttpClient {
install(SSE)
}
// Basic SSE connection
client.sse("https://api.example.com/events") {
// Process incoming server-sent events
for (event in incoming) {
when (event.event) {
"message" -> {
println("New message: ${event.data}")
}
"notification" -> {
println("Notification: ${event.data}")
// Parse JSON data if needed
val notification = Json.decodeFromString<Notification>(event.data ?: "")
handleNotification(notification)
}
"heartbeat" -> {
println("Server heartbeat received")
}
null -> {
// Default event type
println("Default event: ${event.data}")
}
}
}
}
// SSE with custom host and port
client.sse("localhost", 8080, "/stream") {
incoming.consumeEach { event ->
println("Event ID: ${event.id}")
println("Event Type: ${event.event}")
println("Event Data: ${event.data}")
println("Retry: ${event.retry}")
println("---")
}
}
// Manual SSE session management
val session = client.sseSession("https://api.example.com/live-updates")
try {
while (true) {
val event = session.incoming.receive()
processServerSentEvent(event)
}
} finally {
session.close()
}Core SSE session interface for event handling and connection management.
/**
* Client SSE session interface
*/
interface ClientSSESession : CoroutineScope {
/**
* Incoming server-sent events channel
*/
val incoming: ReceiveChannel<ServerSentEvent>
/**
* HTTP call associated with this SSE session
*/
val call: HttpClientCall
/**
* Close the SSE connection
*/
suspend fun close()
}
/**
* Default SSE session implementation
*/
class DefaultClientSSESession(
override val call: HttpClientCall,
override val coroutineContext: CoroutineContext,
override val incoming: ReceiveChannel<ServerSentEvent>
) : ClientSSESessionRepresentation of individual server-sent events with all SSE fields.
/**
* Server-sent event representation
*/
data class ServerSentEvent(
/** Event data payload */
val data: String?,
/** Event type identifier */
val event: String?,
/** Event ID for last-event-id tracking */
val id: String?,
/** Retry timeout in milliseconds */
val retry: Long?,
/** Comments from the event stream */
val comments: String?
) {
companion object {
/**
* Create empty SSE event
* @returns Empty ServerSentEvent
*/
fun empty(): ServerSentEvent = ServerSentEvent(null, null, null, null, null)
}
}Usage Examples:
client.sse("https://api.example.com/events") {
for (event in incoming) {
// Access all event fields
val eventData = event.data ?: "No data"
val eventType = event.event ?: "message" // Default event type
val eventId = event.id
val retryTime = event.retry
val comments = event.comments
println("Event: $eventType")
println("Data: $eventData")
// Use event ID for tracking
if (eventId != null) {
saveLastEventId(eventId)
}
// Handle retry timing
if (retryTime != null) {
println("Server suggests retry after ${retryTime}ms")
}
// Process based on event type
when (eventType) {
"user-joined" -> {
val user = Json.decodeFromString<User>(eventData)
handleUserJoined(user)
}
"user-left" -> {
val user = Json.decodeFromString<User>(eventData)
handleUserLeft(user)
}
"chat-message" -> {
val message = Json.decodeFromString<ChatMessage>(eventData)
displayMessage(message)
}
}
}
}SSE plugin installation and configuration options.
/**
* SSE plugin object
*/
object SSE : HttpClientPlugin<SSEConfig, SSEConfig> {
override val key: AttributeKey<SSEConfig>
override fun prepare(block: SSEConfig.() -> Unit): SSEConfig
override fun install(plugin: SSEConfig, scope: HttpClient)
}
/**
* SSE configuration
*/
class SSEConfig {
/** Whether to show comment lines in events */
var showCommentLines: Boolean = false
/** Whether to show retry directive in events */
var showRetryDirective: Boolean = false
/** Custom reconnection strategy */
var reconnectionStrategy: SSEReconnectionStrategy? = null
}
/**
* SSE reconnection strategy interface
*/
interface SSEReconnectionStrategy {
/**
* Determine if reconnection should be attempted
* @param attempt Reconnection attempt number (starting from 1)
* @param lastError Last connection error
* @returns True if should reconnect, false otherwise
*/
suspend fun shouldReconnect(attempt: Int, lastError: Throwable?): Boolean
/**
* Calculate delay before reconnection attempt
* @param attempt Reconnection attempt number
* @returns Delay in milliseconds
*/
suspend fun delayBeforeReconnect(attempt: Int): Long
}Usage Examples:
// Configure SSE plugin
val client = HttpClient {
install(SSE) {
showCommentLines = true
showRetryDirective = true
// Custom reconnection strategy
reconnectionStrategy = object : SSEReconnectionStrategy {
override suspend fun shouldReconnect(attempt: Int, lastError: Throwable?): Boolean {
return attempt <= 5 // Max 5 reconnection attempts
}
override suspend fun delayBeforeReconnect(attempt: Int): Long {
return (attempt * 1000).toLong() // Exponential backoff
}
}
}
}
// Use configured SSE client
client.sse("https://api.example.com/events") {
for (event in incoming) {
// Comments and retry directives included based on config
if (event.comments != null) {
println("Server comment: ${event.comments}")
}
if (event.retry != null) {
println("Retry directive: ${event.retry}ms")
}
// Process event data
event.data?.let { data ->
processEventData(data, event.event)
}
}
}Configure SSE requests with headers, authentication, and parameters.
/**
* Configure SSE request with custom headers and parameters
*/
suspend fun HttpClient.sse(
urlString: String,
block: suspend ClientSSESession.() -> Unit,
requestBuilder: HttpRequestBuilder.() -> Unit = {}
)Usage Examples:
// SSE with authentication and custom headers
client.sse("https://api.example.com/private-events", {
// SSE session handling
for (event in incoming) {
handlePrivateEvent(event)
}
}) {
// Request configuration
bearerAuth("your-jwt-token")
header("X-Client-Version", "1.0")
parameter("channel", "notifications")
parameter("user_id", "12345")
// Set Last-Event-ID for resuming
val lastEventId = getStoredLastEventId()
if (lastEventId != null) {
header("Last-Event-ID", lastEventId)
}
}
// SSE with query parameters for filtering
client.sse("https://api.example.com/events", {
for (event in incoming) {
when (event.event) {
"price-update" -> handlePriceUpdate(event.data)
"trade-executed" -> handleTradeExecution(event.data)
}
}
}) {
parameter("symbols", "AAPL,GOOGL,MSFT")
parameter("events", "price-update,trade-executed")
header("Accept", "text/event-stream")
}Handle SSE connection errors and implement custom reconnection logic.
/**
* SSE connection exception
*/
class SSEConnectionException(
message: String,
cause: Throwable? = null
) : Exception(message, cause)
/**
* Handle SSE with error recovery
*/
suspend fun <T> HttpClient.sseWithRetry(
urlString: String,
maxRetries: Int = 3,
retryDelay: Long = 1000,
block: suspend ClientSSESession.() -> T
): TUsage Examples:
// Error handling with manual retry
suspend fun connectToSSEWithRetry() {
var attempt = 0
val maxAttempts = 5
while (attempt < maxAttempts) {
try {
client.sse("https://api.example.com/events") {
println("Connected to SSE stream (attempt ${attempt + 1})")
for (event in incoming) {
processEvent(event)
}
}
break // Success, exit retry loop
} catch (e: SSEConnectionException) {
attempt++
println("SSE connection failed (attempt $attempt): ${e.message}")
if (attempt < maxAttempts) {
val delay = attempt * 2000L // Exponential backoff
println("Retrying in ${delay}ms...")
delay(delay)
} else {
println("Max retry attempts reached, giving up")
throw e
}
}
}
}
// Graceful error handling within SSE session
client.sse("https://api.example.com/events") {
try {
for (event in incoming) {
try {
processEvent(event)
} catch (e: Exception) {
println("Error processing event: ${e.message}")
// Continue processing other events
}
}
} catch (e: ChannelClosedException) {
println("SSE channel closed: ${e.message}")
} catch (e: Exception) {
println("SSE session error: ${e.message}")
throw e
}
}/**
* SSE content wrapper for pipeline processing
*/
class SSEClientContent : OutgoingContent.NoContent() {
override val contentType: ContentType = ContentType.Text.EventStream
}
/**
* Channel result for SSE operations
*/
sealed class ChannelResult<out T> {
data class Success<T>(val value: T) : ChannelResult<T>()
data class Failure(val exception: Throwable) : ChannelResult<Nothing>()
object Closed : ChannelResult<Nothing>()
val isSuccess: Boolean
val isFailure: Boolean
val isClosed: Boolean
fun getOrNull(): T?
fun exceptionOrNull(): Throwable?
}
/**
* Channel closed exception
*/
class ChannelClosedException(
message: String? = null,
cause: Throwable? = null
) : Exception(message, cause)/**
* SSE-specific content types
*/
object ContentType {
object Text {
/** Content type for Server-Sent Events */
val EventStream: ContentType = ContentType("text", "event-stream")
}
}
/**
* SSE-specific HTTP headers
*/
object SSEHeaders {
const val LastEventId = "Last-Event-ID"
const val CacheControl = "Cache-Control"
const val Connection = "Connection"
const val Accept = "Accept"
}Install with Tessl CLI
npx tessl i tessl/maven-io-ktor--ktor-client-core