or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

addressing.mdcapabilities.mddatagram-sockets.mdexceptions.mdfile-descriptors.mdindex.mdnio-channels.mdrmi.mdsocket-pairs.mdunix-sockets.mdutilities.md

socket-pairs.mddocs/

0

# Socket Pairs and Pipes

1

2

Interconnected socket pairs and pipe implementations for efficient bidirectional communication between processes, providing both traditional socket pairs and NIO pipe interfaces.

3

4

## Core Imports

5

6

```java

7

import java.io.*;

8

import java.nio.ByteBuffer;

9

import java.nio.channels.Pipe;

10

import java.nio.channels.ReadableByteChannel;

11

import java.nio.channels.WritableByteChannel;

12

import org.newsclub.net.unix.AFUNIXSocket;

13

import org.newsclub.net.unix.AFUNIXSocketPair;

14

import org.newsclub.net.unix.AFSocketPair;

15

import org.newsclub.net.unix.AFPipe;

16

```

17

18

## Capabilities

19

20

### AFUNIXSocketPair

21

22

Pair of interconnected Unix Domain Sockets for bidirectional communication, where data written to one socket can be read from the other.

23

24

```java { .api }

25

/**

26

* Pair of interconnected Unix Domain Sockets

27

*/

28

public final class AFUNIXSocketPair extends AFSocketPair<AFUNIXSocket> {

29

30

/**

31

* Creates a new pair of interconnected AFUNIXSocket instances

32

* @return New AFUNIXSocketPair instance

33

* @throws IOException if socket pair creation fails

34

*/

35

public static AFUNIXSocketPair open() throws IOException;

36

37

/**

38

* Gets the first socket in the pair

39

* @return First AFUNIXSocket instance

40

*/

41

public AFUNIXSocket getSocket1();

42

43

/**

44

* Gets the second socket in the pair

45

* @return Second AFUNIXSocket instance

46

*/

47

public AFUNIXSocket getSocket2();

48

49

/**

50

* Closes both sockets in the pair

51

* @throws IOException if closing fails

52

*/

53

public void close() throws IOException;

54

55

/**

56

* Checks if both sockets are closed

57

* @return true if both sockets are closed

58

*/

59

public boolean isClosed();

60

}

61

```

62

63

**Usage Examples:**

64

65

```java

66

import java.io.*;

67

import java.nio.charset.StandardCharsets;

68

import org.newsclub.net.unix.*;

69

70

// Basic socket pair communication

71

try (AFUNIXSocketPair socketPair = AFUNIXSocketPair.open()) {

72

AFUNIXSocket socket1 = socketPair.getSocket1();

73

AFUNIXSocket socket2 = socketPair.getSocket2();

74

75

// Thread 1: Write to socket1, read from socket2

76

Thread writer = new Thread(() -> {

77

try {

78

OutputStream os = socket1.getOutputStream();

79

os.write("Hello from socket1".getBytes(StandardCharsets.UTF_8));

80

os.flush();

81

82

// Read response

83

InputStream is = socket1.getInputStream();

84

byte[] buffer = new byte[1024];

85

int bytesRead = is.read(buffer);

86

String response = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);

87

System.out.println("Socket1 received: " + response);

88

} catch (IOException e) {

89

e.printStackTrace();

90

}

91

});

92

93

// Thread 2: Read from socket2, write response

94

Thread reader = new Thread(() -> {

95

try {

96

InputStream is = socket2.getInputStream();

97

byte[] buffer = new byte[1024];

98

int bytesRead = is.read(buffer);

99

String message = new String(buffer, 0, bytesRead, StandardCharsets.UTF_8);

100

System.out.println("Socket2 received: " + message);

101

102

// Send response

103

OutputStream os = socket2.getOutputStream();

104

os.write("Hello from socket2".getBytes(StandardCharsets.UTF_8));

105

os.flush();

106

} catch (IOException e) {

107

e.printStackTrace();

108

}

109

});

110

111

writer.start();

112

reader.start();

113

114

writer.join();

115

reader.join();

116

}

117

118

// Producer-Consumer pattern with socket pairs

119

try (AFUNIXSocketPair socketPair = AFUNIXSocketPair.open()) {

120

AFUNIXSocket producer = socketPair.getSocket1();

121

AFUNIXSocket consumer = socketPair.getSocket2();

122

123

// Producer thread

124

Thread producerThread = new Thread(() -> {

125

try (DataOutputStream dos = new DataOutputStream(producer.getOutputStream())) {

126

for (int i = 0; i < 10; i++) {

127

dos.writeInt(i);

128

dos.writeUTF("Message " + i);

129

dos.flush();

130

System.out.println("Produced: " + i);

131

Thread.sleep(100);

132

}

133

dos.writeInt(-1); // End marker

134

} catch (IOException | InterruptedException e) {

135

e.printStackTrace();

136

}

137

});

138

139

// Consumer thread

140

Thread consumerThread = new Thread(() -> {

141

try (DataInputStream dis = new DataInputStream(consumer.getInputStream())) {

142

int value;

143

while ((value = dis.readInt()) != -1) {

144

String message = dis.readUTF();

145

System.out.println("Consumed: " + value + " - " + message);

146

}

147

} catch (IOException e) {

148

e.printStackTrace();

149

}

150

});

151

152

producerThread.start();

153

consumerThread.start();

154

155

producerThread.join();

156

consumerThread.join();

157

}

158

```

