or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

buffer-management.mdconnection-management.mdcontent-streaming.mdcore-io.mdindex.mdselector-management.mdssl-support.md

selector-management.mddocs/

0

# Selector Management

1

2

Jetty IO provides sophisticated selector management for handling multiple connections efficiently using Java NIO selectors. This enables scalable, non-blocking I/O operations with a small number of threads.

3

4

## Capabilities

5

6

### SelectorManager

7

8

Abstract base class for managing multiple ManagedSelector instances, each running in its own thread.

9

10

```java { .api }

11

/**

12

* Manages multiple ManagedSelectors for non-blocking NIO

13

*/

14

abstract class SelectorManager extends ContainerLifeCycle {

15

protected SelectorManager(Executor executor, Scheduler scheduler);

16

protected SelectorManager(Executor executor, Scheduler scheduler, int selectors);

17

18

// Configuration

19

public int getSelectorCount();

20

public void setSelectorCount(int selectorCount);

21

22

public long getConnectTimeout();

23

public void setConnectTimeout(long connectTimeout);

24

25

public int getSelectorPriorityDelta();

26

public void setSelectorPriorityDelta(int selectorPriorityDelta);

27

28

// Selector access

29

public ManagedSelector getSelector(int index);

30

public Collection<ManagedSelector> getSelectors();

31

32

// Connection management

33

public void connect(SelectableChannel channel, Object attachment);

34

public void accept(SelectableChannel channel);

35

public void accept(SelectableChannel channel, Object attachment);

36

37

// Template methods for subclasses

38

protected abstract EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key);

39

protected abstract Connection newConnection(EndPoint endPoint, Object attachment) throws IOException;

40

41

// Optional customization points

42

protected void connectionOpened(Connection connection, Object attachment) {}

43

protected void connectionClosed(Connection connection, Object attachment, Throwable cause) {}

44

protected void connectionFailed(SelectableChannel channel, Throwable ex, Object attachment) {}

45

46

// Constants

47

public static final int DEFAULT_CONNECT_TIMEOUT = 15000;

48

}

49

```

50

51

**Usage Examples:**

52

53

