0
# NIO Channels
1
2
High-performance, non-blocking I/O implementation with full Java NIO integration for scalable socket operations and event-driven programming using Unix Domain Sockets.
3
4
## Capabilities
5
6
### AFUNIXSocketChannel
7
8
NIO socket channel implementation for non-blocking Unix Domain Socket client connections.
9
10
```java { .api }
11
/**
12
* Unix Domain Socket NIO channel implementation
13
*/
14
public final class AFUNIXSocketChannel extends AFSocketChannel {
15
16
/**
17
* Opens a new AFUNIXSocketChannel
18
* @return New AFUNIXSocketChannel instance
19
* @throws IOException if channel creation fails
20
*/
21
public static AFUNIXSocketChannel open() throws IOException;
22
23
/**
24
* Opens and connects a new AFUNIXSocketChannel to the specified address
25
* @param remote The address to connect to
26
* @return Connected AFUNIXSocketChannel instance
27
* @throws IOException if connection fails
28
*/
29
public static AFUNIXSocketChannel open(AFUNIXSocketAddress remote) throws IOException;
30
31
// Connection operations
32
public boolean connect(SocketAddress remote) throws IOException;
33
public boolean finishConnect() throws IOException;
34
public boolean isConnectionPending();
35
36
// Channel configuration
37
public AFUNIXSocketChannel configureBlocking(boolean block) throws IOException;
38
public boolean isBlocking();
39
public SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
40
public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
41
42
// I/O operations
43
public int read(ByteBuffer dst) throws IOException;
44
public int write(ByteBuffer src) throws IOException;
45
public long read(ByteBuffer[] dsts) throws IOException;
46
public long write(ByteBuffer[] srcs) throws IOException;
47
48
// Socket properties
49
public AFUNIXSocket socket();
50
public AFUNIXSocketAddress getLocalAddress() throws IOException;
51
public AFUNIXSocketAddress getRemoteAddress() throws IOException;
52
53
// Channel state
54
public boolean isOpen();
55
public boolean isConnected();
56
public void close() throws IOException;
57
}
58
```
59
60
**Usage Examples:**
61
62
```java
63
import java.nio.ByteBuffer;
64
import java.nio.channels.SelectionKey;
65
import java.nio.channels.Selector;
66
import java.nio.charset.StandardCharsets;
67
import org.newsclub.net.unix.*;
68
69
// Basic non-blocking client
70
AFUNIXSocketAddress serverAddr = AFUNIXSocketAddress.of(new File("/tmp/server.sock"));
71
try (AFUNIXSocketChannel channel = AFUNIXSocketChannel.open()) {
72
channel.configureBlocking(false);
73
74
// Initiate connection
75
boolean connected = channel.connect(serverAddr);
76
if (!connected) {
77
// Connection pending, wait for completion
78
while (!channel.finishConnect()) {
79
Thread.sleep(10);
80
}
81
}
82
83
// Send data
84
String message = "Hello from NIO client";
85
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
86
while (buffer.hasRemaining()) {
87
channel.write(buffer);
88
}
89
90
// Read response
91
ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
92
int bytesRead = channel.read(responseBuffer);
93
responseBuffer.flip();
94
95
byte[] responseData = new byte[bytesRead];
96
responseBuffer.get(responseData);
97
String response = new String(responseData, StandardCharsets.UTF_8);
98
System.out.println("Server response: " + response);
99
}
100
101
// Blocking channel for simpler usage
102
try (AFUNIXSocketChannel channel = AFUNIXSocketChannel.open(serverAddr)) {
103
// Channel is blocking by default and already connected
104
105
ByteBuffer writeBuffer = ByteBuffer.wrap("Hello Server".getBytes(StandardCharsets.UTF_8));
106
channel.write(writeBuffer);
107
108
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
109
int bytesRead = channel.read(readBuffer);
110
111
readBuffer.flip();
112
String response = StandardCharsets.UTF_8.decode(readBuffer).toString();
113
System.out.println("Response: " + response);
114
}
115
```
116
117
### AFUNIXServerSocketChannel
118
119
NIO server socket channel implementation for accepting Unix Domain Socket connections in non-blocking mode.
120
121
```java { .api }
122
/**
123
* Unix Domain Socket NIO server channel implementation
124
*/
125
public final class AFUNIXServerSocketChannel extends AFServerSocketChannel {
126
127
/**
128
* Opens a new AFUNIXServerSocketChannel
129
* @return New AFUNIXServerSocketChannel instance
130
* @throws IOException if channel creation fails
131
*/
132
public static AFUNIXServerSocketChannel open() throws IOException;
133
134
// Server operations
135
public AFUNIXServerSocket socket();
136
public AFUNIXSocketChannel accept() throws IOException;
137
public AFUNIXServerSocketChannel bind(SocketAddress local) throws IOException;
138
public AFUNIXServerSocketChannel bind(SocketAddress local, int backlog) throws IOException;
139
140
// Channel configuration
141
public AFUNIXServerSocketChannel configureBlocking(boolean block) throws IOException;
142
public boolean isBlocking();
143
public SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
144
public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
145
146
// Server properties
147
public AFUNIXSocketAddress getLocalAddress() throws IOException;
148
public boolean isOpen();
149
public void close() throws IOException;
150
}
151
```
152
153
**Usage Examples:**
154
155
```java
156
import java.nio.ByteBuffer;
157
import java.nio.channels.SelectionKey;
158
import java.nio.channels.Selector;
159
import java.util.Iterator;
160
import java.util.Set;
161
import org.newsclub.net.unix.*;
162
163
// Basic server with selector
164
AFUNIXSocketAddress serverAddr = AFUNIXSocketAddress.of(new File("/tmp/nio-server.sock"));
165
try (AFUNIXServerSocketChannel serverChannel = AFUNIXServerSocketChannel.open();
166
Selector selector = Selector.open()) {
167
168
serverChannel.configureBlocking(false);
169
serverChannel.bind(serverAddr);
170
171
// Register server channel for accept operations
172
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
173
174
System.out.println("NIO Server listening on: " + serverAddr.getFile());
175
176
while (!Thread.interrupted()) {
177
int readyChannels = selector.select(1000); // 1 second timeout
178
if (readyChannels == 0) continue;
179
180
Set<SelectionKey> selectedKeys = selector.selectedKeys();
181
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
182
183
while (keyIterator.hasNext()) {
184
SelectionKey key = keyIterator.next();
185
keyIterator.remove();
186
187
if (key.isAcceptable()) {
188
handleAccept(serverChannel, selector);
189
} else if (key.isReadable()) {
190
handleRead(key);
191
}
192
}
193
}
194
}
195
196
// Helper methods for server operations
197
private static void handleAccept(AFUNIXServerSocketChannel serverChannel, Selector selector) throws IOException {
198
AFUNIXSocketChannel clientChannel = serverChannel.accept();
199
if (clientChannel != null) {
200
clientChannel.configureBlocking(false);
201
clientChannel.register(selector, SelectionKey.OP_READ);
202
System.out.println("Accepted connection from: " + clientChannel.getRemoteAddress());
203
}
204
}
205
206
private static void handleRead(SelectionKey key) throws IOException {
207
AFUNIXSocketChannel clientChannel = (AFUNIXSocketChannel) key.channel();
208
ByteBuffer buffer = ByteBuffer.allocate(1024);
209
210
try {
211
int bytesRead = clientChannel.read(buffer);
212
if (bytesRead > 0) {
213
buffer.flip();
214
215
// Echo back the data
216
while (buffer.hasRemaining()) {
217
clientChannel.write(buffer);
218
}
219
} else if (bytesRead < 0) {
220
// Client closed connection
221
clientChannel.close();
222
key.cancel();
223
}
224
} catch (IOException e) {
225
clientChannel.close();
226
key.cancel();
227
}
228
}
229
```
230
231
### AFUNIXDatagramChannel
232
233
NIO datagram channel implementation for non-blocking Unix Domain Socket datagram communication.
234
235
```java { .api }
236
/**
237
* Unix Domain Socket NIO datagram channel implementation
238
*/
239
public final class AFUNIXDatagramChannel extends AFDatagramChannel {
240
241
/**
242
* Opens a new AFUNIXDatagramChannel
243
* @return New AFUNIXDatagramChannel instance
244
* @throws IOException if channel creation fails
245
*/
246
public static AFUNIXDatagramChannel open() throws IOException;
247
248
// Datagram operations
249
public int send(ByteBuffer src, SocketAddress target) throws IOException;
250
public SocketAddress receive(ByteBuffer dst) throws IOException;
251
public AFUNIXDatagramChannel connect(SocketAddress remote) throws IOException;
252
public AFUNIXDatagramChannel disconnect() throws IOException;
253
254
// Channel configuration
255
public AFUNIXDatagramChannel configureBlocking(boolean block) throws IOException;
256
public boolean isBlocking();
257
public AFUNIXDatagramChannel bind(SocketAddress local) throws IOException;
258
259
// Channel properties
260
public AFUNIXDatagramSocket socket();
261
public AFUNIXSocketAddress getLocalAddress() throws IOException;
262
public AFUNIXSocketAddress getRemoteAddress() throws IOException;
263
public boolean isConnected();
264
265
// I/O operations (when connected)
266
public int read(ByteBuffer dst) throws IOException;
267
public int write(ByteBuffer src) throws IOException;
268
}
269
```
270
271
**Usage Examples:**
272
273
```java
274
import java.nio.ByteBuffer;
275
import java.net.SocketAddress;
276
import java.nio.charset.StandardCharsets;
277
import org.newsclub.net.unix.*;
278
279
// Datagram sender
280
AFUNIXSocketAddress targetAddr = AFUNIXSocketAddress.of(new File("/tmp/datagram-target.sock"));
281
try (AFUNIXDatagramChannel senderChannel = AFUNIXDatagramChannel.open()) {
282
String message = "Hello via NIO datagram";
283
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
284
285
int bytesSent = senderChannel.send(buffer, targetAddr);
286
System.out.println("Sent " + bytesSent + " bytes");
287
}
288
289
// Datagram receiver
290
AFUNIXSocketAddress receiverAddr = AFUNIXSocketAddress.of(new File("/tmp/datagram-receiver.sock"));
291
try (AFUNIXDatagramChannel receiverChannel = AFUNIXDatagramChannel.open()) {
292
receiverChannel.bind(receiverAddr);
293
receiverChannel.configureBlocking(false);
294
295
ByteBuffer buffer = ByteBuffer.allocate(1024);
296
SocketAddress senderAddr = receiverChannel.receive(buffer);
297
298
if (senderAddr != null) {
299
buffer.flip();
300
String message = StandardCharsets.UTF_8.decode(buffer).toString();
301
System.out.println("Received from " + senderAddr + ": " + message);
302
}
303
}
304
305
// Connected datagram channel
306
try (AFUNIXDatagramChannel channel = AFUNIXDatagramChannel.open()) {
307
channel.connect(targetAddr);
308
309
// Can use read/write methods when connected
310
ByteBuffer writeBuffer = ByteBuffer.wrap("Connected message".getBytes(StandardCharsets.UTF_8));
311
channel.write(writeBuffer);
312
313
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
314
int bytesRead = channel.read(readBuffer);
315
316
if (bytesRead > 0) {
317
readBuffer.flip();
318
String response = StandardCharsets.UTF_8.decode(readBuffer).toString();
319
System.out.println("Response: " + response);
320
}
321
}
322
```
323
324
### AFUNIXSelectorProvider
325
326
Custom selector provider for Unix Domain Socket channels enabling efficient event-driven programming.
327
328
```java { .api }
329
/**
330
* Unix Domain Socket selector provider
331
*/
332
public final class AFUNIXSelectorProvider extends AFSelectorProvider {
333
334
/**
335
* Gets the system-wide default AFUNIXSelectorProvider
336
* @return AFUNIXSelectorProvider instance
337
*/
338
public static AFUNIXSelectorProvider provider();
339
340
/**
341
* Gets a new AFUNIXSelectorProvider instance
342
* @return New AFUNIXSelectorProvider instance
343
*/
344
public static AFUNIXSelectorProvider getInstance();
345
346
// Channel creation
347
public AFUNIXSocketChannel openSocketChannel() throws IOException;
348
public AFUNIXServerSocketChannel openServerSocketChannel() throws IOException;
349
public AFUNIXDatagramChannel openDatagramChannel() throws IOException;
350
351
// Selector creation
352
public AbstractSelector openSelector() throws IOException;
353
public Pipe openPipe() throws IOException;
354
}
355
```
356
357
**Usage Examples:**
358
359
```java
360
import java.nio.channels.Selector;
361
import java.nio.channels.Pipe;
362
import org.newsclub.net.unix.*;
363
364
// Using custom selector provider
365
AFUNIXSelectorProvider provider = AFUNIXSelectorProvider.getInstance();
366
367
// Create channels through provider
368
try (AFUNIXSocketChannel socketChannel = provider.openSocketChannel();
369
AFUNIXServerSocketChannel serverChannel = provider.openServerSocketChannel();
370
AFUNIXDatagramChannel datagramChannel = provider.openDatagramChannel()) {
371
372
// Configure channels
373
socketChannel.configureBlocking(false);
374
serverChannel.configureBlocking(false);
375
datagramChannel.configureBlocking(false);
376
377
// Create selector through provider
378
try (Selector selector = provider.openSelector()) {
379
// Register channels with selector
380
socketChannel.register(selector, SelectionKey.OP_CONNECT);
381
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
382
datagramChannel.register(selector, SelectionKey.OP_READ);
383
384
// Event loop
385
while (!Thread.interrupted()) {
386
int readyChannels = selector.select();
387
if (readyChannels > 0) {
388
// Process selected keys
389
processSelectedKeys(selector);
390
}
391
}
392
}
393
}
394
395
// Pipe creation for inter-thread communication
396
try (Pipe pipe = provider.openPipe()) {
397
Pipe.SourceChannel sourceChannel = pipe.source();
398
Pipe.SinkChannel sinkChannel = pipe.sink();
399
400
// Configure pipe channels
401
sourceChannel.configureBlocking(false);
402
sinkChannel.configureBlocking(false);
403
404
// Use pipe for thread communication
405
// ... pipe operations
406
}
407
```
408
409
## Advanced NIO Features
410
411
### Channel Selectors and Event Handling
412
413
```java { .api }
414
// Selection key operations
415
public SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
416
public SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
417
418
// Interest operations
419
public static final int OP_READ = 1;
420
public static final int OP_WRITE = 4;
421
public static final int OP_CONNECT = 8;
422
public static final int OP_ACCEPT = 16;
423
```
424
425
**Advanced Selector Example:**
426
427
```java
428
import java.nio.channels.SelectionKey;
429
import java.nio.channels.Selector;
430
import java.util.concurrent.ConcurrentHashMap;
431
import java.util.Map;
432
433
// Multi-client NIO server with client tracking
434
public class AdvancedNIOServer {
435
private final Map<SelectionKey, ClientHandler> clients = new ConcurrentHashMap<>();
436
437
public void runServer(AFUNIXSocketAddress serverAddr) throws IOException {
438
try (AFUNIXServerSocketChannel serverChannel = AFUNIXServerSocketChannel.open();
439
Selector selector = Selector.open()) {
440
441
serverChannel.configureBlocking(false);
442
serverChannel.bind(serverAddr);
443
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
444
445
while (!Thread.interrupted()) {
446
if (selector.select(1000) > 0) {
447
processKeys(selector, serverChannel);
448
}
449
}
450
}
451
}
452
453
private void processKeys(Selector selector, AFUNIXServerSocketChannel serverChannel) throws IOException {
454
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
455
456
while (keyIterator.hasNext()) {
457
SelectionKey key = keyIterator.next();
458
keyIterator.remove();
459
460
try {
461
if (key.isAcceptable()) {
462
acceptClient(serverChannel, selector);
463
} else if (key.isReadable()) {
464
readFromClient(key);
465
} else if (key.isWritable()) {
466
writeToClient(key);
467
}
468
} catch (IOException e) {
469
closeClient(key);
470
}
471
}
472
}
473
474
private void acceptClient(AFUNIXServerSocketChannel serverChannel, Selector selector) throws IOException {
475
AFUNIXSocketChannel clientChannel = serverChannel.accept();
476
if (clientChannel != null) {
477
clientChannel.configureBlocking(false);
478
SelectionKey clientKey = clientChannel.register(selector, SelectionKey.OP_READ);
479
480
ClientHandler handler = new ClientHandler(clientChannel);
481
clients.put(clientKey, handler);
482
clientKey.attach(handler);
483
}
484
}
485
486
private void readFromClient(SelectionKey key) throws IOException {
487
ClientHandler handler = clients.get(key);
488
if (handler != null) {
489
boolean keepOpen = handler.handleRead();
490
if (!keepOpen) {
491
closeClient(key);
492
}
493
}
494
}
495
496
private void writeToClient(SelectionKey key) throws IOException {
497
ClientHandler handler = clients.get(key);
498
if (handler != null) {
499
handler.handleWrite();
500
}
501
}
502
503
private void closeClient(SelectionKey key) {
504
clients.remove(key);
505
try {
506
key.channel().close();
507
} catch (IOException e) {
508
// Log error
509
}
510
key.cancel();
511
}
512
513
private static class ClientHandler {
514
private final AFUNIXSocketChannel channel;
515
private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
516
private final ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
517
518
public ClientHandler(AFUNIXSocketChannel channel) {
519
this.channel = channel;
520
}
521
522
public boolean handleRead() throws IOException {
523
readBuffer.clear();
524
int bytesRead = channel.read(readBuffer);
525
526
if (bytesRead < 0) {
527
return false; // Client closed connection
528
}
529
530
if (bytesRead > 0) {
531
readBuffer.flip();
532
// Process read data and prepare response
533
prepareResponse();
534
}
535
536
return true;
537
}
538
539
public void handleWrite() throws IOException {
540
if (writeBuffer.hasRemaining()) {
541
channel.write(writeBuffer);
542
}
543
}
544
545
private void prepareResponse() {
546
// Echo the data back
547
writeBuffer.clear();
548
writeBuffer.put(readBuffer);
549
writeBuffer.flip();
550
}
551
}
552
}
553
```
554
555
### Buffer Management and Performance
556
557
```java
558
// Efficient buffer operations
559
ByteBuffer directBuffer = ByteBuffer.allocateDirect(8192);
560
ByteBuffer heapBuffer = ByteBuffer.allocate(8192);
561
562
// Scatter-gather I/O
563
ByteBuffer[] buffers = new ByteBuffer[3];
564
buffers[0] = ByteBuffer.allocate(1024);
565
buffers[1] = ByteBuffer.allocate(2048);
566
buffers[2] = ByteBuffer.allocate(1024);
567
568
long totalBytesRead = channel.read(buffers);
569
long totalBytesWritten = channel.write(buffers);
570
```
571
572
## Error Handling
573
574
NIO-specific exception handling patterns:
575
576
```java { .api }
577
// Channel exceptions
578
public class ClosedChannelException extends IOException;
579
public class NotYetConnectedException extends IllegalStateException;
580
public class AlreadyConnectedException extends IllegalStateException;
581
public class ConnectionPendingException extends IllegalStateException;
582
public class NoConnectionPendingException extends IllegalStateException;
583
```
584
585
**Error Handling Examples:**
586
587
```java
588
try (AFUNIXSocketChannel channel = AFUNIXSocketChannel.open()) {
589
channel.configureBlocking(false);
590
591
if (!channel.connect(serverAddr)) {
592
// Connection pending
593
while (!channel.finishConnect()) {
594
Thread.sleep(10);
595
}
596
}
597
598
ByteBuffer buffer = ByteBuffer.allocate(1024);
599
int bytesRead = channel.read(buffer);
600
601
} catch (ClosedChannelException e) {
602
System.err.println("Channel was closed: " + e.getMessage());
603
} catch (NotYetConnectedException e) {
604
System.err.println("Channel not connected: " + e.getMessage());
605
} catch (AlreadyConnectedException e) {
606
System.err.println("Channel already connected: " + e.getMessage());
607
} catch (ConnectionPendingException e) {
608
System.err.println("Connection already pending: " + e.getMessage());
609
} catch (IOException e) {
610
System.err.println("I/O error: " + e.getMessage());
611
}
612
```