159

160

### AFPipe

161

162

NIO Pipe implementation using Unix Domain Sockets, providing source and sink channels for efficient data transfer.

163

164

```java { .api }

165

/**

166

* Pipe implementation using Unix Domain Sockets

167

*/

168

public final class AFPipe extends Pipe {

169

170

/**

171

* Opens a new AFPipe

172

* @return New AFPipe instance

173

* @throws IOException if pipe creation fails

174

*/

175

public static AFPipe open() throws IOException;

176

177

/**

178

* Returns the source channel for reading from this pipe

179

* @return SourceChannel for reading

180

*/

181

public SourceChannel source();

182

183

/**

184

* Returns the sink channel for writing to this pipe

185

* @return SinkChannel for writing

186

*/

187

public SinkChannel sink();

188

189

/**

190

* Closes both source and sink channels

191

* @throws IOException if closing fails

192

*/

193

public void close() throws IOException;

194

195

/**

196

* Source channel for reading from the pipe

197

*/

198

public static class SourceChannel extends Pipe.SourceChannel {

199

public int read(ByteBuffer dst) throws IOException;

200

public long read(ByteBuffer[] dsts) throws IOException;

201

public long read(ByteBuffer[] dsts, int offset, int length) throws IOException;

202

public boolean isOpen();

203

public void close() throws IOException;

204

public SelectionKey register(Selector sel, int ops) throws ClosedChannelException;

205

}

206

207

/**

208

* Sink channel for writing to the pipe

209

*/

210

public static class SinkChannel extends Pipe.SinkChannel {

211

public int write(ByteBuffer src) throws IOException;

212

public long write(ByteBuffer[] srcs) throws IOException;

213

public long write(ByteBuffer[] srcs, int offset, int length) throws IOException;

214

public boolean isOpen();

215

public void close() throws IOException;

216

public SelectionKey register(Selector sel, int ops) throws ClosedChannelException;

217

}

218

}

219

```

220

221

**Usage Examples:**

222

223

