CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-ktor--ktor-client-core-tvosarm64

Ktor HTTP Client Core for tvOS ARM64 - multiplatform asynchronous HTTP client library with coroutines support

Pending
Overview
Eval results
Files

websockets.mddocs/

WebSocket Support

The Ktor HTTP Client Core provides comprehensive WebSocket client functionality through the WebSockets plugin. This enables establishing and managing WebSocket connections with session management, ping/pong handling, message serialization, and full integration with Ktor's coroutine-based architecture.

Core WebSocket API

WebSockets Plugin

The main plugin for WebSocket client functionality that handles connection establishment and session management.

object WebSockets : HttpClientPlugin<WebSockets.Config, WebSockets> {
    class Config {
        var pingInterval: Duration? = null
        var maxFrameSize: Long = Long.MAX_VALUE
        var masking: Boolean = true
        var extensions: MutableList<WebSocketExtension<*>> = mutableListOf()
        
        fun pingInterval(duration: Duration)
        fun maxFrameSize(size: Long)
        fun masking(enabled: Boolean)
        fun extensions(vararg extensions: WebSocketExtension<*>)
    }
}

ClientWebSocketSession

Interface representing a client-side WebSocket session with full duplex communication capabilities.

interface ClientWebSocketSession : WebSocketSession {
    val call: HttpClientCall
    
    // Inherited from WebSocketSession
    val incoming: ReceiveChannel<Frame>
    val outgoing: SendChannel<Frame>
    val closeReason: Deferred<CloseReason?>
    val extensions: List<WebSocketExtension<*>>
    
    // Send text message
    suspend fun send(data: String)
    
    // Send binary message  
    suspend fun send(data: ByteArray)
    
    // Send frame
    suspend fun send(frame: Frame)
    
    // Close connection
    suspend fun close(reason: CloseReason? = null)
}

DefaultClientWebSocketSession

Default implementation of ClientWebSocketSession with standard WebSocket protocol handling.

class DefaultClientWebSocketSession(
    private val call: HttpClientCall,
    delegate: WebSocketSession
) : ClientWebSocketSession, WebSocketSession by delegate {
    override val call: HttpClientCall get() = call
}

WebSocket Connection Builders

Basic WebSocket Connection

Extension functions for establishing WebSocket connections with various configuration options.

// Basic WebSocket connection
suspend fun HttpClient.webSocket(
    method: HttpMethod = HttpMethod.Get,
    host: String? = null,
    port: Int? = null, 
    path: String? = null,
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend ClientWebSocketSession.() -> Unit
)

suspend fun HttpClient.webSocket(
    url: String,
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend ClientWebSocketSession.() -> Unit  
)

suspend fun HttpClient.webSocket(
    url: Url,
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend ClientWebSocketSession.() -> Unit
)

// Secure WebSocket connection (WSS)
suspend fun HttpClient.wss(
    method: HttpMethod = HttpMethod.Get,
    host: String? = null,
    port: Int? = null,
    path: String? = null,
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend ClientWebSocketSession.() -> Unit
)

// WebSocket session without automatic connection handling
suspend fun HttpClient.webSocketSession(
    method: HttpMethod = HttpMethod.Get,
    host: String? = null, 
    port: Int? = null,
    path: String? = null,
    request: HttpRequestBuilder.() -> Unit = {}
): ClientWebSocketSession

suspend fun HttpClient.webSocketSession(
    url: String,
    request: HttpRequestBuilder.() -> Unit = {}
): ClientWebSocketSession

suspend fun HttpClient.ws(
    method: HttpMethod = HttpMethod.Get,
    host: String? = null,
    port: Int? = null,
    path: String? = null,
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend ClientWebSocketSession.() -> Unit
)

Frame Types

WebSocket Frame Hierarchy

sealed class Frame {
    abstract val fin: Boolean
    abstract val data: ByteArray
    
    data class Text(val data: ByteArray, override val fin: Boolean = true) : Frame() {
        constructor(text: String) : this(text.toByteArray(Charsets.UTF_8))
        fun readText(): String = data.toString(Charsets.UTF_8)
    }
    
    data class Binary(override val data: ByteArray, override val fin: Boolean = true) : Frame()
    
