CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/gradle-io-ktor--ktor-client-core-jvm

Multiplatform asynchronous HTTP client core library for JVM that provides request/response handling, plugin architecture, and extensible HTTP communication capabilities.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

websocket-support.mddocs/

WebSocket Support

WebSocket client functionality with session management, frame handling, and connection lifecycle. The WebSocket implementation provides full-duplex communication with servers supporting text, binary, and control frames.

Capabilities

WebSocket Connection Functions

Functions for establishing WebSocket connections with various configuration options.

/**
 * Connect to WebSocket server and execute a block with the session
 */
suspend fun <T> HttpClient.webSocket(
    method: HttpMethod = HttpMethod.Get,
    host: String = "localhost",
    port: Int = DEFAULT_PORT,
    path: String = "/",
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend DefaultClientWebSocketSession.() -> T
): T

suspend fun <T> HttpClient.webSocket(
    urlString: String,
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend DefaultClientWebSocketSession.() -> T
): T

suspend fun <T> HttpClient.webSocket(
    url: Url,
    request: HttpRequestBuilder.() -> Unit = {},
    block: suspend DefaultClientWebSocketSession.() -> T
): T

/**
 * Create WebSocket session without automatic connection management
 */
suspend fun HttpClient.webSocketSession(
    method: HttpMethod = HttpMethod.Get,
    host: String = "localhost",
    port: Int = DEFAULT_PORT,
    path: String = "/",
    request: HttpRequestBuilder.() -> Unit = {}
): DefaultClientWebSocketSession

suspend fun HttpClient.webSocketSession(
    urlString: String,
    request: HttpRequestBuilder.() -> Unit = {}
): DefaultClientWebSocketSession

suspend fun HttpClient.webSocketSession(
    url: Url,
    request: HttpRequestBuilder.() -> Unit = {}
): DefaultClientWebSocketSession

WebSocket Session Interface

Session interface providing frame-based communication capabilities.

/**
 * Default client WebSocket session with frame handling
 */
interface DefaultClientWebSocketSession : ClientWebSocketSession {
    /** Incoming frames channel */
    val incoming: ReceiveChannel<Frame>
    
    /** Outgoing frames channel */
    val outgoing: SendChannel<Frame>
    
    /** Session call information */
    val call: HttpClientCall
    
    /** Close reason when session is terminated */
    val closeReason: Deferred<CloseReason?>
    
    /**
     * Send a frame to the server
     */
    suspend fun send(frame: Frame)
    
    /**
     * Send text message
     */
    suspend fun send(content: String)
    
    /**
     * Send binary data
     */
    suspend fun send(content: ByteArray)
    
    /**
     * Close the WebSocket connection
     */
    suspend fun close(reason: CloseReason = CloseReason(CloseReason.Codes.NORMAL, ""))
    
    /**
     * Flush outgoing frames
     */
    suspend fun flush()
    
    /**
     * Terminate the session immediately
     */
    fun terminate()
}

/**
 * Base client WebSocket session interface
 */
interface ClientWebSocketSession : WebSocketSession {
    /** Associated HTTP call */
    val call: HttpClientCall
}

/**
 * Base WebSocket session interface
 */
interface WebSocketSession {
    /** Incoming frames */
    val incoming: ReceiveChannel<Frame>
    
    /** Outgoing frames */
    val outgoing: SendChannel<Frame>
    
    /** Maximum frame size */
    var maxFrameSize: Long
    
    /** Masking flag */
    var masking: Boolean
    
    /** Content converter */
    val contentConverter: WebsocketContentConverter?
}

WebSocket Frames

Frame types for different kinds of WebSocket messages.

/**
 * Base WebSocket frame
 */
sealed class Frame {
    /** Frame is final (not fragmented) */
    abstract val fin: Boolean
    
    /** Reserved flags */
    abstract val rsv1: Boolean
    abstract val rsv2: Boolean  
    abstract val rsv3: Boolean
    
    /** Frame data */
    abstract val data: ByteArray
    
    /** Dispose frame resources */
    abstract fun dispose()
    
    /**
     * Text frame containing UTF-8 encoded string
     */
    class Text(
        override val fin: Boolean = true,
        val data: ByteArray,
        override val rsv1: Boolean = false,
        override val rsv2: Boolean = false,
        override val rsv3: Boolean = false
    ) : Frame() {
        constructor(text: String) : this(fin = true, data = text.toByteArray(Charsets.UTF_8))
        
        /** Get text content */
        fun readText(): String = data.toString(Charsets.UTF_8)
    }
    
    /**
     * Binary frame containing raw bytes
     */
    class Binary(
        override val fin: Boolean = true,
        override val data: ByteArray,
        override val rsv1: Boolean = false,
        override val rsv2: Boolean = false,
        override val rsv3: Boolean = false
    ) : Frame()
    
    /**
     * Close frame with optional reason
     */
    class Close(
        val reason: CloseReason? = null
    ) : Frame() {
        override val fin: Boolean = true
        override val data: ByteArray = reason?.let { /* encode reason */ } ?: ByteArray(0)
        override val rsv1: Boolean = false
        override val rsv2: Boolean = false
        override val rsv3: Boolean = false
    }
    
