CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-mina--mina-core

Apache MINA Core is a network application framework for building high-performance, scalable network applications with event-driven, asynchronous I/O over various transports including TCP/IP and UDP/IP via Java NIO.

Pending
Overview
Eval results
Files

async-operations.mddocs/

Async Operations

MINA Core provides a comprehensive Future-based asynchronous programming model. All I/O operations return Future objects that allow you to handle operations asynchronously without blocking threads, enabling highly scalable network applications.

IoFuture Hierarchy

Base IoFuture Interface

public interface IoFuture {
    // Associated session
    IoSession getSession();
    
    // Blocking wait methods
    IoFuture await() throws InterruptedException;
    boolean await(long timeout, TimeUnit unit) throws InterruptedException;
    boolean await(long timeoutMillis) throws InterruptedException;
    
    // Uninterruptible wait methods
    IoFuture awaitUninterruptibly();
    boolean awaitUninterruptibly(long timeout, TimeUnit unit);
    boolean awaitUninterruptibly(long timeoutMillis);
    
    // Completion status
    boolean isDone();
    
    // Event listeners
    IoFuture addListener(IoFutureListener<?> listener);
    IoFuture removeListener(IoFutureListener<?> listener);
}

Specialized Future Types

ConnectFuture

Future for connection operations:

public interface ConnectFuture extends IoFuture {
    // Connection result
    IoSession getSession();
    boolean isConnected();
    boolean isCanceled();
    
    // Connection control
    boolean cancel();
    
    // Exception handling
    Throwable getException();
    
    // Typed listener methods
    ConnectFuture addListener(IoFutureListener<? extends ConnectFuture> listener);
    ConnectFuture removeListener(IoFutureListener<? extends ConnectFuture> listener);
    ConnectFuture await() throws InterruptedException;
    ConnectFuture awaitUninterruptibly();
}

WriteFuture

Future for write operations:

public interface WriteFuture extends IoFuture {
    // Write result
    boolean isWritten();
    
    // Exception handling
    Throwable getException();
    
    // Typed listener methods
    WriteFuture addListener(IoFutureListener<? extends WriteFuture> listener);
    WriteFuture removeListener(IoFutureListener<? extends WriteFuture> listener);
    WriteFuture await() throws InterruptedException;
    WriteFuture awaitUninterruptibly();
}

ReadFuture

Future for read operations:

public interface ReadFuture extends IoFuture {
    // Read result
    Object getMessage();
    boolean isRead();
    boolean isClosed();
    
    // Exception handling
    Throwable getException();
    
    // Typed listener methods
    ReadFuture addListener(IoFutureListener<? extends ReadFuture> listener);
    ReadFuture removeListener(IoFutureListener<? extends ReadFuture> listener);
    ReadFuture await() throws InterruptedException;
    ReadFuture awaitUninterruptibly();
}

CloseFuture

Future for close operations:

public interface CloseFuture extends IoFuture {
    // Close result
    boolean isClosed();
    
    // Typed listener methods
    CloseFuture addListener(IoFutureListener<? extends CloseFuture> listener);
    CloseFuture removeListener(IoFutureListener<? extends CloseFuture> listener);
    CloseFuture await() throws InterruptedException;
    CloseFuture awaitUninterruptibly();
}

Asynchronous Connection Handling

Basic Async Connection

public class AsyncConnectionExample {
    
    public void connectAsync() {
        NioSocketConnector connector = new NioSocketConnector();
        connector.setHandler(new ClientHandler());
        
        // Initiate async connection
        ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 8080));
        
        // Add completion listener
        future.addListener(new IoFutureListener<ConnectFuture>() {
            @Override
            public void operationComplete(ConnectFuture future) {
                if (future.isConnected()) {
                    System.out.println("Connected successfully!");
                    IoSession session = future.getSession();
                    session.write("Hello Server!");
                } else {
                    System.err.println("Connection failed: " + future.getException());
                    connector.dispose();
                }
            }
        });
        
        // Continue with other work while connection proceeds asynchronously
        performOtherWork();
    }
    
    public void connectWithTimeout() {
        NioSocketConnector connector = new NioSocketConnector();
        connector.setConnectTimeoutMillis(5000); // 5 second timeout
        
        ConnectFuture future = connector.connect(new InetSocketAddress("remote.example.com", 8080));
        
        // Wait with timeout
        boolean connected = future.awaitUninterruptibly(10000); // 10 second wait
        
        if (connected && future.isConnected()) {
            IoSession session = future.getSession();
            System.out.println("Connected to: " + session.getRemoteAddress());
        } else if (future.isCanceled()) {
            System.out.println("Connection was canceled");
        } else {
            System.out.println("Connection timed out or failed");
            Throwable cause = future.getException();
            if (cause != null) {
                cause.printStackTrace();
            }
        }
    }
}

