WebSocket client functionality with session management, frame handling, and connection lifecycle. The WebSocket implementation provides full-duplex communication with servers supporting text, binary, and control frames.
Functions for establishing WebSocket connections with various configuration options.
/**
* Connect to WebSocket server and execute a block with the session
*/
suspend fun <T> HttpClient.webSocket(
method: HttpMethod = HttpMethod.Get,
host: String = "localhost",
port: Int = DEFAULT_PORT,
path: String = "/",
request: HttpRequestBuilder.() -> Unit = {},
block: suspend DefaultClientWebSocketSession.() -> T
): T
suspend fun <T> HttpClient.webSocket(
urlString: String,
request: HttpRequestBuilder.() -> Unit = {},
block: suspend DefaultClientWebSocketSession.() -> T
): T
suspend fun <T> HttpClient.webSocket(
url: Url,
request: HttpRequestBuilder.() -> Unit = {},
block: suspend DefaultClientWebSocketSession.() -> T
): T
/**
* Create WebSocket session without automatic connection management
*/
suspend fun HttpClient.webSocketSession(
method: HttpMethod = HttpMethod.Get,
host: String = "localhost",
port: Int = DEFAULT_PORT,
path: String = "/",
request: HttpRequestBuilder.() -> Unit = {}
): DefaultClientWebSocketSession
suspend fun HttpClient.webSocketSession(
urlString: String,
request: HttpRequestBuilder.() -> Unit = {}
): DefaultClientWebSocketSession
suspend fun HttpClient.webSocketSession(
url: Url,
request: HttpRequestBuilder.() -> Unit = {}
): DefaultClientWebSocketSessionSession interface providing frame-based communication capabilities.
/**
* Default client WebSocket session with frame handling
*/
interface DefaultClientWebSocketSession : ClientWebSocketSession {
/** Incoming frames channel */
val incoming: ReceiveChannel<Frame>
/** Outgoing frames channel */
val outgoing: SendChannel<Frame>
/** Session call information */
val call: HttpClientCall
/** Close reason when session is terminated */
val closeReason: Deferred<CloseReason?>
/**
* Send a frame to the server
*/
suspend fun send(frame: Frame)
/**
* Send text message
*/
suspend fun send(content: String)
/**
* Send binary data
*/
suspend fun send(content: ByteArray)
/**
* Close the WebSocket connection
*/
suspend fun close(reason: CloseReason = CloseReason(CloseReason.Codes.NORMAL, ""))
/**
* Flush outgoing frames
*/
suspend fun flush()
/**
* Terminate the session immediately
*/
fun terminate()
}
/**
* Base client WebSocket session interface
*/
interface ClientWebSocketSession : WebSocketSession {
/** Associated HTTP call */
val call: HttpClientCall
}
/**
* Base WebSocket session interface
*/
interface WebSocketSession {
/** Incoming frames */
val incoming: ReceiveChannel<Frame>
/** Outgoing frames */
val outgoing: SendChannel<Frame>
/** Maximum frame size */
var maxFrameSize: Long
/** Masking flag */
var masking: Boolean
/** Content converter */
val contentConverter: WebsocketContentConverter?
}Frame types for different kinds of WebSocket messages.
/**
* Base WebSocket frame
*/
sealed class Frame {
/** Frame is final (not fragmented) */
abstract val fin: Boolean
/** Reserved flags */
abstract val rsv1: Boolean
abstract val rsv2: Boolean
abstract val rsv3: Boolean
/** Frame data */
abstract val data: ByteArray
/** Dispose frame resources */
abstract fun dispose()
/**
* Text frame containing UTF-8 encoded string
*/
class Text(
override val fin: Boolean = true,
val data: ByteArray,
override val rsv1: Boolean = false,
override val rsv2: Boolean = false,
override val rsv3: Boolean = false
) : Frame() {
constructor(text: String) : this(fin = true, data = text.toByteArray(Charsets.UTF_8))
/** Get text content */
fun readText(): String = data.toString(Charsets.UTF_8)
}
/**
* Binary frame containing raw bytes
*/
class Binary(
override val fin: Boolean = true,
override val data: ByteArray,
override val rsv1: Boolean = false,
override val rsv2: Boolean = false,
override val rsv3: Boolean = false
) : Frame()
/**
* Close frame with optional reason
*/
class Close(
val reason: CloseReason? = null
) : Frame() {
override val fin: Boolean = true
override val data: ByteArray = reason?.let { /* encode reason */ } ?: ByteArray(0)
override val rsv1: Boolean = false
override val rsv2: Boolean = false
override val rsv3: Boolean = false
}
/**
* Ping frame for connection keep-alive
*/
class Ping(
override val data: ByteArray
) : Frame() {
constructor(data: String = "") : this(data.toByteArray(Charsets.UTF_8))
override val fin: Boolean = true
override val rsv1: Boolean = false
override val rsv2: Boolean = false
override val rsv3: Boolean = false
}
/**
* Pong frame in response to ping
*/
class Pong(
override val data: ByteArray,
override val rsv1: Boolean = false,
override val rsv2: Boolean = false,
override val rsv3: Boolean = false
) : Frame() {
constructor(data: String = "") : this(data.toByteArray(Charsets.UTF_8))
override val fin: Boolean = true
}
}Close reason codes and handling for WebSocket connections.
/**
* WebSocket close reason
*/
data class CloseReason(
val code: Short,
val message: String
) {
companion object Codes {
/** Normal closure */
const val NORMAL: Short = 1000
/** Going away */
const val GOING_AWAY: Short = 1001
/** Protocol error */
const val PROTOCOL_ERROR: Short = 1002
/** Cannot accept data type */
const val CANNOT_ACCEPT: Short = 1003
/** Reserved */
const val RESERVED: Short = 1004
/** No status received */
const val NO_STATUS: Short = 1005
/** Closed abnormally */
const val CLOSED_ABNORMALLY: Short = 1006
/** Invalid data */
const val NOT_CONSISTENT: Short = 1007
/** Policy violation */
const val VIOLATED_POLICY: Short = 1008
/** Message too big */
const val TOO_BIG: Short = 1009
/** Extension required */
const val NO_EXTENSION: Short = 1010
/** Internal server error */
const val INTERNAL_ERROR: Short = 1011
/** Service restart */
const val SERVICE_RESTART: Short = 1012
/** Try again later */
const val TRY_AGAIN_LATER: Short = 1013
/** Bad gateway */
const val BAD_GATEWAY: Short = 1014
/** TLS handshake failure */
const val TLS_HANDSHAKE_FAILURE: Short = 1015
}
}Plugin for configuring WebSocket functionality.
/**
* WebSocket plugin for client configuration
*/
object WebSockets : HttpClientPlugin<WebSocketConfig, WebSocketConfig> {
override val key: AttributeKey<WebSocketConfig>
}
/**
* WebSocket configuration
*/
class WebSocketConfig {
/** Maximum frame size in bytes */
var maxFrameSize: Long = Long.MAX_VALUE
/** Enable/disable frame masking */
var masking: Boolean = true
/** Ping interval for keep-alive */
var pingInterval: Duration? = null
/** Content converter for serialization */
var contentConverter: WebsocketContentConverter? = null
/** Custom extensions */
val extensions: MutableList<WebSocketExtension<*>> = mutableListOf()
}Content conversion for serializing/deserializing WebSocket messages.
/**
* WebSocket content converter interface
*/
interface WebsocketContentConverter {
/**
* Check if converter can handle the type
*/
fun isApplicable(frame: Frame): Boolean
/**
* Deserialize frame to object
*/
suspend fun deserialize(charset: Charset, typeInfo: TypeInfo, content: Frame): Any?
/**
* Serialize object to frame
*/
suspend fun serializeNullable(charset: Charset, typeInfo: TypeInfo, value: Any?): Frame?
}
/**
* Send typed data through WebSocket
*/
suspend inline fun <reified T> DefaultClientWebSocketSession.sendSerialized(data: T)
/**
* Receive typed data from WebSocket
*/
suspend inline fun <reified T> DefaultClientWebSocketSession.receiveDeserialized(): TUsage Examples:
import io.ktor.client.*
import io.ktor.client.plugins.websocket.*
import io.ktor.websocket.*
import kotlinx.coroutines.*
val client = HttpClient {
install(WebSockets) {
maxFrameSize = Long.MAX_VALUE
masking = false
pingInterval = Duration.ofSeconds(30)
}
}
// Basic WebSocket communication
client.webSocket(host = "echo.websocket.org", port = 80, path = "/") {
// Send text message
send("Hello, WebSocket!")
// Send binary data
send(byteArrayOf(1, 2, 3, 4, 5))
// Receive messages
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val text = frame.readText()
println("Received text: $text")
// Echo back
send("Echo: $text")
}
is Frame.Binary -> {
println("Received binary data: ${frame.data.size} bytes")
}
is Frame.Close -> {
val reason = frame.reason
println("Connection closed: ${reason?.code} - ${reason?.message}")
break
}
is Frame.Ping -> {
println("Received ping")
// Pong is automatically sent by the framework
}
is Frame.Pong -> {
println("Received pong")
}
}
}
}
// WebSocket with custom headers and authentication
client.webSocket("wss://api.example.com/websocket") {
headers {
append("Authorization", "Bearer your-token-here")
append("X-Client-Version", "1.0.0")
}
} {
// Connection established with custom headers
// Send JSON-like message
send("""{"type": "subscribe", "channel": "updates"}""")
// Handle incoming messages
while (true) {
val frame = incoming.receive()
if (frame is Frame.Text) {
val message = frame.readText()
println("Received: $message")
// Parse and handle different message types
when {
message.contains("\"type\":\"heartbeat\"") -> {
send("""{"type": "pong"}""")
}
message.contains("\"type\":\"data\"") -> {
// Handle data message
println("Processing data message")
}
message.contains("\"type\":\"error\"") -> {
println("Server error: $message")
break
}
}
}
}
}
// Manual session management
val session = client.webSocketSession("ws://localhost:8080/ws")
// Use session
launch {
// Sender coroutine
repeat(10) { i ->
session.send("Message $i")
delay(1000)
}
session.close(CloseReason(CloseReason.Codes.NORMAL, "Done sending"))
}
launch {
// Receiver coroutine
try {
for (frame in session.incoming) {
when (frame) {
is Frame.Text -> println("Received: ${frame.readText()}")
is Frame.Close -> {
println("Session closed")
break
}
else -> Unit
}
}
} catch (e: Exception) {
println("Error in receiver: ${e.message}")
}
}
// Wait for close reason
val closeReason = session.closeReason.await()
println("Session closed with reason: $closeReason")
// WebSocket with ping/pong handling
client.webSocket("ws://example.com/realtime") {
launch {
// Ping sender
while (true) {
delay(30000) // 30 seconds
try {
send(Frame.Ping("keep-alive".toByteArray()))
} catch (e: Exception) {
println("Failed to send ping: ${e.message}")
break
}
}
}
// Message handler
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val text = frame.readText()
println("Message: $text")
}
is Frame.Pong -> {
println("Pong received - connection alive")
}
is Frame.Close -> {
println("Connection closed by server")
break
}
else -> Unit
}
}
}
// Graceful connection handling with error recovery
suspend fun connectWithRetry(maxRetries: Int = 3) {
repeat(maxRetries) { attempt ->
try {
client.webSocket("wss://api.example.com/stream") {
println("Connected successfully (attempt ${attempt + 1})")
// Connection logic here
for (frame in incoming) {
// Handle frames
if (frame is Frame.Text) {
println("Data: ${frame.readText()}")
}
}
}
return // Success, exit retry loop
} catch (e: Exception) {
println("Connection attempt ${attempt + 1} failed: ${e.message}")
if (attempt < maxRetries - 1) {
val delay = (attempt + 1) * 1000L
println("Retrying in ${delay}ms...")
delay(delay)
} else {
println("All connection attempts failed")
throw e
}
}
}
}
// Use the retry function
connectWithRetry()
client.close()Support for WebSocket extensions like compression.
/**
* Base WebSocket extension
*/
abstract class WebSocketExtension<ConfigType : Any> {
/** Extension name */
abstract val name: String
/** Factory for creating extension instances */
abstract val factory: WebSocketExtensionFactory<ConfigType, out WebSocketExtension<ConfigType>>
/** Extension protocols */
abstract val protocols: List<WebSocketExtensionHeader>
}
/**
* WebSocket extension factory
*/
interface WebSocketExtensionFactory<ConfigType : Any, ExtensionType : WebSocketExtension<ConfigType>> {
/** Extension key */
val key: AttributeKey<ExtensionType>
/** Supported extension names */
val rsv: WebSocketRsv
/** Create extension instance */
fun install(config: ConfigType): ExtensionType
}
/**
* WebSocket extension header
*/
data class WebSocketExtensionHeader(
val name: String,
val parameters: List<WebSocketExtensionParameter> = emptyList()
)
/**
* Extension parameter
*/
data class WebSocketExtensionParameter(
val name: String,
val value: String? = null
)