or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

address-types.mddatagram-sockets.mdindex.mdselectors.mdsocket-builders.mdsocket-interfaces.mdsocket-options.mdutilities.md

selectors.mddocs/

0

# Selectors

1

2

The selector system in Ktor Network provides platform-specific I/O event management built on Kotlin coroutines. Selectors handle asynchronous I/O operations efficiently by managing socket events and coordinating with the coroutine dispatcher.

3

4

## Core Selector Types

5

6

### SelectorManager Interface

7

8

```kotlin { .api }

9

interface SelectorManager : Closeable {

10

val coroutineContext: CoroutineContext

11

fun notifyClosed(selectable: Selectable)

12

suspend fun select(selectable: Selectable, interest: SelectInterest)

13

}

14

```

15

16

Platform-specific selector manager for handling I/O events across multiple sockets.

17

18

**Properties:**

19

- `coroutineContext: CoroutineContext` - The coroutine context used for I/O operations

20

21

**Methods:**

22

- `notifyClosed(selectable: Selectable)` - Notifies that a selectable resource has been closed

23

- `suspend fun select(selectable: Selectable, interest: SelectInterest)` - Suspends until the specified I/O interest is ready

24

25

### SelectorManager Factory

26

27

```kotlin { .api }

28

fun SelectorManager(dispatcher: CoroutineContext = Dispatchers.IO): SelectorManager

29

```

30

31

Creates a platform-specific selector manager instance.

32

33

**Parameters:**

34

- `dispatcher: CoroutineContext` - Coroutine dispatcher for I/O operations (default: Dispatchers.IO)

35

36

**Returns:** `SelectorManager` - Platform-optimized selector manager

37

38

## Selectable Interface

39

40

### Selectable Resource

41

42

```kotlin { .api }

43

interface Selectable {

44

// Platform-specific selectable resource

45

// Implementation details are platform-dependent

46

}

47

```

48

49

Platform-specific interface representing a selectable I/O resource (typically a socket file descriptor).

50

51

## I/O Interest Types

52

53

### SelectInterest Enum

54

55

```kotlin { .api }

56

enum class SelectInterest {

57

READ,

58

WRITE,

59

ACCEPT,

60

CONNECT

61

}

62

```

63

64

Enumeration of different I/O interests that can be monitored by the selector.

65

66

**Values:**

67

- `READ` - Socket ready for reading data

68

- `WRITE` - Socket ready for writing data

69

- `ACCEPT` - Server socket ready to accept connections

70

- `CONNECT` - Client socket connection completed

71

72

## Usage Examples

73

74

### Basic Selector Setup

75

76

```kotlin

77

import io.ktor.network.selector.*

78

import io.ktor.network.sockets.*

79

import kotlinx.coroutines.*

80

81

suspend fun basicSelectorUsage() {

82

// Create selector with default I/O dispatcher

83

val selectorManager = SelectorManager()

84

85

// Create a socket using the selector

86

val socket = aSocket(selectorManager)

87

.tcp()

88

.connect(InetSocketAddress("example.com", 80))

89

90

println("Socket connected using selector")

91

println("Selector context: ${selectorManager.coroutineContext}")

92

93

// Clean up

94

socket.close()

95

selectorManager.close()

96

}

97

```

98

99

### Custom Dispatcher Configuration

100

101

```kotlin

102

suspend fun customDispatcherSelector() {

103

// Create custom dispatcher for I/O operations

104

val customDispatcher = Dispatchers.IO.limitedParallelism(4)

105

106

// Create selector with custom dispatcher

107

val selectorManager = SelectorManager(customDispatcher)

108

109

println("Custom selector created")

110

println("Selector context: ${selectorManager.coroutineContext}")

111

112

// Use selector for multiple sockets

113

val sockets = (1..10).map { port ->

114

aSocket(selectorManager)

115

.tcp()

116

.bind(InetSocketAddress("localhost", 8000 + port))

117

}

118

119

println("Created ${sockets.size} sockets with custom selector")

120

121

// Clean up all sockets

122

sockets.forEach { it.close() }

123

selectorManager.close()

124

}

125

```

126

127

### Multi-Socket Server with Shared Selector

128

129

