0
# Selector Management
1
2
Jetty IO provides sophisticated selector management for handling multiple connections efficiently using Java NIO selectors. This enables scalable, non-blocking I/O operations with a small number of threads.
3
4
## Capabilities
5
6
### SelectorManager
7
8
Abstract base class for managing multiple ManagedSelector instances, each running in its own thread.
9
10
```java { .api }
11
/**
12
* Manages multiple ManagedSelectors for non-blocking NIO
13
*/
14
abstract class SelectorManager extends ContainerLifeCycle {
15
protected SelectorManager(Executor executor, Scheduler scheduler);
16
protected SelectorManager(Executor executor, Scheduler scheduler, int selectors);
17
18
// Configuration
19
public int getSelectorCount();
20
public void setSelectorCount(int selectorCount);
21
22
public long getConnectTimeout();
23
public void setConnectTimeout(long connectTimeout);
24
25
public int getSelectorPriorityDelta();
26
public void setSelectorPriorityDelta(int selectorPriorityDelta);
27
28
// Selector access
29
public ManagedSelector getSelector(int index);
30
public Collection<ManagedSelector> getSelectors();
31
32
// Connection management
33
public void connect(SelectableChannel channel, Object attachment);
34
public void accept(SelectableChannel channel);
35
public void accept(SelectableChannel channel, Object attachment);
36
37
// Template methods for subclasses
38
protected abstract EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key);
39
protected abstract Connection newConnection(EndPoint endPoint, Object attachment) throws IOException;
40
41
// Optional customization points
42
protected void connectionOpened(Connection connection, Object attachment) {}
43
protected void connectionClosed(Connection connection, Object attachment, Throwable cause) {}
44
protected void connectionFailed(SelectableChannel channel, Throwable ex, Object attachment) {}
45
46
// Constants
47
public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
48
}
49
```
50
51
**Usage Examples:**
52
53
```java
54
// HTTP server selector manager
55
public class HttpSelectorManager extends SelectorManager {
56
private final HttpConfiguration httpConfig;
57
private final ByteBufferPool bufferPool;
58
59
public HttpSelectorManager(Executor executor, Scheduler scheduler,
60
HttpConfiguration httpConfig, ByteBufferPool bufferPool) {
61
super(executor, scheduler, 4); // 4 selector threads
62
this.httpConfig = httpConfig;
63
this.bufferPool = bufferPool;
64
65
setConnectTimeout(30000); // 30 second connect timeout
66
setSelectorPriorityDelta(-2); // Lower priority for selector threads
67
}
68
69
@Override
70
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) {
71
SocketChannelEndPoint endPoint = new SocketChannelEndPoint(
72
(SocketChannel) channel, selector, key, getScheduler());
73
endPoint.setIdleTimeout(httpConfig.getIdleTimeout());
74
return endPoint;
75
}
76
77
@Override
78
protected Connection newConnection(EndPoint endPoint, Object attachment) throws IOException {
79
HttpConnection connection = new HttpConnection(httpConfig, getConnector(), endPoint);
80
return configure(connection, endPoint, attachment);
81
}
82
83
@Override
84
protected void connectionOpened(Connection connection, Object attachment) {
85
super.connectionOpened(connection, attachment);
86
System.out.println("HTTP connection opened: " + connection.getEndPoint().getRemoteSocketAddress());
87
}
88
89
@Override
90
protected void connectionClosed(Connection connection, Object attachment, Throwable cause) {
91
super.connectionClosed(connection, attachment, cause);
92
System.out.println("HTTP connection closed: " +
93
(cause != null ? cause.getMessage() : "normal closure"));
94
}
95
96
@Override
97
protected void connectionFailed(SelectableChannel channel, Throwable ex, Object attachment) {
98
super.connectionFailed(channel, ex, attachment);
99
System.err.println("Connection failed for channel: " + channel + ", error: " + ex.getMessage());
100
}
101
}
102
103
// Client selector manager
104
public class ClientSelectorManager extends SelectorManager {
105
private final ClientConnector connector;
106
107
public ClientSelectorManager(ClientConnector connector, Executor executor, Scheduler scheduler) {
108
super(executor, scheduler);
109
this.connector = connector;
110
}
111
112
@Override
113
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) {
114
SocketChannelEndPoint endPoint = new SocketChannelEndPoint(
115
(SocketChannel) channel, selector, key, getScheduler());
116
endPoint.setIdleTimeout(connector.getIdleTimeout());
117
return endPoint;
118
}
119
120
@Override
121
protected Connection newConnection(EndPoint endPoint, Object attachment) throws IOException {
122
@SuppressWarnings("unchecked")
123
Map<String, Object> context = (Map<String, Object>) attachment;
124
125
ClientConnectionFactory factory = (ClientConnectionFactory)
126
context.get(ClientConnectionFactory.CLIENT_CONTEXT_KEY);
127
128
return factory.newConnection(endPoint, context);
129
}
130
}
131
132
// Usage
133
Executor executor = new QueuedThreadPool("selector-manager");
134
Scheduler scheduler = new ScheduledExecutorScheduler("selector-scheduler", false);
135
HttpConfiguration httpConfig = new HttpConfiguration();
136
ByteBufferPool bufferPool = new ArrayByteBufferPool();
137
138
HttpSelectorManager selectorManager = new HttpSelectorManager(executor, scheduler, httpConfig, bufferPool);
139
selectorManager.start();
140
141
// Accept connections
142
ServerSocketChannel serverChannel = ServerSocketChannel.open();
143
serverChannel.bind(new InetSocketAddress(8080));
144
serverChannel.configureBlocking(false);
145
146
selectorManager.accept(serverChannel);
147
```
148
149
### ManagedSelector
150
151
Single-threaded selector for managing NIO operations on a set of channels.
152
153
```java { .api }
154
/**
155
* Single-threaded NIO selector management
156
*/
157
class ManagedSelector extends AbstractLifeCycle implements Dumpable {
158
protected ManagedSelector(SelectorManager selectorManager, int id);
159
160
// Selector operations
161
public Selector getSelector();
162
public int getSelectorId();
163
public SelectorManager getSelectorManager();
164
165
// Channel registration
166
public void submit(Runnable task);
167
public CompletableFuture<Void> submit(Task task);
168
169
// Statistics
170
public int getRegisteredKeys();
171
public int getSelectedKeys();
172
public long getSelectTime();
173
public long getTotalSelectTime();
174
public void resetStats();
175
176
// Lifecycle
177
protected void doStart() throws Exception;
178
protected void doStop() throws Exception;
179
180
// Task interface for selector operations
181
public interface Task {
182
void run(Selector selector) throws Exception;
183
}
184
}
185
```
186
187
**Usage Examples:**
188
189
```java
190
// Custom selector task
191
ManagedSelector.Task registerChannelTask = new ManagedSelector.Task() {
192
@Override
193
public void run(Selector selector) throws Exception {
194
SocketChannel channel = SocketChannel.open();
195
channel.configureBlocking(false);
196
channel.connect(new InetSocketAddress("example.com", 80));
197
198
SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
199
key.attach(connectionContext);
200
201
System.out.println("Channel registered with selector: " + selector);
202
}
203
};
204
205
// Submit task to specific selector
206
ManagedSelector selector = selectorManager.getSelector(0);
207
CompletableFuture<Void> taskFuture = selector.submit(registerChannelTask);
208
taskFuture.thenRun(() -> {
209
System.out.println("Channel registration completed");
210
}).exceptionally(throwable -> {
211
System.err.println("Channel registration failed: " + throwable.getMessage());
212
return null;
213
});
214
215
// Monitor selector statistics
216
Timer statsTimer = new Timer();
217
statsTimer.scheduleAtFixedRate(new TimerTask() {
218
@Override
219
public void run() {
220
for (ManagedSelector selector : selectorManager.getSelectors()) {
221
System.out.printf("Selector %d: registered=%d, selected=%d, selectTime=%dms%n",
222
selector.getSelectorId(),
223
selector.getRegisteredKeys(),
224
selector.getSelectedKeys(),
225
selector.getSelectTime());
226
}
227
}
228
}, 0, 10000); // Every 10 seconds
229
230
// Custom selector operations
231
ManagedSelector.Task customTask = selector -> {
232
// Perform custom operations on selector
233
Set<SelectionKey> keys = selector.selectedKeys();
234
Iterator<SelectionKey> iterator = keys.iterator();
235
236
while (iterator.hasNext()) {
237
SelectionKey key = iterator.next();
238
iterator.remove();
239
240
if (key.isValid()) {
241
if (key.isConnectable()) {
242
handleConnect(key);
243
}
244
if (key.isReadable()) {
245
handleRead(key);
246
}
247
if (key.isWritable()) {
248
handleWrite(key);
249
}
250
}
251
}
252
};
253
254
// Submit custom task
255
selector.submit(customTask);
256
```
257
258
### Selector-based EndPoints
259
260
#### SocketChannelEndPoint
261
262
EndPoint implementation for SocketChannel with selector integration.
263
264
```java { .api }
265
/**
266
* EndPoint implementation for SocketChannel
267
*/
268
class SocketChannelEndPoint extends AbstractEndPoint {
269
public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler);
270
271
// Channel access
272
public SocketChannel getSocketChannel();
273
public SelectionKey getSelectionKey();
274
public ManagedSelector getSelector();
275
276
// I/O operations
277
public int fill(ByteBuffer buffer) throws IOException;
278
public boolean flush(ByteBuffer... buffers) throws IOException;
279
public void fillInterested(Callback callback) throws ReadPendingException;
280
public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException;
281
282
// Configuration
283
public void setTrafficClass(int trafficClass) throws IOException;
284
public int getTrafficClass() throws IOException;
285
286
// Socket options
287
public void setSoLingerTime(int lingerTime);
288
public int getSoLingerTime();
289
290
// Network addresses
291
public SocketAddress getLocalSocketAddress();
292
public SocketAddress getRemoteSocketAddress();
293
294
// SSL information (if applicable)
295
public boolean isSecure();
296
public Object getSslSessionData();
297
298
// Interest operations
299
protected void needsFillInterest();
300
protected void onIncompleteFlush();
301
}
302
```
303
304
#### DatagramChannelEndPoint
305
306
EndPoint implementation for DatagramChannel supporting connectionless protocols.
307
308
```java { .api }
309
/**
310
* EndPoint implementation for DatagramChannel
311
*/
312
class DatagramChannelEndPoint extends AbstractEndPoint {
313
public DatagramChannelEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler);
314
315
// Channel access
316
public DatagramChannel getDatagramChannel();
317
318
// UDP-specific operations
319
public SocketAddress receive(ByteBuffer buffer) throws IOException;
320
public boolean send(SocketAddress address, ByteBuffer... buffers) throws IOException;
321
322
// Multicast support
323
public void join(InetAddress group) throws IOException;
324
public void join(InetAddress group, NetworkInterface networkInterface) throws IOException;
325
public void leave(InetAddress group) throws IOException;
326
public void leave(InetAddress group, NetworkInterface networkInterface) throws IOException;
327
}
328
```
329
330
**EndPoint Usage Examples:**
331
332
```java
333
// TCP socket endpoint configuration
334
SocketChannelEndPoint tcpEndPoint = new SocketChannelEndPoint(
335
socketChannel, selector, selectionKey, scheduler);
336
337
// Configure TCP socket options
338
tcpEndPoint.setSoLingerTime(30); // 30 second linger time
339
tcpEndPoint.setTrafficClass(0x08); // High throughput traffic class
340
341
// Async read with callback
342
tcpEndPoint.fillInterested(new Callback() {
343
@Override
344
public void succeeded() {
345
// Data is available for reading
346
ByteBuffer buffer = ByteBuffer.allocate(8192);
347
try {
348
int bytesRead = tcpEndPoint.fill(buffer);
349
if (bytesRead > 0) {
350
buffer.flip();
351
processData(buffer);
352
}
353
} catch (IOException e) {
354
failed(e);
355
}
356
}
357
358
@Override
359
public void failed(Throwable x) {
360
System.err.println("Read failed: " + x.getMessage());
361
tcpEndPoint.close(x);
362
}
363
});
364
365
// Async write with callback
366
ByteBuffer responseBuffer = ByteBuffer.wrap("HTTP/1.1 200 OK\r\n\r\n".getBytes());
367
tcpEndPoint.write(new Callback() {
368
@Override
369
public void succeeded() {
370
System.out.println("Response sent successfully");
371
}
372
373
@Override
374
public void failed(Throwable x) {
375
System.err.println("Write failed: " + x.getMessage());
376
tcpEndPoint.close(x);
377
}
378
}, responseBuffer);
379
380
// UDP datagram endpoint
381
DatagramChannelEndPoint udpEndPoint = new DatagramChannelEndPoint(
382
datagramChannel, selector, selectionKey, scheduler);
383
384
// UDP receive
385
ByteBuffer receiveBuffer = ByteBuffer.allocate(1500); // MTU size
386
SocketAddress senderAddress = udpEndPoint.receive(receiveBuffer);
387
if (senderAddress != null) {
388
receiveBuffer.flip();
389
System.out.println("Received UDP packet from: " + senderAddress);
390
processUDPData(receiveBuffer);
391
}
392
393
// UDP send
394
ByteBuffer sendBuffer = ByteBuffer.wrap("Hello UDP".getBytes());
395
SocketAddress targetAddress = new InetSocketAddress("192.168.1.100", 9999);
396
boolean sent = udpEndPoint.send(targetAddress, sendBuffer);
397
if (sent) {
398
System.out.println("UDP packet sent to: " + targetAddress);
399
}
400
401
// Multicast support
402
InetAddress multicastGroup = InetAddress.getByName("224.0.0.1");
403
udpEndPoint.join(multicastGroup);
404
System.out.println("Joined multicast group: " + multicastGroup);
405
```
406
407
### Selector Performance Optimization
408
409
#### Selector Tuning
410
411
```java { .api }
412
/**
413
* Performance optimization configurations
414
*/
415
class SelectorOptimization {
416
public static SelectorManager createOptimizedSelectorManager(
417
Executor executor, Scheduler scheduler, int expectedConnections) {
418
419
// Calculate optimal selector count based on CPU cores and expected load
420
int cores = Runtime.getRuntime().availableProcessors();
421
int selectorCount = Math.min(cores, Math.max(1, expectedConnections / 1000));
422
423
SelectorManager manager = new CustomSelectorManager(executor, scheduler, selectorCount) {
424
@Override
425
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) {
426
SocketChannelEndPoint endPoint = new SocketChannelEndPoint(
427
(SocketChannel) channel, selector, key, getScheduler());
428
429
// Optimize for high throughput
430
try {
431
Socket socket = ((SocketChannel) channel).socket();
432
socket.setTcpNoDelay(true); // Disable Nagle's algorithm
433
socket.setSendBufferSize(64 * 1024); // 64KB send buffer
434
socket.setReceiveBufferSize(64 * 1024); // 64KB receive buffer
435
socket.setKeepAlive(true); // Enable keep-alive
436
} catch (IOException e) {
437
System.err.println("Failed to optimize socket: " + e.getMessage());
438
}
439
440
return endPoint;
441
}
442
};
443
444
// Configure selector manager
445
manager.setSelectorPriorityDelta(-1); // Slightly lower priority
446
manager.setConnectTimeout(10000); // 10 second timeout
447
448
return manager;
449
}
450
451
public static void configureHighPerformanceSelector(ManagedSelector selector) {
452
// Submit optimization task
453
selector.submit(new ManagedSelector.Task() {
454
@Override
455
public void run(Selector selector) throws Exception {
456
// Configure selector for high performance
457
System.setProperty("java.nio.channels.spi.SelectorProvider",
458
"sun.nio.ch.EPollSelectorProvider"); // Linux epoll
459
460
// Tune selector behavior
461
selector.wakeup(); // Ensure selector is responsive
462
}
463
});
464
}
465
}
466
```
467
468
#### Connection Load Balancing
469
470
```java { .api }
471
/**
472
* Load balancing connections across selectors
473
*/
474
class LoadBalancedSelectorManager extends SelectorManager {
475
private final AtomicInteger selectorIndex = new AtomicInteger(0);
476
477
public LoadBalancedSelectorManager(Executor executor, Scheduler scheduler) {
478
super(executor, scheduler);
479
}
480
481
@Override
482
public void accept(SelectableChannel channel, Object attachment) {
483
// Distribute connections across selectors using round-robin
484
int index = selectorIndex.getAndIncrement() % getSelectorCount();
485
ManagedSelector selector = getSelector(index);
486
487
selector.submit(() -> {
488
try {
489
SelectionKey key = channel.register(selector.getSelector(),
490
SelectionKey.OP_READ, attachment);
491
EndPoint endPoint = newEndPoint(channel, selector, key);
492
Connection connection = newConnection(endPoint, attachment);
493
endPoint.setConnection(connection);
494
connection.onOpen();
495
} catch (Exception e) {
496
connectionFailed(channel, e, attachment);
497
}
498
});
499
}
500
501
@Override
502
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) {
503
return new SocketChannelEndPoint((SocketChannel) channel, selector, key, getScheduler());
504
}
505
506
@Override
507
protected Connection newConnection(EndPoint endPoint, Object attachment) throws IOException {
508
return new EchoConnection(endPoint, getExecutor());
509
}
510
}
511
```
512
513
**Performance Usage Examples:**
514
515
```java
516
// High-performance server setup
517
Executor executor = new QueuedThreadPool("server", 200, 8); // Min 8, max 200 threads
518
Scheduler scheduler = new ScheduledExecutorScheduler("scheduler", false, 2); // 2 scheduler threads
519
520
LoadBalancedSelectorManager selectorManager = new LoadBalancedSelectorManager(executor, scheduler);
521
selectorManager.start();
522
523
// Configure JVM for optimal NIO performance
524
System.setProperty("java.nio.channels.DefaultThreadPool.threadFactory", "custom");
525
System.setProperty("java.nio.channels.DefaultThreadPool.initialSize", "8");
526
527
// Monitor selector performance
528
for (ManagedSelector selector : selectorManager.getSelectors()) {
529
SelectorOptimization.configureHighPerformanceSelector(selector);
530
531
// Schedule performance monitoring
532
scheduler.schedule(() -> {
533
System.out.printf("Selector %d performance: registered=%d, selectTime=%dms%n",
534
selector.getSelectorId(),
535
selector.getRegisteredKeys(),
536
selector.getSelectTime());
537
}, 5, TimeUnit.SECONDS);
538
}
539
540
// Server channel setup
541
ServerSocketChannel serverChannel = ServerSocketChannel.open();
542
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
543
serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 128 * 1024); // 128KB receive buffer
544
serverChannel.bind(new InetSocketAddress(8080), 1024); // 1024 connection backlog
545
serverChannel.configureBlocking(false);
546
547
// Accept connections with load balancing
548
selectorManager.accept(serverChannel);
549
```