Ktor client WebSockets plugin for Linux x64 platform - enables full-duplex communication between client and server over TCP connection
—
WebSocket session interfaces providing frame-level control, connection lifecycle management, and extension support for real-time bidirectional communication.
Base interface for client-specific WebSocket sessions with access to the underlying HTTP call.
/**
* Client specific WebSocket session interface
* Extends WebSocketSession with client-specific functionality
*/
interface ClientWebSocketSession : WebSocketSession {
/** HttpClientCall associated with this WebSocket session */
val call: HttpClientCall
}Default implementation of client WebSocket session with full feature support.
/**
* Default implementation of ClientWebSocketSession
* Provides complete WebSocket functionality with ping/pong support
* @param call HttpClientCall associated with session
* @param delegate Underlying DefaultWebSocketSession implementation
*/
class DefaultClientWebSocketSession(
override val call: HttpClientCall,
delegate: DefaultWebSocketSession
) : ClientWebSocketSession, DefaultWebSocketSession by delegateCore WebSocket session interface providing frame communication and lifecycle management.
/**
* Base WebSocket session interface for bidirectional communication
* Extends CoroutineScope for structured concurrency support
*/
interface WebSocketSession : CoroutineScope {
/** Enable/disable masking of outgoing messages with random XOR mask */
var masking: Boolean
/** Frame size limit - connection closed if exceeded */
var maxFrameSize: Long
/** Channel for receiving incoming WebSocket frames */
val incoming: ReceiveChannel<Frame>
/** Channel for sending outgoing WebSocket frames */
val outgoing: SendChannel<Frame>
/** List of installed WebSocket extensions */
val extensions: List<WebSocketExtension<*>>
}Usage Examples:
client.webSocket("ws://echo.websocket.org") {
// Configure session properties
maxFrameSize = 1024 * 1024 // 1MB limit
masking = true
// Send frames through outgoing channel
outgoing.send(Frame.Text("Hello WebSocket!"))
// Receive frames from incoming channel
for (frame in incoming) {
when (frame) {
is Frame.Text -> println("Text: ${frame.readText()}")
is Frame.Binary -> println("Binary: ${frame.data.size} bytes")
is Frame.Close -> break
}
}
}Extended WebSocket session with ping/pong functionality and timeout handling.
/**
* Default WebSocket session with ping/pong support
* Provides automatic keep-alive functionality
*/
interface DefaultWebSocketSession : WebSocketSession {
/** Interval between ping messages in milliseconds */
val pingIntervalMillis: Long
/** Timeout for ping/pong roundtrip in milliseconds */
val timeoutMillis: Long
}
/**
* Create DefaultWebSocketSession with ping/pong configuration
* @param session Underlying WebSocket session
* @param pingIntervalMillis Ping interval in milliseconds
* @param timeoutMillis Ping timeout in milliseconds
* @return Configured DefaultWebSocketSession
*/
fun DefaultWebSocketSession(
session: WebSocketSession,
pingIntervalMillis: Long,
timeoutMillis: Long = pingIntervalMillis * 2
): DefaultWebSocketSessionUtility functions for working with WebSocket extensions.
/**
* Get WebSocket extension instance by factory
* @param extension Extension factory to look up
* @return Extension instance
* @throws NoSuchElementException if extension not found
*/
fun <T : WebSocketExtension<*>> WebSocketSession.extension(
extension: WebSocketExtensionFactory<*, T>
): T
/**
* Get WebSocket extension instance by factory, or null if not found
* @param extension Extension factory to look up
* @return Extension instance or null
*/
fun <T : WebSocketExtension<*>> WebSocketSession.extensionOrNull(
extension: WebSocketExtensionFactory<*, T>
): T?
/**
* Send text message as Text frame
* @param content Text content to send
*/
suspend fun WebSocketSession.send(content: String)
/**
* Send binary data as Binary frame
* @param content Binary data to send
*/
suspend fun WebSocketSession.send(content: ByteArray)
/**
* Send WebSocket frame
* @param frame Frame to send
*/
suspend fun WebSocketSession.send(frame: Frame)Usage Examples:
client.webSocket("ws://example.com") {
// Check if compression extension is available
val deflate = extensionOrNull(WebSocketDeflateExtension)
if (deflate != null) {
println("Using compression: ${deflate.compressionLevel}")
}
// Get required extension (throws if not found)
try {
val customExt = extension(CustomWebSocketExtension)
customExt.configure()
} catch (e: NoSuchElementException) {
println("Custom extension not available")
}
}client.webSocket("ws://echo.websocket.org") {
// Session is DefaultClientWebSocketSession
println("Connected to: ${call.request.url}")
println("Max frame size: $maxFrameSize")
// Send text message
send("Hello!")
// Process incoming messages
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val text = frame.readText()
println("Received: $text")
if (text == "bye") break
}
is Frame.Close -> {
println("Connection closed")
break
}
}
}
}val session = client.webSocketSession("ws://api.example.com/ws")
try {
// Configure session
session.maxFrameSize = 512 * 1024
// Send initial message
session.outgoing.send(Frame.Text("CONNECT"))
// Handle responses
while (!session.incoming.isClosedForReceive) {
val frame = session.incoming.receiveCatching().getOrNull() ?: break
when (frame) {
is Frame.Text -> processMessage(frame.readText())
is Frame.Binary -> processBinary(frame.data)
is Frame.Ping -> session.outgoing.send(Frame.Pong(frame.data))
is Frame.Close -> break
}
}
} finally {
session.close()
}client.webSocket("ws://api.example.com/realtime") {
println("Ping interval: ${pingIntervalMillis}ms")
println("Timeout: ${timeoutMillis}ms")
// Monitor connection health
launch {
while (isActive) {
delay(5000) // Check every 5 seconds
if (incoming.isClosedForReceive) {
println("Connection lost")
break
}
}
}
// Handle messages
for (frame in incoming) {
when (frame) {
is Frame.Text -> handleMessage(frame.readText())
is Frame.Ping -> {
println("Received ping")
// Pong is sent automatically
}
is Frame.Pong -> println("Received pong")
is Frame.Close -> {
val reason = frame.readReason()
println("Connection closed: ${reason?.message}")
break
}
}
}
}client.webSocket("ws://chat.example.com") {
// Separate coroutines for sending and receiving
val sendJob = launch {
while (isActive) {
val message = readLine() ?: break
outgoing.send(Frame.Text(message))
}
}
val receiveJob = launch {
for (frame in incoming) {
when (frame) {
is Frame.Text -> println(">> ${frame.readText()}")
is Frame.Close -> break
}
}
}
// Wait for either job to complete
select {
sendJob.onJoin { println("Send completed") }
receiveJob.onJoin { println("Receive completed") }
}
// Cancel remaining job
sendJob.cancel()
receiveJob.cancel()
}client.webSocket("ws://api.example.com") {
// List all active extensions
println("Active extensions:")
extensions.forEach { ext ->
println("- ${ext::class.simpleName}")
}
// Check for specific extension
extensionOrNull(WebSocketDeflateExtension)?.let { deflate ->
println("Compression enabled: level ${deflate.compressionLevel}")
}
// Session logic continues...
}try {
client.webSocket("ws://unreliable.example.com") {
// Configure larger timeout for unreliable connection
maxFrameSize = 2 * 1024 * 1024 // 2MB
for (frame in incoming) {
try {
when (frame) {
is Frame.Text -> processText(frame.readText())
is Frame.Binary -> processBinary(frame.data)
}
} catch (e: Exception) {
println("Frame processing error: ${e.message}")
// Continue processing other frames
}
}
}
} catch (e: WebSocketException) {
println("WebSocket error: ${e.message}")
} catch (e: ProtocolViolationException) {
println("Protocol violation: ${e.message}")
}Install with Tessl CLI
npx tessl i tessl/maven-io-ktor--ktor-client-websockets-linuxx64