    data class Close(val data: ByteArray) : Frame() {
        constructor(reason: CloseReason) : this(reason.toByteArray())
        fun readReason(): CloseReason? = CloseReason.parse(data)
    }
    
    data class Ping(override val data: ByteArray) : Frame()
    data class Pong(override val data: ByteArray) : Frame()
}

data class CloseReason(val code: Short, val message: String) {
    companion object {
        val NORMAL = CloseReason(1000, "Normal Closure")
        val GOING_AWAY = CloseReason(1001, "Going Away")
        val PROTOCOL_ERROR = CloseReason(1002, "Protocol Error")
        val CANNOT_ACCEPT = CloseReason(1003, "Cannot Accept")
        val NOT_CONSISTENT = CloseReason(1007, "Not Consistent")
        val VIOLATED_POLICY = CloseReason(1008, "Violated Policy")
        val TOO_BIG = CloseReason(1009, "Too Big")
        val NO_EXTENSION = CloseReason(1010, "No Extension")
        val INTERNAL_ERROR = CloseReason(1011, "Internal Error")
        val SERVICE_RESTART = CloseReason(1012, "Service Restart")
        val TRY_AGAIN_LATER = CloseReason(1013, "Try Again Later")
    }
}

Basic Usage

Simple WebSocket Communication

val client = HttpClient {
    install(WebSockets)
}

client.webSocket("wss://echo.websocket.org") {
    // Send a message
    send("Hello WebSocket!")
    
    // Receive messages
    for (frame in incoming) {
        when (frame) {
            is Frame.Text -> {
                val receivedText = frame.readText()
                println("Received: $receivedText")
                
                if (receivedText == "Hello WebSocket!") {
                    send("Thanks for the echo!")
                }
            }
            is Frame.Close -> {
                println("Connection closed: ${frame.readReason()}")
                break
            }
            else -> {
                // Handle other frame types
            }
        }
    }
}

client.close()

Bidirectional Communication

val client = HttpClient {
    install(WebSockets) {
        pingInterval = 20.seconds
        maxFrameSize = 1024 * 1024 // 1MB
    }
}

client.webSocket("ws://localhost:8080/chat") {
    // Launch coroutine for sending messages
    launch {
        repeat(10) { i ->
            send("Message $i")
            delay(1000)
        }
        close(CloseReason.NORMAL)
    }
    
    // Receive messages on main coroutine
    for (frame in incoming) {
        when (frame) {
            is Frame.Text -> {
                println("Server: ${frame.readText()}")
            }
            is Frame.Binary -> {
                println("Received binary data: ${frame.data.size} bytes")
            }
            is Frame.Close -> {
                println("Connection closed by server")
                break
            }
            is Frame.Ping -> {
                // Pong is sent automatically
                println("Received ping")
            }
            is Frame.Pong -> {
                println("Received pong")
            }
        }
    }
}

Secure WebSocket (WSS)

val client = HttpClient {
    install(WebSockets)
}

client.wss(
    host = "secure-chat.example.com",
    port = 443,
    path = "/websocket"
) {
    // Configure headers for authentication
    request {
        header("Authorization", "Bearer $accessToken")
        header("X-Client-Version", "1.0")
    }
    
    block = {
        // WebSocket communication
        send("Secure message")
        
        for (frame in incoming) {
            when (frame) {
                is Frame.Text -> println("Secure: ${frame.readText()}")
                is Frame.Close -> break
                else -> { /* handle other frames */ }
            }
        }
    }
}

Advanced WebSocket Features

Custom WebSocket Extensions

class CompressionExtension : WebSocketExtension<CompressionExtension.Config> {
    class Config {
        var compressionLevel: Int = 6
        var windowBits: Int = 15
    }
    
    override val factory: WebSocketExtensionFactory<Config, CompressionExtension>
        get() = TODO("Implement extension factory")
    
    override val protocols: List<WebSocketExtensionHeader>
        get() = listOf(WebSocketExtensionHeader("deflate-frame"))
}

val client = HttpClient {
    install(WebSockets) {
        extensions(CompressionExtension())
    }
}

Connection Management

class WebSocketManager {
    private val client = HttpClient {
        install(WebSockets) {
            pingInterval = 30.seconds
        }
    }
    
    private var session: ClientWebSocketSession? = null
    private var reconnectJob: Job? = null
    
