0
# Async Operations
1
2
MINA Core provides a comprehensive Future-based asynchronous programming model. All I/O operations return Future objects that allow you to handle operations asynchronously without blocking threads, enabling highly scalable network applications.
3
4
## IoFuture Hierarchy
5
6
### Base IoFuture Interface
7
8
```java { .api }
9
public interface IoFuture {
10
// Associated session
11
IoSession getSession();
12
13
// Blocking wait methods
14
IoFuture await() throws InterruptedException;
15
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
16
boolean await(long timeoutMillis) throws InterruptedException;
17
18
// Uninterruptible wait methods
19
IoFuture awaitUninterruptibly();
20
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
21
boolean awaitUninterruptibly(long timeoutMillis);
22
23
// Completion status
24
boolean isDone();
25
26
// Event listeners
27
IoFuture addListener(IoFutureListener<?> listener);
28
IoFuture removeListener(IoFutureListener<?> listener);
29
}
30
```
31
32
### Specialized Future Types
33
34
#### ConnectFuture
35
36
Future for connection operations:
37
38
```java { .api }
39
public interface ConnectFuture extends IoFuture {
40
// Connection result
41
IoSession getSession();
42
boolean isConnected();
43
boolean isCanceled();
44
45
// Connection control
46
boolean cancel();
47
48
// Exception handling
49
Throwable getException();
50
51
// Typed listener methods
52
ConnectFuture addListener(IoFutureListener<? extends ConnectFuture> listener);
53
ConnectFuture removeListener(IoFutureListener<? extends ConnectFuture> listener);
54
ConnectFuture await() throws InterruptedException;
55
ConnectFuture awaitUninterruptibly();
56
}
57
```
58
59
#### WriteFuture
60
61
Future for write operations:
62
63
```java { .api }
64
public interface WriteFuture extends IoFuture {
65
// Write result
66
boolean isWritten();
67
68
// Exception handling
69
Throwable getException();
70
71
// Typed listener methods
72
WriteFuture addListener(IoFutureListener<? extends WriteFuture> listener);
73
WriteFuture removeListener(IoFutureListener<? extends WriteFuture> listener);
74
WriteFuture await() throws InterruptedException;
75
WriteFuture awaitUninterruptibly();
76
}
77
```
78
79
#### ReadFuture
80
81
Future for read operations:
82
83
```java { .api }
84
public interface ReadFuture extends IoFuture {
85
// Read result
86
Object getMessage();
87
boolean isRead();
88
boolean isClosed();
89
90
// Exception handling
91
Throwable getException();
92
93
// Typed listener methods
94
ReadFuture addListener(IoFutureListener<? extends ReadFuture> listener);
95
ReadFuture removeListener(IoFutureListener<? extends ReadFuture> listener);
96
ReadFuture await() throws InterruptedException;
97
ReadFuture awaitUninterruptibly();
98
}
99
```
100
101
#### CloseFuture
102
103
Future for close operations:
104
105
```java { .api }
106
public interface CloseFuture extends IoFuture {
107
// Close result
108
boolean isClosed();
109
110
// Typed listener methods
111
CloseFuture addListener(IoFutureListener<? extends CloseFuture> listener);
112
CloseFuture removeListener(IoFutureListener<? extends CloseFuture> listener);
113
CloseFuture await() throws InterruptedException;
114
CloseFuture awaitUninterruptibly();
115
}
116
```
117
118
## Asynchronous Connection Handling
119
120
### Basic Async Connection
121
122
```java { .api }
123
public class AsyncConnectionExample {
124
125
public void connectAsync() {
126
NioSocketConnector connector = new NioSocketConnector();
127
connector.setHandler(new ClientHandler());
128
129
// Initiate async connection
130
ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 8080));
131
132
// Add completion listener
133
future.addListener(new IoFutureListener<ConnectFuture>() {
134
@Override
135
public void operationComplete(ConnectFuture future) {
136
if (future.isConnected()) {
137
System.out.println("Connected successfully!");
138
IoSession session = future.getSession();
139
session.write("Hello Server!");
140
} else {
141
System.err.println("Connection failed: " + future.getException());
142
connector.dispose();
143
}
144
}
145
});
146
147
// Continue with other work while connection proceeds asynchronously
148
performOtherWork();
149
}
150
151
public void connectWithTimeout() {
152
NioSocketConnector connector = new NioSocketConnector();
153
connector.setConnectTimeoutMillis(5000); // 5 second timeout
154
155
ConnectFuture future = connector.connect(new InetSocketAddress("remote.example.com", 8080));
156
157
// Wait with timeout
158
boolean connected = future.awaitUninterruptibly(10000); // 10 second wait
159
160
if (connected && future.isConnected()) {
161
IoSession session = future.getSession();
162
System.out.println("Connected to: " + session.getRemoteAddress());
163
} else if (future.isCanceled()) {
164
System.out.println("Connection was canceled");
165
} else {
166
System.out.println("Connection timed out or failed");
167
Throwable cause = future.getException();
168
if (cause != null) {
169
cause.printStackTrace();
170
}
171
}
172
}
173
}
174
```
175
176
### Connection Pooling with Futures
177
178
```java { .api }
179
public class AsyncConnectionPool {
180
private final NioSocketConnector connector;
181
private final BlockingQueue<IoSession> availableSessions;
182
private final Set<IoSession> allSessions;
183
private final String host;
184
private final int port;
185
private final int maxConnections;
186
187
public AsyncConnectionPool(String host, int port, int maxConnections) {
188
this.host = host;
189
this.port = port;
190
this.maxConnections = maxConnections;
191
this.connector = new NioSocketConnector();
192
this.availableSessions = new LinkedBlockingQueue<>();
193
this.allSessions = Collections.synchronizedSet(new HashSet<>());
194
195
connector.setHandler(new PooledSessionHandler());
196
}
197
198
public CompletableFuture<IoSession> getSession() {
199
CompletableFuture<IoSession> result = new CompletableFuture<>();
200
201
// Try to get existing session
202
IoSession session = availableSessions.poll();
203
if (session != null && session.isConnected()) {
204
result.complete(session);
205
return result;
206
}
207
208
// Create new connection if under limit
209
if (allSessions.size() < maxConnections) {
210
ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, port));
211
connectFuture.addListener(new IoFutureListener<ConnectFuture>() {
212
@Override
213
public void operationComplete(ConnectFuture future) {
214
if (future.isConnected()) {
215
IoSession newSession = future.getSession();
216
allSessions.add(newSession);
217
result.complete(newSession);
218
} else {
219
result.completeExceptionally(future.getException());
220
}
221
}
222
});
223
} else {
224
result.completeExceptionally(new RuntimeException("Connection pool exhausted"));
225
}
226
227
return result;
228
}
229
230
public void returnSession(IoSession session) {
231
if (session.isConnected()) {
232
availableSessions.offer(session);
233
} else {
234
allSessions.remove(session);
235
}
236
}
237
238
public CompletableFuture<Void> shutdown() {
239
CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
240
List<CloseFuture> closeFutures = new ArrayList<>();
241
242
// Close all sessions
243
for (IoSession session : allSessions) {
244
closeFutures.add(session.closeNow());
245
}
246
247
// Wait for all closes to complete
248
if (closeFutures.isEmpty()) {
249
connector.dispose();
250
shutdownFuture.complete(null);
251
} else {
252
CompletableFuture.allOf(closeFutures.stream()
253
.map(this::toCompletableFuture)
254
.toArray(CompletableFuture[]::new))
255
.thenRun(() -> {
256
connector.dispose();
257
shutdownFuture.complete(null);
258
});
259
}
260
261
return shutdownFuture;
262
}
263
}
264
```
265
266
## Asynchronous Write Operations
267
268
### Basic Async Writes
269
270
```java { .api }
271
public class AsyncWriteExample {
272
273
public void writeAsync(IoSession session, Object message) {
274
WriteFuture future = session.write(message);
275
276
// Add completion listener
277
future.addListener(new IoFutureListener<WriteFuture>() {
278
@Override
279
public void operationComplete(WriteFuture future) {
280
if (future.isWritten()) {
281
System.out.println("Message sent successfully");
282
} else {
283
System.err.println("Write failed: " + future.getException());
284
// Handle write failure (retry, close session, etc.)
285
handleWriteFailure(session, message, future.getException());
286
}
287
}
288
});
289
}
290
291
public void writeWithConfirmation(IoSession session, String message) {
292
WriteFuture future = session.write(message);
293
294
// Wait for write completion with timeout
295
boolean written = future.awaitUninterruptibly(5000); // 5 second timeout
296
297
if (written && future.isWritten()) {
298
System.out.println("Message confirmed sent: " + message);
299
} else {
300
System.err.println("Write timeout or failed for message: " + message);
301
if (future.getException() != null) {
302
future.getException().printStackTrace();
303
}
304
}
305
}
306
307
private void handleWriteFailure(IoSession session, Object message, Throwable cause) {
308
if (cause instanceof WriteTimeoutException) {
309
System.err.println("Write timeout - session may be slow");
310
// Consider reducing write frequency or closing session
311
} else if (cause instanceof WriteToClosedSessionException) {
312
System.err.println("Attempted to write to closed session");
313
// Clean up and stop writing
314
} else {
315
System.err.println("Unexpected write error: " + cause.getMessage());
316
// Log error and potentially retry
317
}
318
}
319
}
320
```
321
322
### Bulk Write Operations
323
324
```java { .api }
325
public class BulkAsyncWrites {
326
327
public CompletableFuture<Void> writeMultipleMessages(IoSession session, List<Object> messages) {
328
List<WriteFuture> writeFutures = new ArrayList<>();
329
330
// Initiate all writes
331
for (Object message : messages) {
332
WriteFuture future = session.write(message);
333
writeFutures.add(future);
334
}
335
336
// Convert MINA futures to CompletableFuture
337
CompletableFuture<Void> result = CompletableFuture.allOf(
338
writeFutures.stream()
339
.map(this::toCompletableFuture)
340
.toArray(CompletableFuture[]::new)
341
);
342
343
return result;
344
}
345
346
public void writeSequentially(IoSession session, List<Object> messages) {
347
writeSequentiallyRecursive(session, messages, 0);
348
}
349
350
private void writeSequentiallyRecursive(IoSession session, List<Object> messages, int index) {
351
if (index >= messages.size()) {
352
System.out.println("All messages sent sequentially");
353
return;
354
}
355
356
WriteFuture future = session.write(messages.get(index));
357
future.addListener(new IoFutureListener<WriteFuture>() {
358
@Override
359
public void operationComplete(WriteFuture future) {
360
if (future.isWritten()) {
361
// Write next message
362
writeSequentiallyRecursive(session, messages, index + 1);
363
} else {
364
System.err.println("Sequential write failed at index " + index);
365
}
366
}
367
});
368
}
369
370
public void writeBatched(IoSession session, List<Object> messages, int batchSize) {
371
List<List<Object>> batches = partitionList(messages, batchSize);
372
writeBatchesSequentially(session, batches, 0);
373
}
374
375
private void writeBatchesSequentially(IoSession session, List<List<Object>> batches, int batchIndex) {
376
if (batchIndex >= batches.size()) {
377
System.out.println("All batches sent");
378
return;
379
}
380
381
List<Object> currentBatch = batches.get(batchIndex);
382
List<WriteFuture> batchFutures = new ArrayList<>();
383
384
// Send all messages in current batch
385
for (Object message : currentBatch) {
386
batchFutures.add(session.write(message));
387
}
388
389
// Wait for batch completion before sending next batch
390
CompletableFuture.allOf(
391
batchFutures.stream()
392
.map(this::toCompletableFuture)
393
.toArray(CompletableFuture[]::new)
394
).thenRun(() -> {
395
System.out.println("Batch " + batchIndex + " completed");
396
writeBatchesSequentially(session, batches, batchIndex + 1);
397
});
398
}
399
}
400
```
401
402
## Asynchronous Read Operations
403
404
### Enabling and Using Read Operations
405
406
```java { .api }
407
public class AsyncReadExample {
408
409
public void enableAsyncReads(IoSession session) {
410
// Enable read operations (disabled by default)
411
session.getConfig().setUseReadOperation(true);
412
}
413
414
public void readAsync(IoSession session) {
415
ReadFuture future = session.read();
416
417
future.addListener(new IoFutureListener<ReadFuture>() {
418
@Override
419
public void operationComplete(ReadFuture future) {
420
if (future.isRead()) {
421
Object message = future.getMessage();
422
System.out.println("Read message: " + message);
423
424
// Continue reading
425
readAsync(session);
426
} else if (future.isClosed()) {
427
System.out.println("Session closed during read");
428
} else {
429
System.err.println("Read failed: " + future.getException());
430
}
431
}
432
});
433
}
434
435
public Object readSync(IoSession session, long timeoutMillis) {
436
ReadFuture future = session.read();
437
438
boolean completed = future.awaitUninterruptibly(timeoutMillis);
439
440
if (completed && future.isRead()) {
441
return future.getMessage();
442
} else if (future.isClosed()) {
443
throw new RuntimeException("Session closed during read");
444
} else {
445
throw new RuntimeException("Read timeout or failed");
446
}
447
}
448
}
449
```
450
451
### Request-Response Pattern
452
453
```java { .api }
454
public class RequestResponsePattern {
455
456
public CompletableFuture<Object> sendRequest(IoSession session, Object request) {
457
CompletableFuture<Object> responseFuture = new CompletableFuture<>();
458
459
// Send request
460
WriteFuture writeFuture = session.write(request);
461
462
writeFuture.addListener(new IoFutureListener<WriteFuture>() {
463
@Override
464
public void operationComplete(WriteFuture future) {
465
if (future.isWritten()) {
466
// Request sent, now wait for response
467
ReadFuture readFuture = session.read();
468
readFuture.addListener(new IoFutureListener<ReadFuture>() {
469
@Override
470
public void operationComplete(ReadFuture readFuture) {
471
if (readFuture.isRead()) {
472
responseFuture.complete(readFuture.getMessage());
473
} else {
474
responseFuture.completeExceptionally(
475
readFuture.getException() != null ?
476
readFuture.getException() :
477
new RuntimeException("Read failed")
478
);
479
}
480
}
481
});
482
} else {
483
responseFuture.completeExceptionally(
484
future.getException() != null ?
485
future.getException() :
486
new RuntimeException("Write failed")
487
);
488
}
489
}
490
});
491
492
return responseFuture;
493
}
494
495
public CompletableFuture<List<Object>> sendMultipleRequests(IoSession session, List<Object> requests) {
496
List<CompletableFuture<Object>> requestFutures = new ArrayList<>();
497
498
for (Object request : requests) {
499
requestFutures.add(sendRequest(session, request));
500
}
501
502
return CompletableFuture.allOf(requestFutures.toArray(new CompletableFuture[0]))
503
.thenApply(v -> requestFutures.stream()
504
.map(CompletableFuture::join)
505
.collect(Collectors.toList())
506
);
507
}
508
}
509
```
510
511
## Session Close Handling
512
513
### Async Session Closure
514
515
```java { .api }
516
public class AsyncCloseExample {
517
518
public void closeGracefully(IoSession session) {
519
// Close after pending writes complete
520
CloseFuture future = session.closeOnFlush();
521
522
future.addListener(new IoFutureListener<CloseFuture>() {
523
@Override
524
public void operationComplete(CloseFuture future) {
525
System.out.println("Session closed gracefully: " + session.getId());
526
cleanupSessionResources(session);
527
}
528
});
529
}
530
531
public void closeImmediately(IoSession session) {
532
// Close immediately, discarding pending writes
533
CloseFuture future = session.closeNow();
534
535
future.addListener(new IoFutureListener<CloseFuture>() {
536
@Override
537
public void operationComplete(CloseFuture future) {
538
System.out.println("Session closed immediately: " + session.getId());
539
cleanupSessionResources(session);
540
}
541
});
542
}
543
544
public CompletableFuture<Void> closeMultipleSessions(Collection<IoSession> sessions) {
545
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
546
547
for (IoSession session : sessions) {
548
CloseFuture closeFuture = session.closeOnFlush();
549
closeFutures.add(toCompletableFuture(closeFuture));
550
}
551
552
return CompletableFuture.allOf(closeFutures.toArray(new CompletableFuture[0]));
553
}
554
555
public void waitForClose(IoSession session, long timeoutMillis) {
556
CloseFuture closeFuture = session.getCloseFuture();
557
558
boolean closed = closeFuture.awaitUninterruptibly(timeoutMillis);
559
560
if (closed) {
561
System.out.println("Session closed within timeout");
562
} else {
563
System.out.println("Session close timed out");
564
}
565
}
566
}
567
```
568
569
## Future Composition and Chaining
570
571
### Converting MINA Futures to CompletableFuture
572
573
```java { .api }
574
public class FutureComposition {
575
576
public <T extends IoFuture> CompletableFuture<T> toCompletableFuture(T minaFuture) {
577
CompletableFuture<T> completableFuture = new CompletableFuture<>();
578
579
minaFuture.addListener(new IoFutureListener<T>() {
580
@Override
581
public void operationComplete(T future) {
582
completableFuture.complete(future);
583
}
584
});
585
586
return completableFuture;
587
}
588
589
public CompletableFuture<IoSession> connectThenAuthenticate(String host, int port, String username, String password) {
590
NioSocketConnector connector = new NioSocketConnector();
591
592
return toCompletableFuture(connector.connect(new InetSocketAddress(host, port)))
593
.thenCompose(connectFuture -> {
594
if (connectFuture.isConnected()) {
595
IoSession session = connectFuture.getSession();
596
597
// Send authentication
598
AuthRequest authReq = new AuthRequest(username, password);
599
return toCompletableFuture(session.write(authReq))
600
.thenCompose(writeFuture -> {
601
if (writeFuture.isWritten()) {
602
// Wait for auth response
603
return toCompletableFuture(session.read())
604
.thenApply(readFuture -> {
605
if (readFuture.isRead()) {
606
AuthResponse response = (AuthResponse) readFuture.getMessage();
607
if (response.isSuccess()) {
608
return session;
609
} else {
610
session.closeNow();
611
throw new RuntimeException("Authentication failed");
612
}
613
} else {
614
session.closeNow();
615
throw new RuntimeException("Failed to read auth response");
616
}
617
});
618
} else {
619
session.closeNow();
620
throw new RuntimeException("Failed to send auth request");
621
}
622
});
623
} else {
624
throw new RuntimeException("Connection failed: " + connectFuture.getException());
625
}
626
});
627
}
628
629
public CompletableFuture<String> performComplexOperation(IoSession session) {
630
return sendCommand(session, "INIT")
631
.thenCompose(initResponse -> sendCommand(session, "SETUP " + initResponse))
632
.thenCompose(setupResponse -> sendCommand(session, "EXECUTE"))
633
.thenCompose(executeResponse -> sendCommand(session, "FINALIZE"))
634
.thenApply(finalizeResponse -> "Operation completed: " + finalizeResponse);
635
}
636
637
private CompletableFuture<String> sendCommand(IoSession session, String command) {
638
return toCompletableFuture(session.write(command))
639
.thenCompose(writeFuture -> {
640
if (writeFuture.isWritten()) {
641
return toCompletableFuture(session.read())
642
.thenApply(readFuture -> {
643
if (readFuture.isRead()) {
644
return readFuture.getMessage().toString();
645
} else {
646
throw new RuntimeException("Failed to read response for: " + command);
647
}
648
});
649
} else {
650
throw new RuntimeException("Failed to send command: " + command);
651
}
652
});
653
}
654
}
655
```
656
657
## Error Handling in Async Operations
658
659
### Comprehensive Error Handling
660
661
```java { .api }
662
public class AsyncErrorHandling {
663
664
public void robustAsyncWrite(IoSession session, Object message, int maxRetries) {
665
writeWithRetry(session, message, maxRetries, 0);
666
}
667
668
private void writeWithRetry(IoSession session, Object message, int maxRetries, int attempt) {
669
WriteFuture future = session.write(message);
670
671
future.addListener(new IoFutureListener<WriteFuture>() {
672
@Override
673
public void operationComplete(WriteFuture future) {
674
if (future.isWritten()) {
675
System.out.println("Message sent successfully on attempt " + (attempt + 1));
676
} else {
677
Throwable cause = future.getException();
678
679
if (attempt < maxRetries && isRetryableError(cause)) {
680
System.out.println("Retrying write attempt " + (attempt + 1) + "/" + maxRetries);
681
682
// Exponential backoff
683
int delay = (int) Math.pow(2, attempt) * 1000;
684
685
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
686
scheduler.schedule(() -> {
687
writeWithRetry(session, message, maxRetries, attempt + 1);
688
scheduler.shutdown();
689
}, delay, TimeUnit.MILLISECONDS);
690
} else {
691
System.err.println("Write failed after " + (attempt + 1) + " attempts: " + cause);
692
handleFinalWriteFailure(session, message, cause);
693
}
694
}
695
}
696
});
697
}
698
699
private boolean isRetryableError(Throwable cause) {
700
return cause instanceof WriteTimeoutException ||
701
(cause instanceof IOException && !(cause instanceof WriteToClosedSessionException));
702
}
703
704
public CompletableFuture<Object> robustRequestResponse(IoSession session, Object request, long timeoutMs) {
705
CompletableFuture<Object> result = new CompletableFuture<>();
706
707
// Set up timeout
708
ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
709
ScheduledFuture<?> timeoutTask = timeoutExecutor.schedule(() -> {
710
result.completeExceptionally(new TimeoutException("Request timed out after " + timeoutMs + "ms"));
711
}, timeoutMs, TimeUnit.MILLISECONDS);
712
713
// Send request
714
WriteFuture writeFuture = session.write(request);
715
716
writeFuture.addListener(new IoFutureListener<WriteFuture>() {
717
@Override
718
public void operationComplete(WriteFuture future) {
719
if (future.isWritten()) {
720
// Request sent, wait for response
721
ReadFuture readFuture = session.read();
722
723
readFuture.addListener(new IoFutureListener<ReadFuture>() {
724
@Override
725
public void operationComplete(ReadFuture readFuture) {
726
if (!result.isDone()) { // Check if not already timed out
727
timeoutTask.cancel(false);
728
timeoutExecutor.shutdown();
729
730
if (readFuture.isRead()) {
731
result.complete(readFuture.getMessage());
732
} else if (readFuture.isClosed()) {
733
result.completeExceptionally(new IOException("Session closed during read"));
734
} else {
735
result.completeExceptionally(readFuture.getException());
736
}
737
}
738
}
739
});
740
} else {
741
if (!result.isDone()) {
742
timeoutTask.cancel(false);
743
timeoutExecutor.shutdown();
744
result.completeExceptionally(future.getException());
745
}
746
}
747
}
748
});
749
750
return result;
751
}
752
}
753
```
754
755
MINA Core's asynchronous programming model provides powerful tools for building scalable, non-blocking network applications. The Future-based approach allows for clean composition of async operations while maintaining high performance under load.