CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-ktor--ktor-network-macosarm64

Ktor network utilities for macOS ARM64 target - provides asynchronous networking components including sockets, selectors, and connection utilities for building Kotlin multiplatform network applications

Pending
Overview
Eval results
Files

selectors.mddocs/

Selectors

The selector system in Ktor Network provides platform-specific I/O event management built on Kotlin coroutines. Selectors handle asynchronous I/O operations efficiently by managing socket events and coordinating with the coroutine dispatcher.

Core Selector Types

SelectorManager Interface

interface SelectorManager : Closeable {
    val coroutineContext: CoroutineContext
    fun notifyClosed(selectable: Selectable)
    suspend fun select(selectable: Selectable, interest: SelectInterest)
}

Platform-specific selector manager for handling I/O events across multiple sockets.

Properties:

  • coroutineContext: CoroutineContext - The coroutine context used for I/O operations

Methods:

  • notifyClosed(selectable: Selectable) - Notifies that a selectable resource has been closed
  • suspend fun select(selectable: Selectable, interest: SelectInterest) - Suspends until the specified I/O interest is ready

SelectorManager Factory

fun SelectorManager(dispatcher: CoroutineContext = Dispatchers.IO): SelectorManager

Creates a platform-specific selector manager instance.

Parameters:

  • dispatcher: CoroutineContext - Coroutine dispatcher for I/O operations (default: Dispatchers.IO)

Returns: SelectorManager - Platform-optimized selector manager

Selectable Interface

Selectable Resource

interface Selectable {
    // Platform-specific selectable resource
    // Implementation details are platform-dependent
}

Platform-specific interface representing a selectable I/O resource (typically a socket file descriptor).

I/O Interest Types

SelectInterest Enum

enum class SelectInterest {
    READ,
    WRITE, 
    ACCEPT,
    CONNECT
}

Enumeration of different I/O interests that can be monitored by the selector.

Values:

  • READ - Socket ready for reading data
  • WRITE - Socket ready for writing data
  • ACCEPT - Server socket ready to accept connections
  • CONNECT - Client socket connection completed

Usage Examples

Basic Selector Setup

import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import kotlinx.coroutines.*

suspend fun basicSelectorUsage() {
    // Create selector with default I/O dispatcher
    val selectorManager = SelectorManager()
    
    // Create a socket using the selector
    val socket = aSocket(selectorManager)
        .tcp()
        .connect(InetSocketAddress("example.com", 80))
    
    println("Socket connected using selector")
    println("Selector context: ${selectorManager.coroutineContext}")
    
    // Clean up
    socket.close()
    selectorManager.close()
}

Custom Dispatcher Configuration

suspend fun customDispatcherSelector() {
    // Create custom dispatcher for I/O operations
    val customDispatcher = Dispatchers.IO.limitedParallelism(4)
    
    // Create selector with custom dispatcher
    val selectorManager = SelectorManager(customDispatcher)
    
    println("Custom selector created")
    println("Selector context: ${selectorManager.coroutineContext}")
    
    // Use selector for multiple sockets
    val sockets = (1..10).map { port ->
        aSocket(selectorManager)
            .tcp()
            .bind(InetSocketAddress("localhost", 8000 + port))
    }
    
    println("Created ${sockets.size} sockets with custom selector")
    
    // Clean up all sockets
    sockets.forEach { it.close() }
    selectorManager.close()
}

Multi-Socket Server with Shared Selector

