CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-eclipse-jetty--jetty-io

Core I/O components for Eclipse Jetty providing essential I/O utilities, buffer management, and network connection handling

Pending
Overview
Eval results
Files

selector-management.mddocs/

Selector Management

Jetty IO provides sophisticated selector management for handling multiple connections efficiently using Java NIO selectors. This enables scalable, non-blocking I/O operations with a small number of threads.

Capabilities

SelectorManager

Abstract base class for managing multiple ManagedSelector instances, each running in its own thread.

/**
 * Manages multiple ManagedSelectors for non-blocking NIO
 */
abstract class SelectorManager extends ContainerLifeCycle {
    protected SelectorManager(Executor executor, Scheduler scheduler);
    protected SelectorManager(Executor executor, Scheduler scheduler, int selectors);
    
    // Configuration
    public int getSelectorCount();
    public void setSelectorCount(int selectorCount);
    
    public long getConnectTimeout();
    public void setConnectTimeout(long connectTimeout);
    
    public int getSelectorPriorityDelta();
    public void setSelectorPriorityDelta(int selectorPriorityDelta);
    
    // Selector access
    public ManagedSelector getSelector(int index);
    public Collection<ManagedSelector> getSelectors();
    
    // Connection management
    public void connect(SelectableChannel channel, Object attachment);
    public void accept(SelectableChannel channel);
    public void accept(SelectableChannel channel, Object attachment);
    
    // Template methods for subclasses
    protected abstract EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key);
    protected abstract Connection newConnection(EndPoint endPoint, Object attachment) throws IOException;
    
    // Optional customization points
    protected void connectionOpened(Connection connection, Object attachment) {}
    protected void connectionClosed(Connection connection, Object attachment, Throwable cause) {}
    protected void connectionFailed(SelectableChannel channel, Throwable ex, Object attachment) {}
    
    // Constants
    public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
}

Usage Examples:

// HTTP server selector manager
public class HttpSelectorManager extends SelectorManager {
    private final HttpConfiguration httpConfig;
    private final ByteBufferPool bufferPool;
    
    public HttpSelectorManager(Executor executor, Scheduler scheduler, 
                              HttpConfiguration httpConfig, ByteBufferPool bufferPool) {
        super(executor, scheduler, 4); // 4 selector threads
        this.httpConfig = httpConfig;
        this.bufferPool = bufferPool;
        
        setConnectTimeout(30000); // 30 second connect timeout
        setSelectorPriorityDelta(-2); // Lower priority for selector threads
    }
    
    @Override
    protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) {
        SocketChannelEndPoint endPoint = new SocketChannelEndPoint(
            (SocketChannel) channel, selector, key, getScheduler());
        endPoint.setIdleTimeout(httpConfig.getIdleTimeout());
        return endPoint;
    }
    
    @Override
    protected Connection newConnection(EndPoint endPoint, Object attachment) throws IOException {
        HttpConnection connection = new HttpConnection(httpConfig, getConnector(), endPoint);
        return configure(connection, endPoint, attachment);
    }
    
    @Override
    protected void connectionOpened(Connection connection, Object attachment) {
        super.connectionOpened(connection, attachment);
        System.out.println("HTTP connection opened: " + connection.getEndPoint().getRemoteSocketAddress());
    }
    
    @Override
    protected void connectionClosed(Connection connection, Object attachment, Throwable cause) {
        super.connectionClosed(connection, attachment, cause);
        System.out.println("HTTP connection closed: " + 
            (cause != null ? cause.getMessage() : "normal closure"));
    }
    
    @Override
    protected void connectionFailed(SelectableChannel channel, Throwable ex, Object attachment) {
        super.connectionFailed(channel, ex, attachment);
        System.err.println("Connection failed for channel: " + channel + ", error: " + ex.getMessage());
    }
}

// Client selector manager
public class ClientSelectorManager extends SelectorManager {
    private final ClientConnector connector;
    
    public ClientSelectorManager(ClientConnector connector, Executor executor, Scheduler scheduler) {
        super(executor, scheduler);
        this.connector = connector;
    }
    
