CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-ktor--ktor-network-tvosx64

Ktor network utilities - tvOS x64 target implementation of asynchronous TCP/UDP sockets and related networking functionality for the Ktor framework

Pending
Overview
Eval results
Files

selector-management.mddocs/

Selector Management

Asynchronous I/O management for handling multiple socket operations concurrently using coroutine-based selection.

Capabilities

Selector Manager

Core interface for managing asynchronous socket operations and selection events.

/**
 * SelectorManager interface allows Selectable wait for SelectInterest.
 */
interface SelectorManager : CoroutineScope, Closeable {
    /**
     * Notifies the selector that selectable has been closed.
     * @param selectable The selectable that has been closed
     */
    fun notifyClosed(selectable: Selectable)

    /**
     * Suspends until interest is selected for selectable.
     * May cause manager to allocate and run selector instance if not yet created.
     * Only one selection is allowed per interest per selectable but you can
     * select for different interests for the same selectable simultaneously.
     * @param selectable The selectable to wait for
     * @param interest The type of I/O interest to wait for
     */
    suspend fun select(selectable: Selectable, interest: SelectInterest)
}

/**
 * Creates the selector manager for current platform.
 * @param dispatcher CoroutineContext for the selector manager (default: EmptyCoroutineContext)
 * @returns SelectorManager instance for the current platform
 */
fun SelectorManager(
    dispatcher: CoroutineContext = EmptyCoroutineContext
): SelectorManager

Usage Examples:

import io.ktor.network.sockets.*
import io.ktor.network.selector.*
import kotlinx.coroutines.*

// Basic selector manager usage
val selectorManager = SelectorManager()

// Use with custom dispatcher
val customDispatcher = Dispatchers.IO.limitedParallelism(4)
val customSelectorManager = SelectorManager(customDispatcher)

// Create sockets using the selector manager
val socket = aSocket(selectorManager).tcp().connect("localhost", 8080)

// Clean up when done
selectorManager.close()

Selection Interest Types

Enumeration of I/O interest types that can be monitored by the selector.

/**
 * Select interest kind.
 */
enum class SelectInterest {
    /** Interest in read operations */
    READ,
    
    /** Interest in write operations */
    WRITE,
    
    /** Interest in accept operations (server sockets) */
    ACCEPT,
    
    /** Interest in connect operations */
    CONNECT;

    companion object {
        /** Array containing all selection interests */
        val AllInterests: Array<SelectInterest>
    }
}

Usage Example:

import io.ktor.network.selector.*

// Check all available interests
SelectInterest.AllInterests.forEach { interest ->
    println("Available interest: $interest")
}

// Use specific interests
val readInterest = SelectInterest.READ
val writeInterest = SelectInterest.WRITE
val acceptInterest = SelectInterest.ACCEPT
val connectInterest = SelectInterest.CONNECT

Selectable Interface

Interface for objects that can be selected for I/O operations.

/**
 * A selectable entity with selectable channel and interested operations subscriptions.
 */
interface Selectable

Selection Exception Handling

Exception types related to selection operations.

/**
 * Exception thrown when a channel is closed during selection
 */
class ClosedChannelCancellationException : CancellationException("Closed channel.")

Usage Example:

import io.ktor.network.selector.*
import kotlinx.coroutines.*

try {
    // Some selector operation that might fail
    selectorManager.select(selectable, SelectInterest.READ)
} catch (e: ClosedChannelCancellationException) {
    println("Channel was closed during selection: ${e.message}")
}

Complete Selector Management Examples

Basic TCP Server with Selector Management:

import io.ktor.network.sockets.*
import io.ktor.network.selector.*
import kotlinx.coroutines.*

suspend fun tcpServerWithSelector() {
    // Create selector manager with custom scope
    val selectorScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
    val selectorManager = SelectorManager(selectorScope.coroutineContext)
    
    try {
        val serverSocket = aSocket(selectorManager).tcp().bind("localhost", 8080)
        println("Server started on port ${serverSocket.port}")
        
        // Handle multiple clients concurrently
        while (selectorScope.isActive) {
            try {
                val clientSocket = serverSocket.accept()
                
                // Launch coroutine for each client
                selectorScope.launch {
                    handleClient(clientSocket)
                }
                
            } catch (e: Exception) {
                println("Error accepting client: ${e.message}")
            }
        }
        
    } finally {
        selectorManager.close()
        selectorScope.cancel()
    }
}

suspend fun handleClient(socket: Socket) {
    try {
        val connection = socket.connection()
        
        while (true) {
            val line = connection.input.readUTF8Line() ?: break
            connection.output.writeStringUtf8("Echo: $line\n")
            connection.output.flush()
        }
        
    } catch (e: Exception) {
        println("Client error: ${e.message}")
    } finally {
        socket.close()
    }
}

Multiple Socket Management:

import io.ktor.network.sockets.*
import io.ktor.network.selector.*
import kotlinx.coroutines.*

class MultiSocketManager {
    private val selectorManager = SelectorManager()
    private val activeSockets = mutableSetOf<Socket>()
    
