0
# Server-Sent Events (SSE)
1
2
Server-Sent Events client implementation for real-time event streaming with automatic reconnection and event parsing.
3
4
## Capabilities
5
6
### SSE Connection Functions
7
8
Establish Server-Sent Events connections for real-time data streaming.
9
10
```kotlin { .api }
11
/**
12
* Establish SSE connection with URL string
13
* @param urlString SSE endpoint URL
14
* @param block SSE session handling block
15
*/
16
suspend fun HttpClient.sse(
17
urlString: String,
18
block: suspend ClientSSESession.() -> Unit
19
)
20
21
/**
22
* Establish SSE connection with detailed configuration
23
* @param host SSE server host
24
* @param port SSE server port (default: 80 for HTTP, 443 for HTTPS)
25
* @param path SSE endpoint path
26
* @param block SSE session handling block
27
*/
28
suspend fun HttpClient.sse(
29
host: String,
30
port: Int = DEFAULT_PORT,
31
path: String = "/",
32
block: suspend ClientSSESession.() -> Unit
33
)
34
35
/**
36
* Get SSE session without automatic event handling
37
* @param urlString SSE endpoint URL
38
* @returns SSE session for manual management
39
*/
40
suspend fun HttpClient.sseSession(urlString: String): ClientSSESession
41
42
/**
43
* Get SSE session with detailed configuration
44
* @param host SSE server host
45
* @param port SSE server port
46
* @param path SSE endpoint path
47
* @returns SSE session for manual management
48
*/
49
suspend fun HttpClient.sseSession(
50
host: String,
51
port: Int,
52
path: String
53
): ClientSSESession
54
```
55
56
**Usage Examples:**
57
58
```kotlin
59
val client = HttpClient {
60
install(SSE)
61
}
62
63
// Basic SSE connection
64
client.sse("https://api.example.com/events") {
65
// Process incoming server-sent events
66
for (event in incoming) {
67
when (event.event) {
68
"message" -> {
69
println("New message: ${event.data}")
70
}
71
"notification" -> {
72
println("Notification: ${event.data}")
73
// Parse JSON data if needed
74
val notification = Json.decodeFromString<Notification>(event.data ?: "")
75
handleNotification(notification)
76
}
77
"heartbeat" -> {
78
println("Server heartbeat received")
79
}
80
null -> {
81
// Default event type
82
println("Default event: ${event.data}")
83
}
84
}
85
}
86
}
87
88
// SSE with custom host and port
89
client.sse("localhost", 8080, "/stream") {
90
incoming.consumeEach { event ->
91
println("Event ID: ${event.id}")
92
println("Event Type: ${event.event}")
93
println("Event Data: ${event.data}")
94
println("Retry: ${event.retry}")
95
println("---")
96
}
97
}
98
99
// Manual SSE session management
100
val session = client.sseSession("https://api.example.com/live-updates")
101
try {
102
while (true) {
103
val event = session.incoming.receive()
104
processServerSentEvent(event)
105
}
106
} finally {
107
session.close()
108
}
109
```
110
111
### ClientSSESession Interface
112
113
Core SSE session interface for event handling and connection management.
114
115
```kotlin { .api }
116
/**
117
* Client SSE session interface
118
*/
119
interface ClientSSESession : CoroutineScope {
120
/**
121
* Incoming server-sent events channel
122
*/
123
val incoming: ReceiveChannel<ServerSentEvent>
124
125
/**
126
* HTTP call associated with this SSE session
127
*/
128
val call: HttpClientCall
129
130
/**
131
* Close the SSE connection
132
*/
133
suspend fun close()
134
}
135
136
/**
137
* Default SSE session implementation
138
*/
139
class DefaultClientSSESession(
140
override val call: HttpClientCall,
141
override val coroutineContext: CoroutineContext,
142
override val incoming: ReceiveChannel<ServerSentEvent>
143
) : ClientSSESession
144
```
145
146
### ServerSentEvent Data Class
147
148
Representation of individual server-sent events with all SSE fields.
149
150
```kotlin { .api }
151
/**
152
* Server-sent event representation
153
*/
154
data class ServerSentEvent(
155
/** Event data payload */
156
val data: String?,
157
158
/** Event type identifier */
159
val event: String?,
160
161
/** Event ID for last-event-id tracking */
162
val id: String?,
163
164
/** Retry timeout in milliseconds */
165
val retry: Long?,
166
167
/** Comments from the event stream */
168
val comments: String?
169
) {
170
companion object {
171
/**
172
* Create empty SSE event
173
* @returns Empty ServerSentEvent
174
*/
175
fun empty(): ServerSentEvent = ServerSentEvent(null, null, null, null, null)
176
}
177
}
178
```
179
180
**Usage Examples:**
181
182
```kotlin
183
client.sse("https://api.example.com/events") {
184
for (event in incoming) {
185
// Access all event fields
186
val eventData = event.data ?: "No data"
187
val eventType = event.event ?: "message" // Default event type
188
val eventId = event.id
189
val retryTime = event.retry
190
val comments = event.comments
191
192
println("Event: $eventType")
193
println("Data: $eventData")
194
195
// Use event ID for tracking
196
if (eventId != null) {
197
saveLastEventId(eventId)
198
}
199
200
// Handle retry timing
201
if (retryTime != null) {
202
println("Server suggests retry after ${retryTime}ms")
203
}
204
205
// Process based on event type
206
when (eventType) {
207
"user-joined" -> {
208
val user = Json.decodeFromString<User>(eventData)
209
handleUserJoined(user)
210
}
211
"user-left" -> {
212
val user = Json.decodeFromString<User>(eventData)
213
handleUserLeft(user)
214
}
215
"chat-message" -> {
216
val message = Json.decodeFromString<ChatMessage>(eventData)
217
displayMessage(message)
218
}
219
}
220
}
221
}
222
```
223
224
### SSE Plugin Configuration
225
226
SSE plugin installation and configuration options.
227
228
```kotlin { .api }
229
/**
230
* SSE plugin object
231
*/
232
object SSE : HttpClientPlugin<SSEConfig, SSEConfig> {
233
override val key: AttributeKey<SSEConfig>
234
235
override fun prepare(block: SSEConfig.() -> Unit): SSEConfig
236
override fun install(plugin: SSEConfig, scope: HttpClient)
237
}
238
239
/**
240
* SSE configuration
241
*/
242
class SSEConfig {
243
/** Whether to show comment lines in events */
244
var showCommentLines: Boolean = false
245
246
/** Whether to show retry directive in events */
247
var showRetryDirective: Boolean = false
248
249
/** Custom reconnection strategy */
250
var reconnectionStrategy: SSEReconnectionStrategy? = null
251
}
252
253
/**
254
* SSE reconnection strategy interface
255
*/
256
interface SSEReconnectionStrategy {
257
/**
258
* Determine if reconnection should be attempted
259
* @param attempt Reconnection attempt number (starting from 1)
260
* @param lastError Last connection error
261
* @returns True if should reconnect, false otherwise
262
*/
263
suspend fun shouldReconnect(attempt: Int, lastError: Throwable?): Boolean
264
265
/**
266
* Calculate delay before reconnection attempt
267
* @param attempt Reconnection attempt number
268
* @returns Delay in milliseconds
269
*/
270
suspend fun delayBeforeReconnect(attempt: Int): Long
271
}
272
```
273
274
**Usage Examples:**
275
276
```kotlin
277
// Configure SSE plugin
278
val client = HttpClient {
279
install(SSE) {
280
showCommentLines = true
281
showRetryDirective = true
282
283
// Custom reconnection strategy
284
reconnectionStrategy = object : SSEReconnectionStrategy {
285
override suspend fun shouldReconnect(attempt: Int, lastError: Throwable?): Boolean {
286
return attempt <= 5 // Max 5 reconnection attempts
287
}
288
289
override suspend fun delayBeforeReconnect(attempt: Int): Long {
290
return (attempt * 1000).toLong() // Exponential backoff
291
}
292
}
293
}
294
}
295
296
// Use configured SSE client
297
client.sse("https://api.example.com/events") {
298
for (event in incoming) {
299
// Comments and retry directives included based on config
300
if (event.comments != null) {
301
println("Server comment: ${event.comments}")
302
}
303
304
if (event.retry != null) {
305
println("Retry directive: ${event.retry}ms")
306
}
307
308
// Process event data
309
event.data?.let { data ->
310
processEventData(data, event.event)
311
}
312
}
313
}
314
```
315
316
### SSE Request Configuration
317
318
Configure SSE requests with headers, authentication, and parameters.
319
320
```kotlin { .api }
321
/**
322
* Configure SSE request with custom headers and parameters
323
*/
324
suspend fun HttpClient.sse(
325
urlString: String,
326
block: suspend ClientSSESession.() -> Unit,
327
requestBuilder: HttpRequestBuilder.() -> Unit = {}
328
)
329
```
330
331
**Usage Examples:**
332
333
```kotlin
334
// SSE with authentication and custom headers
335
client.sse("https://api.example.com/private-events", {
336
// SSE session handling
337
for (event in incoming) {
338
handlePrivateEvent(event)
339
}
340
}) {
341
// Request configuration
342
bearerAuth("your-jwt-token")
343
header("X-Client-Version", "1.0")
344
parameter("channel", "notifications")
345
parameter("user_id", "12345")
346
347
// Set Last-Event-ID for resuming
348
val lastEventId = getStoredLastEventId()
349
if (lastEventId != null) {
350
header("Last-Event-ID", lastEventId)
351
}
352
}
353
354
// SSE with query parameters for filtering
355
client.sse("https://api.example.com/events", {
356
for (event in incoming) {
357
when (event.event) {
358
"price-update" -> handlePriceUpdate(event.data)
359
"trade-executed" -> handleTradeExecution(event.data)
360
}
361
}
362
}) {
363
parameter("symbols", "AAPL,GOOGL,MSFT")
364
parameter("events", "price-update,trade-executed")
365
header("Accept", "text/event-stream")
366
}
367
```
368
369
### Error Handling and Reconnection
370
371
Handle SSE connection errors and implement custom reconnection logic.
372
373
```kotlin { .api }
374
/**
375
* SSE connection exception
376
*/
377
class SSEConnectionException(
378
message: String,
379
cause: Throwable? = null
380
) : Exception(message, cause)
381
382
/**
383
* Handle SSE with error recovery
384
*/
385
suspend fun <T> HttpClient.sseWithRetry(
386
urlString: String,
387
maxRetries: Int = 3,
388
retryDelay: Long = 1000,
389
block: suspend ClientSSESession.() -> T
390
): T
391
```
392
393
**Usage Examples:**
394
395
```kotlin
396
// Error handling with manual retry
397
suspend fun connectToSSEWithRetry() {
398
var attempt = 0
399
val maxAttempts = 5
400
401
while (attempt < maxAttempts) {
402
try {
403
client.sse("https://api.example.com/events") {
404
println("Connected to SSE stream (attempt ${attempt + 1})")
405
406
for (event in incoming) {
407
processEvent(event)
408
}
409
}
410
break // Success, exit retry loop
411
} catch (e: SSEConnectionException) {
412
attempt++
413
println("SSE connection failed (attempt $attempt): ${e.message}")
414
415
if (attempt < maxAttempts) {
416
val delay = attempt * 2000L // Exponential backoff
417
println("Retrying in ${delay}ms...")
418
delay(delay)
419
} else {
420
println("Max retry attempts reached, giving up")
421
throw e
422
}
423
}
424
}
425
}
426
427
// Graceful error handling within SSE session
428
client.sse("https://api.example.com/events") {
429
try {
430
for (event in incoming) {
431
try {
432
processEvent(event)
433
} catch (e: Exception) {
434
println("Error processing event: ${e.message}")
435
// Continue processing other events
436
}
437
}
438
} catch (e: ChannelClosedException) {
439
println("SSE channel closed: ${e.message}")
440
} catch (e: Exception) {
441
println("SSE session error: ${e.message}")
442
throw e
443
}
444
}
445
```
446
447
## Types
448
449
### SSE Types
450
451
```kotlin { .api }
452
/**
453
* SSE content wrapper for pipeline processing
454
*/
455
class SSEClientContent : OutgoingContent.NoContent() {
456
override val contentType: ContentType = ContentType.Text.EventStream
457
}
458
459
/**
460
* Channel result for SSE operations
461
*/
462
sealed class ChannelResult<out T> {
463
data class Success<T>(val value: T) : ChannelResult<T>()
464
data class Failure(val exception: Throwable) : ChannelResult<Nothing>()
465
object Closed : ChannelResult<Nothing>()
466
467
val isSuccess: Boolean
468
val isFailure: Boolean
469
val isClosed: Boolean
470
471
fun getOrNull(): T?
472
fun exceptionOrNull(): Throwable?
473
}
474
475
/**
476
* Channel closed exception
477
*/
478
class ChannelClosedException(
479
message: String? = null,
480
cause: Throwable? = null
481
) : Exception(message, cause)
482
```
483
484
### Content Types
485
486
```kotlin { .api }
487
/**
488
* SSE-specific content types
489
*/
490
object ContentType {
491
object Text {
492
/** Content type for Server-Sent Events */
493
val EventStream: ContentType = ContentType("text", "event-stream")
494
}
495
}
496
497
/**
498
* SSE-specific HTTP headers
499
*/
500
object SSEHeaders {
501
const val LastEventId = "Last-Event-ID"
502
const val CacheControl = "Cache-Control"
503
const val Connection = "Connection"
504
const val Accept = "Accept"
505
}
506
```