CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-kohlschutter-junixsocket--junixsocket-core

Java Unix Domain Socket implementation providing AF_UNIX, AF_TIPC, AF_VSOCK, and AF_SYSTEM socket support with traditional and NIO APIs

Pending
Overview
Eval results
Files

socket-pairs.mddocs/

Socket Pairs and Pipes

Interconnected socket pairs and pipe implementations for efficient bidirectional communication between processes, providing both traditional socket pairs and NIO pipe interfaces.

Core Imports

import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.newsclub.net.unix.AFUNIXSocket;
import org.newsclub.net.unix.AFUNIXSocketPair;
import org.newsclub.net.unix.AFSocketPair;
import org.newsclub.net.unix.AFPipe;

Capabilities

AFUNIXSocketPair

Pair of interconnected Unix Domain Sockets for bidirectional communication, where data written to one socket can be read from the other.

/**
 * Pair of interconnected Unix Domain Sockets
 */
public final class AFUNIXSocketPair extends AFSocketPair<AFUNIXSocket> {
    
    /**
     * Creates a new pair of interconnected AFUNIXSocket instances
     * @return New AFUNIXSocketPair instance
     * @throws IOException if socket pair creation fails
     */
    public static AFUNIXSocketPair open() throws IOException;
    
    /**
     * Gets the first socket in the pair
     * @return First AFUNIXSocket instance
     */
    public AFUNIXSocket getSocket1();
    
    /**
     * Gets the second socket in the pair
     * @return Second AFUNIXSocket instance
     */
    public AFUNIXSocket getSocket2();
    
    /**
     * Closes both sockets in the pair
     * @throws IOException if closing fails
     */
    public void close() throws IOException;
    
    /**
     * Checks if both sockets are closed
     * @return true if both sockets are closed
     */
    public boolean isClosed();
}

Usage Examples:

import java.io.*;
import java.nio.charset.StandardCharsets;
import org.newsclub.net.unix.*;