    suspend fun connectToMultipleServers(servers: List<Pair<String, Int>>) {
        val connections = servers.map { (host, port) ->
            async {
                try {
                    val socket = aSocket(selectorManager).tcp().connect(host, port)
                    activeSockets.add(socket)
                    socket to socket.connection()
                } catch (e: Exception) {
                    println("Failed to connect to $host:$port - ${e.message}")
                    null
                }
            }
        }
        
        // Wait for all connections
        val validConnections = connections.awaitAll().filterNotNull()
        
        // Handle each connection
        validConnections.forEach { (socket, connection) ->
            launch {
                try {
                    handleConnection(connection)
                } finally {
                    activeSockets.remove(socket)
                    socket.close()
                }
            }
        }
    }
    
    private suspend fun handleConnection(connection: Connection) {
        // Handle individual connection
        connection.output.writeStringUtf8("Hello from client\n")
        connection.output.flush()
        
        val response = connection.input.readUTF8Line()
        println("Server response: $response")
    }
    
    suspend fun closeAll() {
        activeSockets.forEach { it.close() }
        activeSockets.clear()
        selectorManager.close()
    }
}

Custom Selector with Resource Management:

import io.ktor.network.sockets.*
import io.ktor.network.selector.*
import kotlinx.coroutines.*
import kotlin.time.Duration.Companion.minutes

class ManagedSelectorService {
    private var selectorManager: SelectorManager? = null
    private val serviceScope = CoroutineScope(
        Dispatchers.IO + 
        SupervisorJob() + 
        CoroutineName("SelectorService")
    )
    
    fun start() {
        selectorManager = SelectorManager(serviceScope.coroutineContext)
    }
    
    suspend fun createTcpConnection(host: String, port: Int): Socket {
        val manager = selectorManager ?: throw IllegalStateException("Service not started")
        
        return aSocket(manager).tcp().connect(host, port) {
            socketTimeout = 30000 // 30 seconds
            keepAlive = true
        }
    }
    
    suspend fun createUdpSocket(port: Int = 0): BoundDatagramSocket {
        val manager = selectorManager ?: throw IllegalStateException("Service not started")
        
        return aSocket(manager).udp().bind("0.0.0.0", port) {
            reuseAddress = true
        }
    }
    
    suspend fun createTcpServer(port: Int): ServerSocket {
        val manager = selectorManager ?: throw IllegalStateException("Service not started")
        
        return aSocket(manager).tcp().bind("0.0.0.0", port) {
            backlogSize = 100
            reuseAddress = true
        }
    }
    
    suspend fun shutdown() {
        try {
            // Graceful shutdown with timeout
            withTimeout(1.minutes) {
                serviceScope.coroutineContext.job.children.forEach { child ->
                    child.cancel()
                    child.join()
                }
            }
        } catch (e: TimeoutCancellationException) {
            println("Timeout during shutdown, forcing close")
        } finally {
            selectorManager?.close()
            serviceScope.cancel()
        }
    }
}

// Usage
suspend fun managedServiceExample() {
    val service = ManagedSelectorService()
    
    try {
        service.start()
        
        // Create multiple connections
        val socket1 = service.createTcpConnection("api1.example.com", 80)
        val socket2 = service.createTcpConnection("api2.example.com", 80)
        val udpSocket = service.createUdpSocket(8080)
        val server = service.createTcpServer(9090)
        
        // Use sockets...
        
        // Clean shutdown
        service.shutdown()
        
    } catch (e: Exception) {
        println("Service error: ${e.message}")
        service.shutdown()
    }
}

Performance Monitoring:

import io.ktor.network.sockets.*
import io.ktor.network.selector.*
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

class SelectorPerformanceMonitor {
    private val selectorManager = SelectorManager()
    private var connectionCount = 0
    private var totalConnectTime = 0L
    
    suspend fun monitoredConnect(host: String, port: Int): Socket {
        val connectTime = measureTimeMillis {
            return aSocket(selectorManager).tcp().connect(host, port)
        }
        
        connectionCount++
        totalConnectTime += connectTime
        
        println("Connection #$connectionCount to $host:$port took ${connectTime}ms")
        println("Average connect time: ${totalConnectTime / connectionCount}ms")
        
        return aSocket(selectorManager).tcp().connect(host, port)
    }
    
    fun getStats(): String {
        return "Connections: $connectionCount, Average time: ${if (connectionCount > 0) totalConnectTime / connectionCount else 0}ms"
    }
    
    suspend fun close() {
        println("Final stats: ${getStats()}")
        selectorManager.close()
    }
}

Best Practices

  1. Resource Management: Always close selector managers when done
  2. Scope Management: Use appropriate coroutine scopes for selector managers
  3. Error Handling: Handle ClosedChannelCancellationException appropriately
  4. Performance: Reuse selector managers across multiple sockets when possible
  5. Shutdown: Implement graceful shutdown with timeouts for production applications

Install with Tessl CLI

npx tessl i tessl/maven-io-ktor--ktor-network-tvosx64

docs

index.md

io-operations.md

selector-management.md

socket-addresses.md

socket-configuration.md

socket-creation.md

tcp-operations.md

udp-operations.md

tile.json