    @Override
    protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) {
        SocketChannelEndPoint endPoint = new SocketChannelEndPoint(
            (SocketChannel) channel, selector, key, getScheduler());
        endPoint.setIdleTimeout(connector.getIdleTimeout());
        return endPoint;
    }
    
    @Override
    protected Connection newConnection(EndPoint endPoint, Object attachment) throws IOException {
        @SuppressWarnings("unchecked")
        Map<String, Object> context = (Map<String, Object>) attachment;
        
        ClientConnectionFactory factory = (ClientConnectionFactory) 
            context.get(ClientConnectionFactory.CLIENT_CONTEXT_KEY);
        
        return factory.newConnection(endPoint, context);
    }
}

// Usage
Executor executor = new QueuedThreadPool("selector-manager");
Scheduler scheduler = new ScheduledExecutorScheduler("selector-scheduler", false);
HttpConfiguration httpConfig = new HttpConfiguration();
ByteBufferPool bufferPool = new ArrayByteBufferPool();

HttpSelectorManager selectorManager = new HttpSelectorManager(executor, scheduler, httpConfig, bufferPool);
selectorManager.start();

// Accept connections
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(8080));
serverChannel.configureBlocking(false);

selectorManager.accept(serverChannel);

ManagedSelector

Single-threaded selector for managing NIO operations on a set of channels.

/**
 * Single-threaded NIO selector management
 */
class ManagedSelector extends AbstractLifeCycle implements Dumpable {
    protected ManagedSelector(SelectorManager selectorManager, int id);
    
    // Selector operations
    public Selector getSelector();
    public int getSelectorId();
    public SelectorManager getSelectorManager();
    
    // Channel registration
    public void submit(Runnable task);
    public CompletableFuture<Void> submit(Task task);
    
    // Statistics
    public int getRegisteredKeys();
    public int getSelectedKeys();
    public long getSelectTime();
    public long getTotalSelectTime();
    public void resetStats();
    
    // Lifecycle
    protected void doStart() throws Exception;
    protected void doStop() throws Exception;
    
    // Task interface for selector operations
    public interface Task {
        void run(Selector selector) throws Exception;
    }
}

Usage Examples:

// Custom selector task
ManagedSelector.Task registerChannelTask = new ManagedSelector.Task() {
    @Override
    public void run(Selector selector) throws Exception {
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress("example.com", 80));
        
        SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT);
        key.attach(connectionContext);
        
        System.out.println("Channel registered with selector: " + selector);
    }
};

// Submit task to specific selector
ManagedSelector selector = selectorManager.getSelector(0);
CompletableFuture<Void> taskFuture = selector.submit(registerChannelTask);
taskFuture.thenRun(() -> {
    System.out.println("Channel registration completed");
}).exceptionally(throwable -> {
    System.err.println("Channel registration failed: " + throwable.getMessage());
    return null;
});

// Monitor selector statistics
Timer statsTimer = new Timer();
statsTimer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        for (ManagedSelector selector : selectorManager.getSelectors()) {
            System.out.printf("Selector %d: registered=%d, selected=%d, selectTime=%dms%n",
                selector.getSelectorId(),
                selector.getRegisteredKeys(),
                selector.getSelectedKeys(),
                selector.getSelectTime());
        }
    }
}, 0, 10000); // Every 10 seconds

// Custom selector operations
ManagedSelector.Task customTask = selector -> {
    // Perform custom operations on selector
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> iterator = keys.iterator();
    
    while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        iterator.remove();
        
        if (key.isValid()) {
            if (key.isConnectable()) {
                handleConnect(key);
            }
            if (key.isReadable()) {
                handleRead(key);
            }
            if (key.isWritable()) {
                handleWrite(key);
            }
        }
    }
};

// Submit custom task
selector.submit(customTask);

Selector-based EndPoints

SocketChannelEndPoint

EndPoint implementation for SocketChannel with selector integration.

/**
 * EndPoint implementation for SocketChannel
 */
class SocketChannelEndPoint extends AbstractEndPoint {
    public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler);
    
    // Channel access
    public SocketChannel getSocketChannel();
    public SelectionKey getSelectionKey();
    public ManagedSelector getSelector();
    
    // I/O operations
    public int fill(ByteBuffer buffer) throws IOException;
    public boolean flush(ByteBuffer... buffers) throws IOException;
    public void fillInterested(Callback callback) throws ReadPendingException;
    public void write(Callback callback, ByteBuffer... buffers) throws WritePendingException;
    
    // Configuration
    public void setTrafficClass(int trafficClass) throws IOException;
    public int getTrafficClass() throws IOException;
    
    // Socket options
    public void setSoLingerTime(int lingerTime);
    public int getSoLingerTime();
    
    // Network addresses
    public SocketAddress getLocalSocketAddress();
    public SocketAddress getRemoteSocketAddress();
    
    // SSL information (if applicable)
    public boolean isSecure();
    public Object getSslSessionData();
    
    // Interest operations
    protected void needsFillInterest();
    protected void onIncompleteFlush();
}

