or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

content-handling.mdcore-client.mdengine-configuration.mdindex.mdplugin-system.mdrequest-building.mdresponse-processing.mdserver-sent-events.mdwebsocket-support.md
tile.json

websocket-support.mddocs/

WebSocket Support

WebSocket client functionality with session management, frame handling, and connection lifecycle. The WebSocket implementation provides full-duplex communication with servers supporting text, binary, and control frames.

Capabilities

WebSocket Connection Functions

Functions for establishing WebSocket connections with various configuration options.

/**
 * Connect to WebSocket server and execute a block with the session
 */
suspend fun <T> HttpClient.webSocket(
    method: HttpMethod = HttpMethod.Get,
    host: String = "localhost",
    port: Int = DEFAULT_PORT,
    path: String = "/",
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend DefaultClientWebSocketSession.() -> T
): T

suspend fun <T> HttpClient.webSocket(
    urlString: String,
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend DefaultClientWebSocketSession.() -> T
): T

suspend fun <T> HttpClient.webSocket(
    url: Url,
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend DefaultClientWebSocketSession.() -> T
): T

/**
 * Create WebSocket session without automatic connection management
 */
suspend fun HttpClient.webSocketSession(
    method: HttpMethod = HttpMethod.Get,
    host: String = "localhost",
    port: Int = DEFAULT_PORT,
    path: String = "/",
    request: HttpRequestBuilder.() -> Unit = {}
): DefaultClientWebSocketSession

suspend fun HttpClient.webSocketSession(
    urlString: String,
    request: HttpRequestBuilder.() -> Unit = {}
): DefaultClientWebSocketSession

suspend fun HttpClient.webSocketSession(
    url: Url,
    request: HttpRequestBuilder.() -> Unit = {}
): DefaultClientWebSocketSession

WebSocket Session Interface

Session interface providing frame-based communication capabilities.

/**
 * Default client WebSocket session with frame handling
 */
interface DefaultClientWebSocketSession : ClientWebSocketSession {
    /** Incoming frames channel */
    val incoming: ReceiveChannel<Frame>
    
    /** Outgoing frames channel */
    val outgoing: SendChannel<Frame>
    
    /** Session call information */
    val call: HttpClientCall
    
    /** Close reason when session is terminated */
    val closeReason: Deferred<CloseReason?>
    
    /**
     * Send a frame to the server
     */
    suspend fun send(frame: Frame)
    
    /**
     * Send text message
     */
    suspend fun send(content: String)
    
    /**
     * Send binary data
     */
    suspend fun send(content: ByteArray)
    
    /**
     * Close the WebSocket connection
     */
    suspend fun close(reason: CloseReason = CloseReason(CloseReason.Codes.NORMAL, ""))
    
    /**
     * Flush outgoing frames
     */
    suspend fun flush()
    
    /**
     * Terminate the session immediately
     */
    fun terminate()
}

/**
 * Base client WebSocket session interface
 */
interface ClientWebSocketSession : WebSocketSession {
    /** Associated HTTP call */
    val call: HttpClientCall
}

/**
 * Base WebSocket session interface
 */
interface WebSocketSession {
    /** Incoming frames */
    val incoming: ReceiveChannel<Frame>
    
    /** Outgoing frames */
    val outgoing: SendChannel<Frame>
    
    /** Maximum frame size */
    var maxFrameSize: Long
    
    /** Masking flag */
    var masking: Boolean
    
    /** Content converter */
    val contentConverter: WebsocketContentConverter?
}

WebSocket Frames

Frame types for different kinds of WebSocket messages.

/**
 * Base WebSocket frame
 */
sealed class Frame {
    /** Frame is final (not fragmented) */
    abstract val fin: Boolean
    
    /** Reserved flags */
    abstract val rsv1: Boolean
    abstract val rsv2: Boolean  
    abstract val rsv3: Boolean
    
    /** Frame data */
    abstract val data: ByteArray
    
    /** Dispose frame resources */
    abstract fun dispose()
    
    /**
     * Text frame containing UTF-8 encoded string
     */
    class Text(
        override val fin: Boolean = true,
        val data: ByteArray,
        override val rsv1: Boolean = false,
        override val rsv2: Boolean = false,
        override val rsv3: Boolean = false
    ) : Frame() {
        constructor(text: String) : this(fin = true, data = text.toByteArray(Charsets.UTF_8))
        
        /** Get text content */
        fun readText(): String = data.toString(Charsets.UTF_8)
    }
    
    /**
     * Binary frame containing raw bytes
     */
    class Binary(
        override val fin: Boolean = true,
        override val data: ByteArray,
        override val rsv1: Boolean = false,
        override val rsv2: Boolean = false,
        override val rsv3: Boolean = false
    ) : Frame()
    
    /**
     * Close frame with optional reason
     */
    class Close(
        val reason: CloseReason? = null
    ) : Frame() {
        override val fin: Boolean = true
        override val data: ByteArray = reason?.let { /* encode reason */ } ?: ByteArray(0)
        override val rsv1: Boolean = false
        override val rsv2: Boolean = false
        override val rsv3: Boolean = false
    }
    
