Java Unix Domain Socket implementation providing AF_UNIX, AF_TIPC, AF_VSOCK, and AF_SYSTEM socket support with traditional and NIO APIs
—
Interconnected socket pairs and pipe implementations for efficient bidirectional communication between processes, providing both traditional socket pairs and NIO pipe interfaces.
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.newsclub.net.unix.AFUNIXSocket;
import org.newsclub.net.unix.AFUNIXSocketPair;
import org.newsclub.net.unix.AFSocketPair;
import org.newsclub.net.unix.AFPipe;Pair of interconnected Unix Domain Sockets for bidirectional communication, where data written to one socket can be read from the other.
/**
* Pair of interconnected Unix Domain Sockets
*/
public final class AFUNIXSocketPair extends AFSocketPair<AFUNIXSocket> {
/**
* Creates a new pair of interconnected AFUNIXSocket instances
* @return New AFUNIXSocketPair instance
* @throws IOException if socket pair creation fails
*/
public static AFUNIXSocketPair open() throws IOException;
/**
* Gets the first socket in the pair
* @return First AFUNIXSocket instance
*/
public AFUNIXSocket getSocket1();
/**
* Gets the second socket in the pair
* @return Second AFUNIXSocket instance
*/
public AFUNIXSocket getSocket2();
/**
* Closes both sockets in the pair
* @throws IOException if closing fails
*/
public void close() throws IOException;
/**
* Checks if both sockets are closed
* @return true if both sockets are closed
*/
public boolean isClosed();
}Usage Examples:
import java.io.*;
import java.nio.charset.StandardCharsets;
import org.newsclub.net.unix.*;
// Basic socket pair communication
try (AFUNIXSocketPair socketPair = AFUNIXSocketPair.open()) {
AFUNIXSocket socket1 = socketPair.getSocket1();
AFUNIXSocket socket2 = socketPair.getSocket2();
// Thread 1: Write to socket1, read from socket2
Thread writer = new Thread(() -> {
try {
OutputStream os = socket1.getOutputStream();
os.write("Hello from socket1".getBytes(StandardCharsets.UTF_8));
os.flush();
// Read response
InputStream is = socket1.getInputStream();
byte[] buffer = new byte[1024];
int bytesRead = is.read(buffer);
String response = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
System.out.println("Socket1 received: " + response);
} catch (IOException e) {
e.printStackTrace();
}
});
// Thread 2: Read from socket2, write response
Thread reader = new Thread(() -> {
try {
InputStream is = socket2.getInputStream();
byte[] buffer = new byte[1024];
int bytesRead = is.read(buffer);
String message = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
System.out.println("Socket2 received: " + message);
// Send response
OutputStream os = socket2.getOutputStream();
os.write("Hello from socket2".getBytes(StandardCharsets.UTF_8));
os.flush();
} catch (IOException e) {
e.printStackTrace();
}
});
writer.start();
reader.start();
writer.join();
reader.join();
}
// Producer-Consumer pattern with socket pairs
try (AFUNIXSocketPair socketPair = AFUNIXSocketPair.open()) {
AFUNIXSocket producer = socketPair.getSocket1();
AFUNIXSocket consumer = socketPair.getSocket2();
// Producer thread
Thread producerThread = new Thread(() -> {
try (DataOutputStream dos = new DataOutputStream(producer.getOutputStream())) {
for (int i = 0; i < 10; i++) {
dos.writeInt(i);
dos.writeUTF("Message " + i);
dos.flush();
System.out.println("Produced: " + i);
Thread.sleep(100);
}
dos.writeInt(-1); // End marker
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
});
// Consumer thread
Thread consumerThread = new Thread(() -> {
try (DataInputStream dis = new DataInputStream(consumer.getInputStream())) {
int value;
while ((value = dis.readInt()) != -1) {
String message = dis.readUTF();
System.out.println("Consumed: " + value + " - " + message);
}
} catch (IOException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}NIO Pipe implementation using Unix Domain Sockets, providing source and sink channels for efficient data transfer.
/**
* Pipe implementation using Unix Domain Sockets
*/
public final class AFPipe extends Pipe {
/**
* Opens a new AFPipe
* @return New AFPipe instance
* @throws IOException if pipe creation fails
*/
public static AFPipe open() throws IOException;
/**
* Returns the source channel for reading from this pipe
* @return SourceChannel for reading
*/
public SourceChannel source();
/**
* Returns the sink channel for writing to this pipe
* @return SinkChannel for writing
*/
public SinkChannel sink();
/**
* Closes both source and sink channels
* @throws IOException if closing fails
*/
public void close() throws IOException;
/**
* Source channel for reading from the pipe
*/
public static class SourceChannel extends Pipe.SourceChannel {
public int read(ByteBuffer dst) throws IOException;
public long read(ByteBuffer[] dsts) throws IOException;
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
public boolean isOpen();
public void close() throws IOException;
public SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
}
/**
* Sink channel for writing to the pipe
*/
public static class SinkChannel extends Pipe.SinkChannel {
public int write(ByteBuffer src) throws IOException;
public long write(ByteBuffer[] srcs) throws IOException;
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
public boolean isOpen();
public void close() throws IOException;
public SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
}
}Usage Examples:
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import org.newsclub.net.unix.*;
// Basic pipe communication
try (AFPipe pipe = AFPipe.open()) {
ReadableByteChannel sourceChannel = pipe.source();
WritableByteChannel sinkChannel = pipe.sink();
// Writer thread
Thread writer = new Thread(() -> {
try {
String message = "Hello through pipe!";
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
while (buffer.hasRemaining()) {
sinkChannel.write(buffer);
}
System.out.println("Message written to pipe");
sinkChannel.close(); // Close to signal end of data
} catch (IOException e) {
e.printStackTrace();
}
});
// Reader thread
Thread reader = new Thread(() -> {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = sourceChannel.read(buffer);
if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String message = new String(data, StandardCharsets.UTF_8);
System.out.println("Message read from pipe: " + message);
}
} catch (IOException e) {
e.printStackTrace();
}
});
writer.start();
reader.start();
writer.join();
reader.join();
}
// NIO-based data transfer with selector
try (AFPipe pipe = AFPipe.open()) {
AFPipe.SourceChannel source = pipe.source();
AFPipe.SinkChannel sink = pipe.sink();
// Configure non-blocking mode
source.configureBlocking(false);
sink.configureBlocking(false);
Selector selector = Selector.open();
SelectionKey sourceKey = source.register(selector, SelectionKey.OP_READ);
// Producer thread
Thread producer = new Thread(() -> {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
for (int i = 0; i < 5; i++) {
buffer.clear();
String data = "Data chunk " + i;
buffer.put(data.getBytes(StandardCharsets.UTF_8));
buffer.flip();
while (buffer.hasRemaining()) {
sink.write(buffer);
}
System.out.println("Produced: " + data);
Thread.sleep(200);
}
sink.close();
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
});
// Consumer using selector
Thread consumer = new Thread(() -> {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (selector.select() > 0) {
for (SelectionKey key : selector.selectedKeys()) {
if (key.isReadable()) {
ReadableByteChannel channel = (ReadableByteChannel) key.channel();
buffer.clear();
int bytesRead = channel.read(buffer);
if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String message = new String(data, StandardCharsets.UTF_8);
System.out.println("Consumed: " + message);
} else if (bytesRead == -1) {
// End of stream
key.cancel();
break;
}
}
}
selector.selectedKeys().clear();
}
} catch (IOException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
}junixsocket supports native socketpair() system call for efficient socket pair creation:
/**
* Base class for socket pairs
*/
public abstract class AFSocketPair<T extends AFSocket> implements Closeable {
/**
* Checks if native socketpair support is available
* @return true if native socketpair is supported
*/
public static boolean isSupported();
/**
* Gets the first socket in the pair
* @return First socket instance
*/
public abstract T getSocket1();
/**
* Gets the second socket in the pair
* @return Second socket instance
*/
public abstract T getSocket2();
}Capability Testing:
import org.newsclub.net.unix.*;
// Check if native socketpair is supported
if (AFSocketCapability.CAPABILITY_NATIVE_SOCKETPAIR.isSupported()) {
System.out.println("Native socketpair available - using optimized implementation");
AFUNIXSocketPair pair = AFUNIXSocketPair.open();
// Use socket pair...
} else {
System.out.println("Using fallback socket pair implementation");
// Fallback to alternative IPC mechanism
}Install with Tessl CLI
npx tessl i tessl/maven-com-kohlschutter-junixsocket--junixsocket-core