Connection Pooling with Futures

public class AsyncConnectionPool {
    private final NioSocketConnector connector;
    private final BlockingQueue<IoSession> availableSessions;
    private final Set<IoSession> allSessions;
    private final String host;
    private final int port;
    private final int maxConnections;
    
    public AsyncConnectionPool(String host, int port, int maxConnections) {
        this.host = host;
        this.port = port;
        this.maxConnections = maxConnections;
        this.connector = new NioSocketConnector();
        this.availableSessions = new LinkedBlockingQueue<>();
        this.allSessions = Collections.synchronizedSet(new HashSet<>());
        
        connector.setHandler(new PooledSessionHandler());
    }
    
    public CompletableFuture<IoSession> getSession() {
        CompletableFuture<IoSession> result = new CompletableFuture<>();
        
        // Try to get existing session
        IoSession session = availableSessions.poll();
        if (session != null && session.isConnected()) {
            result.complete(session);
            return result;
        }
        
        // Create new connection if under limit
        if (allSessions.size() < maxConnections) {
            ConnectFuture connectFuture = connector.connect(new InetSocketAddress(host, port));
            connectFuture.addListener(new IoFutureListener<ConnectFuture>() {
                @Override
                public void operationComplete(ConnectFuture future) {
                    if (future.isConnected()) {
                        IoSession newSession = future.getSession();
                        allSessions.add(newSession);
                        result.complete(newSession);
                    } else {
                        result.completeExceptionally(future.getException());
                    }
                }
            });
        } else {
            result.completeExceptionally(new RuntimeException("Connection pool exhausted"));
        }
        
        return result;
    }
    
    public void returnSession(IoSession session) {
        if (session.isConnected()) {
            availableSessions.offer(session);
        } else {
            allSessions.remove(session);
        }
    }
    
    public CompletableFuture<Void> shutdown() {
        CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
        List<CloseFuture> closeFutures = new ArrayList<>();
        
        // Close all sessions
        for (IoSession session : allSessions) {
            closeFutures.add(session.closeNow());
        }
        
        // Wait for all closes to complete
        if (closeFutures.isEmpty()) {
            connector.dispose();
            shutdownFuture.complete(null);
        } else {
            CompletableFuture.allOf(closeFutures.stream()
                .map(this::toCompletableFuture)
                .toArray(CompletableFuture[]::new))
            .thenRun(() -> {
                connector.dispose();
                shutdownFuture.complete(null);
            });
        }
        
        return shutdownFuture;
    }
}

Asynchronous Write Operations

Basic Async Writes

public class AsyncWriteExample {
    
    public void writeAsync(IoSession session, Object message) {
        WriteFuture future = session.write(message);
        
        // Add completion listener
        future.addListener(new IoFutureListener<WriteFuture>() {
            @Override
            public void operationComplete(WriteFuture future) {
                if (future.isWritten()) {
                    System.out.println("Message sent successfully");
                } else {
                    System.err.println("Write failed: " + future.getException());
                    // Handle write failure (retry, close session, etc.)
                    handleWriteFailure(session, message, future.getException());
                }
            }
        });
    }
    
    public void writeWithConfirmation(IoSession session, String message) {
        WriteFuture future = session.write(message);
        
        // Wait for write completion with timeout
        boolean written = future.awaitUninterruptibly(5000); // 5 second timeout
        
        if (written && future.isWritten()) {
            System.out.println("Message confirmed sent: " + message);
        } else {
            System.err.println("Write timeout or failed for message: " + message);
            if (future.getException() != null) {
                future.getException().printStackTrace();
            }
        }
    }
    
    private void handleWriteFailure(IoSession session, Object message, Throwable cause) {
        if (cause instanceof WriteTimeoutException) {
            System.err.println("Write timeout - session may be slow");
            // Consider reducing write frequency or closing session
        } else if (cause instanceof WriteToClosedSessionException) {
            System.err.println("Attempted to write to closed session");
            // Clean up and stop writing
        } else {
            System.err.println("Unexpected write error: " + cause.getMessage());
            // Log error and potentially retry
        }
    }
}

