Ktor HTTP Client Core - a multiplatform asynchronous HTTP client library for Kotlin providing comprehensive HTTP request/response handling with plugin architecture.
—
Comprehensive WebSocket client functionality with message handling, connection management, and frame processing.
Establish WebSocket connections with various configuration options.
/**
* Establish WebSocket connection over HTTP
* @param urlString WebSocket URL (ws://)
* @param block WebSocket session handling block
*/
suspend fun HttpClient.webSocket(
urlString: String,
block: suspend DefaultClientWebSocketSession.() -> Unit
)
/**
* Establish secure WebSocket connection over HTTPS
* @param urlString Secure WebSocket URL (wss://)
* @param block WebSocket session handling block
*/
suspend fun HttpClient.wss(
urlString: String,
block: suspend DefaultClientWebSocketSession.() -> Unit
)
/**
* Establish WebSocket connection with detailed configuration
* @param method HTTP method for handshake (usually GET)
* @param host WebSocket server host
* @param port WebSocket server port
* @param path WebSocket endpoint path
* @param block WebSocket session handling block
*/
suspend fun HttpClient.webSocket(
method: HttpMethod = HttpMethod.Get,
host: String,
port: Int,
path: String,
block: suspend DefaultClientWebSocketSession.() -> Unit
)
/**
* Get WebSocket session without automatic message handling
* @param urlString WebSocket URL
* @returns WebSocket session for manual management
*/
suspend fun HttpClient.webSocketSession(urlString: String): DefaultClientWebSocketSession
/**
* Establish WebSocket connection (alias for webSocket)
* @param urlString WebSocket URL (ws://)
* @param block WebSocket session handling block
*/
suspend fun HttpClient.ws(
urlString: String,
block: suspend DefaultClientWebSocketSession.() -> Unit
)
/**
* Get WebSocket session with detailed configuration
* @param method HTTP method for handshake
* @param host WebSocket server host
* @param port WebSocket server port
* @param path WebSocket endpoint path
* @returns WebSocket session for manual management
*/
suspend fun HttpClient.webSocketSession(
method: HttpMethod = HttpMethod.Get,
host: String,
port: Int,
path: String
): DefaultClientWebSocketSessionUsage Examples:
val client = HttpClient {
install(WebSockets)
}
// Basic WebSocket connection
client.webSocket("ws://echo.websocket.org") {
// Send text message
send(Frame.Text("Hello, WebSocket!"))
// Receive and process messages
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val message = frame.readText()
println("Received: $message")
}
is Frame.Binary -> {
val data = frame.readBytes()
println("Received binary data: ${data.size} bytes")
}
is Frame.Close -> {
println("Connection closed")
break
}
}
}
}
// Secure WebSocket connection
client.wss("wss://secure-websocket.example.com/chat") {
// Send JSON message
val jsonMessage = """{"type": "join", "room": "general"}"""
send(Frame.Text(jsonMessage))
// Handle incoming messages
while (true) {
val frame = incoming.receive()
if (frame is Frame.Text) {
val response = frame.readText()
println("Server response: $response")
}
if (frame is Frame.Close) break
}
}
// WebSocket with custom headers
client.webSocket("ws://localhost:8080/websocket") {
// The connection is established with custom headers if needed
send(Frame.Text("Connected with custom configuration"))
// Process messages in a loop
incoming.consumeEach { frame ->
when (frame) {
is Frame.Text -> handleTextMessage(frame.readText())
is Frame.Binary -> handleBinaryMessage(frame.readBytes())
is Frame.Pong -> println("Received pong")
is Frame.Close -> println("WebSocket closed")
}
}
}Core WebSocket session interface for message handling.
/**
* WebSocket session interface
*/
interface ClientWebSocketSession : WebSocketSession {
/**
* Incoming frames channel
*/
val incoming: ReceiveChannel<Frame>
/**
* Outgoing frames channel
*/
val outgoing: SendChannel<Frame>
/**
* Extensions negotiated during handshake
*/
val extensions: List<WebSocketExtension<*>>
/**
* Send a frame to the server
* @param frame Frame to send
*/
suspend fun send(frame: Frame)
/**
* Flush outgoing frames
*/
suspend fun flush()
/**
* Close the WebSocket connection
* @param reason Close reason
*/
suspend fun close(reason: CloseReason? = null)
/**
* Ping the server
* @param data Optional ping data
*/
suspend fun ping(data: ByteArray)
/**
* Send pong response
* @param data Pong data
*/
suspend fun pong(data: ByteArray)
/**
* Get close reason if connection is closed
* @returns Close reason or null
*/
suspend fun closeReason(): CloseReason?
}
/**
* Default WebSocket session implementation
*/
class DefaultClientWebSocketSession(
override val call: HttpClientCall,
override val coroutineContext: CoroutineContext
) : ClientWebSocketSession, CoroutineScopeUsage Examples:
// Manual session management
val session = client.webSocketSession("ws://example.com/chat")
// Send different types of frames
session.send(Frame.Text("Hello"))
session.send(Frame.Binary(byteArrayOf(1, 2, 3, 4)))
session.ping(byteArrayOf())
// Receive frames manually
val frame = session.incoming.receive()
when (frame) {
is Frame.Text -> println("Text: ${frame.readText()}")
is Frame.Binary -> println("Binary: ${frame.readBytes().size} bytes")
is Frame.Ping -> {
println("Received ping")
session.pong(frame.data)
}
is Frame.Pong -> println("Received pong")
is Frame.Close -> {
println("Connection closed: ${frame.readReason()}")
session.close()
}
}
// Flush outgoing frames
session.flush()
// Close with reason
session.close(CloseReason(CloseReason.Codes.NORMAL, "Goodbye"))Frame types for different WebSocket message formats.
/**
* WebSocket frame sealed class
*/
sealed class Frame {
/** Frame data as byte array */
abstract val data: ByteArray
/** Whether this is a final frame */
abstract val fin: Boolean
/**
* Text frame for string messages
*/
data class Text(
val text: String,
override val fin: Boolean = true
) : Frame() {
override val data: ByteArray get() = text.encodeToByteArray()
/**
* Read frame content as text
* @returns Frame text content
*/
fun readText(): String = text
}
/**
* Binary frame for byte data
*/
data class Binary(
override val data: ByteArray,
override val fin: Boolean = true
) : Frame() {
/**
* Read frame content as bytes
* @returns Frame binary content
*/
fun readBytes(): ByteArray = data
}
/**
* Close frame for connection termination
*/
data class Close(
val reason: CloseReason? = null
) : Frame() {
override val data: ByteArray = reason?.let {
ByteBuffer.allocate(2 + it.message.length)
.putShort(it.code.toShort())
.put(it.message.encodeToByteArray())
.array()
} ?: byteArrayOf()
override val fin: Boolean = true
/**
* Read close reason from frame
* @returns Close reason
*/
fun readReason(): CloseReason? = reason
}
/**
* Ping frame for connection testing
*/
data class Ping(
override val data: ByteArray
) : Frame() {
override val fin: Boolean = true
}
/**
* Pong frame for ping responses
*/
data class Pong(
override val data: ByteArray
) : Frame() {
override val fin: Boolean = true
}
}Usage Examples:
client.webSocket("ws://example.com") {
// Send different frame types
send(Frame.Text("Hello, World!"))
send(Frame.Binary(byteArrayOf(0x01, 0x02, 0x03)))
send(Frame.Ping(byteArrayOf()))
// Process incoming frames
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val message = frame.readText()
println("Received text: $message")
// Echo back
send(Frame.Text("Echo: $message"))
}
is Frame.Binary -> {
val bytes = frame.readBytes()
println("Received ${bytes.size} bytes")
// Process binary data
processBinaryData(bytes)
}
is Frame.Ping -> {
println("Received ping")
send(Frame.Pong(frame.data))
}
is Frame.Pong -> {
println("Received pong response")
}
is Frame.Close -> {
val closeReason = frame.readReason()
println("Connection closed: ${closeReason?.message}")
break
}
}
}
}WebSocket plugin installation and configuration.
/**
* WebSocket plugin object
*/
object WebSockets : HttpClientPlugin<WebSocketConfig, WebSocketConfig> {
override val key: AttributeKey<WebSocketConfig>
override fun prepare(block: WebSocketConfig.() -> Unit): WebSocketConfig
override fun install(plugin: WebSocketConfig, scope: HttpClient)
}
/**
* WebSocket configuration
*/
class WebSocketConfig {
/** Maximum frame size in bytes */
var maxFrameSize: Long = Long.MAX_VALUE
/** Ping interval in milliseconds */
var pingInterval: Long? = null
/** Extensions to negotiate */
var extensions: MutableList<WebSocketExtensionConfig<*>> = mutableListOf()
/** Content converter for automatic serialization */
var contentConverter: WebSocketContentConverter? = null
/**
* Add WebSocket extension
* @param extension Extension configuration
*/
fun <T : Any> extensions(extension: WebSocketExtensionConfig<T>)
}Usage Examples:
val client = HttpClient {
install(WebSockets) {
maxFrameSize = 1024 * 1024 // 1MB max frame size
pingInterval = 30_000 // Ping every 30 seconds
// Add extensions if needed
// extensions(SomeWebSocketExtension())
}
}
// Use configured WebSocket client
client.webSocket("ws://example.com/realtime") {
// WebSocket communication with configured settings
send(Frame.Text("Message within size limits"))
// Automatic ping/pong handling based on pingInterval
for (frame in incoming) {
when (frame) {
is Frame.Text -> handleMessage(frame.readText())
is Frame.Close -> break
// Ping/Pong frames handled automatically
}
}
}WebSocket connection close handling with standard status codes.
/**
* WebSocket close reason
*/
data class CloseReason(
val code: Short,
val message: String
) {
constructor(knownReason: Codes, message: String) : this(knownReason.code, message)
/**
* Standard WebSocket close codes
*/
enum class Codes(val code: Short) {
NORMAL(1000),
GOING_AWAY(1001),
PROTOCOL_ERROR(1002),
CANNOT_ACCEPT(1003),
NOT_CONSISTENT(1007),
VIOLATED_POLICY(1008),
TOO_BIG(1009),
NO_EXTENSION(1010),
INTERNAL_ERROR(1011),
SERVICE_RESTART(1012),
TRY_AGAIN_LATER(1013),
BAD_GATEWAY(1014)
}
}Usage Examples:
client.webSocket("ws://example.com") {
try {
// WebSocket communication
send(Frame.Text("Hello"))
for (frame in incoming) {
when (frame) {
is Frame.Close -> {
val reason = frame.readReason()
when (reason?.code) {
CloseReason.Codes.NORMAL.code ->
println("Normal closure")
CloseReason.Codes.GOING_AWAY.code ->
println("Server going away")
CloseReason.Codes.PROTOCOL_ERROR.code ->
println("Protocol error occurred")
else ->
println("Closed with code: ${reason?.code}, message: ${reason?.message}")
}
break
}
// Handle other frame types
}
}
} catch (e: Exception) {
// Close with error status
close(CloseReason(CloseReason.Codes.INTERNAL_ERROR, "Client error: ${e.message}"))
}
}
// Graceful shutdown
client.webSocket("ws://example.com") {
// Do work...
// Close normally when done
close(CloseReason(CloseReason.Codes.NORMAL, "Work completed"))
}/**
* WebSocket session base interface
*/
interface WebSocketSession : CoroutineScope {
val call: HttpClientCall
val incoming: ReceiveChannel<Frame>
val outgoing: SendChannel<Frame>
val extensions: List<WebSocketExtension<*>>
suspend fun flush()
suspend fun close(reason: CloseReason? = null)
}
/**
* WebSocket extension interface
*/
interface WebSocketExtension<out ConfigType : Any> {
val factory: WebSocketExtensionFactory<ConfigType, out WebSocketExtension<ConfigType>>
val protocols: List<String>
fun processOutgoingFrame(frame: Frame): Frame
fun processIncomingFrame(frame: Frame): Frame
}
/**
* WebSocket content converter for automatic serialization
*/
interface WebSocketContentConverter {
suspend fun serialize(value: Any): Frame
suspend fun deserialize(frame: Frame, type: TypeInfo): Any?
}/**
* Channel for receiving frames
*/
interface ReceiveChannel<out E> {
val isClosedForReceive: Boolean
val isEmpty: Boolean
suspend fun receive(): E
suspend fun receiveCatching(): ChannelResult<E>
fun tryReceive(): ChannelResult<E>
suspend fun consumeEach(action: suspend (E) -> Unit)
operator fun iterator(): ChannelIterator<E>
}
/**
* Channel for sending frames
*/
interface SendChannel<in E> {
val isClosedForSend: Boolean
suspend fun send(element: E)
fun trySend(element: E): ChannelResult<Unit>
fun close(cause: Throwable? = null): Boolean
suspend fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
}Install with Tessl CLI
npx tessl i tessl/maven-io-ktor--ktor-client-core