or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdbuffer-management.mdfilter-chain.mdindex.mdprotocol-codecs.mdservice-abstractions.mdsession-management.mdtransport-layer.md

async-operations.mddocs/

0

# Async Operations

1

2

MINA Core provides a comprehensive Future-based asynchronous programming model. All I/O operations return Future objects that allow you to handle operations asynchronously without blocking threads, enabling highly scalable network applications.

3

4

## IoFuture Hierarchy

5

6

### Base IoFuture Interface

7

8

```java { .api }

9

public interface IoFuture {

10

// Associated session

11

IoSession getSession();

12

13

// Blocking wait methods

14

IoFuture await() throws InterruptedException;

15

boolean await(long timeout, TimeUnit unit) throws InterruptedException;

16

boolean await(long timeoutMillis) throws InterruptedException;

17

18

// Uninterruptible wait methods

19

IoFuture awaitUninterruptibly();

20

boolean awaitUninterruptibly(long timeout, TimeUnit unit);

21

boolean awaitUninterruptibly(long timeoutMillis);

22

23

// Completion status

24

boolean isDone();

25

26

// Event listeners

27

IoFuture addListener(IoFutureListener<?> listener);

28

IoFuture removeListener(IoFutureListener<?> listener);

29

}

30

```

31

32

### Specialized Future Types

33

34

#### ConnectFuture

35

36

Future for connection operations:

37

38

```java { .api }

39

public interface ConnectFuture extends IoFuture {

40

// Connection result

41

IoSession getSession();

42

boolean isConnected();

43

boolean isCanceled();

44

45

// Connection control

46

boolean cancel();

47

48

// Exception handling

49

Throwable getException();

50

51

// Typed listener methods

52

ConnectFuture addListener(IoFutureListener<? extends ConnectFuture> listener);

53

ConnectFuture removeListener(IoFutureListener<? extends ConnectFuture> listener);

54

ConnectFuture await() throws InterruptedException;

55

ConnectFuture awaitUninterruptibly();

56

}

57

```

58

59

#### WriteFuture

60

61

Future for write operations:

62

63

```java { .api }

64

public interface WriteFuture extends IoFuture {

65

// Write result

66

boolean isWritten();

67

68

// Exception handling

69

Throwable getException();

70

71

// Typed listener methods

72

WriteFuture addListener(IoFutureListener<? extends WriteFuture> listener);

73

WriteFuture removeListener(IoFutureListener<? extends WriteFuture> listener);

74

WriteFuture await() throws InterruptedException;

75

WriteFuture awaitUninterruptibly();

76

}

77

```

78

79

#### ReadFuture

80

81

Future for read operations:

82

83

```java { .api }

84

public interface ReadFuture extends IoFuture {

85

// Read result

86

Object getMessage();

87

boolean isRead();

88

boolean isClosed();

89

90

// Exception handling

91

Throwable getException();

92

93

// Typed listener methods

94

ReadFuture addListener(IoFutureListener<? extends ReadFuture> listener);

95

ReadFuture removeListener(IoFutureListener<? extends ReadFuture> listener);

96

ReadFuture await() throws InterruptedException;

97

ReadFuture awaitUninterruptibly();

98

}

99

```

100

101

#### CloseFuture

102

103

Future for close operations:

104

105

```java { .api }

106

public interface CloseFuture extends IoFuture {

107

// Close result

108

boolean isClosed();

109

110

// Typed listener methods

111

CloseFuture addListener(IoFutureListener<? extends CloseFuture> listener);

112

CloseFuture removeListener(IoFutureListener<? extends CloseFuture> listener);

113

CloseFuture await() throws InterruptedException;

114

CloseFuture awaitUninterruptibly();

115

}

116

```

117

118

## Asynchronous Connection Handling

119

120

### Basic Async Connection

121

122

