0
# WebSocket Support
1
2
Comprehensive WebSocket client functionality with message handling, connection management, and frame processing.
3
4
## Capabilities
5
6
### WebSocket Connection Functions
7
8
Establish WebSocket connections with various configuration options.
9
10
```kotlin { .api }
11
/**
12
* Establish WebSocket connection over HTTP
13
* @param urlString WebSocket URL (ws://)
14
* @param block WebSocket session handling block
15
*/
16
suspend fun HttpClient.webSocket(
17
urlString: String,
18
block: suspend DefaultClientWebSocketSession.() -> Unit
19
)
20
21
/**
22
* Establish secure WebSocket connection over HTTPS
23
* @param urlString Secure WebSocket URL (wss://)
24
* @param block WebSocket session handling block
25
*/
26
suspend fun HttpClient.wss(
27
urlString: String,
28
block: suspend DefaultClientWebSocketSession.() -> Unit
29
)
30
31
/**
32
* Establish WebSocket connection with detailed configuration
33
* @param method HTTP method for handshake (usually GET)
34
* @param host WebSocket server host
35
* @param port WebSocket server port
36
* @param path WebSocket endpoint path
37
* @param block WebSocket session handling block
38
*/
39
suspend fun HttpClient.webSocket(
40
method: HttpMethod = HttpMethod.Get,
41
host: String,
42
port: Int,
43
path: String,
44
block: suspend DefaultClientWebSocketSession.() -> Unit
45
)
46
47
/**
48
* Get WebSocket session without automatic message handling
49
* @param urlString WebSocket URL
50
* @returns WebSocket session for manual management
51
*/
52
suspend fun HttpClient.webSocketSession(urlString: String): DefaultClientWebSocketSession
53
54
/**
55
* Establish WebSocket connection (alias for webSocket)
56
* @param urlString WebSocket URL (ws://)
57
* @param block WebSocket session handling block
58
*/
59
suspend fun HttpClient.ws(
60
urlString: String,
61
block: suspend DefaultClientWebSocketSession.() -> Unit
62
)
63
64
/**
65
* Get WebSocket session with detailed configuration
66
* @param method HTTP method for handshake
67
* @param host WebSocket server host
68
* @param port WebSocket server port
69
* @param path WebSocket endpoint path
70
* @returns WebSocket session for manual management
71
*/
72
suspend fun HttpClient.webSocketSession(
73
method: HttpMethod = HttpMethod.Get,
74
host: String,
75
port: Int,
76
path: String
77
): DefaultClientWebSocketSession
78
```
79
80
**Usage Examples:**
81
82
```kotlin
83
val client = HttpClient {
84
install(WebSockets)
85
}
86
87
// Basic WebSocket connection
88
client.webSocket("ws://echo.websocket.org") {
89
// Send text message
90
send(Frame.Text("Hello, WebSocket!"))
91
92
// Receive and process messages
93
for (frame in incoming) {
94
when (frame) {
95
is Frame.Text -> {
96
val message = frame.readText()
97
println("Received: $message")
98
}
99
is Frame.Binary -> {
100
val data = frame.readBytes()
101
println("Received binary data: ${data.size} bytes")
102
}
103
is Frame.Close -> {
104
println("Connection closed")
105
break
106
}
107
}
108
}
109
}
110
111
// Secure WebSocket connection
112
client.wss("wss://secure-websocket.example.com/chat") {
113
// Send JSON message
114
val jsonMessage = """{"type": "join", "room": "general"}"""
115
send(Frame.Text(jsonMessage))
116
117
// Handle incoming messages
118
while (true) {
119
val frame = incoming.receive()
120
if (frame is Frame.Text) {
121
val response = frame.readText()
122
println("Server response: $response")
123
}
124
if (frame is Frame.Close) break
125
}
126
}
127
128
// WebSocket with custom headers
129
client.webSocket("ws://localhost:8080/websocket") {
130
// The connection is established with custom headers if needed
131
send(Frame.Text("Connected with custom configuration"))
132
133
// Process messages in a loop
134
incoming.consumeEach { frame ->
135
when (frame) {
136
is Frame.Text -> handleTextMessage(frame.readText())
137
is Frame.Binary -> handleBinaryMessage(frame.readBytes())
138
is Frame.Pong -> println("Received pong")
139
is Frame.Close -> println("WebSocket closed")
140
}
141
}
142
}
143
```
144
145
### WebSocket Session Interface
146
147
Core WebSocket session interface for message handling.
148
149
```kotlin { .api }
150
/**
151
* WebSocket session interface
152
*/
153
interface ClientWebSocketSession : WebSocketSession {
154
/**
155
* Incoming frames channel
156
*/
157
val incoming: ReceiveChannel<Frame>
158
159
/**
160
* Outgoing frames channel
161
*/
162
val outgoing: SendChannel<Frame>
163
164
/**
165
* Extensions negotiated during handshake
166
*/
167
val extensions: List<WebSocketExtension<*>>
168
169
/**
170
* Send a frame to the server
171
* @param frame Frame to send
172
*/
173
suspend fun send(frame: Frame)
174
175
/**
176
* Flush outgoing frames
177
*/
178
suspend fun flush()
179
180
/**
181
* Close the WebSocket connection
182
* @param reason Close reason
183
*/
184
suspend fun close(reason: CloseReason? = null)
185
186
/**
187
* Ping the server
188
* @param data Optional ping data
189
*/
190
suspend fun ping(data: ByteArray)
191
192
/**
193
* Send pong response
194
* @param data Pong data
195
*/
196
suspend fun pong(data: ByteArray)
197
198
/**
199
* Get close reason if connection is closed
200
* @returns Close reason or null
201
*/
202
suspend fun closeReason(): CloseReason?
203
}
204
205
/**
206
* Default WebSocket session implementation
207
*/
208
class DefaultClientWebSocketSession(
209
override val call: HttpClientCall,
210
override val coroutineContext: CoroutineContext
211
) : ClientWebSocketSession, CoroutineScope
212
```
213
214
**Usage Examples:**
215
216
```kotlin
217
// Manual session management
218
val session = client.webSocketSession("ws://example.com/chat")
219
220
// Send different types of frames
221
session.send(Frame.Text("Hello"))
222
session.send(Frame.Binary(byteArrayOf(1, 2, 3, 4)))
223
session.ping(byteArrayOf())
224
225
// Receive frames manually
226
val frame = session.incoming.receive()
227
when (frame) {
228
is Frame.Text -> println("Text: ${frame.readText()}")
229
is Frame.Binary -> println("Binary: ${frame.readBytes().size} bytes")
230
is Frame.Ping -> {
231
println("Received ping")
232
session.pong(frame.data)
233
}
234
is Frame.Pong -> println("Received pong")
235
is Frame.Close -> {
236
println("Connection closed: ${frame.readReason()}")
237
session.close()
238
}
239
}
240
241
// Flush outgoing frames
242
session.flush()
243
244
// Close with reason
245
session.close(CloseReason(CloseReason.Codes.NORMAL, "Goodbye"))
246
```
247
248
### WebSocket Frames
249
250
Frame types for different WebSocket message formats.
251
252
```kotlin { .api }
253
/**
254
* WebSocket frame sealed class
255
*/
256
sealed class Frame {
257
/** Frame data as byte array */
258
abstract val data: ByteArray
259
260
/** Whether this is a final frame */
261
abstract val fin: Boolean
262
263
/**
264
* Text frame for string messages
265
*/
266
data class Text(
267
val text: String,
268
override val fin: Boolean = true
269
) : Frame() {
270
override val data: ByteArray get() = text.encodeToByteArray()
271
272
/**
273
* Read frame content as text
274
* @returns Frame text content
275
*/
276
fun readText(): String = text
277
}
278
279
/**
280
* Binary frame for byte data
281
*/
282
data class Binary(
283
override val data: ByteArray,
284
override val fin: Boolean = true
285
) : Frame() {
286
/**
287
* Read frame content as bytes
288
* @returns Frame binary content
289
*/
290
fun readBytes(): ByteArray = data
291
}
292
293
/**
294
* Close frame for connection termination
295
*/
296
data class Close(
297
val reason: CloseReason? = null
298
) : Frame() {
299
override val data: ByteArray = reason?.let {
300
ByteBuffer.allocate(2 + it.message.length)
301
.putShort(it.code.toShort())
302
.put(it.message.encodeToByteArray())
303
.array()
304
} ?: byteArrayOf()
305
override val fin: Boolean = true
306
307
/**
308
* Read close reason from frame
309
* @returns Close reason
310
*/
311
fun readReason(): CloseReason? = reason
312
}
313
314
/**
315
* Ping frame for connection testing
316
*/
317
data class Ping(
318
override val data: ByteArray
319
) : Frame() {
320
override val fin: Boolean = true
321
}
322
323
/**
324
* Pong frame for ping responses
325
*/
326
data class Pong(
327
override val data: ByteArray
328
) : Frame() {
329
override val fin: Boolean = true
330
}
331
}
332
```
333
334
**Usage Examples:**
335
336
```kotlin
337
client.webSocket("ws://example.com") {
338
// Send different frame types
339
send(Frame.Text("Hello, World!"))
340
send(Frame.Binary(byteArrayOf(0x01, 0x02, 0x03)))
341
send(Frame.Ping(byteArrayOf()))
342
343
// Process incoming frames
344
for (frame in incoming) {
345
when (frame) {
346
is Frame.Text -> {
347
val message = frame.readText()
348
println("Received text: $message")
349
350
// Echo back
351
send(Frame.Text("Echo: $message"))
352
}
353
is Frame.Binary -> {
354
val bytes = frame.readBytes()
355
println("Received ${bytes.size} bytes")
356
357
// Process binary data
358
processBinaryData(bytes)
359
}
360
is Frame.Ping -> {
361
println("Received ping")
362
send(Frame.Pong(frame.data))
363
}
364
is Frame.Pong -> {
365
println("Received pong response")
366
}
367
is Frame.Close -> {
368
val closeReason = frame.readReason()
369
println("Connection closed: ${closeReason?.message}")
370
break
371
}
372
}
373
}
374
}
375
```
376
377
### WebSocket Plugin Configuration
378
379
WebSocket plugin installation and configuration.
380
381
```kotlin { .api }
382
/**
383
* WebSocket plugin object
384
*/
385
object WebSockets : HttpClientPlugin<WebSocketConfig, WebSocketConfig> {
386
override val key: AttributeKey<WebSocketConfig>
387
388
override fun prepare(block: WebSocketConfig.() -> Unit): WebSocketConfig
389
override fun install(plugin: WebSocketConfig, scope: HttpClient)
390
}
391
392
/**
393
* WebSocket configuration
394
*/
395
class WebSocketConfig {
396
/** Maximum frame size in bytes */
397
var maxFrameSize: Long = Long.MAX_VALUE
398
399
/** Ping interval in milliseconds */
400
var pingInterval: Long? = null
401
402
/** Extensions to negotiate */
403
var extensions: MutableList<WebSocketExtensionConfig<*>> = mutableListOf()
404
405
/** Content converter for automatic serialization */
406
var contentConverter: WebSocketContentConverter? = null
407
408
/**
409
* Add WebSocket extension
410
* @param extension Extension configuration
411
*/
412
fun <T : Any> extensions(extension: WebSocketExtensionConfig<T>)
413
}
414
```
415
416
**Usage Examples:**
417
418
```kotlin
419
val client = HttpClient {
420
install(WebSockets) {
421
maxFrameSize = 1024 * 1024 // 1MB max frame size
422
pingInterval = 30_000 // Ping every 30 seconds
423
424
// Add extensions if needed
425
// extensions(SomeWebSocketExtension())
426
}
427
}
428
429
// Use configured WebSocket client
430
client.webSocket("ws://example.com/realtime") {
431
// WebSocket communication with configured settings
432
send(Frame.Text("Message within size limits"))
433
434
// Automatic ping/pong handling based on pingInterval
435
for (frame in incoming) {
436
when (frame) {
437
is Frame.Text -> handleMessage(frame.readText())
438
is Frame.Close -> break
439
// Ping/Pong frames handled automatically
440
}
441
}
442
}
443
```
444
445
### Close Reason and Status Codes
446
447
WebSocket connection close handling with standard status codes.
448
449
```kotlin { .api }
450
/**
451
* WebSocket close reason
452
*/
453
data class CloseReason(
454
val code: Short,
455
val message: String
456
) {
457
constructor(knownReason: Codes, message: String) : this(knownReason.code, message)
458
459
/**
460
* Standard WebSocket close codes
461
*/
462
enum class Codes(val code: Short) {
463
NORMAL(1000),
464
GOING_AWAY(1001),
465
PROTOCOL_ERROR(1002),
466
CANNOT_ACCEPT(1003),
467
NOT_CONSISTENT(1007),
468
VIOLATED_POLICY(1008),
469
TOO_BIG(1009),
470
NO_EXTENSION(1010),
471
INTERNAL_ERROR(1011),
472
SERVICE_RESTART(1012),
473
TRY_AGAIN_LATER(1013),
474
BAD_GATEWAY(1014)
475
}
476
}
477
```
478
479
**Usage Examples:**
480
481
```kotlin
482
client.webSocket("ws://example.com") {
483
try {
484
// WebSocket communication
485
send(Frame.Text("Hello"))
486
487
for (frame in incoming) {
488
when (frame) {
489
is Frame.Close -> {
490
val reason = frame.readReason()
491
when (reason?.code) {
492
CloseReason.Codes.NORMAL.code ->
493
println("Normal closure")
494
CloseReason.Codes.GOING_AWAY.code ->
495
println("Server going away")
496
CloseReason.Codes.PROTOCOL_ERROR.code ->
497
println("Protocol error occurred")
498
else ->
499
println("Closed with code: ${reason?.code}, message: ${reason?.message}")
500
}
501
break
502
}
503
// Handle other frame types
504
}
505
}
506
} catch (e: Exception) {
507
// Close with error status
508
close(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Client error: ${e.message}"))
509
}
510
}
511
512
// Graceful shutdown
513
client.webSocket("ws://example.com") {
514
// Do work...
515
516
// Close normally when done
517
close(CloseReason(CloseReason.Codes.NORMAL, "Work completed"))
518
}
519
```
520
521
## Types
522
523
### WebSocket Types
524
525
```kotlin { .api }
526
/**
527
* WebSocket session base interface
528
*/
529
interface WebSocketSession : CoroutineScope {
530
val call: HttpClientCall
531
val incoming: ReceiveChannel<Frame>
532
val outgoing: SendChannel<Frame>
533
val extensions: List<WebSocketExtension<*>>
534
535
suspend fun flush()
536
suspend fun close(reason: CloseReason? = null)
537
}
538
539
/**
540
* WebSocket extension interface
541
*/
542
interface WebSocketExtension<out ConfigType : Any> {
543
val factory: WebSocketExtensionFactory<ConfigType, out WebSocketExtension<ConfigType>>
544
val protocols: List<String>
545
546
fun processOutgoingFrame(frame: Frame): Frame
547
fun processIncomingFrame(frame: Frame): Frame
548
}
549
550
/**
551
* WebSocket content converter for automatic serialization
552
*/
553
interface WebSocketContentConverter {
554
suspend fun serialize(value: Any): Frame
555
suspend fun deserialize(frame: Frame, type: TypeInfo): Any?
556
}
557
```
558
559
### Channel Types
560
561
```kotlin { .api }
562
/**
563
* Channel for receiving frames
564
*/
565
interface ReceiveChannel<out E> {
566
val isClosedForReceive: Boolean
567
val isEmpty: Boolean
568
569
suspend fun receive(): E
570
suspend fun receiveCatching(): ChannelResult<E>
571
fun tryReceive(): ChannelResult<E>
572
573
suspend fun consumeEach(action: suspend (E) -> Unit)
574
operator fun iterator(): ChannelIterator<E>
575
}
576
577
/**
578
* Channel for sending frames
579
*/
580
interface SendChannel<in E> {
581
val isClosedForSend: Boolean
582
583
suspend fun send(element: E)
584
fun trySend(element: E): ChannelResult<Unit>
585
fun close(cause: Throwable? = null): Boolean
586
587
suspend fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
588
}
589
```