Ktor HTTP Client Core for tvOS ARM64 - multiplatform asynchronous HTTP client library with coroutines support
—
The Ktor HTTP Client Core provides comprehensive WebSocket client functionality through the WebSockets plugin. This enables establishing and managing WebSocket connections with session management, ping/pong handling, message serialization, and full integration with Ktor's coroutine-based architecture.
The main plugin for WebSocket client functionality that handles connection establishment and session management.
object WebSockets : HttpClientPlugin<WebSockets.Config, WebSockets> {
class Config {
var pingInterval: Duration? = null
var maxFrameSize: Long = Long.MAX_VALUE
var masking: Boolean = true
var extensions: MutableList<WebSocketExtension<*>> = mutableListOf()
fun pingInterval(duration: Duration)
fun maxFrameSize(size: Long)
fun masking(enabled: Boolean)
fun extensions(vararg extensions: WebSocketExtension<*>)
}
}Interface representing a client-side WebSocket session with full duplex communication capabilities.
interface ClientWebSocketSession : WebSocketSession {
val call: HttpClientCall
// Inherited from WebSocketSession
val incoming: ReceiveChannel<Frame>
val outgoing: SendChannel<Frame>
val closeReason: Deferred<CloseReason?>
val extensions: List<WebSocketExtension<*>>
// Send text message
suspend fun send(data: String)
// Send binary message
suspend fun send(data: ByteArray)
// Send frame
suspend fun send(frame: Frame)
// Close connection
suspend fun close(reason: CloseReason? = null)
}Default implementation of ClientWebSocketSession with standard WebSocket protocol handling.
class DefaultClientWebSocketSession(
private val call: HttpClientCall,
delegate: WebSocketSession
) : ClientWebSocketSession, WebSocketSession by delegate {
override val call: HttpClientCall get() = call
}Extension functions for establishing WebSocket connections with various configuration options.
// Basic WebSocket connection
suspend fun HttpClient.webSocket(
method: HttpMethod = HttpMethod.Get,
host: String? = null,
port: Int? = null,
path: String? = null,
request: HttpRequestBuilder.() -> Unit = {},
block: suspend ClientWebSocketSession.() -> Unit
)
suspend fun HttpClient.webSocket(
url: String,
request: HttpRequestBuilder.() -> Unit = {},
block: suspend ClientWebSocketSession.() -> Unit
)
suspend fun HttpClient.webSocket(
url: Url,
request: HttpRequestBuilder.() -> Unit = {},
block: suspend ClientWebSocketSession.() -> Unit
)
// Secure WebSocket connection (WSS)
suspend fun HttpClient.wss(
method: HttpMethod = HttpMethod.Get,
host: String? = null,
port: Int? = null,
path: String? = null,
request: HttpRequestBuilder.() -> Unit = {},
block: suspend ClientWebSocketSession.() -> Unit
)
// WebSocket session without automatic connection handling
suspend fun HttpClient.webSocketSession(
method: HttpMethod = HttpMethod.Get,
host: String? = null,
port: Int? = null,
path: String? = null,
request: HttpRequestBuilder.() -> Unit = {}
): ClientWebSocketSession
suspend fun HttpClient.webSocketSession(
url: String,
request: HttpRequestBuilder.() -> Unit = {}
): ClientWebSocketSession
suspend fun HttpClient.ws(
method: HttpMethod = HttpMethod.Get,
host: String? = null,
port: Int? = null,
path: String? = null,
request: HttpRequestBuilder.() -> Unit = {},
block: suspend ClientWebSocketSession.() -> Unit
)sealed class Frame {
abstract val fin: Boolean
abstract val data: ByteArray
data class Text(val data: ByteArray, override val fin: Boolean = true) : Frame() {
constructor(text: String) : this(text.toByteArray(Charsets.UTF_8))
fun readText(): String = data.toString(Charsets.UTF_8)
}
data class Binary(override val data: ByteArray, override val fin: Boolean = true) : Frame()
data class Close(val data: ByteArray) : Frame() {
constructor(reason: CloseReason) : this(reason.toByteArray())
fun readReason(): CloseReason? = CloseReason.parse(data)
}
data class Ping(override val data: ByteArray) : Frame()
data class Pong(override val data: ByteArray) : Frame()
}
data class CloseReason(val code: Short, val message: String) {
companion object {
val NORMAL = CloseReason(1000, "Normal Closure")
val GOING_AWAY = CloseReason(1001, "Going Away")
val PROTOCOL_ERROR = CloseReason(1002, "Protocol Error")
val CANNOT_ACCEPT = CloseReason(1003, "Cannot Accept")
val NOT_CONSISTENT = CloseReason(1007, "Not Consistent")
val VIOLATED_POLICY = CloseReason(1008, "Violated Policy")
val TOO_BIG = CloseReason(1009, "Too Big")
val NO_EXTENSION = CloseReason(1010, "No Extension")
val INTERNAL_ERROR = CloseReason(1011, "Internal Error")
val SERVICE_RESTART = CloseReason(1012, "Service Restart")
val TRY_AGAIN_LATER = CloseReason(1013, "Try Again Later")
}
}val client = HttpClient {
install(WebSockets)
}
client.webSocket("wss://echo.websocket.org") {
// Send a message
send("Hello WebSocket!")
// Receive messages
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val receivedText = frame.readText()
println("Received: $receivedText")
if (receivedText == "Hello WebSocket!") {
send("Thanks for the echo!")
}
}
is Frame.Close -> {
println("Connection closed: ${frame.readReason()}")
break
}
else -> {
// Handle other frame types
}
}
}
}
client.close()val client = HttpClient {
install(WebSockets) {
pingInterval = 20.seconds
maxFrameSize = 1024 * 1024 // 1MB
}
}
client.webSocket("ws://localhost:8080/chat") {
// Launch coroutine for sending messages
launch {
repeat(10) { i ->
send("Message $i")
delay(1000)
}
close(CloseReason.NORMAL)
}
// Receive messages on main coroutine
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
println("Server: ${frame.readText()}")
}
is Frame.Binary -> {
println("Received binary data: ${frame.data.size} bytes")
}
is Frame.Close -> {
println("Connection closed by server")
break
}
is Frame.Ping -> {
// Pong is sent automatically
println("Received ping")
}
is Frame.Pong -> {
println("Received pong")
}
}
}
}val client = HttpClient {
install(WebSockets)
}
client.wss(
host = "secure-chat.example.com",
port = 443,
path = "/websocket"
) {
// Configure headers for authentication
request {
header("Authorization", "Bearer $accessToken")
header("X-Client-Version", "1.0")
}
block = {
// WebSocket communication
send("Secure message")
for (frame in incoming) {
when (frame) {
is Frame.Text -> println("Secure: ${frame.readText()}")
is Frame.Close -> break
else -> { /* handle other frames */ }
}
}
}
}class CompressionExtension : WebSocketExtension<CompressionExtension.Config> {
class Config {
var compressionLevel: Int = 6
var windowBits: Int = 15
}
override val factory: WebSocketExtensionFactory<Config, CompressionExtension>
get() = TODO("Implement extension factory")
override val protocols: List<WebSocketExtensionHeader>
get() = listOf(WebSocketExtensionHeader("deflate-frame"))
}
val client = HttpClient {
install(WebSockets) {
extensions(CompressionExtension())
}
}class WebSocketManager {
private val client = HttpClient {
install(WebSockets) {
pingInterval = 30.seconds
}
}
private var session: ClientWebSocketSession? = null
private var reconnectJob: Job? = null
suspend fun connect(url: String): Boolean {
return try {
session = client.webSocketSession(url) {
header("Authorization", "Bearer $token")
}
// Start message handling
launch {
handleIncomingMessages()
}
true
} catch (e: Exception) {
println("Connection failed: ${e.message}")
scheduleReconnect(url)
false
}
}
private suspend fun handleIncomingMessages() {
try {
session?.let { session ->
for (frame in session.incoming) {
when (frame) {
is Frame.Text -> processMessage(frame.readText())
is Frame.Close -> {
println("Connection closed: ${frame.readReason()}")
break
}
else -> { /* handle other frames */ }
}
}
}
} catch (e: Exception) {
println("Message handling error: ${e.message}")
}
}
private fun scheduleReconnect(url: String) {
reconnectJob?.cancel()
reconnectJob = CoroutineScope(Dispatchers.IO).launch {
delay(5000) // Wait 5 seconds before reconnecting
connect(url)
}
}
suspend fun sendMessage(text: String): Boolean {
return try {
session?.send(text)
true
} catch (e: Exception) {
println("Send failed: ${e.message}")
false
}
}
suspend fun disconnect() {
reconnectJob?.cancel()
session?.close(CloseReason.NORMAL)
session = null
client.close()
}
private fun processMessage(message: String) {
// Process incoming message
println("Received: $message")
}
}// JSON message serialization
suspend fun ClientWebSocketSession.sendJson(data: Any) {
val json = Json.encodeToString(data)
send(json)
}
suspend inline fun <reified T> ClientWebSocketSession.receiveJson(): T? {
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
return Json.decodeFromString<T>(frame.readText())
}
is Frame.Close -> return null
else -> continue
}
}
return null
}
// Usage
data class ChatMessage(val user: String, val text: String, val timestamp: Long)
client.webSocket("ws://chat.example.com") {
// Send JSON message
sendJson(ChatMessage("Alice", "Hello!", System.currentTimeMillis()))
// Receive JSON message
val message: ChatMessage? = receiveJson()
message?.let { println("${it.user}: ${it.text}") }
}client.webSocket("ws://binary-service.example.com") {
// Send binary data
val imageData = File("image.png").readBytes()
send(imageData)
// Receive binary data
for (frame in incoming) {
when (frame) {
is Frame.Binary -> {
val data = frame.data
File("received-${System.currentTimeMillis()}.bin").writeBytes(data)
println("Received ${data.size} bytes of binary data")
}
is Frame.Close -> break
else -> { /* handle other frames */ }
}
}
}val client = HttpClient {
install(WebSockets) {
pingInterval = 10.seconds
}
}
suspend fun connectWithRetry(url: String, maxRetries: Int = 3) {
repeat(maxRetries) { attempt ->
try {
client.webSocket(url) {
println("Connected successfully on attempt ${attempt + 1}")
// Handle connection
for (frame in incoming) {
when (frame) {
is Frame.Text -> println(frame.readText())
is Frame.Close -> {
println("Connection closed normally")
return@webSocket
}
else -> { /* handle other frames */ }
}
}
}
return // Success, exit retry loop
} catch (e: ConnectTimeoutException) {
println("Connection timeout on attempt ${attempt + 1}")
if (attempt == maxRetries - 1) throw e
delay(2000 * (attempt + 1)) // Exponential backoff
} catch (e: Exception) {
println("Connection error on attempt ${attempt + 1}: ${e.message}")
if (attempt == maxRetries - 1) throw e
delay(1000)
}
}
}client.webSocket("ws://example.com/socket") {
try {
// WebSocket communication
for (frame in incoming) {
when (frame) {
is Frame.Text -> {
val message = frame.readText()
if (message == "shutdown") {
send("Acknowledged shutdown")
close(CloseReason.NORMAL)
break
}
// Process other messages
}
is Frame.Close -> {
println("Server closed connection: ${frame.readReason()}")
break
}
else -> { /* handle other frames */ }
}
}
} catch (e: Exception) {
println("WebSocket error: ${e.message}")
// Attempt graceful close
try {
close(CloseReason(1011, "Internal Error"))
} catch (closeException: Exception) {
println("Failed to close gracefully: ${closeException.message}")
}
} finally {
println("WebSocket session ended")
}
}class MockWebSocketServer {
private val responses = mutableListOf<String>()
fun addResponse(response: String) {
responses.add(response)
}
suspend fun simulate(session: ClientWebSocketSession) {
// Simulate server responses
responses.forEach { response ->
session.send(response)
delay(100)
}
session.close(CloseReason.NORMAL)
}
}
// Test WebSocket client
suspend fun testWebSocketClient() {
val client = HttpClient {
install(WebSockets)
}
// In actual tests, this would be a real test server
client.webSocket("ws://test-server:8080/test") {
send("ping")
val response = incoming.receive()
if (response is Frame.Text) {
assertEquals("pong", response.readText())
}
}
client.close()
}class WebSocketPool(private val maxConnections: Int = 10) {
private val availableConnections = Channel<ClientWebSocketSession>(maxConnections)
private val client = HttpClient {
install(WebSockets) {
pingInterval = 30.seconds
}
}
suspend fun borrowConnection(url: String): ClientWebSocketSession {
return availableConnections.tryReceive().getOrNull()
?: client.webSocketSession(url)
}
suspend fun returnConnection(session: ClientWebSocketSession) {
if (!session.closeReason.isCompleted) {
availableConnections.trySend(session)
}
}
fun close() {
client.close()
}
}class BatchedWebSocketSender(private val session: ClientWebSocketSession) {
private val messageQueue = Channel<String>(Channel.UNLIMITED)
private val batchSize = 10
private val flushInterval = 100L // milliseconds
init {
CoroutineScope(Dispatchers.IO).launch {
processBatches()
}
}
suspend fun sendMessage(message: String) {
messageQueue.send(message)
}
private suspend fun processBatches() {
val batch = mutableListOf<String>()
while (!messageQueue.isClosedForReceive) {
// Collect messages for batch
val timeout = withTimeoutOrNull(flushInterval) {
repeat(batchSize) {
batch.add(messageQueue.receive())
}
}
// Send batch if we have messages
if (batch.isNotEmpty()) {
val combinedMessage = batch.joinToString("\n")
session.send(combinedMessage)
batch.clear()
}
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-ktor--ktor-client-core-tvosarm64