0
# Socket Pairs and Pipes
1
2
Interconnected socket pairs and pipe implementations for efficient bidirectional communication between processes, providing both traditional socket pairs and NIO pipe interfaces.
3
4
## Core Imports
5
6
```java
7
import java.io.*;
8
import java.nio.ByteBuffer;
9
import java.nio.channels.Pipe;
10
import java.nio.channels.ReadableByteChannel;
11
import java.nio.channels.WritableByteChannel;
12
import org.newsclub.net.unix.AFUNIXSocket;
13
import org.newsclub.net.unix.AFUNIXSocketPair;
14
import org.newsclub.net.unix.AFSocketPair;
15
import org.newsclub.net.unix.AFPipe;
16
```
17
18
## Capabilities
19
20
### AFUNIXSocketPair
21
22
Pair of interconnected Unix Domain Sockets for bidirectional communication, where data written to one socket can be read from the other.
23
24
```java { .api }
25
/**
26
* Pair of interconnected Unix Domain Sockets
27
*/
28
public final class AFUNIXSocketPair extends AFSocketPair<AFUNIXSocket> {
29
30
/**
31
* Creates a new pair of interconnected AFUNIXSocket instances
32
* @return New AFUNIXSocketPair instance
33
* @throws IOException if socket pair creation fails
34
*/
35
public static AFUNIXSocketPair open() throws IOException;
36
37
/**
38
* Gets the first socket in the pair
39
* @return First AFUNIXSocket instance
40
*/
41
public AFUNIXSocket getSocket1();
42
43
/**
44
* Gets the second socket in the pair
45
* @return Second AFUNIXSocket instance
46
*/
47
public AFUNIXSocket getSocket2();
48
49
/**
50
* Closes both sockets in the pair
51
* @throws IOException if closing fails
52
*/
53
public void close() throws IOException;
54
55
/**
56
* Checks if both sockets are closed
57
* @return true if both sockets are closed
58
*/
59
public boolean isClosed();
60
}
61
```
62
63
**Usage Examples:**
64
65
```java
66
import java.io.*;
67
import java.nio.charset.StandardCharsets;
68
import org.newsclub.net.unix.*;
69
70
// Basic socket pair communication
71
try (AFUNIXSocketPair socketPair = AFUNIXSocketPair.open()) {
72
AFUNIXSocket socket1 = socketPair.getSocket1();
73
AFUNIXSocket socket2 = socketPair.getSocket2();
74
75
// Thread 1: Write to socket1, read from socket2
76
Thread writer = new Thread(() -> {
77
try {
78
OutputStream os = socket1.getOutputStream();
79
os.write("Hello from socket1".getBytes(StandardCharsets.UTF_8));
80
os.flush();
81
82
// Read response
83
InputStream is = socket1.getInputStream();
84
byte[] buffer = new byte[1024];
85
int bytesRead = is.read(buffer);
86
String response = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
87
System.out.println("Socket1 received: " + response);
88
} catch (IOException e) {
89
e.printStackTrace();
90
}
91
});
92
93
// Thread 2: Read from socket2, write response
94
Thread reader = new Thread(() -> {
95
try {
96
InputStream is = socket2.getInputStream();
97
byte[] buffer = new byte[1024];
98
int bytesRead = is.read(buffer);
99
String message = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
100
System.out.println("Socket2 received: " + message);
101
102
// Send response
103
OutputStream os = socket2.getOutputStream();
104
os.write("Hello from socket2".getBytes(StandardCharsets.UTF_8));
105
os.flush();
106
} catch (IOException e) {
107
e.printStackTrace();
108
}
109
});
110
111
writer.start();
112
reader.start();
113
114
writer.join();
115
reader.join();
116
}
117
118
// Producer-Consumer pattern with socket pairs
119
try (AFUNIXSocketPair socketPair = AFUNIXSocketPair.open()) {
120
AFUNIXSocket producer = socketPair.getSocket1();
121
AFUNIXSocket consumer = socketPair.getSocket2();
122
123
// Producer thread
124
Thread producerThread = new Thread(() -> {
125
try (DataOutputStream dos = new DataOutputStream(producer.getOutputStream())) {
126
for (int i = 0; i < 10; i++) {
127
dos.writeInt(i);
128
dos.writeUTF("Message " + i);
129
dos.flush();
130
System.out.println("Produced: " + i);
131
Thread.sleep(100);
132
}
133
dos.writeInt(-1); // End marker
134
} catch (IOException | InterruptedException e) {
135
e.printStackTrace();
136
}
137
});
138
139
// Consumer thread
140
Thread consumerThread = new Thread(() -> {
141
try (DataInputStream dis = new DataInputStream(consumer.getInputStream())) {
142
int value;
143
while ((value = dis.readInt()) != -1) {
144
String message = dis.readUTF();
145
System.out.println("Consumed: " + value + " - " + message);
146
}
147
} catch (IOException e) {
148
e.printStackTrace();
149
}
150
});
151
152
producerThread.start();
153
consumerThread.start();
154
155
producerThread.join();
156
consumerThread.join();
157
}
158
```
159
160
### AFPipe
161
162
NIO Pipe implementation using Unix Domain Sockets, providing source and sink channels for efficient data transfer.
163
164
```java { .api }
165
/**
166
* Pipe implementation using Unix Domain Sockets
167
*/
168
public final class AFPipe extends Pipe {
169
170
/**
171
* Opens a new AFPipe
172
* @return New AFPipe instance
173
* @throws IOException if pipe creation fails
174
*/
175
public static AFPipe open() throws IOException;
176
177
/**
178
* Returns the source channel for reading from this pipe
179
* @return SourceChannel for reading
180
*/
181
public SourceChannel source();
182
183
/**
184
* Returns the sink channel for writing to this pipe
185
* @return SinkChannel for writing
186
*/
187
public SinkChannel sink();
188
189
/**
190
* Closes both source and sink channels
191
* @throws IOException if closing fails
192
*/
193
public void close() throws IOException;
194
195
/**
196
* Source channel for reading from the pipe
197
*/
198
public static class SourceChannel extends Pipe.SourceChannel {
199
public int read(ByteBuffer dst) throws IOException;
200
public long read(ByteBuffer[] dsts) throws IOException;
201
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
202
public boolean isOpen();
203
public void close() throws IOException;
204
public SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
205
}
206
207
/**
208
* Sink channel for writing to the pipe
209
*/
210
public static class SinkChannel extends Pipe.SinkChannel {
211
public int write(ByteBuffer src) throws IOException;
212
public long write(ByteBuffer[] srcs) throws IOException;
213
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
214
public boolean isOpen();
215
public void close() throws IOException;
216
public SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
217
}
218
}
219
```
220
221
**Usage Examples:**
222
223
```java
224
import java.nio.ByteBuffer;
225
import java.nio.channels.ReadableByteChannel;
226
import java.nio.channels.WritableByteChannel;
227
import java.nio.charset.StandardCharsets;
228
import org.newsclub.net.unix.*;
229
230
// Basic pipe communication
231
try (AFPipe pipe = AFPipe.open()) {
232
ReadableByteChannel sourceChannel = pipe.source();
233
WritableByteChannel sinkChannel = pipe.sink();
234
235
// Writer thread
236
Thread writer = new Thread(() -> {
237
try {
238
String message = "Hello through pipe!";
239
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
240
241
while (buffer.hasRemaining()) {
242
sinkChannel.write(buffer);
243
}
244
245
System.out.println("Message written to pipe");
246
sinkChannel.close(); // Close to signal end of data
247
} catch (IOException e) {
248
e.printStackTrace();
249
}
250
});
251
252
// Reader thread
253
Thread reader = new Thread(() -> {
254
try {
255
ByteBuffer buffer = ByteBuffer.allocate(1024);
256
int bytesRead = sourceChannel.read(buffer);
257
258
if (bytesRead > 0) {
259
buffer.flip();
260
byte[] data = new byte[buffer.remaining()];
261
buffer.get(data);
262
String message = new String(data, StandardCharsets.UTF_8);
263
System.out.println("Message read from pipe: " + message);
264
}
265
} catch (IOException e) {
266
e.printStackTrace();
267
}
268
});
269
270
writer.start();
271
reader.start();
272
273
writer.join();
274
reader.join();
275
}
276
277
// NIO-based data transfer with selector
278
try (AFPipe pipe = AFPipe.open()) {
279
AFPipe.SourceChannel source = pipe.source();
280
AFPipe.SinkChannel sink = pipe.sink();
281
282
// Configure non-blocking mode
283
source.configureBlocking(false);
284
sink.configureBlocking(false);
285
286
Selector selector = Selector.open();
287
SelectionKey sourceKey = source.register(selector, SelectionKey.OP_READ);
288
289
// Producer thread
290
Thread producer = new Thread(() -> {
291
try {
292
ByteBuffer buffer = ByteBuffer.allocate(1024);
293
for (int i = 0; i < 5; i++) {
294
buffer.clear();
295
String data = "Data chunk " + i;
296
buffer.put(data.getBytes(StandardCharsets.UTF_8));
297
buffer.flip();
298
299
while (buffer.hasRemaining()) {
300
sink.write(buffer);
301
}
302
303
System.out.println("Produced: " + data);
304
Thread.sleep(200);
305
}
306
sink.close();
307
} catch (IOException | InterruptedException e) {
308
e.printStackTrace();
309
}
310
});
311
312
// Consumer using selector
313
Thread consumer = new Thread(() -> {
314
try {
315
ByteBuffer buffer = ByteBuffer.allocate(1024);
316
317
while (selector.select() > 0) {
318
for (SelectionKey key : selector.selectedKeys()) {
319
if (key.isReadable()) {
320
ReadableByteChannel channel = (ReadableByteChannel) key.channel();
321
buffer.clear();
322
323
int bytesRead = channel.read(buffer);
324
if (bytesRead > 0) {
325
buffer.flip();
326
byte[] data = new byte[buffer.remaining()];
327
buffer.get(data);
328
String message = new String(data, StandardCharsets.UTF_8);
329
System.out.println("Consumed: " + message);
330
} else if (bytesRead == -1) {
331
// End of stream
332
key.cancel();
333
break;
334
}
335
}
336
}
337
selector.selectedKeys().clear();
338
}
339
} catch (IOException e) {
340
e.printStackTrace();
341
}
342
});
343
344
producer.start();
345
consumer.start();
346
347
producer.join();
348
consumer.join();
349
}
350
```
351
352
### Native Socket Pair Support
353
354
junixsocket supports native socketpair() system call for efficient socket pair creation:
355
356
```java { .api }
357
/**
358
* Base class for socket pairs
359
*/
360
public abstract class AFSocketPair<T extends AFSocket> implements Closeable {
361
362
/**
363
* Checks if native socketpair support is available
364
* @return true if native socketpair is supported
365
*/
366
public static boolean isSupported();
367
368
/**
369
* Gets the first socket in the pair
370
* @return First socket instance
371
*/
372
public abstract T getSocket1();
373
374
/**
375
* Gets the second socket in the pair
376
* @return Second socket instance
377
*/
378
public abstract T getSocket2();
379
}
380
```
381
382
**Capability Testing:**
383
384
```java
385
import org.newsclub.net.unix.*;
386
387
// Check if native socketpair is supported
388
if (AFSocketCapability.CAPABILITY_NATIVE_SOCKETPAIR.isSupported()) {
389
System.out.println("Native socketpair available - using optimized implementation");
390
AFUNIXSocketPair pair = AFUNIXSocketPair.open();
391
// Use socket pair...
392
} else {
393
System.out.println("Using fallback socket pair implementation");
394
// Fallback to alternative IPC mechanism
395
}
396
```