```java

54

// HTTP server selector manager

55

public class HttpSelectorManager extends SelectorManager {

56

private final HttpConfiguration httpConfig;

57

private final ByteBufferPool bufferPool;

58

59

public HttpSelectorManager(Executor executor, Scheduler scheduler,

60

HttpConfiguration httpConfig, ByteBufferPool bufferPool) {

61

super(executor, scheduler, 4); // 4 selector threads

62

this.httpConfig = httpConfig;

63

this.bufferPool = bufferPool;

64

65

setConnectTimeout(30000); // 30 second connect timeout

66

setSelectorPriorityDelta(-2); // Lower priority for selector threads

67

}

68

69

@Override

70

protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) {

71

SocketChannelEndPoint endPoint = new SocketChannelEndPoint(

72

(SocketChannel) channel, selector, key, getScheduler());

73

endPoint.setIdleTimeout(httpConfig.getIdleTimeout());

74

return endPoint;

75

}

76

77

@Override

78

protected Connection newConnection(EndPoint endPoint, Object attachment) throws IOException {

79

HttpConnection connection = new HttpConnection(httpConfig, getConnector(), endPoint);

80

return configure(connection, endPoint, attachment);

81

}

82

83

@Override

84

protected void connectionOpened(Connection connection, Object attachment) {

85

super.connectionOpened(connection, attachment);

86

System.out.println("HTTP connection opened: " + connection.getEndPoint().getRemoteSocketAddress());

87

}

88

89

@Override

90

protected void connectionClosed(Connection connection, Object attachment, Throwable cause) {

91

super.connectionClosed(connection, attachment, cause);

92

System.out.println("HTTP connection closed: " +

93

(cause != null ? cause.getMessage() : "normal closure"));

94

}

95

96

@Override

97

protected void connectionFailed(SelectableChannel channel, Throwable ex, Object attachment) {

98

super.connectionFailed(channel, ex, attachment);

99

System.err.println("Connection failed for channel: " + channel + ", error: " + ex.getMessage());

100

}

101

}

102

103

// Client selector manager

104

public class ClientSelectorManager extends SelectorManager {

105

private final ClientConnector connector;

106

107

public ClientSelectorManager(ClientConnector connector, Executor executor, Scheduler scheduler) {

108

super(executor, scheduler);

109

this.connector = connector;

110

}

111

112

@Override

113

protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) {

114

SocketChannelEndPoint endPoint = new SocketChannelEndPoint(

115

(SocketChannel) channel, selector, key, getScheduler());

116

endPoint.setIdleTimeout(connector.getIdleTimeout());

117

return endPoint;

118

}

119

120

@Override

121

protected Connection newConnection(EndPoint endPoint, Object attachment) throws IOException {

122

@SuppressWarnings("unchecked")

123

Map<String, Object> context = (Map<String, Object>) attachment;

124

125

ClientConnectionFactory factory = (ClientConnectionFactory)

126

context.get(ClientConnectionFactory.CLIENT_CONTEXT_KEY);

127

128

return factory.newConnection(endPoint, context);

129

}

130

}

131

132

// Usage

133

Executor executor = new QueuedThreadPool("selector-manager");

134

Scheduler scheduler = new ScheduledExecutorScheduler("selector-scheduler", false);

135

HttpConfiguration httpConfig = new HttpConfiguration();

136

ByteBufferPool bufferPool = new ArrayByteBufferPool();

137

138

HttpSelectorManager selectorManager = new HttpSelectorManager(executor, scheduler, httpConfig, bufferPool);

139

selectorManager.start();

140

141

// Accept connections

142

ServerSocketChannel serverChannel = ServerSocketChannel.open();

143

serverChannel.bind(new InetSocketAddress(8080));

144

serverChannel.configureBlocking(false);

145

146

selectorManager.accept(serverChannel);

147

```

148

149

### ManagedSelector

150

151

Single-threaded selector for managing NIO operations on a set of channels.

152

153

```java { .api }

154

/**

155

* Single-threaded NIO selector management

156

*/

157

class ManagedSelector extends AbstractLifeCycle implements Dumpable {

158

protected ManagedSelector(SelectorManager selectorManager, int id);

159

160

// Selector operations

161

public Selector getSelector();

162

public int getSelectorId();

163

public SelectorManager getSelectorManager();

164

165

// Channel registration

166

public void submit(Runnable task);

167

public CompletableFuture<Void> submit(Task task);

168

169

// Statistics

170

public int getRegisteredKeys();

171

public int getSelectedKeys();

172

public long getSelectTime();

173

public long getTotalSelectTime();

174

public void resetStats();

175

176

// Lifecycle

177

protected void doStart() throws Exception;

178

protected void doStop() throws Exception;

179

180

// Task interface for selector operations

181

public interface Task {

182

void run(Selector selector) throws Exception;

183

}

184

}

185

```

186

187

**Usage Examples:**

188

189