suspend fun multiSocketServer() {
    // Single selector manages all sockets
    val selectorManager = SelectorManager()
    
    // Create multiple server sockets
    val httpServer = aSocket(selectorManager)
        .tcp()
        .bind(InetSocketAddress("localhost", 8080))
    
    val httpsServer = aSocket(selectorManager)
        .tcp()
        .bind(InetSocketAddress("localhost", 8443))
    
    val adminServer = aSocket(selectorManager)
        .tcp()
        .bind(InetSocketAddress("localhost", 9000))
    
    println("Multi-socket server started:")
    println("HTTP: ${httpServer.localAddress}")
    println("HTTPS: ${httpsServer.localAddress}")
    println("Admin: ${adminServer.localAddress}")
    
    // Handle connections from all servers concurrently
    val jobs = listOf(
        launch { handleServer(httpServer, "HTTP") },
        launch { handleServer(httpsServer, "HTTPS") },
        launch { handleServer(adminServer, "Admin") }
    )
    
    // Wait for shutdown signal
    delay(60000) // Run for 1 minute
    
    // Graceful shutdown
    jobs.forEach { it.cancel() }
    httpServer.close()
    httpsServer.close()
    adminServer.close()
    selectorManager.close()
}

suspend fun handleServer(serverSocket: ServerSocket, name: String) {
    while (!serverSocket.isClosed) {
        try {
            val client = serverSocket.accept()
            launch {
                println("$name client connected: ${client.remoteAddress}")
                // Handle client...
                delay(1000)
                client.close()
            }
        } catch (e: Exception) {
            println("$name server error: ${e.message}")
            break
        }
    }
}

UDP with Selector

suspend fun udpWithSelector() {
    val selectorManager = SelectorManager()
    
    // Multiple UDP sockets sharing selector
    val receivers = (1..3).map { i ->
        aSocket(selectorManager)
            .udp()
            .bind(InetSocketAddress("localhost", 9000 + i))
    }
    
    val sender = aSocket(selectorManager)
        .udp()
        .bind()
    
    println("UDP sockets created:")
    receivers.forEachIndexed { i, socket ->
        println("Receiver $i: ${socket.localAddress}")
    }
    
    // Send messages to all receivers
    receivers.forEach { receiver ->
        val message = "Hello to ${receiver.localAddress}"
        val datagram = Datagram(
            packet = ByteReadPacket(message.toByteArray()),
            address = receiver.localAddress
        )
        sender.send(datagram)
    }
    
    // Receive messages
    receivers.forEachIndexed { i, receiver ->
        launch {
            try {
                val datagram = receiver.receive()
                println("Receiver $i got: ${datagram.packet.readText()}")
            } catch (e: Exception) {
                println("Receiver $i error: ${e.message}")
            }
        }
    }
    
    delay(1000)
    
    // Clean up
    receivers.forEach { it.close() }
    sender.close()
    selectorManager.close()
}

Selector Lifecycle Management

class NetworkService {
    private lateinit var selectorManager: SelectorManager
    private val activeSockets = mutableListOf<ASocket>()
    
    suspend fun start() {
        selectorManager = SelectorManager()
        println("Network service started")
        println("Selector context: ${selectorManager.coroutineContext}")
    }
    
    suspend fun createSocket(address: SocketAddress): Socket {
        val socket = aSocket(selectorManager)
            .tcp()
            .connect(address)
        
        activeSockets.add(socket)
        return socket
    }
    
    suspend fun createServer(address: SocketAddress): ServerSocket {
        val server = aSocket(selectorManager)
            .tcp()
            .bind(address)
        
        activeSockets.add(server)
        return server
    }
    
    suspend fun shutdown() {
        println("Shutting down network service...")
        
        // Close all active sockets
        activeSockets.forEach { socket ->
            try {
                socket.close()
                println("Closed socket: ${socket}")
            } catch (e: Exception) {
                println("Error closing socket: ${e.message}")
            }
        }
        
        // Close selector manager
        selectorManager.close()
        println("Network service shutdown complete")
    }
}

suspend fun useNetworkService() {
    val service = NetworkService()
    
    try {
        service.start()
        
        // Create some sockets
        val client = service.createSocket(InetSocketAddress("example.com", 80))
        val server = service.createServer(InetSocketAddress("localhost", 8080))
        
        println("Sockets created through service")
        
        // Use sockets...
        delay(5000)
        
    } finally {
        service.shutdown()
    }
}

Performance Monitoring