```java { .api }

123

public class AsyncConnectionExample {

124

125

public void connectAsync() {

126

NioSocketConnector connector = new NioSocketConnector();

127

connector.setHandler(new ClientHandler());

128

129

// Initiate async connection

130

ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 8080));

131

132

// Add completion listener

133

future.addListener(new IoFutureListener<ConnectFuture>() {

134

@Override

135

public void operationComplete(ConnectFuture future) {

136

if (future.isConnected()) {

137

System.out.println("Connected successfully!");

138

IoSession session = future.getSession();

139

session.write("Hello Server!");

140

} else {

141

System.err.println("Connection failed: " + future.getException());

142

connector.dispose();

143

}

144

}

145

});

146

147

// Continue with other work while connection proceeds asynchronously

148

performOtherWork();

149

}

150

151

public void connectWithTimeout() {

152

NioSocketConnector connector = new NioSocketConnector();

153

connector.setConnectTimeoutMillis(5000); // 5 second timeout

154

155

ConnectFuture future = connector.connect(new InetSocketAddress("remote.example.com", 8080));

156

157

// Wait with timeout

158

boolean connected = future.awaitUninterruptibly(10000); // 10 second wait

159

160

if (connected && future.isConnected()) {

161

IoSession session = future.getSession();

162

System.out.println("Connected to: " + session.getRemoteAddress());

163

} else if (future.isCanceled()) {

164

System.out.println("Connection was canceled");

165

} else {

166

System.out.println("Connection timed out or failed");

167

Throwable cause = future.getException();

168

if (cause != null) {

169

cause.printStackTrace();

170

}

171

}

172

}

173

}

174

```

175

176

### Connection Pooling with Futures

177

178

```java { .api }

179

public class AsyncConnectionPool {

180

private final NioSocketConnector connector;

181

private final BlockingQueue<IoSession> availableSessions;

182

private final Set<IoSession> allSessions;

183

private final String host;

184

private final int port;

185

private final int maxConnections;

186

187

public AsyncConnectionPool(String host, int port, int maxConnections) {

188

this.host = host;

189

this.port = port;

190

this.maxConnections = maxConnections;

191

this.connector = new NioSocketConnector();

192

this.availableSessions = new LinkedBlockingQueue<>();

193

this.allSessions = Collections.synchronizedSet(new HashSet<>());

194

195

connector.setHandler(new PooledSessionHandler());

196

}

197

198

public CompletableFuture<IoSession> getSession() {

199

CompletableFuture<IoSession> result = new CompletableFuture<>();

200

201

// Try to get existing session

202

IoSession session = availableSessions.poll();

203

if (session != null && session.isConnected()) {

204

result.complete(session);

205

return result;

206

}

207

208

// Create new connection if under limit

209

if (allSessions.size() < maxConnections) {

210

ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, port));

211

connectFuture.addListener(new IoFutureListener<ConnectFuture>() {

212

@Override

213

public void operationComplete(ConnectFuture future) {

214

if (future.isConnected()) {

215

IoSession newSession = future.getSession();

216

allSessions.add(newSession);

217

result.complete(newSession);

218

} else {

219

result.completeExceptionally(future.getException());

220

}

221

}

222

});

223

} else {

224

result.completeExceptionally(new RuntimeException("Connection pool exhausted"));

225

}

226

227

return result;

228

}

229

230

public void returnSession(IoSession session) {

231

if (session.isConnected()) {

232

availableSessions.offer(session);

233

} else {

234

allSessions.remove(session);

235

}

236

}

237

238

public CompletableFuture<Void> shutdown() {

239

CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();

240

List<CloseFuture> closeFutures = new ArrayList<>();

241

242

// Close all sessions

243

for (IoSession session : allSessions) {

244

closeFutures.add(session.closeNow());

245

}

246

247

// Wait for all closes to complete

248

if (closeFutures.isEmpty()) {

249

connector.dispose();

250

shutdownFuture.complete(null);

251

} else {

252

CompletableFuture.allOf(closeFutures.stream()

253

.map(this::toCompletableFuture)

254

.toArray(CompletableFuture[]::new))

255

.thenRun(() -> {

256

connector.dispose();

257

shutdownFuture.complete(null);

258

});

259

}

260

261

return shutdownFuture;

262

}

263

}

264

```