    /**
     * Ping frame for connection keep-alive
     */
    class Ping(
        override val data: ByteArray
    ) : Frame() {
        constructor(data: String = "") : this(data.toByteArray(Charsets.UTF_8))
        
        override val fin: Boolean = true
        override val rsv1: Boolean = false
        override val rsv2: Boolean = false
        override val rsv3: Boolean = false
    }
    
    /**
     * Pong frame in response to ping
     */
    class Pong(
        override val data: ByteArray,
        override val rsv1: Boolean = false,
        override val rsv2: Boolean = false,
        override val rsv3: Boolean = false
    ) : Frame() {
        constructor(data: String = "") : this(data.toByteArray(Charsets.UTF_8))
        
        override val fin: Boolean = true
    }
}

Close Reasons

Close reason codes and handling for WebSocket connections.

/**
 * WebSocket close reason
 */
data class CloseReason(
    val code: Short,
    val message: String
) {
    companion object Codes {
        /** Normal closure */
        const val NORMAL: Short = 1000
        
        /** Going away */
        const val GOING_AWAY: Short = 1001
        
        /** Protocol error */
        const val PROTOCOL_ERROR: Short = 1002
        
        /** Cannot accept data type */
        const val CANNOT_ACCEPT: Short = 1003
        
        /** Reserved */
        const val RESERVED: Short = 1004
        
        /** No status received */
        const val NO_STATUS: Short = 1005
        
        /** Closed abnormally */
        const val CLOSED_ABNORMALLY: Short = 1006
        
        /** Invalid data */
        const val NOT_CONSISTENT: Short = 1007
        
        /** Policy violation */
        const val VIOLATED_POLICY: Short = 1008
        
        /** Message too big */
        const val TOO_BIG: Short = 1009
        
        /** Extension required */
        const val NO_EXTENSION: Short = 1010
        
        /** Internal server error */
        const val INTERNAL_ERROR: Short = 1011
        
        /** Service restart */
        const val SERVICE_RESTART: Short = 1012
        
        /** Try again later */
        const val TRY_AGAIN_LATER: Short = 1013
        
        /** Bad gateway */
        const val BAD_GATEWAY: Short = 1014
        
        /** TLS handshake failure */
        const val TLS_HANDSHAKE_FAILURE: Short = 1015
    }
}

WebSocket Plugin

Plugin for configuring WebSocket functionality.

/**
 * WebSocket plugin for client configuration
 */
object WebSockets : HttpClientPlugin<WebSocketConfig, WebSocketConfig> {
    override val key: AttributeKey<WebSocketConfig>
}

/**
 * WebSocket configuration
 */
class WebSocketConfig {
    /** Maximum frame size in bytes */
    var maxFrameSize: Long = Long.MAX_VALUE
    
    /** Enable/disable frame masking */
    var masking: Boolean = true
    
    /** Ping interval for keep-alive */
    var pingInterval: Duration? = null
    
    /** Content converter for serialization */
    var contentConverter: WebsocketContentConverter? = null
    
    /** Custom extensions */
    val extensions: MutableList<WebSocketExtension<*>> = mutableListOf()
}

Content Conversion

Content conversion for serializing/deserializing WebSocket messages.

/**
 * WebSocket content converter interface
 */
interface WebsocketContentConverter {
    /**
     * Check if converter can handle the type
     */
    fun isApplicable(frame: Frame): Boolean
    
    /**
     * Deserialize frame to object
     */
    suspend fun deserialize(charset: Charset, typeInfo: TypeInfo, content: Frame): Any?
    
    /**
     * Serialize object to frame
     */
    suspend fun serializeNullable(charset: Charset, typeInfo: TypeInfo, value: Any?): Frame?
}

/**
 * Send typed data through WebSocket
 */
suspend inline fun <reified T> DefaultClientWebSocketSession.sendSerialized(data: T)

/**
 * Receive typed data from WebSocket
 */
suspend inline fun <reified T> DefaultClientWebSocketSession.receiveDeserialized(): T

Usage Examples:

import io.ktor.client.*
import io.ktor.client.plugins.websocket.*
import io.ktor.websocket.*
import kotlinx.coroutines.*

val client = HttpClient {
    install(WebSockets) {
        maxFrameSize = Long.MAX_VALUE
        masking = false
        pingInterval = Duration.ofSeconds(30)
    }
}