```java

190

// Custom selector task

191

ManagedSelector.Task registerChannelTask = new ManagedSelector.Task() {

192

@Override

193

public void run(Selector selector) throws Exception {

194

SocketChannel channel = SocketChannel.open();

195

channel.configureBlocking(false);

196

channel.connect(new InetSocketAddress("example.com", 80));

197

198

SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);

199

key.attach(connectionContext);

200

201

System.out.println("Channel registered with selector: " + selector);

202

}

203

};

204

205

// Submit task to specific selector

206

ManagedSelector selector = selectorManager.getSelector(0);

207

CompletableFuture<Void> taskFuture = selector.submit(registerChannelTask);

208

taskFuture.thenRun(() -> {

209

System.out.println("Channel registration completed");

210

}).exceptionally(throwable -> {

211

System.err.println("Channel registration failed: " + throwable.getMessage());

212

return null;

213

});

214

215

// Monitor selector statistics

216

Timer statsTimer = new Timer();

217

statsTimer.scheduleAtFixedRate(new TimerTask() {

218

@Override

219

public void run() {

220

for (ManagedSelector selector : selectorManager.getSelectors()) {

221

System.out.printf("Selector %d: registered=%d, selected=%d, selectTime=%dms%n",

222

selector.getSelectorId(),

223

selector.getRegisteredKeys(),

224

selector.getSelectedKeys(),

225

selector.getSelectTime());

226

}

227

}

228

}, 0, 10000); // Every 10 seconds

229

230

// Custom selector operations

231

ManagedSelector.Task customTask = selector -> {

232

// Perform custom operations on selector

233

Set<SelectionKey> keys = selector.selectedKeys();

234

Iterator<SelectionKey> iterator = keys.iterator();

235

236

while (iterator.hasNext()) {

237

SelectionKey key = iterator.next();

238

iterator.remove();

239

240

if (key.isValid()) {

241

if (key.isConnectable()) {

242

handleConnect(key);

243

}

244

if (key.isReadable()) {

245

handleRead(key);

246

}

247

if (key.isWritable()) {

248

handleWrite(key);

249

}

250

}

251

}

252

};

253

254

// Submit custom task

255

selector.submit(customTask);

256

```

257

258

### Selector-based EndPoints

259

260

#### SocketChannelEndPoint

261

262

EndPoint implementation for SocketChannel with selector integration.

263

264

```java { .api }

265

/**

266

* EndPoint implementation for SocketChannel

267

*/

268

class SocketChannelEndPoint extends AbstractEndPoint {

269

public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler);

270

271

// Channel access

272

public SocketChannel getSocketChannel();

273

public SelectionKey getSelectionKey();

274

public ManagedSelector getSelector();

275

276

// I/O operations

277

public int fill(ByteBuffer buffer) throws IOException;

278

public boolean flush(ByteBuffer... buffers) throws IOException;

279

public void fillInterested(Callback callback) throws ReadPendingException;

280

public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException;

281

282

// Configuration

283

public void setTrafficClass(int trafficClass) throws IOException;

284

public int getTrafficClass() throws IOException;

285

286

// Socket options

287

public void setSoLingerTime(int lingerTime);

288

public int getSoLingerTime();

289

290

// Network addresses

291

public SocketAddress getLocalSocketAddress();

292

public SocketAddress getRemoteSocketAddress();

293

294

// SSL information (if applicable)

295

public boolean isSecure();

296

public Object getSslSessionData();

297

298

// Interest operations

299

protected void needsFillInterest();

300

protected void onIncompleteFlush();

301

}

302

```

303

304

#### DatagramChannelEndPoint

305

306

EndPoint implementation for DatagramChannel supporting connectionless protocols.

307

308

```java { .api }

309

/**

310

* EndPoint implementation for DatagramChannel

311

*/

312

class DatagramChannelEndPoint extends AbstractEndPoint {

313

public DatagramChannelEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler);

314

315

// Channel access

316

public DatagramChannel getDatagramChannel();

317

318

// UDP-specific operations

319

public SocketAddress receive(ByteBuffer buffer) throws IOException;

320

public boolean send(SocketAddress address, ByteBuffer... buffers) throws IOException;

321

322

// Multicast support

323

public void join(InetAddress group) throws IOException;

324

public void join(InetAddress group, NetworkInterface networkInterface) throws IOException;

325

public void leave(InetAddress group) throws IOException;

326

public void leave(InetAddress group, NetworkInterface networkInterface) throws IOException;

327

}

328

```

329

330

**EndPoint Usage Examples:**

331

332

