or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdio-operations.mdselector-management.mdsocket-addresses.mdsocket-configuration.mdsocket-creation.mdtcp-operations.mdudp-operations.md

selector-management.mddocs/

0

# Selector Management

1

2

Asynchronous I/O management for handling multiple socket operations concurrently using coroutine-based selection.

3

4

## Capabilities

5

6

### Selector Manager

7

8

Core interface for managing asynchronous socket operations and selection events.

9

10

```kotlin { .api }

11

/**

12

* SelectorManager interface allows Selectable wait for SelectInterest.

13

*/

14

interface SelectorManager : CoroutineScope, Closeable {

15

/**

16

* Notifies the selector that selectable has been closed.

17

* @param selectable The selectable that has been closed

18

*/

19

fun notifyClosed(selectable: Selectable)

20

21

/**

22

* Suspends until interest is selected for selectable.

23

* May cause manager to allocate and run selector instance if not yet created.

24

* Only one selection is allowed per interest per selectable but you can

25

* select for different interests for the same selectable simultaneously.

26

* @param selectable The selectable to wait for

27

* @param interest The type of I/O interest to wait for

28

*/

29

suspend fun select(selectable: Selectable, interest: SelectInterest)

30

}

31

32

/**

33

* Creates the selector manager for current platform.

34

* @param dispatcher CoroutineContext for the selector manager (default: EmptyCoroutineContext)

35

* @returns SelectorManager instance for the current platform

36

*/

37

fun SelectorManager(

38

dispatcher: CoroutineContext = EmptyCoroutineContext

39

): SelectorManager

40

```

41

42

**Usage Examples:**

43

44

```kotlin

45

import io.ktor.network.sockets.*

46

import io.ktor.network.selector.*

47

import kotlinx.coroutines.*

48

49

// Basic selector manager usage

50

val selectorManager = SelectorManager()

51

52

// Use with custom dispatcher

53

val customDispatcher = Dispatchers.IO.limitedParallelism(4)

54

val customSelectorManager = SelectorManager(customDispatcher)

55

56

// Create sockets using the selector manager

57

val socket = aSocket(selectorManager).tcp().connect("localhost", 8080)

58

59

// Clean up when done

60

selectorManager.close()

61

```

62

63

### Selection Interest Types

64

65

Enumeration of I/O interest types that can be monitored by the selector.

66

67

```kotlin { .api }

68

/**

69

* Select interest kind.

70

*/

71

enum class SelectInterest {

72

/** Interest in read operations */

73

READ,

74

75

/** Interest in write operations */

76

WRITE,

77

78

/** Interest in accept operations (server sockets) */

79

ACCEPT,

80

81

/** Interest in connect operations */

82

CONNECT;

83

84

companion object {

85

/** Array containing all selection interests */

86

val AllInterests: Array<SelectInterest>

87

}

88

}

89

```

90

91

**Usage Example:**

92

93

```kotlin

94

import io.ktor.network.selector.*

95

96

// Check all available interests

97

SelectInterest.AllInterests.forEach { interest ->

98

println("Available interest: $interest")

99

}

100

101

// Use specific interests

102

val readInterest = SelectInterest.READ

103

val writeInterest = SelectInterest.WRITE

104

val acceptInterest = SelectInterest.ACCEPT

105

val connectInterest = SelectInterest.CONNECT

106

```

107

108

### Selectable Interface

109

110

Interface for objects that can be selected for I/O operations.

111

112

```kotlin { .api }

113

/**

114

* A selectable entity with selectable channel and interested operations subscriptions.

115

*/

116

interface Selectable

117

```

118

119

### Selection Exception Handling

120

121

Exception types related to selection operations.

122

123

```kotlin { .api }

124

/**

125

* Exception thrown when a channel is closed during selection

126

*/

127

class ClosedChannelCancellationException : CancellationException("Closed channel.")

128

```

129

130

**Usage Example:**

131