    /**
     * Ping frame for connection keep-alive
     */
    class Ping(
        override val data: ByteArray
    ) : Frame() {
        constructor(data: String = "") : this(data.toByteArray(Charsets.UTF_8))
        
        override val fin: Boolean = true
        override val rsv1: Boolean = false
        override val rsv2: Boolean = false
        override val rsv3: Boolean = false
    }
    
    /**
     * Pong frame in response to ping
     */
    class Pong(
        override val data: ByteArray,
        override val rsv1: Boolean = false,
        override val rsv2: Boolean = false,
        override val rsv3: Boolean = false
    ) : Frame() {
        constructor(data: String = "") : this(data.toByteArray(Charsets.UTF_8))
        
        override val fin: Boolean = true
    }
}

Close Reasons

Close reason codes and handling for WebSocket connections.

/**
 * WebSocket close reason
 */
data class CloseReason(
    val code: Short,
    val message: String
) {
    companion object Codes {
        /** Normal closure */
        const val NORMAL: Short = 1000
        
        /** Going away */
        const val GOING_AWAY: Short = 1001
        
        /** Protocol error */
        const val PROTOCOL_ERROR: Short = 1002
        
        /** Cannot accept data type */
        const val CANNOT_ACCEPT: Short = 1003
        
        /** Reserved */
        const val RESERVED: Short = 1004
        
        /** No status received */
        const val NO_STATUS: Short = 1005
        
        /** Closed abnormally */
        const val CLOSED_ABNORMALLY: Short = 1006
        
        /** Invalid data */
        const val NOT_CONSISTENT: Short = 1007
        
        /** Policy violation */
        const val VIOLATED_POLICY: Short = 1008
        
        /** Message too big */
        const val TOO_BIG: Short = 1009
        
        /** Extension required */
        const val NO_EXTENSION: Short = 1010
        
        /** Internal server error */
        const val INTERNAL_ERROR: Short = 1011
        
        /** Service restart */
        const val SERVICE_RESTART: Short = 1012
        
        /** Try again later */
        const val TRY_AGAIN_LATER: Short = 1013
        
        /** Bad gateway */
        const val BAD_GATEWAY: Short = 1014
        
        /** TLS handshake failure */
        const val TLS_HANDSHAKE_FAILURE: Short = 1015
    }
}

WebSocket Plugin

Plugin for configuring WebSocket functionality.

/**
 * WebSocket plugin for client configuration
 */
object WebSockets : HttpClientPlugin<WebSocketConfig, WebSocketConfig> {
    override val key: AttributeKey<WebSocketConfig>
}

/**
 * WebSocket configuration
 */
class WebSocketConfig {
    /** Maximum frame size in bytes */
    var maxFrameSize: Long = Long.MAX_VALUE
    
    /** Enable/disable frame masking */
    var masking: Boolean = true
    
    /** Ping interval for keep-alive */
    var pingInterval: Duration? = null
    
    /** Content converter for serialization */
    var contentConverter: WebsocketContentConverter? = null
    
    /** Custom extensions */
    val extensions: MutableList<WebSocketExtension<*>> = mutableListOf()
}

Content Conversion

Content conversion for serializing/deserializing WebSocket messages.

/**
 * WebSocket content converter interface
 */
interface WebsocketContentConverter {
    /**
     * Check if converter can handle the type
     */
    fun isApplicable(frame: Frame): Boolean
    
    /**
     * Deserialize frame to object
     */
    suspend fun deserialize(charset: Charset, typeInfo: TypeInfo, content: Frame): Any?
    
    /**
     * Serialize object to frame
     */
    suspend fun serializeNullable(charset: Charset, typeInfo: TypeInfo, value: Any?): Frame?
}

/**
 * Send typed data through WebSocket
 */
suspend inline fun <reified T> DefaultClientWebSocketSession.sendSerialized(data: T)

/**
 * Receive typed data from WebSocket
 */
suspend inline fun <reified T> DefaultClientWebSocketSession.receiveDeserialized(): T

Usage Examples:

import io.ktor.client.*
import io.ktor.client.plugins.websocket.*
import io.ktor.websocket.*
import kotlinx.coroutines.*

val client = HttpClient {
    install(WebSockets) {
        maxFrameSize = Long.MAX_VALUE
        masking = false
        pingInterval = Duration.ofSeconds(30)
    }
}

// Basic WebSocket communication
client.webSocket(host = "echo.websocket.org", port = 80, path = "/") {
    // Send text message
    send("Hello, WebSocket!")
    
    // Send binary data
    send(byteArrayOf(1, 2, 3, 4, 5))
    
    // Receive messages
    for (frame in incoming) {
        when (frame) {
            is Frame.Text -> {
                val text = frame.readText()
                println("Received text: $text")
                
                // Echo back
                send("Echo: $text")
            }
            is Frame.Binary -> {
                println("Received binary data: ${frame.data.size} bytes")
            }
            is Frame.Close -> {
                val reason = frame.reason
                println("Connection closed: ${reason?.code} - ${reason?.message}")
                break
            }
            is Frame.Ping -> {
                println("Received ping")
                // Pong is automatically sent by the framework
            }
            is Frame.Pong -> {
                println("Received pong")
            }
        }
    }
}