suspend fun monitorSelectorPerformance() {
    val selectorManager = SelectorManager()
    
    // Track selector usage
    var socketCount = 0
    val startTime = System.currentTimeMillis()
    
    // Create many sockets to test selector efficiency
    val sockets = mutableListOf<ASocket>()
    
    repeat(100) { i ->
        val socket = aSocket(selectorManager)
            .tcp()
            .bind(InetSocketAddress("localhost", 0)) // Any available port
        
        sockets.add(socket)
        socketCount++
        
        if (i % 20 == 0) {
            val elapsed = System.currentTimeMillis() - startTime
            println("Created $socketCount sockets in ${elapsed}ms")
            println("Selector handling ${sockets.size} active sockets")
        }
    }
    
    println("Selector managing ${sockets.size} sockets efficiently")
    
    // Test concurrent operations
    val operationStart = System.currentTimeMillis()
    
    // Simulate concurrent socket operations
    val jobs = sockets.map { socket ->
        launch {
            // Each socket performs some operation
            delay(10)
        }
    }
    
    jobs.joinAll()
    val operationTime = System.currentTimeMillis() - operationStart
    println("Concurrent operations on ${sockets.size} sockets took ${operationTime}ms")
    
    // Clean up
    sockets.forEach { it.close() }
    selectorManager.close()
}

Error Handling with Selectors

suspend fun selectorErrorHandling() {
    var selectorManager: SelectorManager? = null
    
    try {
        selectorManager = SelectorManager()
        
        val socket = aSocket(selectorManager)
            .tcp()
            .configure {
                socketTimeout = 5000 // Short timeout for testing
            }
            .connect(InetSocketAddress("nonexistent.example.com", 80))
        
        socket.close()
        
    } catch (e: Exception) {
        println("Socket operation failed: ${e.message}")
        
    } finally {
        // Always clean up selector
        selectorManager?.close()
        println("Selector cleanup completed")
    }
}

Platform Considerations

macOS ARM64 Optimizations

The selector implementation on macOS ARM64 uses native kqueue for optimal performance:

suspend fun macosOptimizedSelector() {
    // Selector automatically uses kqueue on macOS
    val selectorManager = SelectorManager()
    
    // Take advantage of macOS-specific features
    val socket = aSocket(selectorManager)
        .tcp()
        .configure {
            // macOS supports efficient port reuse
            reusePort = true
            reuseAddress = true
        }
        .bind(InetSocketAddress("localhost", 8080))
    
    println("macOS-optimized selector active")
    println("Using native kqueue for I/O events")
    
    socket.close()
    selectorManager.close()
}

Best Practices

  1. Shared Selectors: Use one selector manager for multiple sockets to optimize resource usage
  2. Proper Cleanup: Always close selector managers in finally blocks or use .use {}
  3. Custom Dispatchers: Configure appropriate dispatchers based on workload characteristics
  4. Lifecycle Management: Coordinate selector lifecycle with application lifecycle
  5. Error Handling: Handle selector-related exceptions gracefully

Type Definitions

Required Import Statements

import io.ktor.network.selector.*
import io.ktor.network.sockets.*
import kotlinx.coroutines.*

Related Types

  • Used by all socket builders - See Socket Builders
  • Manages all socket types - See Socket Interfaces
  • Coordinates UDP operations - See Datagram Sockets
  • CoroutineContext, Dispatchers - From kotlinx-coroutines
  • Closeable - From Kotlin standard library

Exception Types

class ClosedChannelCancellationException : CancellationException

Selectors may throw platform-specific I/O exceptions during operations, including ClosedChannelCancellationException when operations are performed on closed channels. Always wrap selector operations in appropriate try-catch blocks for production applications.

Install with Tessl CLI

npx tessl i tessl/maven-io-ktor--ktor-network-macosarm64

docs

address-types.md

datagram-sockets.md

index.md

selectors.md

socket-builders.md

socket-interfaces.md

socket-options.md

utilities.md

tile.json