// Basic socket pair communication
try (AFUNIXSocketPair socketPair = AFUNIXSocketPair.open()) {
    AFUNIXSocket socket1 = socketPair.getSocket1();
    AFUNIXSocket socket2 = socketPair.getSocket2();
    
    // Thread 1: Write to socket1, read from socket2
    Thread writer = new Thread(() -> {
        try {
            OutputStream os = socket1.getOutputStream();
            os.write("Hello from socket1".getBytes(StandardCharsets.UTF_8));
            os.flush();
            
            // Read response
            InputStream is = socket1.getInputStream();
            byte[] buffer = new byte[1024];
            int bytesRead = is.read(buffer);
            String response = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
            System.out.println("Socket1 received: " + response);
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
    
    // Thread 2: Read from socket2, write response
    Thread reader = new Thread(() -> {
        try {
            InputStream is = socket2.getInputStream();
            byte[] buffer = new byte[1024];
            int bytesRead = is.read(buffer);
            String message = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);
            System.out.println("Socket2 received: " + message);
            
            // Send response
            OutputStream os = socket2.getOutputStream();
            os.write("Hello from socket2".getBytes(StandardCharsets.UTF_8));
            os.flush();
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
    
    writer.start();
    reader.start();
    
    writer.join();
    reader.join();
}

// Producer-Consumer pattern with socket pairs
try (AFUNIXSocketPair socketPair = AFUNIXSocketPair.open()) {
    AFUNIXSocket producer = socketPair.getSocket1();
    AFUNIXSocket consumer = socketPair.getSocket2();
    
    // Producer thread
    Thread producerThread = new Thread(() -> {
        try (DataOutputStream dos = new DataOutputStream(producer.getOutputStream())) {
            for (int i = 0; i < 10; i++) {
                dos.writeInt(i);
                dos.writeUTF("Message " + i);
                dos.flush();
                System.out.println("Produced: " + i);
                Thread.sleep(100);
            }
            dos.writeInt(-1); // End marker
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    });
    
    // Consumer thread
    Thread consumerThread = new Thread(() -> {
        try (DataInputStream dis = new DataInputStream(consumer.getInputStream())) {
            int value;
            while ((value = dis.readInt()) != -1) {
                String message = dis.readUTF();
                System.out.println("Consumed: " + value + " - " + message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
    
    producerThread.start();
    consumerThread.start();
    
    producerThread.join();
    consumerThread.join();
}

AFPipe

NIO Pipe implementation using Unix Domain Sockets, providing source and sink channels for efficient data transfer.

/**
 * Pipe implementation using Unix Domain Sockets
 */
public final class AFPipe extends Pipe {
    
    /**
     * Opens a new AFPipe
     * @return New AFPipe instance
     * @throws IOException if pipe creation fails
     */
    public static AFPipe open() throws IOException;
    
    /**
     * Returns the source channel for reading from this pipe
     * @return SourceChannel for reading
     */
    public SourceChannel source();
    
    /**
     * Returns the sink channel for writing to this pipe
     * @return SinkChannel for writing
     */
    public SinkChannel sink();
    
    /**
     * Closes both source and sink channels
     * @throws IOException if closing fails
     */
    public void close() throws IOException;
    
    /**
     * Source channel for reading from the pipe
     */
    public static class SourceChannel extends Pipe.SourceChannel {
        public int read(ByteBuffer dst) throws IOException;
        public long read(ByteBuffer[] dsts) throws IOException;
        public long read(ByteBuffer[] dsts, int offset, int length) throws IOException;
        public boolean isOpen();
        public void close() throws IOException;
        public SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
    }
    
    /**
     * Sink channel for writing to the pipe
     */
    public static class SinkChannel extends Pipe.SinkChannel {
        public int write(ByteBuffer src) throws IOException;
        public long write(ByteBuffer[] srcs) throws IOException;
        public long write(ByteBuffer[] srcs, int offset, int length) throws IOException;
        public boolean isOpen();
        public void close() throws IOException;
        public SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
    }
}

Usage Examples:

import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import org.newsclub.net.unix.*;

// Basic pipe communication
try (AFPipe pipe = AFPipe.open()) {
    ReadableByteChannel sourceChannel = pipe.source();
    WritableByteChannel sinkChannel = pipe.sink();
    
    // Writer thread
    Thread writer = new Thread(() -> {
        try {
            String message = "Hello through pipe!";
            ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));
            
            while (buffer.hasRemaining()) {
                sinkChannel.write(buffer);
            }
            
            System.out.println("Message written to pipe");
            sinkChannel.close(); // Close to signal end of data
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
    
    // Reader thread
    Thread reader = new Thread(() -> {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            int bytesRead = sourceChannel.read(buffer);
            
            if (bytesRead > 0) {
                buffer.flip();
                byte[] data = new byte[buffer.remaining()];
                buffer.get(data);
                String message = new String(data, StandardCharsets.UTF_8);
                System.out.println("Message read from pipe: " + message);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
    
    writer.start();
    reader.start();
    
    writer.join();
    reader.join();
}

// NIO-based data transfer with selector
try (AFPipe pipe = AFPipe.open()) {
    AFPipe.SourceChannel source = pipe.source();
    AFPipe.SinkChannel sink = pipe.sink();
    
    // Configure non-blocking mode
    source.configureBlocking(false);
    sink.configureBlocking(false);
    
    Selector selector = Selector.open();
    SelectionKey sourceKey = source.register(selector, SelectionKey.OP_READ);
    
    // Producer thread
    Thread producer = new Thread(() -> {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            for (int i = 0; i < 5; i++) {
                buffer.clear();
                String data = "Data chunk " + i;
                buffer.put(data.getBytes(StandardCharsets.UTF_8));
                buffer.flip();
                
                while (buffer.hasRemaining()) {
                    sink.write(buffer);
                }
                
                System.out.println("Produced: " + data);
                Thread.sleep(200);
            }
            sink.close();
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        }
    });
    
    // Consumer using selector
    Thread consumer = new Thread(() -> {
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            
            while (selector.select() > 0) {
                for (SelectionKey key : selector.selectedKeys()) {
                    if (key.isReadable()) {
                        ReadableByteChannel channel = (ReadableByteChannel) key.channel();
                        buffer.clear();
                        
                        int bytesRead = channel.read(buffer);
                        if (bytesRead > 0) {
                            buffer.flip();
                            byte[] data = new byte[buffer.remaining()];
                            buffer.get(data);
                            String message = new String(data, StandardCharsets.UTF_8);
                            System.out.println("Consumed: " + message);
                        } else if (bytesRead == -1) {
                            // End of stream
                            key.cancel();
                            break;
                        }
                    }
                }
                selector.selectedKeys().clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    });
    
    producer.start();
    consumer.start();
    
    producer.join();
    consumer.join();
}

Native Socket Pair Support

junixsocket supports native socketpair() system call for efficient socket pair creation:

/**
 * Base class for socket pairs
 */
public abstract class AFSocketPair<T extends AFSocket> implements Closeable {
    
    /**
     * Checks if native socketpair support is available
     * @return true if native socketpair is supported
     */
    public static boolean isSupported();
    
    /**
     * Gets the first socket in the pair
     * @return First socket instance
     */
    public abstract T getSocket1();
    
    /**
     * Gets the second socket in the pair
     * @return Second socket instance
     */
    public abstract T getSocket2();
}

Capability Testing:

import org.newsclub.net.unix.*;

// Check if native socketpair is supported
if (AFSocketCapability.CAPABILITY_NATIVE_SOCKETPAIR.isSupported()) {
    System.out.println("Native socketpair available - using optimized implementation");
    AFUNIXSocketPair pair = AFUNIXSocketPair.open();
    // Use socket pair...
} else {
    System.out.println("Using fallback socket pair implementation");
    // Fallback to alternative IPC mechanism
}

Install with Tessl CLI

npx tessl i tessl/maven-com-kohlschutter-junixsocket--junixsocket-core

docs

addressing.md

capabilities.md

datagram-sockets.md

exceptions.md

file-descriptors.md

index.md

nio-channels.md

rmi.md

socket-pairs.md

unix-sockets.md

utilities.md

tile.json