CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-ktor--ktor-client-websockets-linuxx64

Ktor client WebSockets plugin for Linux x64 platform - enables full-duplex communication between client and server over TCP connection

Pending
Overview
Eval results
Files

session-management.mddocs/

Session Management

WebSocket session interfaces providing frame-level control, connection lifecycle management, and extension support for real-time bidirectional communication.

Capabilities

Client WebSocket Session Interface

Base interface for client-specific WebSocket sessions with access to the underlying HTTP call.

/**
 * Client specific WebSocket session interface
 * Extends WebSocketSession with client-specific functionality
 */
interface ClientWebSocketSession : WebSocketSession {
    /** HttpClientCall associated with this WebSocket session */
    val call: HttpClientCall
}

Default Client WebSocket Session

Default implementation of client WebSocket session with full feature support.

/**
 * Default implementation of ClientWebSocketSession
 * Provides complete WebSocket functionality with ping/pong support
 * @param call HttpClientCall associated with session
 * @param delegate Underlying DefaultWebSocketSession implementation
 */
class DefaultClientWebSocketSession(
    override val call: HttpClientCall,
    delegate: DefaultWebSocketSession
) : ClientWebSocketSession, DefaultWebSocketSession by delegate

Base WebSocket Session Interface

Core WebSocket session interface providing frame communication and lifecycle management.

/**
 * Base WebSocket session interface for bidirectional communication
 * Extends CoroutineScope for structured concurrency support
 */
interface WebSocketSession : CoroutineScope {
    /** Enable/disable masking of outgoing messages with random XOR mask */
    var masking: Boolean
    
    /** Frame size limit - connection closed if exceeded */
    var maxFrameSize: Long
    
    /** Channel for receiving incoming WebSocket frames */
    val incoming: ReceiveChannel<Frame>
    
    /** Channel for sending outgoing WebSocket frames */
    val outgoing: SendChannel<Frame>
    
    /** List of installed WebSocket extensions */
    val extensions: List<WebSocketExtension<*>>
}

Usage Examples:

client.webSocket("ws://echo.websocket.org") {
    // Configure session properties
    maxFrameSize = 1024 * 1024 // 1MB limit
    masking = true
    
    // Send frames through outgoing channel
    outgoing.send(Frame.Text("Hello WebSocket!"))
    
    // Receive frames from incoming channel
    for (frame in incoming) {
        when (frame) {
            is Frame.Text -> println("Text: ${frame.readText()}")
            is Frame.Binary -> println("Binary: ${frame.data.size} bytes")
            is Frame.Close -> break
        }
    }
}

Default WebSocket Session Interface

Extended WebSocket session with ping/pong functionality and timeout handling.

/**
 * Default WebSocket session with ping/pong support
 * Provides automatic keep-alive functionality
 */
interface DefaultWebSocketSession : WebSocketSession {
    /** Interval between ping messages in milliseconds */
    val pingIntervalMillis: Long
    
    /** Timeout for ping/pong roundtrip in milliseconds */
    val timeoutMillis: Long
}

/**
 * Create DefaultWebSocketSession with ping/pong configuration
 * @param session Underlying WebSocket session
 * @param pingIntervalMillis Ping interval in milliseconds  
 * @param timeoutMillis Ping timeout in milliseconds
 * @return Configured DefaultWebSocketSession
 */
fun DefaultWebSocketSession(
    session: WebSocketSession,
    pingIntervalMillis: Long,
    timeoutMillis: Long = pingIntervalMillis * 2
): DefaultWebSocketSession

Session Extension Functions

Utility functions for working with WebSocket extensions.

/**
 * Get WebSocket extension instance by factory
 * @param extension Extension factory to look up
 * @return Extension instance
 * @throws NoSuchElementException if extension not found
 */
fun <T : WebSocketExtension<*>> WebSocketSession.extension(
    extension: WebSocketExtensionFactory<*, T>
): T

/**
 * Get WebSocket extension instance by factory, or null if not found
 * @param extension Extension factory to look up
 * @return Extension instance or null
 */
fun <T : WebSocketExtension<*>> WebSocketSession.extensionOrNull(
    extension: WebSocketExtensionFactory<*, T>
): T?

/**
 * Send text message as Text frame
 * @param content Text content to send
 */
suspend fun WebSocketSession.send(content: String)

/**
 * Send binary data as Binary frame
 * @param content Binary data to send
 */
suspend fun WebSocketSession.send(content: ByteArray)

/**
 * Send WebSocket frame
 * @param frame Frame to send
 */
suspend fun WebSocketSession.send(frame: Frame)

Usage Examples:

client.webSocket("ws://example.com") {
    // Check if compression extension is available
    val deflate = extensionOrNull(WebSocketDeflateExtension)
    if (deflate != null) {
        println("Using compression: ${deflate.compressionLevel}")
    }
    
    // Get required extension (throws if not found)
    try {
        val customExt = extension(CustomWebSocketExtension)
        customExt.configure()
    } catch (e: NoSuchElementException) {
        println("Custom extension not available")
    }
}

Session Management Examples

Basic Session Usage

client.webSocket("ws://echo.websocket.org") {
    // Session is DefaultClientWebSocketSession
    println("Connected to: ${call.request.url}")
    println("Max frame size: $maxFrameSize")
    
    // Send text message
    send("Hello!")
    
    // Process incoming messages
    for (frame in incoming) {
        when (frame) {
            is Frame.Text -> {
                val text = frame.readText()
                println("Received: $text")
                if (text == "bye") break
            }
            is Frame.Close -> {
                println("Connection closed")
                break
            }
        }
    }
}

Manual Session Management

val session = client.webSocketSession("ws://api.example.com/ws")

try {
    // Configure session
    session.maxFrameSize = 512 * 1024
    
    // Send initial message
    session.outgoing.send(Frame.Text("CONNECT"))
    
    // Handle responses
    while (!session.incoming.isClosedForReceive) {
        val frame = session.incoming.receiveCatching().getOrNull() ?: break
        
        when (frame) {
            is Frame.Text -> processMessage(frame.readText())
            is Frame.Binary -> processBinary(frame.data)
            is Frame.Ping -> session.outgoing.send(Frame.Pong(frame.data))
            is Frame.Close -> break
        }
    }
} finally {
    session.close()
}

Session with Ping/Pong Monitoring

client.webSocket("ws://api.example.com/realtime") {
    println("Ping interval: ${pingIntervalMillis}ms")
    println("Timeout: ${timeoutMillis}ms")
    
    // Monitor connection health
    launch {
        while (isActive) {
            delay(5000) // Check every 5 seconds
            if (incoming.isClosedForReceive) {
                println("Connection lost")
                break
            }
        }
    }
    
    // Handle messages
    for (frame in incoming) {
        when (frame) {
            is Frame.Text -> handleMessage(frame.readText())
            is Frame.Ping -> {
                println("Received ping")
                // Pong is sent automatically
            }
            is Frame.Pong -> println("Received pong")
            is Frame.Close -> {
                val reason = frame.readReason()
                println("Connection closed: ${reason?.message}")
                break
            }
        }
    }
}

Concurrent Send and Receive

client.webSocket("ws://chat.example.com") {
    // Separate coroutines for sending and receiving
    val sendJob = launch {
        while (isActive) {
            val message = readLine() ?: break
            outgoing.send(Frame.Text(message))
        }
    }
    
    val receiveJob = launch {
        for (frame in incoming) {
            when (frame) {
                is Frame.Text -> println(">> ${frame.readText()}")
                is Frame.Close -> break
            }
        }
    }
    
    // Wait for either job to complete
    select {
        sendJob.onJoin { println("Send completed") }
        receiveJob.onJoin { println("Receive completed") }
    }
    
    // Cancel remaining job
    sendJob.cancel()
    receiveJob.cancel()
}

Extension Management

client.webSocket("ws://api.example.com") {
    // List all active extensions
    println("Active extensions:")
    extensions.forEach { ext ->
        println("- ${ext::class.simpleName}")
    }
    
    // Check for specific extension
    extensionOrNull(WebSocketDeflateExtension)?.let { deflate ->
        println("Compression enabled: level ${deflate.compressionLevel}")
    }
    
    // Session logic continues...
}

Error Handling in Sessions

try {
    client.webSocket("ws://unreliable.example.com") {
        // Configure larger timeout for unreliable connection
        maxFrameSize = 2 * 1024 * 1024 // 2MB
        
        for (frame in incoming) {
            try {
                when (frame) {
                    is Frame.Text -> processText(frame.readText())
                    is Frame.Binary -> processBinary(frame.data)
                }
            } catch (e: Exception) {
                println("Frame processing error: ${e.message}")
                // Continue processing other frames
            }
        }
    }
} catch (e: WebSocketException) {
    println("WebSocket error: ${e.message}")
} catch (e: ProtocolViolationException) {
    println("Protocol violation: ${e.message}")
}

Install with Tessl CLI

npx tessl i tessl/maven-io-ktor--ktor-client-websockets-linuxx64

docs

frame-operations.md

index.md

plugin-configuration.md

serialization-support.md

session-management.md

websocket-connections.md

tile.json