Ktor network utilities for macOS ARM64 target - provides asynchronous networking components including sockets, selectors, and connection utilities for building Kotlin multiplatform network applications
—
The selector system in Ktor Network provides platform-specific I/O event management built on Kotlin coroutines. Selectors handle asynchronous I/O operations efficiently by managing socket events and coordinating with the coroutine dispatcher.
interface SelectorManager : Closeable {
val coroutineContext: CoroutineContext
fun notifyClosed(selectable: Selectable)
suspend fun select(selectable: Selectable, interest: SelectInterest)
}Platform-specific selector manager for handling I/O events across multiple sockets.
Properties:
coroutineContext: CoroutineContext - The coroutine context used for I/O operationsMethods:
notifyClosed(selectable: Selectable) - Notifies that a selectable resource has been closedsuspend fun select(selectable: Selectable, interest: SelectInterest) - Suspends until the specified I/O interest is readyfun SelectorManager(dispatcher: CoroutineContext = Dispatchers.IO): SelectorManagerCreates a platform-specific selector manager instance.
Parameters:
dispatcher: CoroutineContext - Coroutine dispatcher for I/O operations (default: Dispatchers.IO)Returns: SelectorManager - Platform-optimized selector manager
interface Selectable {
// Platform-specific selectable resource
// Implementation details are platform-dependent
}Platform-specific interface representing a selectable I/O resource (typically a socket file descriptor).
enum class SelectInterest {
READ,
WRITE,
ACCEPT,
CONNECT
}Enumeration of different I/O interests that can be monitored by the selector.
Values:
READ - Socket ready for reading dataWRITE - Socket ready for writing dataACCEPT - Server socket ready to accept connectionsCONNECT - Client socket connection completedimport io.ktor.network.selector.*
import io.ktor.network.sockets.*
import kotlinx.coroutines.*
suspend fun basicSelectorUsage() {
// Create selector with default I/O dispatcher
val selectorManager = SelectorManager()
// Create a socket using the selector
val socket = aSocket(selectorManager)
.tcp()
.connect(InetSocketAddress("example.com", 80))
println("Socket connected using selector")
println("Selector context: ${selectorManager.coroutineContext}")
// Clean up
socket.close()
selectorManager.close()
}suspend fun customDispatcherSelector() {
// Create custom dispatcher for I/O operations
val customDispatcher = Dispatchers.IO.limitedParallelism(4)
// Create selector with custom dispatcher
val selectorManager = SelectorManager(customDispatcher)
println("Custom selector created")
println("Selector context: ${selectorManager.coroutineContext}")
// Use selector for multiple sockets
val sockets = (1..10).map { port ->
aSocket(selectorManager)
.tcp()
.bind(InetSocketAddress("localhost", 8000 + port))
}
println("Created ${sockets.size} sockets with custom selector")
// Clean up all sockets
sockets.forEach { it.close() }
selectorManager.close()
}suspend fun multiSocketServer() {
// Single selector manages all sockets
val selectorManager = SelectorManager()
// Create multiple server sockets
val httpServer = aSocket(selectorManager)
.tcp()
.bind(InetSocketAddress("localhost", 8080))
val httpsServer = aSocket(selectorManager)
.tcp()
.bind(InetSocketAddress("localhost", 8443))
val adminServer = aSocket(selectorManager)
.tcp()
.bind(InetSocketAddress("localhost", 9000))
println("Multi-socket server started:")
println("HTTP: ${httpServer.localAddress}")
println("HTTPS: ${httpsServer.localAddress}")
println("Admin: ${adminServer.localAddress}")
// Handle connections from all servers concurrently
val jobs = listOf(
launch { handleServer(httpServer, "HTTP") },
launch { handleServer(httpsServer, "HTTPS") },
launch { handleServer(adminServer, "Admin") }
)
// Wait for shutdown signal
delay(60000) // Run for 1 minute
// Graceful shutdown
jobs.forEach { it.cancel() }
httpServer.close()
httpsServer.close()
adminServer.close()
selectorManager.close()
}
suspend fun handleServer(serverSocket: ServerSocket, name: String) {
while (!serverSocket.isClosed) {
try {
val client = serverSocket.accept()
launch {
println("$name client connected: ${client.remoteAddress}")
// Handle client...
delay(1000)
client.close()
}
} catch (e: Exception) {
println("$name server error: ${e.message}")
break
}
}
}suspend fun udpWithSelector() {
val selectorManager = SelectorManager()
// Multiple UDP sockets sharing selector
val receivers = (1..3).map { i ->
aSocket(selectorManager)
.udp()
.bind(InetSocketAddress("localhost", 9000 + i))
}
val sender = aSocket(selectorManager)
.udp()
.bind()
println("UDP sockets created:")
receivers.forEachIndexed { i, socket ->
println("Receiver $i: ${socket.localAddress}")
}
// Send messages to all receivers
receivers.forEach { receiver ->
val message = "Hello to ${receiver.localAddress}"
val datagram = Datagram(
packet = ByteReadPacket(message.toByteArray()),
address = receiver.localAddress
)
sender.send(datagram)
}
// Receive messages
receivers.forEachIndexed { i, receiver ->
launch {
try {
val datagram = receiver.receive()
println("Receiver $i got: ${datagram.packet.readText()}")
} catch (e: Exception) {
println("Receiver $i error: ${e.message}")
}
}
}
delay(1000)
// Clean up
receivers.forEach { it.close() }
sender.close()
selectorManager.close()
}class NetworkService {
private lateinit var selectorManager: SelectorManager
private val activeSockets = mutableListOf<ASocket>()
suspend fun start() {
selectorManager = SelectorManager()
println("Network service started")
println("Selector context: ${selectorManager.coroutineContext}")
}
suspend fun createSocket(address: SocketAddress): Socket {
val socket = aSocket(selectorManager)
.tcp()
.connect(address)
activeSockets.add(socket)
return socket
}
suspend fun createServer(address: SocketAddress): ServerSocket {
val server = aSocket(selectorManager)
.tcp()
.bind(address)
activeSockets.add(server)
return server
}
suspend fun shutdown() {
println("Shutting down network service...")
// Close all active sockets
activeSockets.forEach { socket ->
try {
socket.close()
println("Closed socket: ${socket}")
} catch (e: Exception) {
println("Error closing socket: ${e.message}")
}
}
// Close selector manager
selectorManager.close()
println("Network service shutdown complete")
}
}
suspend fun useNetworkService() {
val service = NetworkService()
try {
service.start()
// Create some sockets
val client = service.createSocket(InetSocketAddress("example.com", 80))
val server = service.createServer(InetSocketAddress("localhost", 8080))
println("Sockets created through service")
// Use sockets...
delay(5000)
} finally {
service.shutdown()
}
}suspend fun monitorSelectorPerformance() {
val selectorManager = SelectorManager()
// Track selector usage
var socketCount = 0
val startTime = System.currentTimeMillis()
// Create many sockets to test selector efficiency
val sockets = mutableListOf<ASocket>()
repeat(100) { i ->
val socket = aSocket(selectorManager)
.tcp()
.bind(InetSocketAddress("localhost", 0)) // Any available port
sockets.add(socket)
socketCount++
if (i % 20 == 0) {
val elapsed = System.currentTimeMillis() - startTime
println("Created $socketCount sockets in ${elapsed}ms")
println("Selector handling ${sockets.size} active sockets")
}
}
println("Selector managing ${sockets.size} sockets efficiently")
// Test concurrent operations
val operationStart = System.currentTimeMillis()
// Simulate concurrent socket operations
val jobs = sockets.map { socket ->
launch {
// Each socket performs some operation
delay(10)
}
}
jobs.joinAll()
val operationTime = System.currentTimeMillis() - operationStart
println("Concurrent operations on ${sockets.size} sockets took ${operationTime}ms")
// Clean up
sockets.forEach { it.close() }
selectorManager.close()
}suspend fun selectorErrorHandling() {
var selectorManager: SelectorManager? = null
try {
selectorManager = SelectorManager()
val socket = aSocket(selectorManager)
.tcp()
.configure {
socketTimeout = 5000 // Short timeout for testing
}
.connect(InetSocketAddress("nonexistent.example.com", 80))
socket.close()
} catch (e: Exception) {
println("Socket operation failed: ${e.message}")
} finally {
// Always clean up selector
selectorManager?.close()
println("Selector cleanup completed")
}
}The selector implementation on macOS ARM64 uses native kqueue for optimal performance:
suspend fun macosOptimizedSelector() {
// Selector automatically uses kqueue on macOS
val selectorManager = SelectorManager()
// Take advantage of macOS-specific features
val socket = aSocket(selectorManager)
.tcp()
.configure {
// macOS supports efficient port reuse
reusePort = true
reuseAddress = true
}
.bind(InetSocketAddress("localhost", 8080))
println("macOS-optimized selector active")
println("Using native kqueue for I/O events")
socket.close()
selectorManager.close()
}import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import kotlinx.coroutines.*CoroutineContext, Dispatchers - From kotlinx-coroutinesCloseable - From Kotlin standard libraryclass ClosedChannelCancellationException : CancellationExceptionSelectors may throw platform-specific I/O exceptions during operations, including ClosedChannelCancellationException when operations are performed on closed channels. Always wrap selector operations in appropriate try-catch blocks for production applications.
Install with Tessl CLI
npx tessl i tessl/maven-io-ktor--ktor-network-macosarm64