Bulk Write Operations

public class BulkAsyncWrites {
    
    public CompletableFuture<Void> writeMultipleMessages(IoSession session, List<Object> messages) {
        List<WriteFuture> writeFutures = new ArrayList<>();
        
        // Initiate all writes
        for (Object message : messages) {
            WriteFuture future = session.write(message);
            writeFutures.add(future);
        }
        
        // Convert MINA futures to CompletableFuture
        CompletableFuture<Void> result = CompletableFuture.allOf(
            writeFutures.stream()
                .map(this::toCompletableFuture)
                .toArray(CompletableFuture[]::new)
        );
        
        return result;
    }
    
    public void writeSequentially(IoSession session, List<Object> messages) {
        writeSequentiallyRecursive(session, messages, 0);
    }
    
    private void writeSequentiallyRecursive(IoSession session, List<Object> messages, int index) {
        if (index >= messages.size()) {
            System.out.println("All messages sent sequentially");
            return;
        }
        
        WriteFuture future = session.write(messages.get(index));
        future.addListener(new IoFutureListener<WriteFuture>() {
            @Override
            public void operationComplete(WriteFuture future) {
                if (future.isWritten()) {
                    // Write next message
                    writeSequentiallyRecursive(session, messages, index + 1);
                } else {
                    System.err.println("Sequential write failed at index " + index);
                }
            }
        });
    }
    
    public void writeBatched(IoSession session, List<Object> messages, int batchSize) {
        List<List<Object>> batches = partitionList(messages, batchSize);
        writeBatchesSequentially(session, batches, 0);
    }
    
    private void writeBatchesSequentially(IoSession session, List<List<Object>> batches, int batchIndex) {
        if (batchIndex >= batches.size()) {
            System.out.println("All batches sent");
            return;
        }
        
        List<Object> currentBatch = batches.get(batchIndex);
        List<WriteFuture> batchFutures = new ArrayList<>();
        
        // Send all messages in current batch
        for (Object message : currentBatch) {
            batchFutures.add(session.write(message));
        }
        
        // Wait for batch completion before sending next batch
        CompletableFuture.allOf(
            batchFutures.stream()
                .map(this::toCompletableFuture)
                .toArray(CompletableFuture[]::new)
        ).thenRun(() -> {
            System.out.println("Batch " + batchIndex + " completed");
            writeBatchesSequentially(session, batches, batchIndex + 1);
        });
    }
}

Asynchronous Read Operations

Enabling and Using Read Operations

public class AsyncReadExample {
    
    public void enableAsyncReads(IoSession session) {
        // Enable read operations (disabled by default)
        session.getConfig().setUseReadOperation(true);
    }
    
    public void readAsync(IoSession session) {
        ReadFuture future = session.read();
        
        future.addListener(new IoFutureListener<ReadFuture>() {
            @Override
            public void operationComplete(ReadFuture future) {
                if (future.isRead()) {
                    Object message = future.getMessage();
                    System.out.println("Read message: " + message);
                    
                    // Continue reading
                    readAsync(session);
                } else if (future.isClosed()) {
                    System.out.println("Session closed during read");
                } else {
                    System.err.println("Read failed: " + future.getException());
                }
            }
        });
    }
    
    public Object readSync(IoSession session, long timeoutMillis) {
        ReadFuture future = session.read();
        
        boolean completed = future.awaitUninterruptibly(timeoutMillis);
        
        if (completed && future.isRead()) {
            return future.getMessage();
        } else if (future.isClosed()) {
            throw new RuntimeException("Session closed during read");
        } else {
            throw new RuntimeException("Read timeout or failed");
        }
    }
}

Request-Response Pattern

public class RequestResponsePattern {
    
    public CompletableFuture<Object> sendRequest(IoSession session, Object request) {
        CompletableFuture<Object> responseFuture = new CompletableFuture<>();
        
        // Send request
        WriteFuture writeFuture = session.write(request);
        
        writeFuture.addListener(new IoFutureListener<WriteFuture>() {
            @Override
            public void operationComplete(WriteFuture future) {
                if (future.isWritten()) {
                    // Request sent, now wait for response
                    ReadFuture readFuture = session.read();
                    readFuture.addListener(new IoFutureListener<ReadFuture>() {
                        @Override
                        public void operationComplete(ReadFuture readFuture) {
                            if (readFuture.isRead()) {
                                responseFuture.complete(readFuture.getMessage());
                            } else {
                                responseFuture.completeExceptionally(
                                    readFuture.getException() != null ? 
                                    readFuture.getException() : 
                                    new RuntimeException("Read failed")
                                );
                            }
                        }
                    });
                } else {
                    responseFuture.completeExceptionally(
                        future.getException() != null ? 
                        future.getException() : 
                        new RuntimeException("Write failed")
                    );
                }
            }
        });
        
        return responseFuture;
    }
    