// Basic WebSocket communication
client.webSocket(host = "echo.websocket.org", port = 80, path = "/") {
    // Send text message
    send("Hello, WebSocket!")
    
    // Send binary data
    send(byteArrayOf(1, 2, 3, 4, 5))
    
    // Receive messages
    for (frame in incoming) {
        when (frame) {
            is Frame.Text -> {
                val text = frame.readText()
                println("Received text: $text")
                
                // Echo back
                send("Echo: $text")
            }
            is Frame.Binary -> {
                println("Received binary data: ${frame.data.size} bytes")
            }
            is Frame.Close -> {
                val reason = frame.reason
                println("Connection closed: ${reason?.code} - ${reason?.message}")
                break
            }
            is Frame.Ping -> {
                println("Received ping")
                // Pong is automatically sent by the framework
            }
            is Frame.Pong -> {
                println("Received pong")
            }
        }
    }
}

// WebSocket with custom headers and authentication
client.webSocket("wss://api.example.com/websocket") {
    headers {
        append("Authorization", "Bearer your-token-here")
        append("X-Client-Version", "1.0.0")
    }
} {
    // Connection established with custom headers
    
    // Send JSON-like message
    send("""{"type": "subscribe", "channel": "updates"}""")
    
    // Handle incoming messages
    while (true) {
        val frame = incoming.receive()
        if (frame is Frame.Text) {
            val message = frame.readText()
            println("Received: $message")
            
            // Parse and handle different message types
            when {
                message.contains("\"type\":\"heartbeat\"") -> {
                    send("""{"type": "pong"}""")
                }
                message.contains("\"type\":\"data\"") -> {
                    // Handle data message
                    println("Processing data message")
                }
                message.contains("\"type\":\"error\"") -> {
                    println("Server error: $message")
                    break
                }
            }
        }
    }
}

// Manual session management
val session = client.webSocketSession("ws://localhost:8080/ws")

// Use session
launch {
    // Sender coroutine
    repeat(10) { i ->
        session.send("Message $i")
        delay(1000)
    }
    session.close(CloseReason(CloseReason.Codes.NORMAL, "Done sending"))
}

launch {
    // Receiver coroutine
    try {
        for (frame in session.incoming) {
            when (frame) {
                is Frame.Text -> println("Received: ${frame.readText()}")
                is Frame.Close -> {
                    println("Session closed")
                    break
                }
                else -> Unit
            }
        }
    } catch (e: Exception) {
        println("Error in receiver: ${e.message}")
    }
}

// Wait for close reason
val closeReason = session.closeReason.await()
println("Session closed with reason: $closeReason")

// WebSocket with ping/pong handling
client.webSocket("ws://example.com/realtime") {
    launch {
        // Ping sender
        while (true) {
            delay(30000) // 30 seconds
            try {
                send(Frame.Ping("keep-alive".toByteArray()))
            } catch (e: Exception) {
                println("Failed to send ping: ${e.message}")
                break
            }
        }
    }
    
    // Message handler
    for (frame in incoming) {
        when (frame) {
            is Frame.Text -> {
                val text = frame.readText()
                println("Message: $text")
            }
            is Frame.Pong -> {
                println("Pong received - connection alive")
            }
            is Frame.Close -> {
                println("Connection closed by server")
                break
            }
            else -> Unit
        }
    }
}

// Graceful connection handling with error recovery
suspend fun connectWithRetry(maxRetries: Int = 3) {
    repeat(maxRetries) { attempt ->
        try {
            client.webSocket("wss://api.example.com/stream") {
                println("Connected successfully (attempt ${attempt + 1})")
                
                // Connection logic here
                for (frame in incoming) {
                    // Handle frames
                    if (frame is Frame.Text) {
                        println("Data: ${frame.readText()}")
                    }
                }
            }
            return // Success, exit retry loop
        } catch (e: Exception) {
            println("Connection attempt ${attempt + 1} failed: ${e.message}")
            if (attempt < maxRetries - 1) {
                val delay = (attempt + 1) * 1000L
                println("Retrying in ${delay}ms...")
                delay(delay)
            } else {
                println("All connection attempts failed")
                throw e
            }
        }
    }
}

// Use the retry function
connectWithRetry()

client.close()

WebSocket Extensions

Support for WebSocket extensions like compression.

/**
 * Base WebSocket extension
 */
abstract class WebSocketExtension<ConfigType : Any> {
    /** Extension name */
    abstract val name: String
    
    /** Factory for creating extension instances */
    abstract val factory: WebSocketExtensionFactory<ConfigType, out WebSocketExtension<ConfigType>>
    
    /** Extension protocols */
    abstract val protocols: List<WebSocketExtensionHeader>
}

/**
 * WebSocket extension factory
 */
interface WebSocketExtensionFactory<ConfigType : Any, ExtensionType : WebSocketExtension<ConfigType>> {
    /** Extension key */
    val key: AttributeKey<ExtensionType>
    
    /** Supported extension names */
    val rsv: WebSocketRsv
    
    /** Create extension instance */
    fun install(config: ConfigType): ExtensionType
}

/**
 * WebSocket extension header
 */
data class WebSocketExtensionHeader(
    val name: String,
    val parameters: List<WebSocketExtensionParameter> = emptyList()
)

/**
 * Extension parameter
 */
data class WebSocketExtensionParameter(
    val name: String,
    val value: String? = null
)