```java

333

// TCP socket endpoint configuration

334

SocketChannelEndPoint tcpEndPoint = new SocketChannelEndPoint(

335

socketChannel, selector, selectionKey, scheduler);

336

337

// Configure TCP socket options

338

tcpEndPoint.setSoLingerTime(30); // 30 second linger time

339

tcpEndPoint.setTrafficClass(0x08); // High throughput traffic class

340

341

// Async read with callback

342

tcpEndPoint.fillInterested(new Callback() {

343

@Override

344

public void succeeded() {

345

// Data is available for reading

346

ByteBuffer buffer = ByteBuffer.allocate(8192);

347

try {

348

int bytesRead = tcpEndPoint.fill(buffer);

349

if (bytesRead > 0) {

350

buffer.flip();

351

processData(buffer);

352

}

353

} catch (IOException e) {

354

failed(e);

355

}

356

}

357

358

@Override

359

public void failed(Throwable x) {

360

System.err.println("Read failed: " + x.getMessage());

361

tcpEndPoint.close(x);

362

}

363

});

364

365

// Async write with callback

366

ByteBuffer responseBuffer = ByteBuffer.wrap("HTTP/1.1 200 OK\r\n\r\n".getBytes());

367

tcpEndPoint.write(new Callback() {

368

@Override

369

public void succeeded() {

370

System.out.println("Response sent successfully");

371

}

372

373

@Override

374

public void failed(Throwable x) {

375

System.err.println("Write failed: " + x.getMessage());

376

tcpEndPoint.close(x);

377

}

378

}, responseBuffer);

379

380

// UDP datagram endpoint

381

DatagramChannelEndPoint udpEndPoint = new DatagramChannelEndPoint(

382

datagramChannel, selector, selectionKey, scheduler);

383

384

// UDP receive

385

ByteBuffer receiveBuffer = ByteBuffer.allocate(1500); // MTU size

386

SocketAddress senderAddress = udpEndPoint.receive(receiveBuffer);

387

if (senderAddress != null) {

388

receiveBuffer.flip();

389

System.out.println("Received UDP packet from: " + senderAddress);

390

processUDPData(receiveBuffer);

391

}

392

393

// UDP send

394

ByteBuffer sendBuffer = ByteBuffer.wrap("Hello UDP".getBytes());

395

SocketAddress targetAddress = new InetSocketAddress("192.168.1.100", 9999);

396

boolean sent = udpEndPoint.send(targetAddress, sendBuffer);

397

if (sent) {

398

System.out.println("UDP packet sent to: " + targetAddress);

399

}

400

401

// Multicast support

402

InetAddress multicastGroup = InetAddress.getByName("224.0.0.1");

403

udpEndPoint.join(multicastGroup);

404

System.out.println("Joined multicast group: " + multicastGroup);

405

```

406

407

### Selector Performance Optimization

408

409

#### Selector Tuning

410

411

```java { .api }

412

/**

413

* Performance optimization configurations

414

*/

415

class SelectorOptimization {

416

public static SelectorManager createOptimizedSelectorManager(

417

Executor executor, Scheduler scheduler, int expectedConnections) {

418

419

// Calculate optimal selector count based on CPU cores and expected load

420

int cores = Runtime.getRuntime().availableProcessors();

421

int selectorCount = Math.min(cores, Math.max(1, expectedConnections / 1000));

422

423

SelectorManager manager = new CustomSelectorManager(executor, scheduler, selectorCount) {

424

@Override

425

protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) {

426

SocketChannelEndPoint endPoint = new SocketChannelEndPoint(

427

(SocketChannel) channel, selector, key, getScheduler());

428

429

// Optimize for high throughput

430

try {

431

Socket socket = ((SocketChannel) channel).socket();

432

socket.setTcpNoDelay(true); // Disable Nagle's algorithm

433

socket.setSendBufferSize(64 * 1024); // 64KB send buffer

434

socket.setReceiveBufferSize(64 * 1024); // 64KB receive buffer

435

socket.setKeepAlive(true); // Enable keep-alive

436

} catch (IOException e) {

437

System.err.println("Failed to optimize socket: " + e.getMessage());

438

}

439

440

return endPoint;

441

}

442

};

443

444

// Configure selector manager

445

manager.setSelectorPriorityDelta(-1); // Slightly lower priority

446

manager.setConnectTimeout(10000); // 10 second timeout

447

448

return manager;

449

}

450

451

public static void configureHighPerformanceSelector(ManagedSelector selector) {

452

// Submit optimization task

453

selector.submit(new ManagedSelector.Task() {

454

@Override

455

public void run(Selector selector) throws Exception {

456

// Configure selector for high performance

457

System.setProperty("java.nio.channels.spi.SelectorProvider",

458

"sun.nio.ch.EPollSelectorProvider"); // Linux epoll

459

460

// Tune selector behavior

461

selector.wakeup(); // Ensure selector is responsive

462

}

463

});

464

}

465

}

466

```