    suspend fun connect(url: String): Boolean {
        return try {
            session = client.webSocketSession(url) {
                header("Authorization", "Bearer $token")
            }
            
            // Start message handling
            launch {
                handleIncomingMessages()
            }
            
            true
        } catch (e: Exception) {
            println("Connection failed: ${e.message}")
            scheduleReconnect(url)
            false
        }
    }
    
    private suspend fun handleIncomingMessages() {
        try {
            session?.let { session ->
                for (frame in session.incoming) {
                    when (frame) {
                        is Frame.Text -> processMessage(frame.readText())
                        is Frame.Close -> {
                            println("Connection closed: ${frame.readReason()}")
                            break
                        }
                        else -> { /* handle other frames */ }
                    }
                }
            }
        } catch (e: Exception) {
            println("Message handling error: ${e.message}")
        }
    }
    
    private fun scheduleReconnect(url: String) {
        reconnectJob?.cancel()
        reconnectJob = CoroutineScope(Dispatchers.IO).launch {
            delay(5000) // Wait 5 seconds before reconnecting
            connect(url)
        }
    }
    
    suspend fun sendMessage(text: String): Boolean {
        return try {
            session?.send(text)
            true
        } catch (e: Exception) {
            println("Send failed: ${e.message}")
            false
        }
    }
    
    suspend fun disconnect() {
        reconnectJob?.cancel()
        session?.close(CloseReason.NORMAL)
        session = null
        client.close()
    }
    
    private fun processMessage(message: String) {
        // Process incoming message
        println("Received: $message")
    }
}

Message Serialization

// JSON message serialization
suspend fun ClientWebSocketSession.sendJson(data: Any) {
    val json = Json.encodeToString(data)
    send(json)
}

suspend inline fun <reified T> ClientWebSocketSession.receiveJson(): T? {
    for (frame in incoming) {
        when (frame) {
            is Frame.Text -> {
                return Json.decodeFromString<T>(frame.readText())
            }
            is Frame.Close -> return null
            else -> continue
        }
    }
    return null
}

// Usage
data class ChatMessage(val user: String, val text: String, val timestamp: Long)

client.webSocket("ws://chat.example.com") {
    // Send JSON message
    sendJson(ChatMessage("Alice", "Hello!", System.currentTimeMillis()))
    
    // Receive JSON message
    val message: ChatMessage? = receiveJson()
    message?.let { println("${it.user}: ${it.text}") }
}

Binary Data Handling

client.webSocket("ws://binary-service.example.com") {
    // Send binary data
    val imageData = File("image.png").readBytes()
    send(imageData)
    
    // Receive binary data
    for (frame in incoming) {
        when (frame) {
            is Frame.Binary -> {
                val data = frame.data
                File("received-${System.currentTimeMillis()}.bin").writeBytes(data)
                println("Received ${data.size} bytes of binary data")
            }
            is Frame.Close -> break
            else -> { /* handle other frames */ }
        }
    }
}

Error Handling and Recovery

Connection Error Handling

val client = HttpClient {
    install(WebSockets) {
        pingInterval = 10.seconds
    }
}

suspend fun connectWithRetry(url: String, maxRetries: Int = 3) {
    repeat(maxRetries) { attempt ->
        try {
            client.webSocket(url) {
                println("Connected successfully on attempt ${attempt + 1}")
                
                // Handle connection
                for (frame in incoming) {
                    when (frame) {
                        is Frame.Text -> println(frame.readText())
                        is Frame.Close -> {
                            println("Connection closed normally")
                            return@webSocket
                        }
                        else -> { /* handle other frames */ }
                    }
                }
            }
            return // Success, exit retry loop
        } catch (e: ConnectTimeoutException) {
            println("Connection timeout on attempt ${attempt + 1}")
            if (attempt == maxRetries - 1) throw e
            delay(2000 * (attempt + 1)) // Exponential backoff
        } catch (e: Exception) {
            println("Connection error on attempt ${attempt + 1}: ${e.message}")
            if (attempt == maxRetries - 1) throw e
            delay(1000)
        }
    }
}

Graceful Disconnection