265

266

## Asynchronous Write Operations

267

268

### Basic Async Writes

269

270

```java { .api }

271

public class AsyncWriteExample {

272

273

public void writeAsync(IoSession session, Object message) {

274

WriteFuture future = session.write(message);

275

276

// Add completion listener

277

future.addListener(new IoFutureListener<WriteFuture>() {

278

@Override

279

public void operationComplete(WriteFuture future) {

280

if (future.isWritten()) {

281

System.out.println("Message sent successfully");

282

} else {

283

System.err.println("Write failed: " + future.getException());

284

// Handle write failure (retry, close session, etc.)

285

handleWriteFailure(session, message, future.getException());

286

}

287

}

288

});

289

}

290

291

public void writeWithConfirmation(IoSession session, String message) {

292

WriteFuture future = session.write(message);

293

294

// Wait for write completion with timeout

295

boolean written = future.awaitUninterruptibly(5000); // 5 second timeout

296

297

if (written && future.isWritten()) {

298

System.out.println("Message confirmed sent: " + message);

299

} else {

300

System.err.println("Write timeout or failed for message: " + message);

301

if (future.getException() != null) {

302

future.getException().printStackTrace();

303

}

304

}

305

}

306

307

private void handleWriteFailure(IoSession session, Object message, Throwable cause) {

308

if (cause instanceof WriteTimeoutException) {

309

System.err.println("Write timeout - session may be slow");

310

// Consider reducing write frequency or closing session

311

} else if (cause instanceof WriteToClosedSessionException) {

312

System.err.println("Attempted to write to closed session");

313

// Clean up and stop writing

314

} else {

315

System.err.println("Unexpected write error: " + cause.getMessage());

316

// Log error and potentially retry

317

}

318

}

319

}

320

```

321

322

### Bulk Write Operations

323

324

```java { .api }

325

public class BulkAsyncWrites {

326

327

public CompletableFuture<Void> writeMultipleMessages(IoSession session, List<Object> messages) {

328

List<WriteFuture> writeFutures = new ArrayList<>();

329

330

// Initiate all writes

331

for (Object message : messages) {

332

WriteFuture future = session.write(message);

333

writeFutures.add(future);

334

}

335

336

// Convert MINA futures to CompletableFuture

337

CompletableFuture<Void> result = CompletableFuture.allOf(

338

writeFutures.stream()

339

.map(this::toCompletableFuture)

340

.toArray(CompletableFuture[]::new)

341

);

342

343

return result;

344

}

345

346

public void writeSequentially(IoSession session, List<Object> messages) {

347

writeSequentiallyRecursive(session, messages, 0);

348

}

349

350

private void writeSequentiallyRecursive(IoSession session, List<Object> messages, int index) {

351

if (index >= messages.size()) {

352

System.out.println("All messages sent sequentially");

353

return;

354

}

355

356

WriteFuture future = session.write(messages.get(index));

357

future.addListener(new IoFutureListener<WriteFuture>() {

358

@Override

359

public void operationComplete(WriteFuture future) {

360

if (future.isWritten()) {

361

// Write next message

362

writeSequentiallyRecursive(session, messages, index + 1);

363

} else {

364

System.err.println("Sequential write failed at index " + index);

365

}

366

}

367

});

368

}

369

370

public void writeBatched(IoSession session, List<Object> messages, int batchSize) {

371

List<List<Object>> batches = partitionList(messages, batchSize);

372

writeBatchesSequentially(session, batches, 0);

373

}

374

375

private void writeBatchesSequentially(IoSession session, List<List<Object>> batches, int batchIndex) {

376

if (batchIndex >= batches.size()) {

377

System.out.println("All batches sent");

378

return;

379

}

380

381

List<Object> currentBatch = batches.get(batchIndex);

382

List<WriteFuture> batchFutures = new ArrayList<>();

383

384

// Send all messages in current batch

385

for (Object message : currentBatch) {

386

batchFutures.add(session.write(message));

387

}

388

389

// Wait for batch completion before sending next batch

390

CompletableFuture.allOf(

391

batchFutures.stream()

392

.map(this::toCompletableFuture)

393

.toArray(CompletableFuture[]::new)

394

).thenRun(() -> {

395

System.out.println("Batch " + batchIndex + " completed");

396

writeBatchesSequentially(session, batches, batchIndex + 1);

397

});

398

}

399

}

400

```