```java

224

import java.nio.ByteBuffer;

225

import java.nio.channels.ReadableByteChannel;

226

import java.nio.channels.WritableByteChannel;

227

import java.nio.charset.StandardCharsets;

228

import org.newsclub.net.unix.*;

229

230

// Basic pipe communication

231

try (AFPipe pipe = AFPipe.open()) {

232

ReadableByteChannel sourceChannel = pipe.source();

233

WritableByteChannel sinkChannel = pipe.sink();

234

235

// Writer thread

236

Thread writer = new Thread(() -> {

237

try {

238

String message = "Hello through pipe!";

239

ByteBuffer buffer = ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8));

240

241

while (buffer.hasRemaining()) {

242

sinkChannel.write(buffer);

243

}

244

245

System.out.println("Message written to pipe");

246

sinkChannel.close(); // Close to signal end of data

247

} catch (IOException e) {

248

e.printStackTrace();

249

}

250

});

251

252

// Reader thread

253

Thread reader = new Thread(() -> {

254

try {

255

ByteBuffer buffer = ByteBuffer.allocate(1024);

256

int bytesRead = sourceChannel.read(buffer);

257

258

if (bytesRead > 0) {

259

buffer.flip();

260

byte[] data = new byte[buffer.remaining()];

261

buffer.get(data);

262

String message = new String(data, StandardCharsets.UTF_8);

263

System.out.println("Message read from pipe: " + message);

264

}

265

} catch (IOException e) {

266

e.printStackTrace();

267

}

268

});

269

270

writer.start();

271

reader.start();

272

273

writer.join();

274

reader.join();

275

}

276

277

// NIO-based data transfer with selector

278

try (AFPipe pipe = AFPipe.open()) {

279

AFPipe.SourceChannel source = pipe.source();

280

AFPipe.SinkChannel sink = pipe.sink();

281

282

// Configure non-blocking mode

283

source.configureBlocking(false);

284

sink.configureBlocking(false);

285

286

Selector selector = Selector.open();

287

SelectionKey sourceKey = source.register(selector, SelectionKey.OP_READ);

288

289

// Producer thread

290

Thread producer = new Thread(() -> {

291

try {

292

ByteBuffer buffer = ByteBuffer.allocate(1024);

293

for (int i = 0; i < 5; i++) {

294

buffer.clear();

295

String data = "Data chunk " + i;

296

buffer.put(data.getBytes(StandardCharsets.UTF_8));

297

buffer.flip();

298

299

while (buffer.hasRemaining()) {

300

sink.write(buffer);

301

}

302

303

System.out.println("Produced: " + data);

304

Thread.sleep(200);

305

}

306

sink.close();

307

} catch (IOException | InterruptedException e) {

308

e.printStackTrace();

309

}

310

});

311

312

// Consumer using selector

313

Thread consumer = new Thread(() -> {

314

try {

315

ByteBuffer buffer = ByteBuffer.allocate(1024);

316

317

while (selector.select() > 0) {

318

for (SelectionKey key : selector.selectedKeys()) {

319

if (key.isReadable()) {

320

ReadableByteChannel channel = (ReadableByteChannel) key.channel();

321

buffer.clear();

322

323

int bytesRead = channel.read(buffer);

324

if (bytesRead > 0) {

325

buffer.flip();

326

byte[] data = new byte[buffer.remaining()];

327

buffer.get(data);

328

String message = new String(data, StandardCharsets.UTF_8);

329

System.out.println("Consumed: " + message);

330

} else if (bytesRead == -1) {

331

// End of stream

332

key.cancel();

333

break;

334

}

335

}

336

}

337

selector.selectedKeys().clear();

338

}

339

} catch (IOException e) {

340

e.printStackTrace();

341

}

342

});

343

344

producer.start();

345

consumer.start();

346

347

producer.join();

348

consumer.join();

349

}

350

```

351

352

### Native Socket Pair Support

353

354

junixsocket supports native socketpair() system call for efficient socket pair creation:

355

356

```java { .api }

357

/**

358

* Base class for socket pairs

359

*/

360

public abstract class AFSocketPair<T extends AFSocket> implements Closeable {

361

362

/**

363

* Checks if native socketpair support is available

364

* @return true if native socketpair is supported

365

*/

366

public static boolean isSupported();

367

368

/**

369

* Gets the first socket in the pair

370

* @return First socket instance

371

*/

372

public abstract T getSocket1();

373

374

/**

375

* Gets the second socket in the pair

376

* @return Second socket instance

377

*/

378

public abstract T getSocket2();

379

}

380

```

381

382

**Capability Testing:**

383

384

```java

385

import org.newsclub.net.unix.*;

386

387

// Check if native socketpair is supported

388

if (AFSocketCapability.CAPABILITY_NATIVE_SOCKETPAIR.isSupported()) {

389

System.out.println("Native socketpair available - using optimized implementation");

390

AFUNIXSocketPair pair = AFUNIXSocketPair.open();

391

// Use socket pair...

392

} else {

393

System.out.println("Using fallback socket pair implementation");

394

// Fallback to alternative IPC mechanism

395

}

396

```