// WebSocket with custom headers and authentication
client.webSocket("wss://api.example.com/websocket") {
    headers {
        append("Authorization", "Bearer your-token-here")
        append("X-Client-Version", "1.0.0")
    }
} {
    // Connection established with custom headers
    
    // Send JSON-like message
    send("""{"type": "subscribe", "channel": "updates"}""")
    
    // Handle incoming messages
    while (true) {
        val frame = incoming.receive()
        if (frame is Frame.Text) {
            val message = frame.readText()
            println("Received: $message")
            
            // Parse and handle different message types
            when {
                message.contains("\"type\":\"heartbeat\"") -> {
                    send("""{"type": "pong"}""")
                }
                message.contains("\"type\":\"data\"") -> {
                    // Handle data message
                    println("Processing data message")
                }
                message.contains("\"type\":\"error\"") -> {
                    println("Server error: $message")
                    break
                }
            }
        }
    }
}

// Manual session management
val session = client.webSocketSession("ws://localhost:8080/ws")

// Use session
launch {
    // Sender coroutine
    repeat(10) { i ->
        session.send("Message $i")
        delay(1000)
    }
    session.close(CloseReason(CloseReason.Codes.NORMAL, "Done sending"))
}

launch {
    // Receiver coroutine
    try {
        for (frame in session.incoming) {
            when (frame) {
                is Frame.Text -> println("Received: ${frame.readText()}")
                is Frame.Close -> {
                    println("Session closed")
                    break
                }
                else -> Unit
            }
        }
    } catch (e: Exception) {
        println("Error in receiver: ${e.message}")
    }
}

// Wait for close reason
val closeReason = session.closeReason.await()
println("Session closed with reason: $closeReason")

// WebSocket with ping/pong handling
client.webSocket("ws://example.com/realtime") {
    launch {
        // Ping sender
        while (true) {
            delay(30000) // 30 seconds
            try {
                send(Frame.Ping("keep-alive".toByteArray()))
            } catch (e: Exception) {
                println("Failed to send ping: ${e.message}")
                break
            }
        }
    }
    
    // Message handler
    for (frame in incoming) {
        when (frame) {
            is Frame.Text -> {
                val text = frame.readText()
                println("Message: $text")
            }
            is Frame.Pong -> {
                println("Pong received - connection alive")
            }
            is Frame.Close -> {
                println("Connection closed by server")
                break
            }
            else -> Unit
        }
    }
}

// Graceful connection handling with error recovery
suspend fun connectWithRetry(maxRetries: Int = 3) {
    repeat(maxRetries) { attempt ->
        try {
            client.webSocket("wss://api.example.com/stream") {
                println("Connected successfully (attempt ${attempt + 1})")
                
                // Connection logic here
                for (frame in incoming) {
                    // Handle frames
                    if (frame is Frame.Text) {
                        println("Data: ${frame.readText()}")
                    }
                }
            }
            return // Success, exit retry loop
        } catch (e: Exception) {
            println("Connection attempt ${attempt + 1} failed: ${e.message}")
            if (attempt < maxRetries - 1) {
                val delay = (attempt + 1) * 1000L
                println("Retrying in ${delay}ms...")
                delay(delay)
            } else {
                println("All connection attempts failed")
                throw e
            }
        }
    }
}

// Use the retry function
connectWithRetry()

client.close()

WebSocket Extensions

Support for WebSocket extensions like compression.

/**
 * Base WebSocket extension
 */
abstract class WebSocketExtension<ConfigType : Any> {
    /** Extension name */
    abstract val name: String
    
    /** Factory for creating extension instances */
    abstract val factory: WebSocketExtensionFactory<ConfigType, out WebSocketExtension<ConfigType>>
    
    /** Extension protocols */
    abstract val protocols: List<WebSocketExtensionHeader>
}

/**
 * WebSocket extension factory
 */
interface WebSocketExtensionFactory<ConfigType : Any, ExtensionType : WebSocketExtension<ConfigType>> {
    /** Extension key */
    val key: AttributeKey<ExtensionType>
    
    /** Supported extension names */
    val rsv: WebSocketRsv
    
    /** Create extension instance */
    fun install(config: ConfigType): ExtensionType
}

/**
 * WebSocket extension header
 */
data class WebSocketExtensionHeader(
    val name: String,
    val parameters: List<WebSocketExtensionParameter> = emptyList()
)

/**
 * Extension parameter
 */
data class WebSocketExtensionParameter(
    val name: String,
    val value: String? = null
)

docs

content-handling.md

core-client.md

engine-configuration.md

index.md

plugin-system.md

request-building.md

response-processing.md

server-sent-events.md

websocket-support.md

tile.json