401

402

## Asynchronous Read Operations

403

404

### Enabling and Using Read Operations

405

406

```java { .api }

407

public class AsyncReadExample {

408

409

public void enableAsyncReads(IoSession session) {

410

// Enable read operations (disabled by default)

411

session.getConfig().setUseReadOperation(true);

412

}

413

414

public void readAsync(IoSession session) {

415

ReadFuture future = session.read();

416

417

future.addListener(new IoFutureListener<ReadFuture>() {

418

@Override

419

public void operationComplete(ReadFuture future) {

420

if (future.isRead()) {

421

Object message = future.getMessage();

422

System.out.println("Read message: " + message);

423

424

// Continue reading

425

readAsync(session);

426

} else if (future.isClosed()) {

427

System.out.println("Session closed during read");

428

} else {

429

System.err.println("Read failed: " + future.getException());

430

}

431

}

432

});

433

}

434

435

public Object readSync(IoSession session, long timeoutMillis) {

436

ReadFuture future = session.read();

437

438

boolean completed = future.awaitUninterruptibly(timeoutMillis);

439

440

if (completed && future.isRead()) {

441

return future.getMessage();

442

} else if (future.isClosed()) {

443

throw new RuntimeException("Session closed during read");

444

} else {

445

throw new RuntimeException("Read timeout or failed");

446

}

447

}

448

}

449

```

450

451

### Request-Response Pattern

452

453

```java { .api }

454

public class RequestResponsePattern {

455

456

public CompletableFuture<Object> sendRequest(IoSession session, Object request) {

457

CompletableFuture<Object> responseFuture = new CompletableFuture<>();

458

459

// Send request

460

WriteFuture writeFuture = session.write(request);

461

462

writeFuture.addListener(new IoFutureListener<WriteFuture>() {

463

@Override

464

public void operationComplete(WriteFuture future) {

465

if (future.isWritten()) {

466

// Request sent, now wait for response

467

ReadFuture readFuture = session.read();

468

readFuture.addListener(new IoFutureListener<ReadFuture>() {

469

@Override

470

public void operationComplete(ReadFuture readFuture) {

471

if (readFuture.isRead()) {

472

responseFuture.complete(readFuture.getMessage());

473

} else {

474

responseFuture.completeExceptionally(

475

readFuture.getException() != null ?

476

readFuture.getException() :

477

new RuntimeException("Read failed")

478

);

479

}

480

}

481

});

482

} else {

483

responseFuture.completeExceptionally(

484

future.getException() != null ?

485

future.getException() :

486

new RuntimeException("Write failed")

487

);

488

}

489

}

490

});

491

492

return responseFuture;

493

}

494

495

public CompletableFuture<List<Object>> sendMultipleRequests(IoSession session, List<Object> requests) {

496

List<CompletableFuture<Object>> requestFutures = new ArrayList<>();

497

498

for (Object request : requests) {

499

requestFutures.add(sendRequest(session, request));

500

}

501

502

return CompletableFuture.allOf(requestFutures.toArray(new CompletableFuture[0]))

503

.thenApply(v -> requestFutures.stream()

504

.map(CompletableFuture::join)

505

.collect(Collectors.toList())

506

);

507

}

508

}

509

```

510

511

## Session Close Handling

512

513

### Async Session Closure

514

515