```kotlin

130

suspend fun multiSocketServer() {

131

// Single selector manages all sockets

132

val selectorManager = SelectorManager()

133

134

// Create multiple server sockets

135

val httpServer = aSocket(selectorManager)

136

.tcp()

137

.bind(InetSocketAddress("localhost", 8080))

138

139

val httpsServer = aSocket(selectorManager)

140

.tcp()

141

.bind(InetSocketAddress("localhost", 8443))

142

143

val adminServer = aSocket(selectorManager)

144

.tcp()

145

.bind(InetSocketAddress("localhost", 9000))

146

147

println("Multi-socket server started:")

148

println("HTTP: ${httpServer.localAddress}")

149

println("HTTPS: ${httpsServer.localAddress}")

150

println("Admin: ${adminServer.localAddress}")

151

152

// Handle connections from all servers concurrently

153

val jobs = listOf(

154

launch { handleServer(httpServer, "HTTP") },

155

launch { handleServer(httpsServer, "HTTPS") },

156

launch { handleServer(adminServer, "Admin") }

157

)

158

159

// Wait for shutdown signal

160

delay(60000) // Run for 1 minute

161

162

// Graceful shutdown

163

jobs.forEach { it.cancel() }

164

httpServer.close()

165

httpsServer.close()

166

adminServer.close()

167

selectorManager.close()

168

}

169

170

suspend fun handleServer(serverSocket: ServerSocket, name: String) {

171

while (!serverSocket.isClosed) {

172

try {

173

val client = serverSocket.accept()

174

launch {

175

println("$name client connected: ${client.remoteAddress}")

176

// Handle client...

177

delay(1000)

178

client.close()

179

}

180

} catch (e: Exception) {

181

println("$name server error: ${e.message}")

182

break

183

}

184

}

185

}

186

```

187

188

### UDP with Selector

189

190

```kotlin

191

suspend fun udpWithSelector() {

192

val selectorManager = SelectorManager()

193

194

// Multiple UDP sockets sharing selector

195

val receivers = (1..3).map { i ->

196

aSocket(selectorManager)

197

.udp()

198

.bind(InetSocketAddress("localhost", 9000 + i))

199

}

200

201

val sender = aSocket(selectorManager)

202

.udp()

203

.bind()

204

205

println("UDP sockets created:")

206

receivers.forEachIndexed { i, socket ->

207

println("Receiver $i: ${socket.localAddress}")

208

}

209

210

// Send messages to all receivers

211

receivers.forEach { receiver ->

212

val message = "Hello to ${receiver.localAddress}"

213

val datagram = Datagram(

214

packet = ByteReadPacket(message.toByteArray()),

215

address = receiver.localAddress

216

)

217

sender.send(datagram)

218

}

219

220

// Receive messages

221

receivers.forEachIndexed { i, receiver ->

222

launch {

223

try {

224

val datagram = receiver.receive()

225

println("Receiver $i got: ${datagram.packet.readText()}")

226

} catch (e: Exception) {

227

println("Receiver $i error: ${e.message}")

228

}

229

}

230

}

231

232

delay(1000)

233

234

// Clean up

235

receivers.forEach { it.close() }

236

sender.close()

237

selectorManager.close()

238

}

239

```

240

241

### Selector Lifecycle Management

242

243

```kotlin

244

class NetworkService {

245

private lateinit var selectorManager: SelectorManager

246

private val activeSockets = mutableListOf<ASocket>()

247

248

suspend fun start() {

249

selectorManager = SelectorManager()

250

println("Network service started")

251

println("Selector context: ${selectorManager.coroutineContext}")

252

}

253

254

suspend fun createSocket(address: SocketAddress): Socket {

255

val socket = aSocket(selectorManager)

256

.tcp()

257

.connect(address)

258

259

activeSockets.add(socket)

260

return socket

261

}

262

263

suspend fun createServer(address: SocketAddress): ServerSocket {

264

val server = aSocket(selectorManager)

265

.tcp()

266

.bind(address)

267

268

activeSockets.add(server)

269

return server

270

}

271

272

suspend fun shutdown() {

273

println("Shutting down network service...")

274

275

// Close all active sockets

276

activeSockets.forEach { socket ->

277

try {

278

socket.close()

279

println("Closed socket: ${socket}")

280

} catch (e: Exception) {

281

println("Error closing socket: ${e.message}")

282

}

283

}

284

285

// Close selector manager

286

selectorManager.close()

287

println("Network service shutdown complete")

288

}

289

}

290

291

suspend fun useNetworkService() {

292

val service = NetworkService()

293

294

try {

295

service.start()

296

297

// Create some sockets

298

val client = service.createSocket(InetSocketAddress("example.com", 80))

299

val server = service.createServer(InetSocketAddress("localhost", 8080))

300

301

println("Sockets created through service")

302

303

// Use sockets...

304

delay(5000)

305

306

} finally {

307

service.shutdown()

308

}

309

}

310

```