client.webSocket("ws://example.com/socket") {
    try {
        // WebSocket communication
        for (frame in incoming) {
            when (frame) {
                is Frame.Text -> {
                    val message = frame.readText()
                    if (message == "shutdown") {
                        send("Acknowledged shutdown")
                        close(CloseReason.NORMAL)
                        break
                    }
                    // Process other messages
                }
                is Frame.Close -> {
                    println("Server closed connection: ${frame.readReason()}")
                    break
                }
                else -> { /* handle other frames */ }
            }
        }
    } catch (e: Exception) {
        println("WebSocket error: ${e.message}")
        // Attempt graceful close
        try {
            close(CloseReason(1011, "Internal Error"))
        } catch (closeException: Exception) {
            println("Failed to close gracefully: ${closeException.message}")
        }
    } finally {
        println("WebSocket session ended")
    }
}

WebSocket Testing

Mock WebSocket Server

class MockWebSocketServer {
    private val responses = mutableListOf<String>()
    
    fun addResponse(response: String) {
        responses.add(response)
    }
    
    suspend fun simulate(session: ClientWebSocketSession) {
        // Simulate server responses
        responses.forEach { response ->
            session.send(response)
            delay(100)
        }
        session.close(CloseReason.NORMAL)
    }
}

// Test WebSocket client
suspend fun testWebSocketClient() {
    val client = HttpClient {
        install(WebSockets)
    }
    
    // In actual tests, this would be a real test server
    client.webSocket("ws://test-server:8080/test") {
        send("ping")
        
        val response = incoming.receive()
        if (response is Frame.Text) {
            assertEquals("pong", response.readText())
        }
    }
    
    client.close()
}

Performance Optimization

Connection Pooling

class WebSocketPool(private val maxConnections: Int = 10) {
    private val availableConnections = Channel<ClientWebSocketSession>(maxConnections)
    private val client = HttpClient {
        install(WebSockets) {
            pingInterval = 30.seconds
        }
    }
    
    suspend fun borrowConnection(url: String): ClientWebSocketSession {
        return availableConnections.tryReceive().getOrNull()
            ?: client.webSocketSession(url)
    }
    
    suspend fun returnConnection(session: ClientWebSocketSession) {
        if (!session.closeReason.isCompleted) {
            availableConnections.trySend(session)
        }
    }
    
    fun close() {
        client.close()
    }
}

Batched Message Sending

class BatchedWebSocketSender(private val session: ClientWebSocketSession) {
    private val messageQueue = Channel<String>(Channel.UNLIMITED)
    private val batchSize = 10
    private val flushInterval = 100L // milliseconds
    
    init {
        CoroutineScope(Dispatchers.IO).launch {
            processBatches()
        }
    }
    
    suspend fun sendMessage(message: String) {
        messageQueue.send(message)
    }
    
    private suspend fun processBatches() {
        val batch = mutableListOf<String>()
        
        while (!messageQueue.isClosedForReceive) {
            // Collect messages for batch
            val timeout = withTimeoutOrNull(flushInterval) {
                repeat(batchSize) {
                    batch.add(messageQueue.receive())
                }
            }
            
            // Send batch if we have messages
            if (batch.isNotEmpty()) {
                val combinedMessage = batch.joinToString("\n")
                session.send(combinedMessage)
                batch.clear()
            }
        }
    }
}

Best Practices

  1. Handle all frame types: Always process Text, Binary, Close, Ping, and Pong frames appropriately
  2. Implement reconnection logic: Network connections can be unreliable, implement automatic reconnection
  3. Use heartbeats: Configure ping intervals to detect connection issues early
  4. Graceful shutdown: Always close connections properly with appropriate close reasons
  5. Error handling: Wrap WebSocket operations in try-catch blocks for robust error handling
  6. Message size limits: Be aware of frame size limits and implement chunking for large messages
  7. Authentication: Include proper authentication headers when establishing connections
  8. Resource cleanup: Always close clients and sessions to prevent resource leaks
  9. Concurrent access: Be careful with concurrent access to WebSocket sessions, they are not thread-safe
  10. Protocol negotiation: Use WebSocket subprotocols for structured communication protocols

Install with Tessl CLI

npx tessl i tessl/maven-io-ktor--ktor-client-core-tvosarm64

docs

builtin-plugins.md

caching.md

cookies.md

engine-configuration.md

forms.md

http-client.md

index.md

plugin-system.md

request-building.md

response-handling.md

response-observation.md

utilities.md

websockets.md

tile.json