467

468

#### Connection Load Balancing

469

470

```java { .api }

471

/**

472

* Load balancing connections across selectors

473

*/

474

class LoadBalancedSelectorManager extends SelectorManager {

475

private final AtomicInteger selectorIndex = new AtomicInteger(0);

476

477

public LoadBalancedSelectorManager(Executor executor, Scheduler scheduler) {

478

super(executor, scheduler);

479

}

480

481

@Override

482

public void accept(SelectableChannel channel, Object attachment) {

483

// Distribute connections across selectors using round-robin

484

int index = selectorIndex.getAndIncrement() % getSelectorCount();

485

ManagedSelector selector = getSelector(index);

486

487

selector.submit(() -> {

488

try {

489

SelectionKey key = channel.register(selector.getSelector(),

490

SelectionKey.OP_READ, attachment);

491

EndPoint endPoint = newEndPoint(channel, selector, key);

492

Connection connection = newConnection(endPoint, attachment);

493

endPoint.setConnection(connection);

494

connection.onOpen();

495

} catch (Exception e) {

496

connectionFailed(channel, e, attachment);

497

}

498

});

499

}

500

501

@Override

502

protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) {

503

return new SocketChannelEndPoint((SocketChannel) channel, selector, key, getScheduler());

504

}

505

506

@Override

507

protected Connection newConnection(EndPoint endPoint, Object attachment) throws IOException {

508

return new EchoConnection(endPoint, getExecutor());

509

}

510

}

511

```

512

513

**Performance Usage Examples:**

514

515

```java

516

// High-performance server setup

517

Executor executor = new QueuedThreadPool("server", 200, 8); // Min 8, max 200 threads

518

Scheduler scheduler = new ScheduledExecutorScheduler("scheduler", false, 2); // 2 scheduler threads

519

520

LoadBalancedSelectorManager selectorManager = new LoadBalancedSelectorManager(executor, scheduler);

521

selectorManager.start();

522

523

// Configure JVM for optimal NIO performance

524

System.setProperty("java.nio.channels.DefaultThreadPool.threadFactory", "custom");

525

System.setProperty("java.nio.channels.DefaultThreadPool.initialSize", "8");

526

527

// Monitor selector performance

528

for (ManagedSelector selector : selectorManager.getSelectors()) {

529

SelectorOptimization.configureHighPerformanceSelector(selector);

530

531

// Schedule performance monitoring

532

scheduler.schedule(() -> {

533

System.out.printf("Selector %d performance: registered=%d, selectTime=%dms%n",

534

selector.getSelectorId(),

535

selector.getRegisteredKeys(),

536

selector.getSelectTime());

537

}, 5, TimeUnit.SECONDS);

538

}

539

540

// Server channel setup

541

ServerSocketChannel serverChannel = ServerSocketChannel.open();

542

serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);

543

serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 128 * 1024); // 128KB receive buffer

544

serverChannel.bind(new InetSocketAddress(8080), 1024); // 1024 connection backlog

545

serverChannel.configureBlocking(false);

546

547

// Accept connections with load balancing

548

selectorManager.accept(serverChannel);

549

```