    public CompletableFuture<List<Object>> sendMultipleRequests(IoSession session, List<Object> requests) {
        List<CompletableFuture<Object>> requestFutures = new ArrayList<>();
        
        for (Object request : requests) {
            requestFutures.add(sendRequest(session, request));
        }
        
        return CompletableFuture.allOf(requestFutures.toArray(new CompletableFuture[0]))
            .thenApply(v -> requestFutures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
    }
}

Session Close Handling

Async Session Closure

public class AsyncCloseExample {
    
    public void closeGracefully(IoSession session) {
        // Close after pending writes complete
        CloseFuture future = session.closeOnFlush();
        
        future.addListener(new IoFutureListener<CloseFuture>() {
            @Override
            public void operationComplete(CloseFuture future) {
                System.out.println("Session closed gracefully: " + session.getId());
                cleanupSessionResources(session);
            }
        });
    }
    
    public void closeImmediately(IoSession session) {
        // Close immediately, discarding pending writes
        CloseFuture future = session.closeNow();
        
        future.addListener(new IoFutureListener<CloseFuture>() {
            @Override
            public void operationComplete(CloseFuture future) {
                System.out.println("Session closed immediately: " + session.getId());
                cleanupSessionResources(session);
            }
        });
    }
    
    public CompletableFuture<Void> closeMultipleSessions(Collection<IoSession> sessions) {
        List<CompletableFuture<Void>> closeFutures = new ArrayList<>();
        
        for (IoSession session : sessions) {
            CloseFuture closeFuture = session.closeOnFlush();
            closeFutures.add(toCompletableFuture(closeFuture));
        }
        
        return CompletableFuture.allOf(closeFutures.toArray(new CompletableFuture[0]));
    }
    
    public void waitForClose(IoSession session, long timeoutMillis) {
        CloseFuture closeFuture = session.getCloseFuture();
        
        boolean closed = closeFuture.awaitUninterruptibly(timeoutMillis);
        
        if (closed) {
            System.out.println("Session closed within timeout");
        } else {
            System.out.println("Session close timed out");
        }
    }
}

Future Composition and Chaining

Converting MINA Futures to CompletableFuture

public class FutureComposition {
    
    public <T extends IoFuture> CompletableFuture<T> toCompletableFuture(T minaFuture) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        
        minaFuture.addListener(new IoFutureListener<T>() {
            @Override
            public void operationComplete(T future) {
                completableFuture.complete(future);
            }
        });
        
        return completableFuture;
    }
    
    public CompletableFuture<IoSession> connectThenAuthenticate(String host, int port, String username, String password) {
        NioSocketConnector connector = new NioSocketConnector();
        
        return toCompletableFuture(connector.connect(new InetSocketAddress(host, port)))
            .thenCompose(connectFuture -> {
                if (connectFuture.isConnected()) {
                    IoSession session = connectFuture.getSession();
                    
                    // Send authentication
                    AuthRequest authReq = new AuthRequest(username, password);
                    return toCompletableFuture(session.write(authReq))
                        .thenCompose(writeFuture -> {
                            if (writeFuture.isWritten()) {
                                // Wait for auth response
                                return toCompletableFuture(session.read())
                                    .thenApply(readFuture -> {
                                        if (readFuture.isRead()) {
                                            AuthResponse response = (AuthResponse) readFuture.getMessage();
                                            if (response.isSuccess()) {
                                                return session;
                                            } else {
                                                session.closeNow();
                                                throw new RuntimeException("Authentication failed");
                                            }
                                        } else {
                                            session.closeNow();
                                            throw new RuntimeException("Failed to read auth response");
                                        }
                                    });
                            } else {
                                session.closeNow();
                                throw new RuntimeException("Failed to send auth request");
                            }
                        });
                } else {
                    throw new RuntimeException("Connection failed: " + connectFuture.getException());
                }
            });
    }
    
