Ktor network utilities - tvOS x64 target implementation of asynchronous TCP/UDP sockets and related networking functionality for the Ktor framework
—
Asynchronous I/O management for handling multiple socket operations concurrently using coroutine-based selection.
Core interface for managing asynchronous socket operations and selection events.
/**
* SelectorManager interface allows Selectable wait for SelectInterest.
*/
interface SelectorManager : CoroutineScope, Closeable {
/**
* Notifies the selector that selectable has been closed.
* @param selectable The selectable that has been closed
*/
fun notifyClosed(selectable: Selectable)
/**
* Suspends until interest is selected for selectable.
* May cause manager to allocate and run selector instance if not yet created.
* Only one selection is allowed per interest per selectable but you can
* select for different interests for the same selectable simultaneously.
* @param selectable The selectable to wait for
* @param interest The type of I/O interest to wait for
*/
suspend fun select(selectable: Selectable, interest: SelectInterest)
}
/**
* Creates the selector manager for current platform.
* @param dispatcher CoroutineContext for the selector manager (default: EmptyCoroutineContext)
* @returns SelectorManager instance for the current platform
*/
fun SelectorManager(
dispatcher: CoroutineContext = EmptyCoroutineContext
): SelectorManagerUsage Examples:
import io.ktor.network.sockets.*
import io.ktor.network.selector.*
import kotlinx.coroutines.*
// Basic selector manager usage
val selectorManager = SelectorManager()
// Use with custom dispatcher
val customDispatcher = Dispatchers.IO.limitedParallelism(4)
val customSelectorManager = SelectorManager(customDispatcher)
// Create sockets using the selector manager
val socket = aSocket(selectorManager).tcp().connect("localhost", 8080)
// Clean up when done
selectorManager.close()Enumeration of I/O interest types that can be monitored by the selector.
/**
* Select interest kind.
*/
enum class SelectInterest {
/** Interest in read operations */
READ,
/** Interest in write operations */
WRITE,
/** Interest in accept operations (server sockets) */
ACCEPT,
/** Interest in connect operations */
CONNECT;
companion object {
/** Array containing all selection interests */
val AllInterests: Array<SelectInterest>
}
}Usage Example:
import io.ktor.network.selector.*
// Check all available interests
SelectInterest.AllInterests.forEach { interest ->
println("Available interest: $interest")
}
// Use specific interests
val readInterest = SelectInterest.READ
val writeInterest = SelectInterest.WRITE
val acceptInterest = SelectInterest.ACCEPT
val connectInterest = SelectInterest.CONNECTInterface for objects that can be selected for I/O operations.
/**
* A selectable entity with selectable channel and interested operations subscriptions.
*/
interface SelectableException types related to selection operations.
/**
* Exception thrown when a channel is closed during selection
*/
class ClosedChannelCancellationException : CancellationException("Closed channel.")Usage Example:
import io.ktor.network.selector.*
import kotlinx.coroutines.*
try {
// Some selector operation that might fail
selectorManager.select(selectable, SelectInterest.READ)
} catch (e: ClosedChannelCancellationException) {
println("Channel was closed during selection: ${e.message}")
}Basic TCP Server with Selector Management:
import io.ktor.network.sockets.*
import io.ktor.network.selector.*
import kotlinx.coroutines.*
suspend fun tcpServerWithSelector() {
// Create selector manager with custom scope
val selectorScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
val selectorManager = SelectorManager(selectorScope.coroutineContext)
try {
val serverSocket = aSocket(selectorManager).tcp().bind("localhost", 8080)
println("Server started on port ${serverSocket.port}")
// Handle multiple clients concurrently
while (selectorScope.isActive) {
try {
val clientSocket = serverSocket.accept()
// Launch coroutine for each client
selectorScope.launch {
handleClient(clientSocket)
}
} catch (e: Exception) {
println("Error accepting client: ${e.message}")
}
}
} finally {
selectorManager.close()
selectorScope.cancel()
}
}
suspend fun handleClient(socket: Socket) {
try {
val connection = socket.connection()
while (true) {
val line = connection.input.readUTF8Line() ?: break
connection.output.writeStringUtf8("Echo: $line\n")
connection.output.flush()
}
} catch (e: Exception) {
println("Client error: ${e.message}")
} finally {
socket.close()
}
}Multiple Socket Management:
import io.ktor.network.sockets.*
import io.ktor.network.selector.*
import kotlinx.coroutines.*
class MultiSocketManager {
private val selectorManager = SelectorManager()
private val activeSockets = mutableSetOf<Socket>()
suspend fun connectToMultipleServers(servers: List<Pair<String, Int>>) {
val connections = servers.map { (host, port) ->
async {
try {
val socket = aSocket(selectorManager).tcp().connect(host, port)
activeSockets.add(socket)
socket to socket.connection()
} catch (e: Exception) {
println("Failed to connect to $host:$port - ${e.message}")
null
}
}
}
// Wait for all connections
val validConnections = connections.awaitAll().filterNotNull()
// Handle each connection
validConnections.forEach { (socket, connection) ->
launch {
try {
handleConnection(connection)
} finally {
activeSockets.remove(socket)
socket.close()
}
}
}
}
private suspend fun handleConnection(connection: Connection) {
// Handle individual connection
connection.output.writeStringUtf8("Hello from client\n")
connection.output.flush()
val response = connection.input.readUTF8Line()
println("Server response: $response")
}
suspend fun closeAll() {
activeSockets.forEach { it.close() }
activeSockets.clear()
selectorManager.close()
}
}Custom Selector with Resource Management:
import io.ktor.network.sockets.*
import io.ktor.network.selector.*
import kotlinx.coroutines.*
import kotlin.time.Duration.Companion.minutes
class ManagedSelectorService {
private var selectorManager: SelectorManager? = null
private val serviceScope = CoroutineScope(
Dispatchers.IO +
SupervisorJob() +
CoroutineName("SelectorService")
)
fun start() {
selectorManager = SelectorManager(serviceScope.coroutineContext)
}
suspend fun createTcpConnection(host: String, port: Int): Socket {
val manager = selectorManager ?: throw IllegalStateException("Service not started")
return aSocket(manager).tcp().connect(host, port) {
socketTimeout = 30000 // 30 seconds
keepAlive = true
}
}
suspend fun createUdpSocket(port: Int = 0): BoundDatagramSocket {
val manager = selectorManager ?: throw IllegalStateException("Service not started")
return aSocket(manager).udp().bind("0.0.0.0", port) {
reuseAddress = true
}
}
suspend fun createTcpServer(port: Int): ServerSocket {
val manager = selectorManager ?: throw IllegalStateException("Service not started")
return aSocket(manager).tcp().bind("0.0.0.0", port) {
backlogSize = 100
reuseAddress = true
}
}
suspend fun shutdown() {
try {
// Graceful shutdown with timeout
withTimeout(1.minutes) {
serviceScope.coroutineContext.job.children.forEach { child ->
child.cancel()
child.join()
}
}
} catch (e: TimeoutCancellationException) {
println("Timeout during shutdown, forcing close")
} finally {
selectorManager?.close()
serviceScope.cancel()
}
}
}
// Usage
suspend fun managedServiceExample() {
val service = ManagedSelectorService()
try {
service.start()
// Create multiple connections
val socket1 = service.createTcpConnection("api1.example.com", 80)
val socket2 = service.createTcpConnection("api2.example.com", 80)
val udpSocket = service.createUdpSocket(8080)
val server = service.createTcpServer(9090)
// Use sockets...
// Clean shutdown
service.shutdown()
} catch (e: Exception) {
println("Service error: ${e.message}")
service.shutdown()
}
}Performance Monitoring:
import io.ktor.network.sockets.*
import io.ktor.network.selector.*
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
class SelectorPerformanceMonitor {
private val selectorManager = SelectorManager()
private var connectionCount = 0
private var totalConnectTime = 0L
suspend fun monitoredConnect(host: String, port: Int): Socket {
val connectTime = measureTimeMillis {
return aSocket(selectorManager).tcp().connect(host, port)
}
connectionCount++
totalConnectTime += connectTime
println("Connection #$connectionCount to $host:$port took ${connectTime}ms")
println("Average connect time: ${totalConnectTime / connectionCount}ms")
return aSocket(selectorManager).tcp().connect(host, port)
}
fun getStats(): String {
return "Connections: $connectionCount, Average time: ${if (connectionCount > 0) totalConnectTime / connectionCount else 0}ms"
}
suspend fun close() {
println("Final stats: ${getStats()}")
selectorManager.close()
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-ktor--ktor-network-tvosx64