Ktor HTTP client core library providing asynchronous HTTP client capabilities for Kotlin multiplatform applications
—
Full WebSocket client implementation with session management, message handling, connection lifecycle, and support for both text and binary messaging with automatic connection management.
Core WebSocket plugin for HTTP client.
/**
* WebSocket plugin for HTTP client
*/
class WebSockets private constructor() {
companion object Plugin : HttpClientPlugin<Config, WebSockets> {
override val key: AttributeKey<WebSockets>
}
/**
* WebSocket plugin configuration
*/
class Config {
/** Ping interval in milliseconds */
var pingInterval: Long? = null
/** Connection timeout in milliseconds */
var timeout: Long = 15000
/** Maximum frame size */
var maxFrameSize: Long = Long.MAX_VALUE
/** Content converter for message serialization */
var contentConverter: WebsocketContentConverter? = null
}
}Establish WebSocket connections with various configuration options.
/**
* Establish WebSocket connection
* @param urlString - WebSocket URL (ws:// or wss://)
* @param block - WebSocket session handling block
*/
suspend fun HttpClient.webSocket(
urlString: String,
block: suspend ClientWebSocketSession.() -> Unit
)
suspend fun HttpClient.webSocket(
method: HttpMethod = HttpMethod.Get,
host: String = "localhost",
port: Int = DEFAULT_PORT,
path: String = "/",
block: suspend ClientWebSocketSession.() -> Unit
)
suspend fun HttpClient.webSocket(
block: HttpRequestBuilder.() -> Unit,
sessionBlock: suspend ClientWebSocketSession.() -> Unit
)
/**
* Establish secure WebSocket connection (wss://)
*/
suspend fun HttpClient.wss(
urlString: String,
block: suspend ClientWebSocketSession.() -> Unit
)
/**
* Establish insecure WebSocket connection (ws://)
*/
suspend fun HttpClient.ws(
urlString: String,
block: suspend ClientWebSocketSession.() -> Unit
)Usage Examples:
import io.ktor.client.plugins.websocket.*
import io.ktor.websocket.*
val client = HttpClient {
install(WebSockets) {
pingInterval = 20000
timeout = 15000
maxFrameSize = Long.MAX_VALUE
}
}
// Simple WebSocket connection
client.webSocket("wss://echo.websocket.org") {
// Send text message
send("Hello, WebSocket!")
// Receive messages
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val text = frame.readText()
println("Received: $text")
}
is Frame.Binary -> {
val bytes = frame.readBytes()
println("Received binary: ${bytes.size} bytes")
}
is Frame.Close -> {
println("Connection closed: ${frame.readReason()}")
break
}
}
}
}
// WebSocket with custom headers
client.webSocket({
url("wss://api.example.com/socket")
header("Authorization", "Bearer token123")
parameter("room", "chat-room-1")
}) { session ->
// Handle WebSocket session
session.send("JOIN room")
for (frame in incoming) {
// Process incoming frames
}
}WebSocket session interface for client-side connections.
/**
* Client-side WebSocket session
*/
interface ClientWebSocketSession : WebSocketSession {
/** Associated HTTP client call */
val call: HttpClientCall
}
/**
* Default implementation of client WebSocket session
*/
class DefaultClientWebSocketSession(
private val call: HttpClientCall,
delegate: WebSocketSession
) : ClientWebSocketSession, WebSocketSession by delegate
/**
* Base WebSocket session interface
*/
interface WebSocketSession : CoroutineScope {
/** Incoming frames channel */
val incoming: ReceiveChannel<Frame>
/** Outgoing frames channel */
val outgoing: SendChannel<Frame>
/** WebSocket extensions */
val extensions: List<WebSocketExtension<*>>
/** Session close reason */
val closeReason: Deferred<CloseReason?>
/** Send text frame */
suspend fun send(content: String)
/** Send binary frame */
suspend fun send(content: ByteArray)
/** Send frame */
suspend fun send(frame: Frame)
/** Flush outgoing frames */
suspend fun flush()
/** Close the WebSocket connection */
suspend fun close(reason: CloseReason? = null)
/** Terminate the WebSocket connection immediately */
fun terminate()
}Handle different types of WebSocket frames.
/**
* WebSocket frame types
*/
sealed class Frame {
/** Text frame containing UTF-8 text */
class Text(
val data: ByteArray,
val fin: Boolean = true,
val rsv1: Boolean = false,
val rsv2: Boolean = false,
val rsv3: Boolean = false
) : Frame() {
/** Read frame content as text */
fun readText(): String
/** Copy frame data */
fun copy(): ByteArray
}
/** Binary frame containing raw bytes */
class Binary(
val data: ByteArray,
val fin: Boolean = true,
val rsv1: Boolean = false,
val rsv2: Boolean = false,
val rsv3: Boolean = false
) : Frame() {
/** Read frame content as bytes */
fun readBytes(): ByteArray
/** Copy frame data */
fun copy(): ByteArray
}
/** Close frame with optional reason */
class Close(
val data: ByteArray = byteArrayOf()
) : Frame() {
/** Read close reason */
fun readReason(): CloseReason?
}
/** Ping frame for keep-alive */
class Ping(
val data: ByteArray
) : Frame()
/** Pong frame in response to ping */
class Pong(
val data: ByteArray
) : Frame()
}
/**
* WebSocket close reason
*/
data class CloseReason(
val code: Short,
val message: String
) {
companion object {
val NORMAL = CloseReason(1000, "Normal closure")
val GOING_AWAY = CloseReason(1001, "Going away")
val PROTOCOL_ERROR = CloseReason(1002, "Protocol error")
val CANNOT_ACCEPT = CloseReason(1003, "Cannot accept")
val NOT_CONSISTENT = CloseReason(1007, "Not consistent")
val VIOLATED_POLICY = CloseReason(1008, "Violated policy")
val TOO_BIG = CloseReason(1009, "Too big")
val NO_EXTENSION = CloseReason(1010, "No extension")
val INTERNAL_ERROR = CloseReason(1011, "Internal error")
val SERVICE_RESTART = CloseReason(1012, "Service restart")
val TRY_AGAIN_LATER = CloseReason(1013, "Try again later")
val TLS_HANDSHAKE_FAILED = CloseReason(1015, "TLS handshake failed")
}
}Usage Examples:
client.webSocket("wss://api.example.com/chat") {
// Send different types of frames
send("Hello, chat!") // Text frame
send(byteArrayOf(1, 2, 3, 4)) // Binary frame
// Send custom frames
send(Frame.Ping(byteArrayOf()))
// Handle incoming frames
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val message = frame.readText()
println("Chat message: $message")
// Echo back
send("Echo: $message")
}
is Frame.Binary -> {
val data = frame.readBytes()
println("Binary data: ${data.size} bytes")
// Process binary data
processBinaryData(data)
}
is Frame.Close -> {
val reason = frame.readReason()
println("Connection closed: ${reason?.message}")
break
}
is Frame.Ping -> {
// Respond to ping with pong
send(Frame.Pong(frame.data))
}
is Frame.Pong -> {
println("Received pong")
}
}
}
}Advanced session management and connection handling.
/**
* Get WebSocket session from HTTP call
*/
suspend fun HttpClient.webSocketSession(
block: HttpRequestBuilder.() -> Unit = {}
): ClientWebSocketSession
suspend fun HttpClient.webSocketSession(
urlString: String,
block: HttpRequestBuilder.() -> Unit = {}
): ClientWebSocketSession
/**
* WebSocket session utilities
*/
suspend fun ClientWebSocketSession.receiveDeserialized<T>(): T
suspend fun ClientWebSocketSession.sendSerialized(data: Any)Usage Examples:
// Get session without immediate processing
val session = client.webSocketSession("wss://api.example.com/socket") {
header("Authorization", "Bearer token")
}
try {
// Use session
session.send("Hello")
val response = session.incoming.receive()
when (response) {
is Frame.Text -> println(response.readText())
else -> println("Unexpected frame type")
}
} finally {
// Close session
session.close(CloseReason.NORMAL)
}
// With content negotiation
session.sendSerialized(ChatMessage("user1", "Hello everyone!"))
val message: ChatMessage = session.receiveDeserialized()Support for WebSocket protocol extensions.
/**
* WebSocket extension interface
*/
interface WebSocketExtension<ConfigType : Any> {
/** Extension name */
val name: String
/** Extension configuration */
val config: ConfigType
/** Process outgoing frame */
fun processOutgoingFrame(frame: Frame): Frame
/** Process incoming frame */
fun processIncomingFrame(frame: Frame): Frame
}
/**
* Common WebSocket extensions
*/
object WebSocketExtensions {
/** Deflate extension for compression */
val deflate: WebSocketExtension<DeflateConfig>
/** Per-message deflate extension */
val perMessageDeflate: WebSocketExtension<PerMessageDeflateConfig>
}Handle WebSocket-specific errors and exceptions.
/**
* WebSocket exception for connection errors
*/
class WebSocketException(
message: String,
cause: Throwable? = null
) : Exception(message, cause)
/**
* Exception for upgrade failures
*/
class WebSocketUpgradeException(
val response: HttpResponse
) : WebSocketException("WebSocket upgrade failed: ${response.status}")
/**
* Exception for protocol violations
*/
class WebSocketProtocolException(
message: String
) : WebSocketException(message)Usage Examples:
try {
client.webSocket("wss://invalid-endpoint.example.com") {
send("Hello")
// Handle session
}
} catch (e: WebSocketUpgradeException) {
println("Failed to upgrade to WebSocket: ${e.response.status}")
} catch (e: WebSocketProtocolException) {
println("Protocol error: ${e.message}")
} catch (e: WebSocketException) {
println("WebSocket error: ${e.message}")
}// WebSocket session types
interface WebSocketSession : CoroutineScope {
val coroutineContext: CoroutineContext
val incoming: ReceiveChannel<Frame>
val outgoing: SendChannel<Frame>
val extensions: List<WebSocketExtension<*>>
val closeReason: Deferred<CloseReason?>
}
// Delegating session implementation
class DelegatingClientWebSocketSession(
val delegate: WebSocketSession,
override val call: HttpClientCall
) : ClientWebSocketSession, WebSocketSession by delegate
// Content conversion types
interface WebsocketContentConverter {
suspend fun serialize(
charset: Charset,
typeInfo: TypeInfo,
value: Any?
): String
suspend fun deserialize(
charset: Charset,
typeInfo: TypeInfo,
content: String
): Any?
}
// Extension configuration types
data class DeflateConfig(
val serverMaxWindowBits: Int = 15,
val clientMaxWindowBits: Int = 15,
val serverNoContextTakeover: Boolean = false,
val clientNoContextTakeover: Boolean = false
)
data class PerMessageDeflateConfig(
val serverMaxWindowBits: Int = 15,
val clientMaxWindowBits: Int = 15,
val serverNoContextTakeover: Boolean = false,
val clientNoContextTakeover: Boolean = false,
val compressIfBiggerThan: Int = 1024
)Install with Tessl CLI
npx tessl i tessl/maven-io-ktor--ktor-client-core-jvm