```java { .api }

516

public class AsyncCloseExample {

517

518

public void closeGracefully(IoSession session) {

519

// Close after pending writes complete

520

CloseFuture future = session.closeOnFlush();

521

522

future.addListener(new IoFutureListener<CloseFuture>() {

523

@Override

524

public void operationComplete(CloseFuture future) {

525

System.out.println("Session closed gracefully: " + session.getId());

526

cleanupSessionResources(session);

527

}

528

});

529

}

530

531

public void closeImmediately(IoSession session) {

532

// Close immediately, discarding pending writes

533

CloseFuture future = session.closeNow();

534

535

future.addListener(new IoFutureListener<CloseFuture>() {

536

@Override

537

public void operationComplete(CloseFuture future) {

538

System.out.println("Session closed immediately: " + session.getId());

539

cleanupSessionResources(session);

540

}

541

});

542

}

543

544

public CompletableFuture<Void> closeMultipleSessions(Collection<IoSession> sessions) {

545

List<CompletableFuture<Void>> closeFutures = new ArrayList<>();

546

547

for (IoSession session : sessions) {

548

CloseFuture closeFuture = session.closeOnFlush();

549

closeFutures.add(toCompletableFuture(closeFuture));

550

}

551

552

return CompletableFuture.allOf(closeFutures.toArray(new CompletableFuture[0]));

553

}

554

555

public void waitForClose(IoSession session, long timeoutMillis) {

556

CloseFuture closeFuture = session.getCloseFuture();

557

558

boolean closed = closeFuture.awaitUninterruptibly(timeoutMillis);

559

560

if (closed) {

561

System.out.println("Session closed within timeout");

562

} else {

563

System.out.println("Session close timed out");

564

}

565

}

566

}

567

```

568

569

## Future Composition and Chaining

570

571

### Converting MINA Futures to CompletableFuture

572

573

```java { .api }

574

public class FutureComposition {

575

576

public <T extends IoFuture> CompletableFuture<T> toCompletableFuture(T minaFuture) {

577

CompletableFuture<T> completableFuture = new CompletableFuture<>();

578

579

minaFuture.addListener(new IoFutureListener<T>() {

580

@Override

581

public void operationComplete(T future) {

582

completableFuture.complete(future);

583

}

584

});

585

586

return completableFuture;

587

}

588

589

public CompletableFuture<IoSession> connectThenAuthenticate(String host, int port, String username, String password) {

590

NioSocketConnector connector = new NioSocketConnector();

591

592

return toCompletableFuture(connector.connect(new InetSocketAddress(host, port)))

593

.thenCompose(connectFuture -> {

594

if (connectFuture.isConnected()) {

595

IoSession session = connectFuture.getSession();

596

597

// Send authentication

598

AuthRequest authReq = new AuthRequest(username, password);

599

return toCompletableFuture(session.write(authReq))

600

.thenCompose(writeFuture -> {

601

if (writeFuture.isWritten()) {

602

// Wait for auth response

603

return toCompletableFuture(session.read())

604

.thenApply(readFuture -> {

605

if (readFuture.isRead()) {

606

AuthResponse response = (AuthResponse) readFuture.getMessage();

607

if (response.isSuccess()) {

608

return session;

609

} else {

610

session.closeNow();

611

throw new RuntimeException("Authentication failed");

612

}

613

} else {

614

session.closeNow();

615

throw new RuntimeException("Failed to read auth response");

616

}

617

});

618

} else {

619

session.closeNow();

620

throw new RuntimeException("Failed to send auth request");

621

}

622

});

623

} else {

624

throw new RuntimeException("Connection failed: " + connectFuture.getException());

625

}

626

});

627

}

628

629

public CompletableFuture<String> performComplexOperation(IoSession session) {

630

return sendCommand(session, "INIT")

631

.thenCompose(initResponse -> sendCommand(session, "SETUP " + initResponse))

632

.thenCompose(setupResponse -> sendCommand(session, "EXECUTE"))

633

.thenCompose(executeResponse -> sendCommand(session, "FINALIZE"))

634

.thenApply(finalizeResponse -> "Operation completed: " + finalizeResponse);

635

}

636

637

private CompletableFuture<String> sendCommand(IoSession session, String command) {

638

return toCompletableFuture(session.write(command))

639

.thenCompose(writeFuture -> {

640

if (writeFuture.isWritten()) {

641

return toCompletableFuture(session.read())

642

.thenApply(readFuture -> {

643

if (readFuture.isRead()) {

644

return readFuture.getMessage().toString();

645

} else {

646

throw new RuntimeException("Failed to read response for: " + command);

647

}

648

});

649

} else {

650

throw new RuntimeException("Failed to send command: " + command);

651

}

652

});

653

}

654

}

655

```