132

```kotlin

133

import io.ktor.network.selector.*

134

import kotlinx.coroutines.*

135

136

try {

137

// Some selector operation that might fail

138

selectorManager.select(selectable, SelectInterest.READ)

139

} catch (e: ClosedChannelCancellationException) {

140

println("Channel was closed during selection: ${e.message}")

141

}

142

```

143

144

### Complete Selector Management Examples

145

146

**Basic TCP Server with Selector Management:**

147

148

```kotlin

149

import io.ktor.network.sockets.*

150

import io.ktor.network.selector.*

151

import kotlinx.coroutines.*

152

153

suspend fun tcpServerWithSelector() {

154

// Create selector manager with custom scope

155

val selectorScope = CoroutineScope(Dispatchers.IO + SupervisorJob())

156

val selectorManager = SelectorManager(selectorScope.coroutineContext)

157

158

try {

159

val serverSocket = aSocket(selectorManager).tcp().bind("localhost", 8080)

160

println("Server started on port ${serverSocket.port}")

161

162

// Handle multiple clients concurrently

163

while (selectorScope.isActive) {

164

try {

165

val clientSocket = serverSocket.accept()

166

167

// Launch coroutine for each client

168

selectorScope.launch {

169

handleClient(clientSocket)

170

}

171

172

} catch (e: Exception) {

173

println("Error accepting client: ${e.message}")

174

}

175

}

176

177

} finally {

178

selectorManager.close()

179

selectorScope.cancel()

180

}

181

}

182

183

suspend fun handleClient(socket: Socket) {

184

try {

185

val connection = socket.connection()

186

187

while (true) {

188

val line = connection.input.readUTF8Line() ?: break

189

connection.output.writeStringUtf8("Echo: $line\n")

190

connection.output.flush()

191

}

192

193

} catch (e: Exception) {

194

println("Client error: ${e.message}")

195

} finally {

196

socket.close()

197

}

198

}

199

```

200

201

**Multiple Socket Management:**

202

203

```kotlin

204

import io.ktor.network.sockets.*

205

import io.ktor.network.selector.*

206

import kotlinx.coroutines.*

207

208

class MultiSocketManager {

209

private val selectorManager = SelectorManager()

210

private val activeSockets = mutableSetOf<Socket>()

211

212

suspend fun connectToMultipleServers(servers: List<Pair<String, Int>>) {

213

val connections = servers.map { (host, port) ->

214

async {

215

try {

216

val socket = aSocket(selectorManager).tcp().connect(host, port)

217

activeSockets.add(socket)

218

socket to socket.connection()

219

} catch (e: Exception) {

220

println("Failed to connect to $host:$port - ${e.message}")

221

null

222

}

223

}

224

}

225

226

// Wait for all connections

227

val validConnections = connections.awaitAll().filterNotNull()

228

229

// Handle each connection

230

validConnections.forEach { (socket, connection) ->

231

launch {

232

try {

233

handleConnection(connection)

234

} finally {

235

activeSockets.remove(socket)

236

socket.close()

237

}

238

}

239

}

240

}

241

242

private suspend fun handleConnection(connection: Connection) {

243

// Handle individual connection

244

connection.output.writeStringUtf8("Hello from client\n")

245

connection.output.flush()

246

247

val response = connection.input.readUTF8Line()

248

println("Server response: $response")

249

}

250

251

suspend fun closeAll() {

252

activeSockets.forEach { it.close() }

253

activeSockets.clear()

254

selectorManager.close()

255

}

256

}

257

```

258

259

**Custom Selector with Resource Management:**

260

261