DatagramChannelEndPoint

EndPoint implementation for DatagramChannel supporting connectionless protocols.

/**
 * EndPoint implementation for DatagramChannel
 */
class DatagramChannelEndPoint extends AbstractEndPoint {
    public DatagramChannelEndPoint(DatagramChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler);
    
    // Channel access
    public DatagramChannel getDatagramChannel();
    
    // UDP-specific operations
    public SocketAddress receive(ByteBuffer buffer) throws IOException;
    public boolean send(SocketAddress address, ByteBuffer... buffers) throws IOException;
    
    // Multicast support
    public void join(InetAddress group) throws IOException;
    public void join(InetAddress group, NetworkInterface networkInterface) throws IOException;
    public void leave(InetAddress group) throws IOException;
    public void leave(InetAddress group, NetworkInterface networkInterface) throws IOException;
}

EndPoint Usage Examples:

// TCP socket endpoint configuration
SocketChannelEndPoint tcpEndPoint = new SocketChannelEndPoint(
    socketChannel, selector, selectionKey, scheduler);

// Configure TCP socket options
tcpEndPoint.setSoLingerTime(30); // 30 second linger time
tcpEndPoint.setTrafficClass(0x08); // High throughput traffic class

// Async read with callback
tcpEndPoint.fillInterested(new Callback() {
    @Override
    public void succeeded() {
        // Data is available for reading
        ByteBuffer buffer = ByteBuffer.allocate(8192);
        try {
            int bytesRead = tcpEndPoint.fill(buffer);
            if (bytesRead > 0) {
                buffer.flip();
                processData(buffer);
            }
        } catch (IOException e) {
            failed(e);
        }
    }
    
    @Override
    public void failed(Throwable x) {
        System.err.println("Read failed: " + x.getMessage());
        tcpEndPoint.close(x);
    }
});

// Async write with callback
ByteBuffer responseBuffer = ByteBuffer.wrap("HTTP/1.1 200 OK\r\n\r\n".getBytes());
tcpEndPoint.write(new Callback() {
    @Override
    public void succeeded() {
        System.out.println("Response sent successfully");
    }
    
    @Override
    public void failed(Throwable x) {
        System.err.println("Write failed: " + x.getMessage());
        tcpEndPoint.close(x);
    }
}, responseBuffer);

// UDP datagram endpoint
DatagramChannelEndPoint udpEndPoint = new DatagramChannelEndPoint(
    datagramChannel, selector, selectionKey, scheduler);

// UDP receive
ByteBuffer receiveBuffer = ByteBuffer.allocate(1500); // MTU size
SocketAddress senderAddress = udpEndPoint.receive(receiveBuffer);
if (senderAddress != null) {
    receiveBuffer.flip();
    System.out.println("Received UDP packet from: " + senderAddress);
    processUDPData(receiveBuffer);
}

// UDP send
ByteBuffer sendBuffer = ByteBuffer.wrap("Hello UDP".getBytes());
SocketAddress targetAddress = new InetSocketAddress("192.168.1.100", 9999);
boolean sent = udpEndPoint.send(targetAddress, sendBuffer);
if (sent) {
    System.out.println("UDP packet sent to: " + targetAddress);
}

// Multicast support
InetAddress multicastGroup = InetAddress.getByName("224.0.0.1");
udpEndPoint.join(multicastGroup);
System.out.println("Joined multicast group: " + multicastGroup);

Selector Performance Optimization

Selector Tuning

/**
 * Performance optimization configurations
 */
