0
# WebSocket Support
1
2
The Ktor HTTP Client Core provides comprehensive WebSocket client functionality through the `WebSockets` plugin. This enables establishing and managing WebSocket connections with session management, ping/pong handling, message serialization, and full integration with Ktor's coroutine-based architecture.
3
4
## Core WebSocket API
5
6
### WebSockets Plugin
7
8
The main plugin for WebSocket client functionality that handles connection establishment and session management.
9
10
```kotlin { .api }
11
object WebSockets : HttpClientPlugin<WebSockets.Config, WebSockets> {
12
class Config {
13
var pingInterval: Duration? = null
14
var maxFrameSize: Long = Long.MAX_VALUE
15
var masking: Boolean = true
16
var extensions: MutableList<WebSocketExtension<*>> = mutableListOf()
17
18
fun pingInterval(duration: Duration)
19
fun maxFrameSize(size: Long)
20
fun masking(enabled: Boolean)
21
fun extensions(vararg extensions: WebSocketExtension<*>)
22
}
23
}
24
```
25
26
### ClientWebSocketSession
27
28
Interface representing a client-side WebSocket session with full duplex communication capabilities.
29
30
```kotlin { .api }
31
interface ClientWebSocketSession : WebSocketSession {
32
val call: HttpClientCall
33
34
// Inherited from WebSocketSession
35
val incoming: ReceiveChannel<Frame>
36
val outgoing: SendChannel<Frame>
37
val closeReason: Deferred<CloseReason?>
38
val extensions: List<WebSocketExtension<*>>
39
40
// Send text message
41
suspend fun send(data: String)
42
43
// Send binary message
44
suspend fun send(data: ByteArray)
45
46
// Send frame
47
suspend fun send(frame: Frame)
48
49
// Close connection
50
suspend fun close(reason: CloseReason? = null)
51
}
52
```
53
54
### DefaultClientWebSocketSession
55
56
Default implementation of `ClientWebSocketSession` with standard WebSocket protocol handling.
57
58
```kotlin { .api }
59
class DefaultClientWebSocketSession(
60
private val call: HttpClientCall,
61
delegate: WebSocketSession
62
) : ClientWebSocketSession, WebSocketSession by delegate {
63
override val call: HttpClientCall get() = call
64
}
65
```
66
67
## WebSocket Connection Builders
68
69
### Basic WebSocket Connection
70
71
Extension functions for establishing WebSocket connections with various configuration options.
72
73
```kotlin { .api }
74
// Basic WebSocket connection
75
suspend fun HttpClient.webSocket(
76
method: HttpMethod = HttpMethod.Get,
77
host: String? = null,
78
port: Int? = null,
79
path: String? = null,
80
request: HttpRequestBuilder.() -> Unit = {},
81
block: suspend ClientWebSocketSession.() -> Unit
82
)
83
84
suspend fun HttpClient.webSocket(
85
url: String,
86
request: HttpRequestBuilder.() -> Unit = {},
87
block: suspend ClientWebSocketSession.() -> Unit
88
)
89
90
suspend fun HttpClient.webSocket(
91
url: Url,
92
request: HttpRequestBuilder.() -> Unit = {},
93
block: suspend ClientWebSocketSession.() -> Unit
94
)
95
96
// Secure WebSocket connection (WSS)
97
suspend fun HttpClient.wss(
98
method: HttpMethod = HttpMethod.Get,
99
host: String? = null,
100
port: Int? = null,
101
path: String? = null,
102
request: HttpRequestBuilder.() -> Unit = {},
103
block: suspend ClientWebSocketSession.() -> Unit
104
)
105
106
// WebSocket session without automatic connection handling
107
suspend fun HttpClient.webSocketSession(
108
method: HttpMethod = HttpMethod.Get,
109
host: String? = null,
110
port: Int? = null,
111
path: String? = null,
112
request: HttpRequestBuilder.() -> Unit = {}
113
): ClientWebSocketSession
114
115
suspend fun HttpClient.webSocketSession(
116
url: String,
117
request: HttpRequestBuilder.() -> Unit = {}
118
): ClientWebSocketSession
119
120
suspend fun HttpClient.ws(
121
method: HttpMethod = HttpMethod.Get,
122
host: String? = null,
123
port: Int? = null,
124
path: String? = null,
125
request: HttpRequestBuilder.() -> Unit = {},
126
block: suspend ClientWebSocketSession.() -> Unit
127
)
128
```
129
130
## Frame Types
131
132
### WebSocket Frame Hierarchy
133
134
```kotlin { .api }
135
sealed class Frame {
136
abstract val fin: Boolean
137
abstract val data: ByteArray
138
139
data class Text(val data: ByteArray, override val fin: Boolean = true) : Frame() {
140
constructor(text: String) : this(text.toByteArray(Charsets.UTF_8))
141
fun readText(): String = data.toString(Charsets.UTF_8)
142
}
143
144
data class Binary(override val data: ByteArray, override val fin: Boolean = true) : Frame()
145
146
data class Close(val data: ByteArray) : Frame() {
147
constructor(reason: CloseReason) : this(reason.toByteArray())
148
fun readReason(): CloseReason? = CloseReason.parse(data)
149
}
150
151
data class Ping(override val data: ByteArray) : Frame()
152
data class Pong(override val data: ByteArray) : Frame()
153
}
154
155
data class CloseReason(val code: Short, val message: String) {
156
companion object {
157
val NORMAL = CloseReason(1000, "Normal Closure")
158
val GOING_AWAY = CloseReason(1001, "Going Away")
159
val PROTOCOL_ERROR = CloseReason(1002, "Protocol Error")
160
val CANNOT_ACCEPT = CloseReason(1003, "Cannot Accept")
161
val NOT_CONSISTENT = CloseReason(1007, "Not Consistent")
162
val VIOLATED_POLICY = CloseReason(1008, "Violated Policy")
163
val TOO_BIG = CloseReason(1009, "Too Big")
164
val NO_EXTENSION = CloseReason(1010, "No Extension")
165
val INTERNAL_ERROR = CloseReason(1011, "Internal Error")
166
val SERVICE_RESTART = CloseReason(1012, "Service Restart")
167
val TRY_AGAIN_LATER = CloseReason(1013, "Try Again Later")
168
}
169
}
170
```
171
172
## Basic Usage
173
174
### Simple WebSocket Communication
175
176
```kotlin
177
val client = HttpClient {
178
install(WebSockets)
179
}
180
181
client.webSocket("wss://echo.websocket.org") {
182
// Send a message
183
send("Hello WebSocket!")
184
185
// Receive messages
186
for (frame in incoming) {
187
when (frame) {
188
is Frame.Text -> {
189
val receivedText = frame.readText()
190
println("Received: $receivedText")
191
192
if (receivedText == "Hello WebSocket!") {
193
send("Thanks for the echo!")
194
}
195
}
196
is Frame.Close -> {
197
println("Connection closed: ${frame.readReason()}")
198
break
199
}
200
else -> {
201
// Handle other frame types
202
}
203
}
204
}
205
}
206
207
client.close()
208
```
209
210
### Bidirectional Communication
211
212
```kotlin
213
val client = HttpClient {
214
install(WebSockets) {
215
pingInterval = 20.seconds
216
maxFrameSize = 1024 * 1024 // 1MB
217
}
218
}
219
220
client.webSocket("ws://localhost:8080/chat") {
221
// Launch coroutine for sending messages
222
launch {
223
repeat(10) { i ->
224
send("Message $i")
225
delay(1000)
226
}
227
close(CloseReason.NORMAL)
228
}
229
230
// Receive messages on main coroutine
231
for (frame in incoming) {
232
when (frame) {
233
is Frame.Text -> {
234
println("Server: ${frame.readText()}")
235
}
236
is Frame.Binary -> {
237
println("Received binary data: ${frame.data.size} bytes")
238
}
239
is Frame.Close -> {
240
println("Connection closed by server")
241
break
242
}
243
is Frame.Ping -> {
244
// Pong is sent automatically
245
println("Received ping")
246
}
247
is Frame.Pong -> {
248
println("Received pong")
249
}
250
}
251
}
252
}
253
```
254
255
### Secure WebSocket (WSS)
256
257
```kotlin
258
val client = HttpClient {
259
install(WebSockets)
260
}
261
262
client.wss(
263
host = "secure-chat.example.com",
264
port = 443,
265
path = "/websocket"
266
) {
267
// Configure headers for authentication
268
request {
269
header("Authorization", "Bearer $accessToken")
270
header("X-Client-Version", "1.0")
271
}
272
273
block = {
274
// WebSocket communication
275
send("Secure message")
276
277
for (frame in incoming) {
278
when (frame) {
279
is Frame.Text -> println("Secure: ${frame.readText()}")
280
is Frame.Close -> break
281
else -> { /* handle other frames */ }
282
}
283
}
284
}
285
}
286
```
287
288
## Advanced WebSocket Features
289
290
### Custom WebSocket Extensions
291
292
```kotlin
293
class CompressionExtension : WebSocketExtension<CompressionExtension.Config> {
294
class Config {
295
var compressionLevel: Int = 6
296
var windowBits: Int = 15
297
}
298
299
override val factory: WebSocketExtensionFactory<Config, CompressionExtension>
300
get() = TODO("Implement extension factory")
301
302
override val protocols: List<WebSocketExtensionHeader>
303
get() = listOf(WebSocketExtensionHeader("deflate-frame"))
304
}
305
306
val client = HttpClient {
307
install(WebSockets) {
308
extensions(CompressionExtension())
309
}
310
}
311
```
312
313
### Connection Management
314
315
```kotlin
316
class WebSocketManager {
317
private val client = HttpClient {
318
install(WebSockets) {
319
pingInterval = 30.seconds
320
}
321
}
322
323
private var session: ClientWebSocketSession? = null
324
private var reconnectJob: Job? = null
325
326
suspend fun connect(url: String): Boolean {
327
return try {
328
session = client.webSocketSession(url) {
329
header("Authorization", "Bearer $token")
330
}
331
332
// Start message handling
333
launch {
334
handleIncomingMessages()
335
}
336
337
true
338
} catch (e: Exception) {
339
println("Connection failed: ${e.message}")
340
scheduleReconnect(url)
341
false
342
}
343
}
344
345
private suspend fun handleIncomingMessages() {
346
try {
347
session?.let { session ->
348
for (frame in session.incoming) {
349
when (frame) {
350
is Frame.Text -> processMessage(frame.readText())
351
is Frame.Close -> {
352
println("Connection closed: ${frame.readReason()}")
353
break
354
}
355
else -> { /* handle other frames */ }
356
}
357
}
358
}
359
} catch (e: Exception) {
360
println("Message handling error: ${e.message}")
361
}
362
}
363
364
private fun scheduleReconnect(url: String) {
365
reconnectJob?.cancel()
366
reconnectJob = CoroutineScope(Dispatchers.IO).launch {
367
delay(5000) // Wait 5 seconds before reconnecting
368
connect(url)
369
}
370
}
371
372
suspend fun sendMessage(text: String): Boolean {
373
return try {
374
session?.send(text)
375
true
376
} catch (e: Exception) {
377
println("Send failed: ${e.message}")
378
false
379
}
380
}
381
382
suspend fun disconnect() {
383
reconnectJob?.cancel()
384
session?.close(CloseReason.NORMAL)
385
session = null
386
client.close()
387
}
388
389
private fun processMessage(message: String) {
390
// Process incoming message
391
println("Received: $message")
392
}
393
}
394
```
395
396
### Message Serialization
397
398
```kotlin
399
// JSON message serialization
400
suspend fun ClientWebSocketSession.sendJson(data: Any) {
401
val json = Json.encodeToString(data)
402
send(json)
403
}
404
405
suspend inline fun <reified T> ClientWebSocketSession.receiveJson(): T? {
406
for (frame in incoming) {
407
when (frame) {
408
is Frame.Text -> {
409
return Json.decodeFromString<T>(frame.readText())
410
}
411
is Frame.Close -> return null
412
else -> continue
413
}
414
}
415
return null
416
}
417
418
// Usage
419
data class ChatMessage(val user: String, val text: String, val timestamp: Long)
420
421
client.webSocket("ws://chat.example.com") {
422
// Send JSON message
423
sendJson(ChatMessage("Alice", "Hello!", System.currentTimeMillis()))
424
425
// Receive JSON message
426
val message: ChatMessage? = receiveJson()
427
message?.let { println("${it.user}: ${it.text}") }
428
}
429
```
430
431
### Binary Data Handling
432
433
```kotlin
434
client.webSocket("ws://binary-service.example.com") {
435
// Send binary data
436
val imageData = File("image.png").readBytes()
437
send(imageData)
438
439
// Receive binary data
440
for (frame in incoming) {
441
when (frame) {
442
is Frame.Binary -> {
443
val data = frame.data
444
File("received-${System.currentTimeMillis()}.bin").writeBytes(data)
445
println("Received ${data.size} bytes of binary data")
446
}
447
is Frame.Close -> break
448
else -> { /* handle other frames */ }
449
}
450
}
451
}
452
```
453
454
## Error Handling and Recovery
455
456
### Connection Error Handling
457
458
```kotlin
459
val client = HttpClient {
460
install(WebSockets) {
461
pingInterval = 10.seconds
462
}
463
}
464
465
suspend fun connectWithRetry(url: String, maxRetries: Int = 3) {
466
repeat(maxRetries) { attempt ->
467
try {
468
client.webSocket(url) {
469
println("Connected successfully on attempt ${attempt + 1}")
470
471
// Handle connection
472
for (frame in incoming) {
473
when (frame) {
474
is Frame.Text -> println(frame.readText())
475
is Frame.Close -> {
476
println("Connection closed normally")
477
return@webSocket
478
}
479
else -> { /* handle other frames */ }
480
}
481
}
482
}
483
return // Success, exit retry loop
484
} catch (e: ConnectTimeoutException) {
485
println("Connection timeout on attempt ${attempt + 1}")
486
if (attempt == maxRetries - 1) throw e
487
delay(2000 * (attempt + 1)) // Exponential backoff
488
} catch (e: Exception) {
489
println("Connection error on attempt ${attempt + 1}: ${e.message}")
490
if (attempt == maxRetries - 1) throw e
491
delay(1000)
492
}
493
}
494
}
495
```
496
497
### Graceful Disconnection
498
499
```kotlin
500
client.webSocket("ws://example.com/socket") {
501
try {
502
// WebSocket communication
503
for (frame in incoming) {
504
when (frame) {
505
is Frame.Text -> {
506
val message = frame.readText()
507
if (message == "shutdown") {
508
send("Acknowledged shutdown")
509
close(CloseReason.NORMAL)
510
break
511
}
512
// Process other messages
513
}
514
is Frame.Close -> {
515
println("Server closed connection: ${frame.readReason()}")
516
break
517
}
518
else -> { /* handle other frames */ }
519
}
520
}
521
} catch (e: Exception) {
522
println("WebSocket error: ${e.message}")
523
// Attempt graceful close
524
try {
525
close(CloseReason(1011, "Internal Error"))
526
} catch (closeException: Exception) {
527
println("Failed to close gracefully: ${closeException.message}")
528
}
529
} finally {
530
println("WebSocket session ended")
531
}
532
}
533
```
534
535
## WebSocket Testing
536
537
### Mock WebSocket Server
538
539
```kotlin
540
class MockWebSocketServer {
541
private val responses = mutableListOf<String>()
542
543
fun addResponse(response: String) {
544
responses.add(response)
545
}
546
547
suspend fun simulate(session: ClientWebSocketSession) {
548
// Simulate server responses
549
responses.forEach { response ->
550
session.send(response)
551
delay(100)
552
}
553
session.close(CloseReason.NORMAL)
554
}
555
}
556
557
// Test WebSocket client
558
suspend fun testWebSocketClient() {
559
val client = HttpClient {
560
install(WebSockets)
561
}
562
563
// In actual tests, this would be a real test server
564
client.webSocket("ws://test-server:8080/test") {
565
send("ping")
566
567
val response = incoming.receive()
568
if (response is Frame.Text) {
569
assertEquals("pong", response.readText())
570
}
571
}
572
573
client.close()
574
}
575
```
576
577
## Performance Optimization
578
579
### Connection Pooling
580
581
```kotlin
582
class WebSocketPool(private val maxConnections: Int = 10) {
583
private val availableConnections = Channel<ClientWebSocketSession>(maxConnections)
584
private val client = HttpClient {
585
install(WebSockets) {
586
pingInterval = 30.seconds
587
}
588
}
589
590
suspend fun borrowConnection(url: String): ClientWebSocketSession {
591
return availableConnections.tryReceive().getOrNull()
592
?: client.webSocketSession(url)
593
}
594
595
suspend fun returnConnection(session: ClientWebSocketSession) {
596
if (!session.closeReason.isCompleted) {
597
availableConnections.trySend(session)
598
}
599
}
600
601
fun close() {
602
client.close()
603
}
604
}
605
```
606
607
### Batched Message Sending
608
609
```kotlin
610
class BatchedWebSocketSender(private val session: ClientWebSocketSession) {
611
private val messageQueue = Channel<String>(Channel.UNLIMITED)
612
private val batchSize = 10
613
private val flushInterval = 100L // milliseconds
614
615
init {
616
CoroutineScope(Dispatchers.IO).launch {
617
processBatches()
618
}
619
}
620
621
suspend fun sendMessage(message: String) {
622
messageQueue.send(message)
623
}
624
625
private suspend fun processBatches() {
626
val batch = mutableListOf<String>()
627
628
while (!messageQueue.isClosedForReceive) {
629
// Collect messages for batch
630
val timeout = withTimeoutOrNull(flushInterval) {
631
repeat(batchSize) {
632
batch.add(messageQueue.receive())
633
}
634
}
635
636
// Send batch if we have messages
637
if (batch.isNotEmpty()) {
638
val combinedMessage = batch.joinToString("\n")
639
session.send(combinedMessage)
640
batch.clear()
641
}
642
}
643
}
644
}
645
```
646
647
## Best Practices
648
649
1. **Handle all frame types**: Always process Text, Binary, Close, Ping, and Pong frames appropriately
650
2. **Implement reconnection logic**: Network connections can be unreliable, implement automatic reconnection
651
3. **Use heartbeats**: Configure ping intervals to detect connection issues early
652
4. **Graceful shutdown**: Always close connections properly with appropriate close reasons
653
5. **Error handling**: Wrap WebSocket operations in try-catch blocks for robust error handling
654
6. **Message size limits**: Be aware of frame size limits and implement chunking for large messages
655
7. **Authentication**: Include proper authentication headers when establishing connections
656
8. **Resource cleanup**: Always close clients and sessions to prevent resource leaks
657
9. **Concurrent access**: Be careful with concurrent access to WebSocket sessions, they are not thread-safe
658
10. **Protocol negotiation**: Use WebSocket subprotocols for structured communication protocols