Apache MINA Core is a network application framework for building high-performance, scalable network applications with event-driven, asynchronous I/O over various transports including TCP/IP and UDP/IP via Java NIO.
—
MINA Core provides a comprehensive Future-based asynchronous programming model. All I/O operations return Future objects that allow you to handle operations asynchronously without blocking threads, enabling highly scalable network applications.
public interface IoFuture {
// Associated session
IoSession getSession();
// Blocking wait methods
IoFuture await() throws InterruptedException;
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
// Uninterruptible wait methods
IoFuture awaitUninterruptibly();
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// Completion status
boolean isDone();
// Event listeners
IoFuture addListener(IoFutureListener<?> listener);
IoFuture removeListener(IoFutureListener<?> listener);
}Future for connection operations:
public interface ConnectFuture extends IoFuture {
// Connection result
IoSession getSession();
boolean isConnected();
boolean isCanceled();
// Connection control
boolean cancel();
// Exception handling
Throwable getException();
// Typed listener methods
ConnectFuture addListener(IoFutureListener<? extends ConnectFuture> listener);
ConnectFuture removeListener(IoFutureListener<? extends ConnectFuture> listener);
ConnectFuture await() throws InterruptedException;
ConnectFuture awaitUninterruptibly();
}Future for write operations:
public interface WriteFuture extends IoFuture {
// Write result
boolean isWritten();
// Exception handling
Throwable getException();
// Typed listener methods
WriteFuture addListener(IoFutureListener<? extends WriteFuture> listener);
WriteFuture removeListener(IoFutureListener<? extends WriteFuture> listener);
WriteFuture await() throws InterruptedException;
WriteFuture awaitUninterruptibly();
}Future for read operations:
public interface ReadFuture extends IoFuture {
// Read result
Object getMessage();
boolean isRead();
boolean isClosed();
// Exception handling
Throwable getException();
// Typed listener methods
ReadFuture addListener(IoFutureListener<? extends ReadFuture> listener);
ReadFuture removeListener(IoFutureListener<? extends ReadFuture> listener);
ReadFuture await() throws InterruptedException;
ReadFuture awaitUninterruptibly();
}Future for close operations:
public interface CloseFuture extends IoFuture {
// Close result
boolean isClosed();
// Typed listener methods
CloseFuture addListener(IoFutureListener<? extends CloseFuture> listener);
CloseFuture removeListener(IoFutureListener<? extends CloseFuture> listener);
CloseFuture await() throws InterruptedException;
CloseFuture awaitUninterruptibly();
}public class AsyncConnectionExample {
public void connectAsync() {
NioSocketConnector connector = new NioSocketConnector();
connector.setHandler(new ClientHandler());
// Initiate async connection
ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 8080));
// Add completion listener
future.addListener(new IoFutureListener<ConnectFuture>() {
@Override
public void operationComplete(ConnectFuture future) {
if (future.isConnected()) {
System.out.println("Connected successfully!");
IoSession session = future.getSession();
session.write("Hello Server!");
} else {
System.err.println("Connection failed: " + future.getException());
connector.dispose();
}
}
});
// Continue with other work while connection proceeds asynchronously
performOtherWork();
}
public void connectWithTimeout() {
NioSocketConnector connector = new NioSocketConnector();
connector.setConnectTimeoutMillis(5000); // 5 second timeout
ConnectFuture future = connector.connect(new InetSocketAddress("remote.example.com", 8080));
// Wait with timeout
boolean connected = future.awaitUninterruptibly(10000); // 10 second wait
if (connected && future.isConnected()) {
IoSession session = future.getSession();
System.out.println("Connected to: " + session.getRemoteAddress());
} else if (future.isCanceled()) {
System.out.println("Connection was canceled");
} else {
System.out.println("Connection timed out or failed");
Throwable cause = future.getException();
if (cause != null) {
cause.printStackTrace();
}
}
}
}public class AsyncConnectionPool {
private final NioSocketConnector connector;
private final BlockingQueue<IoSession> availableSessions;
private final Set<IoSession> allSessions;
private final String host;
private final int port;
private final int maxConnections;
public AsyncConnectionPool(String host, int port, int maxConnections) {
this.host = host;
this.port = port;
this.maxConnections = maxConnections;
this.connector = new NioSocketConnector();
this.availableSessions = new LinkedBlockingQueue<>();
this.allSessions = Collections.synchronizedSet(new HashSet<>());
connector.setHandler(new PooledSessionHandler());
}
public CompletableFuture<IoSession> getSession() {
CompletableFuture<IoSession> result = new CompletableFuture<>();
// Try to get existing session
IoSession session = availableSessions.poll();
if (session != null && session.isConnected()) {
result.complete(session);
return result;
}
// Create new connection if under limit
if (allSessions.size() < maxConnections) {
ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, port));
connectFuture.addListener(new IoFutureListener<ConnectFuture>() {
@Override
public void operationComplete(ConnectFuture future) {
if (future.isConnected()) {
IoSession newSession = future.getSession();
allSessions.add(newSession);
result.complete(newSession);
} else {
result.completeExceptionally(future.getException());
}
}
});
} else {
result.completeExceptionally(new RuntimeException("Connection pool exhausted"));
}
return result;
}
public void returnSession(IoSession session) {
if (session.isConnected()) {
availableSessions.offer(session);
} else {
allSessions.remove(session);
}
}
public CompletableFuture<Void> shutdown() {
CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
List<CloseFuture> closeFutures = new ArrayList<>();
// Close all sessions
for (IoSession session : allSessions) {
closeFutures.add(session.closeNow());
}
// Wait for all closes to complete
if (closeFutures.isEmpty()) {
connector.dispose();
shutdownFuture.complete(null);
} else {
CompletableFuture.allOf(closeFutures.stream()
.map(this::toCompletableFuture)
.toArray(CompletableFuture[]::new))
.thenRun(() -> {
connector.dispose();
shutdownFuture.complete(null);
});
}
return shutdownFuture;
}
}public class AsyncWriteExample {
public void writeAsync(IoSession session, Object message) {
WriteFuture future = session.write(message);
// Add completion listener
future.addListener(new IoFutureListener<WriteFuture>() {
@Override
public void operationComplete(WriteFuture future) {
if (future.isWritten()) {
System.out.println("Message sent successfully");
} else {
System.err.println("Write failed: " + future.getException());
// Handle write failure (retry, close session, etc.)
handleWriteFailure(session, message, future.getException());
}
}
});
}
public void writeWithConfirmation(IoSession session, String message) {
WriteFuture future = session.write(message);
// Wait for write completion with timeout
boolean written = future.awaitUninterruptibly(5000); // 5 second timeout
if (written && future.isWritten()) {
System.out.println("Message confirmed sent: " + message);
} else {
System.err.println("Write timeout or failed for message: " + message);
if (future.getException() != null) {
future.getException().printStackTrace();
}
}
}
private void handleWriteFailure(IoSession session, Object message, Throwable cause) {
if (cause instanceof WriteTimeoutException) {
System.err.println("Write timeout - session may be slow");
// Consider reducing write frequency or closing session
} else if (cause instanceof WriteToClosedSessionException) {
System.err.println("Attempted to write to closed session");
// Clean up and stop writing
} else {
System.err.println("Unexpected write error: " + cause.getMessage());
// Log error and potentially retry
}
}
}public class BulkAsyncWrites {
public CompletableFuture<Void> writeMultipleMessages(IoSession session, List<Object> messages) {
List<WriteFuture> writeFutures = new ArrayList<>();
// Initiate all writes
for (Object message : messages) {
WriteFuture future = session.write(message);
writeFutures.add(future);
}
// Convert MINA futures to CompletableFuture
CompletableFuture<Void> result = CompletableFuture.allOf(
writeFutures.stream()
.map(this::toCompletableFuture)
.toArray(CompletableFuture[]::new)
);
return result;
}
public void writeSequentially(IoSession session, List<Object> messages) {
writeSequentiallyRecursive(session, messages, 0);
}
private void writeSequentiallyRecursive(IoSession session, List<Object> messages, int index) {
if (index >= messages.size()) {
System.out.println("All messages sent sequentially");
return;
}
WriteFuture future = session.write(messages.get(index));
future.addListener(new IoFutureListener<WriteFuture>() {
@Override
public void operationComplete(WriteFuture future) {
if (future.isWritten()) {
// Write next message
writeSequentiallyRecursive(session, messages, index + 1);
} else {
System.err.println("Sequential write failed at index " + index);
}
}
});
}
public void writeBatched(IoSession session, List<Object> messages, int batchSize) {
List<List<Object>> batches = partitionList(messages, batchSize);
writeBatchesSequentially(session, batches, 0);
}
private void writeBatchesSequentially(IoSession session, List<List<Object>> batches, int batchIndex) {
if (batchIndex >= batches.size()) {
System.out.println("All batches sent");
return;
}
List<Object> currentBatch = batches.get(batchIndex);
List<WriteFuture> batchFutures = new ArrayList<>();
// Send all messages in current batch
for (Object message : currentBatch) {
batchFutures.add(session.write(message));
}
// Wait for batch completion before sending next batch
CompletableFuture.allOf(
batchFutures.stream()
.map(this::toCompletableFuture)
.toArray(CompletableFuture[]::new)
).thenRun(() -> {
System.out.println("Batch " + batchIndex + " completed");
writeBatchesSequentially(session, batches, batchIndex + 1);
});
}
}public class AsyncReadExample {
public void enableAsyncReads(IoSession session) {
// Enable read operations (disabled by default)
session.getConfig().setUseReadOperation(true);
}
public void readAsync(IoSession session) {
ReadFuture future = session.read();
future.addListener(new IoFutureListener<ReadFuture>() {
@Override
public void operationComplete(ReadFuture future) {
if (future.isRead()) {
Object message = future.getMessage();
System.out.println("Read message: " + message);
// Continue reading
readAsync(session);
} else if (future.isClosed()) {
System.out.println("Session closed during read");
} else {
System.err.println("Read failed: " + future.getException());
}
}
});
}
public Object readSync(IoSession session, long timeoutMillis) {
ReadFuture future = session.read();
boolean completed = future.awaitUninterruptibly(timeoutMillis);
if (completed && future.isRead()) {
return future.getMessage();
} else if (future.isClosed()) {
throw new RuntimeException("Session closed during read");
} else {
throw new RuntimeException("Read timeout or failed");
}
}
}public class RequestResponsePattern {
public CompletableFuture<Object> sendRequest(IoSession session, Object request) {
CompletableFuture<Object> responseFuture = new CompletableFuture<>();
// Send request
WriteFuture writeFuture = session.write(request);
writeFuture.addListener(new IoFutureListener<WriteFuture>() {
@Override
public void operationComplete(WriteFuture future) {
if (future.isWritten()) {
// Request sent, now wait for response
ReadFuture readFuture = session.read();
readFuture.addListener(new IoFutureListener<ReadFuture>() {
@Override
public void operationComplete(ReadFuture readFuture) {
if (readFuture.isRead()) {
responseFuture.complete(readFuture.getMessage());
} else {
responseFuture.completeExceptionally(
readFuture.getException() != null ?
readFuture.getException() :
new RuntimeException("Read failed")
);
}
}
});
} else {
responseFuture.completeExceptionally(
future.getException() != null ?
future.getException() :
new RuntimeException("Write failed")
);
}
}
});
return responseFuture;
}
public CompletableFuture<List<Object>> sendMultipleRequests(IoSession session, List<Object> requests) {
List<CompletableFuture<Object>> requestFutures = new ArrayList<>();
for (Object request : requests) {
requestFutures.add(sendRequest(session, request));
}
return CompletableFuture.allOf(requestFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> requestFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
}public class AsyncCloseExample {
public void closeGracefully(IoSession session) {
// Close after pending writes complete
CloseFuture future = session.closeOnFlush();
future.addListener(new IoFutureListener<CloseFuture>() {
@Override
public void operationComplete(CloseFuture future) {
System.out.println("Session closed gracefully: " + session.getId());
cleanupSessionResources(session);
}
});
}
public void closeImmediately(IoSession session) {
// Close immediately, discarding pending writes
CloseFuture future = session.closeNow();
future.addListener(new IoFutureListener<CloseFuture>() {
@Override
public void operationComplete(CloseFuture future) {
System.out.println("Session closed immediately: " + session.getId());
cleanupSessionResources(session);
}
});
}
public CompletableFuture<Void> closeMultipleSessions(Collection<IoSession> sessions) {
List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
for (IoSession session : sessions) {
CloseFuture closeFuture = session.closeOnFlush();
closeFutures.add(toCompletableFuture(closeFuture));
}
return CompletableFuture.allOf(closeFutures.toArray(new CompletableFuture[0]));
}
public void waitForClose(IoSession session, long timeoutMillis) {
CloseFuture closeFuture = session.getCloseFuture();
boolean closed = closeFuture.awaitUninterruptibly(timeoutMillis);
if (closed) {
System.out.println("Session closed within timeout");
} else {
System.out.println("Session close timed out");
}
}
}public class FutureComposition {
public <T extends IoFuture> CompletableFuture<T> toCompletableFuture(T minaFuture) {
CompletableFuture<T> completableFuture = new CompletableFuture<>();
minaFuture.addListener(new IoFutureListener<T>() {
@Override
public void operationComplete(T future) {
completableFuture.complete(future);
}
});
return completableFuture;
}
public CompletableFuture<IoSession> connectThenAuthenticate(String host, int port, String username, String password) {
NioSocketConnector connector = new NioSocketConnector();
return toCompletableFuture(connector.connect(new InetSocketAddress(host, port)))
.thenCompose(connectFuture -> {
if (connectFuture.isConnected()) {
IoSession session = connectFuture.getSession();
// Send authentication
AuthRequest authReq = new AuthRequest(username, password);
return toCompletableFuture(session.write(authReq))
.thenCompose(writeFuture -> {
if (writeFuture.isWritten()) {
// Wait for auth response
return toCompletableFuture(session.read())
.thenApply(readFuture -> {
if (readFuture.isRead()) {
AuthResponse response = (AuthResponse) readFuture.getMessage();
if (response.isSuccess()) {
return session;
} else {
session.closeNow();
throw new RuntimeException("Authentication failed");
}
} else {
session.closeNow();
throw new RuntimeException("Failed to read auth response");
}
});
} else {
session.closeNow();
throw new RuntimeException("Failed to send auth request");
}
});
} else {
throw new RuntimeException("Connection failed: " + connectFuture.getException());
}
});
}
public CompletableFuture<String> performComplexOperation(IoSession session) {
return sendCommand(session, "INIT")
.thenCompose(initResponse -> sendCommand(session, "SETUP " + initResponse))
.thenCompose(setupResponse -> sendCommand(session, "EXECUTE"))
.thenCompose(executeResponse -> sendCommand(session, "FINALIZE"))
.thenApply(finalizeResponse -> "Operation completed: " + finalizeResponse);
}
private CompletableFuture<String> sendCommand(IoSession session, String command) {
return toCompletableFuture(session.write(command))
.thenCompose(writeFuture -> {
if (writeFuture.isWritten()) {
return toCompletableFuture(session.read())
.thenApply(readFuture -> {
if (readFuture.isRead()) {
return readFuture.getMessage().toString();
} else {
throw new RuntimeException("Failed to read response for: " + command);
}
});
} else {
throw new RuntimeException("Failed to send command: " + command);
}
});
}
}public class AsyncErrorHandling {
public void robustAsyncWrite(IoSession session, Object message, int maxRetries) {
writeWithRetry(session, message, maxRetries, 0);
}
private void writeWithRetry(IoSession session, Object message, int maxRetries, int attempt) {
WriteFuture future = session.write(message);
future.addListener(new IoFutureListener<WriteFuture>() {
@Override
public void operationComplete(WriteFuture future) {
if (future.isWritten()) {
System.out.println("Message sent successfully on attempt " + (attempt + 1));
} else {
Throwable cause = future.getException();
if (attempt < maxRetries && isRetryableError(cause)) {
System.out.println("Retrying write attempt " + (attempt + 1) + "/" + maxRetries);
// Exponential backoff
int delay = (int) Math.pow(2, attempt) * 1000;
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.schedule(() -> {
writeWithRetry(session, message, maxRetries, attempt + 1);
scheduler.shutdown();
}, delay, TimeUnit.MILLISECONDS);
} else {
System.err.println("Write failed after " + (attempt + 1) + " attempts: " + cause);
handleFinalWriteFailure(session, message, cause);
}
}
}
});
}
private boolean isRetryableError(Throwable cause) {
return cause instanceof WriteTimeoutException ||
(cause instanceof IOException && !(cause instanceof WriteToClosedSessionException));
}
public CompletableFuture<Object> robustRequestResponse(IoSession session, Object request, long timeoutMs) {
CompletableFuture<Object> result = new CompletableFuture<>();
// Set up timeout
ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture<?> timeoutTask = timeoutExecutor.schedule(() -> {
result.completeExceptionally(new TimeoutException("Request timed out after " + timeoutMs + "ms"));
}, timeoutMs, TimeUnit.MILLISECONDS);
// Send request
WriteFuture writeFuture = session.write(request);
writeFuture.addListener(new IoFutureListener<WriteFuture>() {
@Override
public void operationComplete(WriteFuture future) {
if (future.isWritten()) {
// Request sent, wait for response
ReadFuture readFuture = session.read();
readFuture.addListener(new IoFutureListener<ReadFuture>() {
@Override
public void operationComplete(ReadFuture readFuture) {
if (!result.isDone()) { // Check if not already timed out
timeoutTask.cancel(false);
timeoutExecutor.shutdown();
if (readFuture.isRead()) {
result.complete(readFuture.getMessage());
} else if (readFuture.isClosed()) {
result.completeExceptionally(new IOException("Session closed during read"));
} else {
result.completeExceptionally(readFuture.getException());
}
}
}
});
} else {
if (!result.isDone()) {
timeoutTask.cancel(false);
timeoutExecutor.shutdown();
result.completeExceptionally(future.getException());
}
}
}
});
return result;
}
}MINA Core's asynchronous programming model provides powerful tools for building scalable, non-blocking network applications. The Future-based approach allows for clean composition of async operations while maintaining high performance under load.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-mina--mina-core