0
# Selectors
1
2
The selector system in Ktor Network provides platform-specific I/O event management built on Kotlin coroutines. Selectors handle asynchronous I/O operations efficiently by managing socket events and coordinating with the coroutine dispatcher.
3
4
## Core Selector Types
5
6
### SelectorManager Interface
7
8
```kotlin { .api }
9
interface SelectorManager : Closeable {
10
val coroutineContext: CoroutineContext
11
fun notifyClosed(selectable: Selectable)
12
suspend fun select(selectable: Selectable, interest: SelectInterest)
13
}
14
```
15
16
Platform-specific selector manager for handling I/O events across multiple sockets.
17
18
**Properties:**
19
- `coroutineContext: CoroutineContext` - The coroutine context used for I/O operations
20
21
**Methods:**
22
- `notifyClosed(selectable: Selectable)` - Notifies that a selectable resource has been closed
23
- `suspend fun select(selectable: Selectable, interest: SelectInterest)` - Suspends until the specified I/O interest is ready
24
25
### SelectorManager Factory
26
27
```kotlin { .api }
28
fun SelectorManager(dispatcher: CoroutineContext = Dispatchers.IO): SelectorManager
29
```
30
31
Creates a platform-specific selector manager instance.
32
33
**Parameters:**
34
- `dispatcher: CoroutineContext` - Coroutine dispatcher for I/O operations (default: Dispatchers.IO)
35
36
**Returns:** `SelectorManager` - Platform-optimized selector manager
37
38
## Selectable Interface
39
40
### Selectable Resource
41
42
```kotlin { .api }
43
interface Selectable {
44
// Platform-specific selectable resource
45
// Implementation details are platform-dependent
46
}
47
```
48
49
Platform-specific interface representing a selectable I/O resource (typically a socket file descriptor).
50
51
## I/O Interest Types
52
53
### SelectInterest Enum
54
55
```kotlin { .api }
56
enum class SelectInterest {
57
READ,
58
WRITE,
59
ACCEPT,
60
CONNECT
61
}
62
```
63
64
Enumeration of different I/O interests that can be monitored by the selector.
65
66
**Values:**
67
- `READ` - Socket ready for reading data
68
- `WRITE` - Socket ready for writing data
69
- `ACCEPT` - Server socket ready to accept connections
70
- `CONNECT` - Client socket connection completed
71
72
## Usage Examples
73
74
### Basic Selector Setup
75
76
```kotlin
77
import io.ktor.network.selector.*
78
import io.ktor.network.sockets.*
79
import kotlinx.coroutines.*
80
81
suspend fun basicSelectorUsage() {
82
// Create selector with default I/O dispatcher
83
val selectorManager = SelectorManager()
84
85
// Create a socket using the selector
86
val socket = aSocket(selectorManager)
87
.tcp()
88
.connect(InetSocketAddress("example.com", 80))
89
90
println("Socket connected using selector")
91
println("Selector context: ${selectorManager.coroutineContext}")
92
93
// Clean up
94
socket.close()
95
selectorManager.close()
96
}
97
```
98
99
### Custom Dispatcher Configuration
100
101
```kotlin
102
suspend fun customDispatcherSelector() {
103
// Create custom dispatcher for I/O operations
104
val customDispatcher = Dispatchers.IO.limitedParallelism(4)
105
106
// Create selector with custom dispatcher
107
val selectorManager = SelectorManager(customDispatcher)
108
109
println("Custom selector created")
110
println("Selector context: ${selectorManager.coroutineContext}")
111
112
// Use selector for multiple sockets
113
val sockets = (1..10).map { port ->
114
aSocket(selectorManager)
115
.tcp()
116
.bind(InetSocketAddress("localhost", 8000 + port))
117
}
118
119
println("Created ${sockets.size} sockets with custom selector")
120
121
// Clean up all sockets
122
sockets.forEach { it.close() }
123
selectorManager.close()
124
}
125
```
126
127
### Multi-Socket Server with Shared Selector
128
129
```kotlin
130
suspend fun multiSocketServer() {
131
// Single selector manages all sockets
132
val selectorManager = SelectorManager()
133
134
// Create multiple server sockets
135
val httpServer = aSocket(selectorManager)
136
.tcp()
137
.bind(InetSocketAddress("localhost", 8080))
138
139
val httpsServer = aSocket(selectorManager)
140
.tcp()
141
.bind(InetSocketAddress("localhost", 8443))
142
143
val adminServer = aSocket(selectorManager)
144
.tcp()
145
.bind(InetSocketAddress("localhost", 9000))
146
147
println("Multi-socket server started:")
148
println("HTTP: ${httpServer.localAddress}")
149
println("HTTPS: ${httpsServer.localAddress}")
150
println("Admin: ${adminServer.localAddress}")
151
152
// Handle connections from all servers concurrently
153
val jobs = listOf(
154
launch { handleServer(httpServer, "HTTP") },
155
launch { handleServer(httpsServer, "HTTPS") },
156
launch { handleServer(adminServer, "Admin") }
157
)
158
159
// Wait for shutdown signal
160
delay(60000) // Run for 1 minute
161
162
// Graceful shutdown
163
jobs.forEach { it.cancel() }
164
httpServer.close()
165
httpsServer.close()
166
adminServer.close()
167
selectorManager.close()
168
}
169
170
suspend fun handleServer(serverSocket: ServerSocket, name: String) {
171
while (!serverSocket.isClosed) {
172
try {
173
val client = serverSocket.accept()
174
launch {
175
println("$name client connected: ${client.remoteAddress}")
176
// Handle client...
177
delay(1000)
178
client.close()
179
}
180
} catch (e: Exception) {
181
println("$name server error: ${e.message}")
182
break
183
}
184
}
185
}
186
```
187
188
### UDP with Selector
189
190
```kotlin
191
suspend fun udpWithSelector() {
192
val selectorManager = SelectorManager()
193
194
// Multiple UDP sockets sharing selector
195
val receivers = (1..3).map { i ->
196
aSocket(selectorManager)
197
.udp()
198
.bind(InetSocketAddress("localhost", 9000 + i))
199
}
200
201
val sender = aSocket(selectorManager)
202
.udp()
203
.bind()
204
205
println("UDP sockets created:")
206
receivers.forEachIndexed { i, socket ->
207
println("Receiver $i: ${socket.localAddress}")
208
}
209
210
// Send messages to all receivers
211
receivers.forEach { receiver ->
212
val message = "Hello to ${receiver.localAddress}"
213
val datagram = Datagram(
214
packet = ByteReadPacket(message.toByteArray()),
215
address = receiver.localAddress
216
)
217
sender.send(datagram)
218
}
219
220
// Receive messages
221
receivers.forEachIndexed { i, receiver ->
222
launch {
223
try {
224
val datagram = receiver.receive()
225
println("Receiver $i got: ${datagram.packet.readText()}")
226
} catch (e: Exception) {
227
println("Receiver $i error: ${e.message}")
228
}
229
}
230
}
231
232
delay(1000)
233
234
// Clean up
235
receivers.forEach { it.close() }
236
sender.close()
237
selectorManager.close()
238
}
239
```
240
241
### Selector Lifecycle Management
242
243
```kotlin
244
class NetworkService {
245
private lateinit var selectorManager: SelectorManager
246
private val activeSockets = mutableListOf<ASocket>()
247
248
suspend fun start() {
249
selectorManager = SelectorManager()
250
println("Network service started")
251
println("Selector context: ${selectorManager.coroutineContext}")
252
}
253
254
suspend fun createSocket(address: SocketAddress): Socket {
255
val socket = aSocket(selectorManager)
256
.tcp()
257
.connect(address)
258
259
activeSockets.add(socket)
260
return socket
261
}
262
263
suspend fun createServer(address: SocketAddress): ServerSocket {
264
val server = aSocket(selectorManager)
265
.tcp()
266
.bind(address)
267
268
activeSockets.add(server)
269
return server
270
}
271
272
suspend fun shutdown() {
273
println("Shutting down network service...")
274
275
// Close all active sockets
276
activeSockets.forEach { socket ->
277
try {
278
socket.close()
279
println("Closed socket: ${socket}")
280
} catch (e: Exception) {
281
println("Error closing socket: ${e.message}")
282
}
283
}
284
285
// Close selector manager
286
selectorManager.close()
287
println("Network service shutdown complete")
288
}
289
}
290
291
suspend fun useNetworkService() {
292
val service = NetworkService()
293
294
try {
295
service.start()
296
297
// Create some sockets
298
val client = service.createSocket(InetSocketAddress("example.com", 80))
299
val server = service.createServer(InetSocketAddress("localhost", 8080))
300
301
println("Sockets created through service")
302
303
// Use sockets...
304
delay(5000)
305
306
} finally {
307
service.shutdown()
308
}
309
}
310
```
311
312
### Performance Monitoring
313
314
```kotlin
315
suspend fun monitorSelectorPerformance() {
316
val selectorManager = SelectorManager()
317
318
// Track selector usage
319
var socketCount = 0
320
val startTime = System.currentTimeMillis()
321
322
// Create many sockets to test selector efficiency
323
val sockets = mutableListOf<ASocket>()
324
325
repeat(100) { i ->
326
val socket = aSocket(selectorManager)
327
.tcp()
328
.bind(InetSocketAddress("localhost", 0)) // Any available port
329
330
sockets.add(socket)
331
socketCount++
332
333
if (i % 20 == 0) {
334
val elapsed = System.currentTimeMillis() - startTime
335
println("Created $socketCount sockets in ${elapsed}ms")
336
println("Selector handling ${sockets.size} active sockets")
337
}
338
}
339
340
println("Selector managing ${sockets.size} sockets efficiently")
341
342
// Test concurrent operations
343
val operationStart = System.currentTimeMillis()
344
345
// Simulate concurrent socket operations
346
val jobs = sockets.map { socket ->
347
launch {
348
// Each socket performs some operation
349
delay(10)
350
}
351
}
352
353
jobs.joinAll()
354
val operationTime = System.currentTimeMillis() - operationStart
355
println("Concurrent operations on ${sockets.size} sockets took ${operationTime}ms")
356
357
// Clean up
358
sockets.forEach { it.close() }
359
selectorManager.close()
360
}
361
```
362
363
### Error Handling with Selectors
364
365
```kotlin
366
suspend fun selectorErrorHandling() {
367
var selectorManager: SelectorManager? = null
368
369
try {
370
selectorManager = SelectorManager()
371
372
val socket = aSocket(selectorManager)
373
.tcp()
374
.configure {
375
socketTimeout = 5000 // Short timeout for testing
376
}
377
.connect(InetSocketAddress("nonexistent.example.com", 80))
378
379
socket.close()
380
381
} catch (e: Exception) {
382
println("Socket operation failed: ${e.message}")
383
384
} finally {
385
// Always clean up selector
386
selectorManager?.close()
387
println("Selector cleanup completed")
388
}
389
}
390
```
391
392
## Platform Considerations
393
394
### macOS ARM64 Optimizations
395
396
The selector implementation on macOS ARM64 uses native kqueue for optimal performance:
397
398
```kotlin
399
suspend fun macosOptimizedSelector() {
400
// Selector automatically uses kqueue on macOS
401
val selectorManager = SelectorManager()
402
403
// Take advantage of macOS-specific features
404
val socket = aSocket(selectorManager)
405
.tcp()
406
.configure {
407
// macOS supports efficient port reuse
408
reusePort = true
409
reuseAddress = true
410
}
411
.bind(InetSocketAddress("localhost", 8080))
412
413
println("macOS-optimized selector active")
414
println("Using native kqueue for I/O events")
415
416
socket.close()
417
selectorManager.close()
418
}
419
```
420
421
## Best Practices
422
423
1. **Shared Selectors**: Use one selector manager for multiple sockets to optimize resource usage
424
2. **Proper Cleanup**: Always close selector managers in finally blocks or use .use {}
425
3. **Custom Dispatchers**: Configure appropriate dispatchers based on workload characteristics
426
4. **Lifecycle Management**: Coordinate selector lifecycle with application lifecycle
427
5. **Error Handling**: Handle selector-related exceptions gracefully
428
429
## Type Definitions
430
431
### Required Import Statements
432
433
```kotlin
434
import io.ktor.network.selector.*
435
import io.ktor.network.sockets.*
436
import kotlinx.coroutines.*
437
```
438
439
### Related Types
440
441
- Used by all socket builders - See [Socket Builders](./socket-builders.md)
442
- Manages all socket types - See [Socket Interfaces](./socket-interfaces.md)
443
- Coordinates UDP operations - See [Datagram Sockets](./datagram-sockets.md)
444
- `CoroutineContext`, `Dispatchers` - From kotlinx-coroutines
445
- `Closeable` - From Kotlin standard library
446
447
### Exception Types
448
449
```kotlin { .api }
450
class ClosedChannelCancellationException : CancellationException
451
```
452
453
Selectors may throw platform-specific I/O exceptions during operations, including `ClosedChannelCancellationException` when operations are performed on closed channels. Always wrap selector operations in appropriate try-catch blocks for production applications.