0
# Selector Management
1
2
Asynchronous I/O management for handling multiple socket operations concurrently using coroutine-based selection.
3
4
## Capabilities
5
6
### Selector Manager
7
8
Core interface for managing asynchronous socket operations and selection events.
9
10
```kotlin { .api }
11
/**
12
* SelectorManager interface allows Selectable wait for SelectInterest.
13
*/
14
interface SelectorManager : CoroutineScope, Closeable {
15
/**
16
* Notifies the selector that selectable has been closed.
17
* @param selectable The selectable that has been closed
18
*/
19
fun notifyClosed(selectable: Selectable)
20
21
/**
22
* Suspends until interest is selected for selectable.
23
* May cause manager to allocate and run selector instance if not yet created.
24
* Only one selection is allowed per interest per selectable but you can
25
* select for different interests for the same selectable simultaneously.
26
* @param selectable The selectable to wait for
27
* @param interest The type of I/O interest to wait for
28
*/
29
suspend fun select(selectable: Selectable, interest: SelectInterest)
30
}
31
32
/**
33
* Creates the selector manager for current platform.
34
* @param dispatcher CoroutineContext for the selector manager (default: EmptyCoroutineContext)
35
* @returns SelectorManager instance for the current platform
36
*/
37
fun SelectorManager(
38
dispatcher: CoroutineContext = EmptyCoroutineContext
39
): SelectorManager
40
```
41
42
**Usage Examples:**
43
44
```kotlin
45
import io.ktor.network.sockets.*
46
import io.ktor.network.selector.*
47
import kotlinx.coroutines.*
48
49
// Basic selector manager usage
50
val selectorManager = SelectorManager()
51
52
// Use with custom dispatcher
53
val customDispatcher = Dispatchers.IO.limitedParallelism(4)
54
val customSelectorManager = SelectorManager(customDispatcher)
55
56
// Create sockets using the selector manager
57
val socket = aSocket(selectorManager).tcp().connect("localhost", 8080)
58
59
// Clean up when done
60
selectorManager.close()
61
```
62
63
### Selection Interest Types
64
65
Enumeration of I/O interest types that can be monitored by the selector.
66
67
```kotlin { .api }
68
/**
69
* Select interest kind.
70
*/
71
enum class SelectInterest {
72
/** Interest in read operations */
73
READ,
74
75
/** Interest in write operations */
76
WRITE,
77
78
/** Interest in accept operations (server sockets) */
79
ACCEPT,
80
81
/** Interest in connect operations */
82
CONNECT;
83
84
companion object {
85
/** Array containing all selection interests */
86
val AllInterests: Array<SelectInterest>
87
}
88
}
89
```
90
91
**Usage Example:**
92
93
```kotlin
94
import io.ktor.network.selector.*
95
96
// Check all available interests
97
SelectInterest.AllInterests.forEach { interest ->
98
println("Available interest: $interest")
99
}
100
101
// Use specific interests
102
val readInterest = SelectInterest.READ
103
val writeInterest = SelectInterest.WRITE
104
val acceptInterest = SelectInterest.ACCEPT
105
val connectInterest = SelectInterest.CONNECT
106
```
107
108
### Selectable Interface
109
110
Interface for objects that can be selected for I/O operations.
111
112
```kotlin { .api }
113
/**
114
* A selectable entity with selectable channel and interested operations subscriptions.
115
*/
116
interface Selectable
117
```
118
119
### Selection Exception Handling
120
121
Exception types related to selection operations.
122
123
```kotlin { .api }
124
/**
125
* Exception thrown when a channel is closed during selection
126
*/
127
class ClosedChannelCancellationException : CancellationException("Closed channel.")
128
```
129
130
**Usage Example:**
131
132
```kotlin
133
import io.ktor.network.selector.*
134
import kotlinx.coroutines.*
135
136
try {
137
// Some selector operation that might fail
138
selectorManager.select(selectable, SelectInterest.READ)
139
} catch (e: ClosedChannelCancellationException) {
140
println("Channel was closed during selection: ${e.message}")
141
}
142
```
143
144
### Complete Selector Management Examples
145
146
**Basic TCP Server with Selector Management:**
147
148
```kotlin
149
import io.ktor.network.sockets.*
150
import io.ktor.network.selector.*
151
import kotlinx.coroutines.*
152
153
suspend fun tcpServerWithSelector() {
154
// Create selector manager with custom scope
155
val selectorScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
156
val selectorManager = SelectorManager(selectorScope.coroutineContext)
157
158
try {
159
val serverSocket = aSocket(selectorManager).tcp().bind("localhost", 8080)
160
println("Server started on port ${serverSocket.port}")
161
162
// Handle multiple clients concurrently
163
while (selectorScope.isActive) {
164
try {
165
val clientSocket = serverSocket.accept()
166
167
// Launch coroutine for each client
168
selectorScope.launch {
169
handleClient(clientSocket)
170
}
171
172
} catch (e: Exception) {
173
println("Error accepting client: ${e.message}")
174
}
175
}
176
177
} finally {
178
selectorManager.close()
179
selectorScope.cancel()
180
}
181
}
182
183
suspend fun handleClient(socket: Socket) {
184
try {
185
val connection = socket.connection()
186
187
while (true) {
188
val line = connection.input.readUTF8Line() ?: break
189
connection.output.writeStringUtf8("Echo: $line\n")
190
connection.output.flush()
191
}
192
193
} catch (e: Exception) {
194
println("Client error: ${e.message}")
195
} finally {
196
socket.close()
197
}
198
}
199
```
200
201
**Multiple Socket Management:**
202
203
```kotlin
204
import io.ktor.network.sockets.*
205
import io.ktor.network.selector.*
206
import kotlinx.coroutines.*
207
208
class MultiSocketManager {
209
private val selectorManager = SelectorManager()
210
private val activeSockets = mutableSetOf<Socket>()
211
212
suspend fun connectToMultipleServers(servers: List<Pair<String, Int>>) {
213
val connections = servers.map { (host, port) ->
214
async {
215
try {
216
val socket = aSocket(selectorManager).tcp().connect(host, port)
217
activeSockets.add(socket)
218
socket to socket.connection()
219
} catch (e: Exception) {
220
println("Failed to connect to $host:$port - ${e.message}")
221
null
222
}
223
}
224
}
225
226
// Wait for all connections
227
val validConnections = connections.awaitAll().filterNotNull()
228
229
// Handle each connection
230
validConnections.forEach { (socket, connection) ->
231
launch {
232
try {
233
handleConnection(connection)
234
} finally {
235
activeSockets.remove(socket)
236
socket.close()
237
}
238
}
239
}
240
}
241
242
private suspend fun handleConnection(connection: Connection) {
243
// Handle individual connection
244
connection.output.writeStringUtf8("Hello from client\n")
245
connection.output.flush()
246
247
val response = connection.input.readUTF8Line()
248
println("Server response: $response")
249
}
250
251
suspend fun closeAll() {
252
activeSockets.forEach { it.close() }
253
activeSockets.clear()
254
selectorManager.close()
255
}
256
}
257
```
258
259
**Custom Selector with Resource Management:**
260
261
```kotlin
262
import io.ktor.network.sockets.*
263
import io.ktor.network.selector.*
264
import kotlinx.coroutines.*
265
import kotlin.time.Duration.Companion.minutes
266
267
class ManagedSelectorService {
268
private var selectorManager: SelectorManager? = null
269
private val serviceScope = CoroutineScope(
270
Dispatchers.IO +
271
SupervisorJob() +
272
CoroutineName("SelectorService")
273
)
274
275
fun start() {
276
selectorManager = SelectorManager(serviceScope.coroutineContext)
277
}
278
279
suspend fun createTcpConnection(host: String, port: Int): Socket {
280
val manager = selectorManager ?: throw IllegalStateException("Service not started")
281
282
return aSocket(manager).tcp().connect(host, port) {
283
socketTimeout = 30000 // 30 seconds
284
keepAlive = true
285
}
286
}
287
288
suspend fun createUdpSocket(port: Int = 0): BoundDatagramSocket {
289
val manager = selectorManager ?: throw IllegalStateException("Service not started")
290
291
return aSocket(manager).udp().bind("0.0.0.0", port) {
292
reuseAddress = true
293
}
294
}
295
296
suspend fun createTcpServer(port: Int): ServerSocket {
297
val manager = selectorManager ?: throw IllegalStateException("Service not started")
298
299
return aSocket(manager).tcp().bind("0.0.0.0", port) {
300
backlogSize = 100
301
reuseAddress = true
302
}
303
}
304
305
suspend fun shutdown() {
306
try {
307
// Graceful shutdown with timeout
308
withTimeout(1.minutes) {
309
serviceScope.coroutineContext.job.children.forEach { child ->
310
child.cancel()
311
child.join()
312
}
313
}
314
} catch (e: TimeoutCancellationException) {
315
println("Timeout during shutdown, forcing close")
316
} finally {
317
selectorManager?.close()
318
serviceScope.cancel()
319
}
320
}
321
}
322
323
// Usage
324
suspend fun managedServiceExample() {
325
val service = ManagedSelectorService()
326
327
try {
328
service.start()
329
330
// Create multiple connections
331
val socket1 = service.createTcpConnection("api1.example.com", 80)
332
val socket2 = service.createTcpConnection("api2.example.com", 80)
333
val udpSocket = service.createUdpSocket(8080)
334
val server = service.createTcpServer(9090)
335
336
// Use sockets...
337
338
// Clean shutdown
339
service.shutdown()
340
341
} catch (e: Exception) {
342
println("Service error: ${e.message}")
343
service.shutdown()
344
}
345
}
346
```
347
348
**Performance Monitoring:**
349
350
```kotlin
351
import io.ktor.network.sockets.*
352
import io.ktor.network.selector.*
353
import kotlinx.coroutines.*
354
import kotlin.system.measureTimeMillis
355
356
class SelectorPerformanceMonitor {
357
private val selectorManager = SelectorManager()
358
private var connectionCount = 0
359
private var totalConnectTime = 0L
360
361
suspend fun monitoredConnect(host: String, port: Int): Socket {
362
val connectTime = measureTimeMillis {
363
return aSocket(selectorManager).tcp().connect(host, port)
364
}
365
366
connectionCount++
367
totalConnectTime += connectTime
368
369
println("Connection #$connectionCount to $host:$port took ${connectTime}ms")
370
println("Average connect time: ${totalConnectTime / connectionCount}ms")
371
372
return aSocket(selectorManager).tcp().connect(host, port)
373
}
374
375
fun getStats(): String {
376
return "Connections: $connectionCount, Average time: ${if (connectionCount > 0) totalConnectTime / connectionCount else 0}ms"
377
}
378
379
suspend fun close() {
380
println("Final stats: ${getStats()}")
381
selectorManager.close()
382
}
383
}
384
```
385
386
## Best Practices
387
388
1. **Resource Management**: Always close selector managers when done
389
2. **Scope Management**: Use appropriate coroutine scopes for selector managers
390
3. **Error Handling**: Handle ClosedChannelCancellationException appropriately
391
4. **Performance**: Reuse selector managers across multiple sockets when possible
392
5. **Shutdown**: Implement graceful shutdown with timeouts for production applications