class SelectorOptimization {
    public static SelectorManager createOptimizedSelectorManager(
            Executor executor, Scheduler scheduler, int expectedConnections) {
        
        // Calculate optimal selector count based on CPU cores and expected load
        int cores = Runtime.getRuntime().availableProcessors();
        int selectorCount = Math.min(cores, Math.max(1, expectedConnections / 1000));
        
        SelectorManager manager = new CustomSelectorManager(executor, scheduler, selectorCount) {
            @Override
            protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) {
                SocketChannelEndPoint endPoint = new SocketChannelEndPoint(
                    (SocketChannel) channel, selector, key, getScheduler());
                
                // Optimize for high throughput
                try {
                    Socket socket = ((SocketChannel) channel).socket();
                    socket.setTcpNoDelay(true); // Disable Nagle's algorithm
                    socket.setSendBufferSize(64 * 1024); // 64KB send buffer
                    socket.setReceiveBufferSize(64 * 1024); // 64KB receive buffer
                    socket.setKeepAlive(true); // Enable keep-alive
                } catch (IOException e) {
                    System.err.println("Failed to optimize socket: " + e.getMessage());
                }
                
                return endPoint;
            }
        };
        
        // Configure selector manager
        manager.setSelectorPriorityDelta(-1); // Slightly lower priority
        manager.setConnectTimeout(10000); // 10 second timeout
        
        return manager;
    }
    
    public static void configureHighPerformanceSelector(ManagedSelector selector) {
        // Submit optimization task
        selector.submit(new ManagedSelector.Task() {
            @Override
            public void run(Selector selector) throws Exception {
                // Configure selector for high performance
                System.setProperty("java.nio.channels.spi.SelectorProvider", 
                    "sun.nio.ch.EPollSelectorProvider"); // Linux epoll
                
                // Tune selector behavior
                selector.wakeup(); // Ensure selector is responsive
            }
        });
    }
}

Connection Load Balancing

/**
 * Load balancing connections across selectors
 */
class LoadBalancedSelectorManager extends SelectorManager {
    private final AtomicInteger selectorIndex = new AtomicInteger(0);
    
    public LoadBalancedSelectorManager(Executor executor, Scheduler scheduler) {
        super(executor, scheduler);
    }
    
    @Override
    public void accept(SelectableChannel channel, Object attachment) {
        // Distribute connections across selectors using round-robin
        int index = selectorIndex.getAndIncrement() % getSelectorCount();
        ManagedSelector selector = getSelector(index);
        
        selector.submit(() -> {
            try {
                SelectionKey key = channel.register(selector.getSelector(), 
                    SelectionKey.OP_READ, attachment);
                EndPoint endPoint = newEndPoint(channel, selector, key);
                Connection connection = newConnection(endPoint, attachment);
                endPoint.setConnection(connection);
                connection.onOpen();
            } catch (Exception e) {
                connectionFailed(channel, e, attachment);
            }
        });
    }
    
    @Override
    protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key) {
        return new SocketChannelEndPoint((SocketChannel) channel, selector, key, getScheduler());
    }
    
    @Override
    protected Connection newConnection(EndPoint endPoint, Object attachment) throws IOException {
        return new EchoConnection(endPoint, getExecutor());
    }
}

Performance Usage Examples:

// High-performance server setup
Executor executor = new QueuedThreadPool("server", 200, 8); // Min 8, max 200 threads
Scheduler scheduler = new ScheduledExecutorScheduler("scheduler", false, 2); // 2 scheduler threads

LoadBalancedSelectorManager selectorManager = new LoadBalancedSelectorManager(executor, scheduler);
selectorManager.start();

// Configure JVM for optimal NIO performance
System.setProperty("java.nio.channels.DefaultThreadPool.threadFactory", "custom");
System.setProperty("java.nio.channels.DefaultThreadPool.initialSize", "8");

// Monitor selector performance
for (ManagedSelector selector : selectorManager.getSelectors()) {
    SelectorOptimization.configureHighPerformanceSelector(selector);
    
    // Schedule performance monitoring
    scheduler.schedule(() -> {
        System.out.printf("Selector %d performance: registered=%d, selectTime=%dms%n",
            selector.getSelectorId(), 
            selector.getRegisteredKeys(),
            selector.getSelectTime());
    }, 5, TimeUnit.SECONDS);
}

// Server channel setup
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);
serverChannel.setOption(StandardSocketOptions.SO_RCVBUF, 128 * 1024); // 128KB receive buffer
serverChannel.bind(new InetSocketAddress(8080), 1024); // 1024 connection backlog
serverChannel.configureBlocking(false);

// Accept connections with load balancing
selectorManager.accept(serverChannel);

Install with Tessl CLI

npx tessl i tessl/maven-org-eclipse-jetty--jetty-io

docs

buffer-management.md

connection-management.md

content-streaming.md

core-io.md

index.md

selector-management.md

ssl-support.md

tile.json