```kotlin

262

import io.ktor.network.sockets.*

263

import io.ktor.network.selector.*

264

import kotlinx.coroutines.*

265

import kotlin.time.Duration.Companion.minutes

266

267

class ManagedSelectorService {

268

private var selectorManager: SelectorManager? = null

269

private val serviceScope = CoroutineScope(

270

Dispatchers.IO +

271

SupervisorJob() +

272

CoroutineName("SelectorService")

273

)

274

275

fun start() {

276

selectorManager = SelectorManager(serviceScope.coroutineContext)

277

}

278

279

suspend fun createTcpConnection(host: String, port: Int): Socket {

280

val manager = selectorManager ?: throw IllegalStateException("Service not started")

281

282

return aSocket(manager).tcp().connect(host, port) {

283

socketTimeout = 30000 // 30 seconds

284

keepAlive = true

285

}

286

}

287

288

suspend fun createUdpSocket(port: Int = 0): BoundDatagramSocket {

289

val manager = selectorManager ?: throw IllegalStateException("Service not started")

290

291

return aSocket(manager).udp().bind("0.0.0.0", port) {

292

reuseAddress = true

293

}

294

}

295

296

suspend fun createTcpServer(port: Int): ServerSocket {

297

val manager = selectorManager ?: throw IllegalStateException("Service not started")

298

299

return aSocket(manager).tcp().bind("0.0.0.0", port) {

300

backlogSize = 100

301

reuseAddress = true

302

}

303

}

304

305

suspend fun shutdown() {

306

try {

307

// Graceful shutdown with timeout

308

withTimeout(1.minutes) {

309

serviceScope.coroutineContext.job.children.forEach { child ->

310

child.cancel()

311

child.join()

312

}

313

}

314

} catch (e: TimeoutCancellationException) {

315

println("Timeout during shutdown, forcing close")

316

} finally {

317

selectorManager?.close()

318

serviceScope.cancel()

319

}

320

}

321

}

322

323

// Usage

324

suspend fun managedServiceExample() {

325

val service = ManagedSelectorService()

326

327

try {

328

service.start()

329

330

// Create multiple connections

331

val socket1 = service.createTcpConnection("api1.example.com", 80)

332

val socket2 = service.createTcpConnection("api2.example.com", 80)

333

val udpSocket = service.createUdpSocket(8080)

334

val server = service.createTcpServer(9090)

335

336

// Use sockets...

337

338

// Clean shutdown

339

service.shutdown()

340

341

} catch (e: Exception) {

342

println("Service error: ${e.message}")

343

service.shutdown()

344

}

345

}

346

```

347

348

**Performance Monitoring:**

349

350

```kotlin

351

import io.ktor.network.sockets.*

352

import io.ktor.network.selector.*

353

import kotlinx.coroutines.*

354

import kotlin.system.measureTimeMillis

355

356

class SelectorPerformanceMonitor {

357

private val selectorManager = SelectorManager()

358

private var connectionCount = 0

359

private var totalConnectTime = 0L

360

361

suspend fun monitoredConnect(host: String, port: Int): Socket {

362

val connectTime = measureTimeMillis {

363

return aSocket(selectorManager).tcp().connect(host, port)

364

}

365

366

connectionCount++

367

totalConnectTime += connectTime

368

369

println("Connection #$connectionCount to $host:$port took ${connectTime}ms")

370

println("Average connect time: ${totalConnectTime / connectionCount}ms")

371

372

return aSocket(selectorManager).tcp().connect(host, port)

373

}

374

375

fun getStats(): String {

376

return "Connections: $connectionCount, Average time: ${if (connectionCount > 0) totalConnectTime / connectionCount else 0}ms"

377

}

378

379

suspend fun close() {

380

println("Final stats: ${getStats()}")

381

selectorManager.close()

382

}

383

}

384

```

385

386

## Best Practices

387

388

1. **Resource Management**: Always close selector managers when done

389

2. **Scope Management**: Use appropriate coroutine scopes for selector managers

390

3. **Error Handling**: Handle ClosedChannelCancellationException appropriately

391

4. **Performance**: Reuse selector managers across multiple sockets when possible

392

5. **Shutdown**: Implement graceful shutdown with timeouts for production applications