    public CompletableFuture<String> performComplexOperation(IoSession session) {
        return sendCommand(session, "INIT")
            .thenCompose(initResponse -> sendCommand(session, "SETUP " + initResponse))
            .thenCompose(setupResponse -> sendCommand(session, "EXECUTE"))
            .thenCompose(executeResponse -> sendCommand(session, "FINALIZE"))
            .thenApply(finalizeResponse -> "Operation completed: " + finalizeResponse);
    }
    
    private CompletableFuture<String> sendCommand(IoSession session, String command) {
        return toCompletableFuture(session.write(command))
            .thenCompose(writeFuture -> {
                if (writeFuture.isWritten()) {
                    return toCompletableFuture(session.read())
                        .thenApply(readFuture -> {
                            if (readFuture.isRead()) {
                                return readFuture.getMessage().toString();
                            } else {
                                throw new RuntimeException("Failed to read response for: " + command);
                            }
                        });
                } else {
                    throw new RuntimeException("Failed to send command: " + command);
                }
            });
    }
}

Error Handling in Async Operations

Comprehensive Error Handling

public class AsyncErrorHandling {
    
    public void robustAsyncWrite(IoSession session, Object message, int maxRetries) {
        writeWithRetry(session, message, maxRetries, 0);
    }
    
    private void writeWithRetry(IoSession session, Object message, int maxRetries, int attempt) {
        WriteFuture future = session.write(message);
        
        future.addListener(new IoFutureListener<WriteFuture>() {
            @Override
            public void operationComplete(WriteFuture future) {
                if (future.isWritten()) {
                    System.out.println("Message sent successfully on attempt " + (attempt + 1));
                } else {
                    Throwable cause = future.getException();
                    
                    if (attempt < maxRetries && isRetryableError(cause)) {
                        System.out.println("Retrying write attempt " + (attempt + 1) + "/" + maxRetries);
                        
                        // Exponential backoff
                        int delay = (int) Math.pow(2, attempt) * 1000;
                        
                        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
                        scheduler.schedule(() -> {
                            writeWithRetry(session, message, maxRetries, attempt + 1);
                            scheduler.shutdown();
                        }, delay, TimeUnit.MILLISECONDS);
                    } else {
                        System.err.println("Write failed after " + (attempt + 1) + " attempts: " + cause);
                        handleFinalWriteFailure(session, message, cause);
                    }
                }
            }
        });
    }
    
    private boolean isRetryableError(Throwable cause) {
        return cause instanceof WriteTimeoutException ||
               (cause instanceof IOException && !(cause instanceof WriteToClosedSessionException));
    }
    
    public CompletableFuture<Object> robustRequestResponse(IoSession session, Object request, long timeoutMs) {
        CompletableFuture<Object> result = new CompletableFuture<>();
        
        // Set up timeout
        ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
        ScheduledFuture<?> timeoutTask = timeoutExecutor.schedule(() -> {
            result.completeExceptionally(new TimeoutException("Request timed out after " + timeoutMs + "ms"));
        }, timeoutMs, TimeUnit.MILLISECONDS);
        
        // Send request
        WriteFuture writeFuture = session.write(request);
        
        writeFuture.addListener(new IoFutureListener<WriteFuture>() {
            @Override
            public void operationComplete(WriteFuture future) {
                if (future.isWritten()) {
                    // Request sent, wait for response
                    ReadFuture readFuture = session.read();
                    
                    readFuture.addListener(new IoFutureListener<ReadFuture>() {
                        @Override
                        public void operationComplete(ReadFuture readFuture) {
                            if (!result.isDone()) {  // Check if not already timed out
                                timeoutTask.cancel(false);
                                timeoutExecutor.shutdown();
                                
                                if (readFuture.isRead()) {
                                    result.complete(readFuture.getMessage());
                                } else if (readFuture.isClosed()) {
                                    result.completeExceptionally(new IOException("Session closed during read"));
                                } else {
                                    result.completeExceptionally(readFuture.getException());
                                }
                            }
                        }
                    });
                } else {
                    if (!result.isDone()) {
                        timeoutTask.cancel(false);
                        timeoutExecutor.shutdown();
                        result.completeExceptionally(future.getException());
                    }
                }
            }
        });
        
        return result;
    }
}

MINA Core's asynchronous programming model provides powerful tools for building scalable, non-blocking network applications. The Future-based approach allows for clean composition of async operations while maintaining high performance under load.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-mina--mina-core

docs

async-operations.md

buffer-management.md

filter-chain.md

index.md

protocol-codecs.md

service-abstractions.md

session-management.md

transport-layer.md

tile.json