311

312

### Performance Monitoring

313

314

```kotlin

315

suspend fun monitorSelectorPerformance() {

316

val selectorManager = SelectorManager()

317

318

// Track selector usage

319

var socketCount = 0

320

val startTime = System.currentTimeMillis()

321

322

// Create many sockets to test selector efficiency

323

val sockets = mutableListOf<ASocket>()

324

325

repeat(100) { i ->

326

val socket = aSocket(selectorManager)

327

.tcp()

328

.bind(InetSocketAddress("localhost", 0)) // Any available port

329

330

sockets.add(socket)

331

socketCount++

332

333

if (i % 20 == 0) {

334

val elapsed = System.currentTimeMillis() - startTime

335

println("Created $socketCount sockets in ${elapsed}ms")

336

println("Selector handling ${sockets.size} active sockets")

337

}

338

}

339

340

println("Selector managing ${sockets.size} sockets efficiently")

341

342

// Test concurrent operations

343

val operationStart = System.currentTimeMillis()

344

345

// Simulate concurrent socket operations

346

val jobs = sockets.map { socket ->

347

launch {

348

// Each socket performs some operation

349

delay(10)

350

}

351

}

352

353

jobs.joinAll()

354

val operationTime = System.currentTimeMillis() - operationStart

355

println("Concurrent operations on ${sockets.size} sockets took ${operationTime}ms")

356

357

// Clean up

358

sockets.forEach { it.close() }

359

selectorManager.close()

360

}

361

```

362

363

### Error Handling with Selectors

364

365

```kotlin

366

suspend fun selectorErrorHandling() {

367

var selectorManager: SelectorManager? = null

368

369

try {

370

selectorManager = SelectorManager()

371

372

val socket = aSocket(selectorManager)

373

.tcp()

374

.configure {

375

socketTimeout = 5000 // Short timeout for testing

376

}

377

.connect(InetSocketAddress("nonexistent.example.com", 80))

378

379

socket.close()

380

381

} catch (e: Exception) {

382

println("Socket operation failed: ${e.message}")

383

384

} finally {

385

// Always clean up selector

386

selectorManager?.close()

387

println("Selector cleanup completed")

388

}

389

}

390

```

391

392

## Platform Considerations

393

394

### macOS ARM64 Optimizations

395

396

The selector implementation on macOS ARM64 uses native kqueue for optimal performance:

397

398

```kotlin

399

suspend fun macosOptimizedSelector() {

400

// Selector automatically uses kqueue on macOS

401

val selectorManager = SelectorManager()

402

403

// Take advantage of macOS-specific features

404

val socket = aSocket(selectorManager)

405

.tcp()

406

.configure {

407

// macOS supports efficient port reuse

408

reusePort = true

409

reuseAddress = true

410

}

411

.bind(InetSocketAddress("localhost", 8080))

412

413

println("macOS-optimized selector active")

414

println("Using native kqueue for I/O events")

415

416

socket.close()

417

selectorManager.close()

418

}

419

```

420

421

## Best Practices

422

423

1. **Shared Selectors**: Use one selector manager for multiple sockets to optimize resource usage

424

2. **Proper Cleanup**: Always close selector managers in finally blocks or use .use {}

425

3. **Custom Dispatchers**: Configure appropriate dispatchers based on workload characteristics

426

4. **Lifecycle Management**: Coordinate selector lifecycle with application lifecycle

427

5. **Error Handling**: Handle selector-related exceptions gracefully

428

429

## Type Definitions

430

431

### Required Import Statements

432

433

```kotlin

434

import io.ktor.network.selector.*

435

import io.ktor.network.sockets.*

436

import kotlinx.coroutines.*

437

```

438

439

### Related Types

440

441

- Used by all socket builders - See [Socket Builders](./socket-builders.md)

442

- Manages all socket types - See [Socket Interfaces](./socket-interfaces.md)

443

- Coordinates UDP operations - See [Datagram Sockets](./datagram-sockets.md)

444

- `CoroutineContext`, `Dispatchers` - From kotlinx-coroutines

445

- `Closeable` - From Kotlin standard library

446

447

### Exception Types

448

449

```kotlin { .api }

450

class ClosedChannelCancellationException : CancellationException

451

```

452

453

Selectors may throw platform-specific I/O exceptions during operations, including `ClosedChannelCancellationException` when operations are performed on closed channels. Always wrap selector operations in appropriate try-catch blocks for production applications.