656

657

## Error Handling in Async Operations

658

659

### Comprehensive Error Handling

660

661

```java { .api }

662

public class AsyncErrorHandling {

663

664

public void robustAsyncWrite(IoSession session, Object message, int maxRetries) {

665

writeWithRetry(session, message, maxRetries, 0);

666

}

667

668

private void writeWithRetry(IoSession session, Object message, int maxRetries, int attempt) {

669

WriteFuture future = session.write(message);

670

671

future.addListener(new IoFutureListener<WriteFuture>() {

672

@Override

673

public void operationComplete(WriteFuture future) {

674

if (future.isWritten()) {

675

System.out.println("Message sent successfully on attempt " + (attempt + 1));

676

} else {

677

Throwable cause = future.getException();

678

679

if (attempt < maxRetries && isRetryableError(cause)) {

680

System.out.println("Retrying write attempt " + (attempt + 1) + "/" + maxRetries);

681

682

// Exponential backoff

683

int delay = (int) Math.pow(2, attempt) * 1000;

684

685

ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

686

scheduler.schedule(() -> {

687

writeWithRetry(session, message, maxRetries, attempt + 1);

688

scheduler.shutdown();

689

}, delay, TimeUnit.MILLISECONDS);

690

} else {

691

System.err.println("Write failed after " + (attempt + 1) + " attempts: " + cause);

692

handleFinalWriteFailure(session, message, cause);

693

}

694

}

695

}

696

});

697

}

698

699

private boolean isRetryableError(Throwable cause) {

700

return cause instanceof WriteTimeoutException ||

701

(cause instanceof IOException && !(cause instanceof WriteToClosedSessionException));

702

}

703

704

public CompletableFuture<Object> robustRequestResponse(IoSession session, Object request, long timeoutMs) {

705

CompletableFuture<Object> result = new CompletableFuture<>();

706

707

// Set up timeout

708

ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();

709

ScheduledFuture<?> timeoutTask = timeoutExecutor.schedule(() -> {

710

result.completeExceptionally(new TimeoutException("Request timed out after " + timeoutMs + "ms"));

711

}, timeoutMs, TimeUnit.MILLISECONDS);

712

713

// Send request

714

WriteFuture writeFuture = session.write(request);

715

716

writeFuture.addListener(new IoFutureListener<WriteFuture>() {

717

@Override

718

public void operationComplete(WriteFuture future) {

719

if (future.isWritten()) {

720

// Request sent, wait for response

721

ReadFuture readFuture = session.read();

722

723

readFuture.addListener(new IoFutureListener<ReadFuture>() {

724

@Override

725

public void operationComplete(ReadFuture readFuture) {

726

if (!result.isDone()) { // Check if not already timed out

727

timeoutTask.cancel(false);

728

timeoutExecutor.shutdown();

729

730

if (readFuture.isRead()) {

731

result.complete(readFuture.getMessage());

732

} else if (readFuture.isClosed()) {

733

result.completeExceptionally(new IOException("Session closed during read"));

734

} else {

735

result.completeExceptionally(readFuture.getException());

736

}

737

}

738

}

739

});

740

} else {

741

if (!result.isDone()) {

742

timeoutTask.cancel(false);

743

timeoutExecutor.shutdown();

744

result.completeExceptionally(future.getException());

745

}

746

}

747

}

748

});

749

750

return result;

751

}

752

}

753

```

754

755

MINA Core's asynchronous programming model provides powerful tools for building scalable, non-blocking network applications. The Future-based approach allows for clean composition of async operations while maintaining high performance under load.