Apache MINA Core is a network application framework for building high-performance, scalable network applications with event-driven, asynchronous I/O over various transports including TCP/IP and UDP/IP via Java NIO.
—
MINA Core provides unified service abstractions for both server (acceptor) and client (connector) applications through the IoService, IoAcceptor, and IoConnector interfaces. These abstractions provide consistent APIs across different transport types.
Interface providing metadata about transport implementations:
public interface TransportMetadata {
// Provider information
String getProviderName(); // e.g., "nio", "apr", "serial"
String getName(); // Transport name
// Connection model
ConnectionModel getConnectionModel(); // CONNECTION or CONNECTIONLESS
boolean isConnectionless();
// Address and session types
Class<? extends SocketAddress> getAddressType();
Class<? extends IoSession> getSessionType();
Class<? extends IoSessionConfig> getSessionConfigType();
// Capabilities
boolean hasFragmentation(); // Can messages be fragmented
Set<Class<? extends Object>> getEnvelopeTypes(); // Allowed message types
}Enumeration of connection models:
public enum ConnectionModel {
CONNECTION, // Connection-oriented (TCP)
CONNECTIONLESS // Connectionless (UDP)
}The base interface for all MINA services:
public interface IoService {
// Service metadata and state
TransportMetadata getTransportMetadata();
boolean isActive();
boolean isDisposing();
boolean isDisposed();
long getActivationTime();
// Service lifecycle
void dispose();
void dispose(boolean awaitTermination);
// Handler management
IoHandler getHandler();
void setHandler(IoHandler handler);
// Session management
Map<Long, IoSession> getManagedSessions();
int getManagedSessionCount();
IoSessionConfig getSessionConfig();
Set<WriteFuture> broadcast(Object message);
// Filter chain management
IoFilterChainBuilder getFilterChainBuilder();
void setFilterChainBuilder(IoFilterChainBuilder builder);
DefaultIoFilterChainBuilder getFilterChain();
// Data structure factory
IoSessionDataStructureFactory getSessionDataStructureFactory();
void setSessionDataStructureFactory(IoSessionDataStructureFactory factory);
// Statistics and monitoring
IoServiceStatistics getStatistics();
int getScheduledWriteBytes();
int getScheduledWriteMessages();
// Event listeners
void addListener(IoServiceListener listener);
void removeListener(IoServiceListener listener);
}Interface for server-side services that accept incoming connections:
public interface IoAcceptor extends IoService {
// Binding and unbinding
void bind(SocketAddress localAddress) throws IOException;
void bind(SocketAddress... localAddresses) throws IOException;
void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException;
void unbind();
void unbind(SocketAddress localAddress);
void unbind(SocketAddress... localAddresses);
void unbind(Iterable<? extends SocketAddress> localAddresses);
// Local addresses
Set<SocketAddress> getLocalAddresses();
SocketAddress getLocalAddress();
SocketAddress getDefaultLocalAddress();
void setDefaultLocalAddress(SocketAddress localAddress);
List<SocketAddress> getDefaultLocalAddresses();
void setDefaultLocalAddresses(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses);
void setDefaultLocalAddresses(List<? extends SocketAddress> localAddresses);
// Additional binding methods
void bind() throws IOException;
void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException;
// Additional unbinding methods
void unbind(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses);
// Close behavior configuration
boolean isCloseOnDeactivation();
void setCloseOnDeactivation(boolean closeOnDeactivation);
// Session creation for connectionless transports
IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress);
}Interface for client-side services that establish outgoing connections:
public interface IoConnector extends IoService {
// Connection operations
ConnectFuture connect(SocketAddress remoteAddress);
ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
ConnectFuture connect(SocketAddress remoteAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
// Remote address configuration
SocketAddress getDefaultRemoteAddress();
void setDefaultRemoteAddress(SocketAddress defaultRemoteAddress);
// Local address configuration
SocketAddress getDefaultLocalAddress();
void setDefaultLocalAddress(SocketAddress defaultLocalAddress);
// Additional connection methods
ConnectFuture connect();
ConnectFuture connect(IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
// Connection timeout
long getConnectTimeoutMillis();
void setConnectTimeoutMillis(long connectTimeoutMillis);
// Deprecated timeout methods (marked for removal)
@Deprecated
int getConnectTimeout();
@Deprecated
void setConnectTimeout(int connectTimeout);
}Complete handler interface for processing I/O events:
public interface IoHandler {
// Session lifecycle events
void sessionCreated(IoSession session) throws Exception;
void sessionOpened(IoSession session) throws Exception;
void sessionClosed(IoSession session) throws Exception;
void sessionIdle(IoSession session, IdleStatus status) throws Exception;
// Message handling
void messageReceived(IoSession session, Object message) throws Exception;
void messageSent(IoSession session, Object message) throws Exception;
// Input/Output events
void inputClosed(IoSession session) throws Exception; // Half-duplex channel closure
// Error handling
void exceptionCaught(IoSession session, Throwable cause) throws Exception;
// Filter events
void event(IoSession session, FilterEvent event) throws Exception;
}TCP server implementation using NIO:
// Basic server setup
NioSocketAcceptor acceptor = new NioSocketAcceptor();
// Configure the acceptor
acceptor.setReuseAddress(true);
acceptor.setBacklog(100);
// Configure session settings
SocketSessionConfig config = acceptor.getSessionConfig();
config.setSendBufferSize(64 * 1024);
config.setReceiveBufferSize(64 * 1024);
config.setTcpNoDelay(true);
config.setKeepAlive(true);
// Set up filter chain
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
// Set handler
acceptor.setHandler(new ServerHandler());
// Bind to multiple addresses
acceptor.bind(new InetSocketAddress(8080));
acceptor.bind(new InetSocketAddress(8443));
// Check bound addresses
Set<SocketAddress> addresses = acceptor.getLocalAddresses();
System.out.println("Server listening on: " + addresses);TCP client implementation using NIO:
// Basic client setup
NioSocketConnector connector = new NioSocketConnector();
// Configure connection timeout
connector.setConnectTimeoutMillis(30000); // 30 seconds
connector.setConnectTimeoutCheckInterval(1000); // Check every second
// Configure session settings
SocketSessionConfig config = connector.getSessionConfig();
config.setSendBufferSize(32 * 1024);
config.setReceiveBufferSize(32 * 1024);
config.setTcpNoDelay(true);
// Set up filter chain
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));
connector.getFilterChain().addLast("logger", new LoggingFilter());
// Set handler
connector.setHandler(new ClientHandler());
// Connect to server
ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 8080));
future.awaitUninterruptibly();
if (future.isConnected()) {
IoSession session = future.getSession();
System.out.println("Connected to server: " + session.getRemoteAddress());
} else {
System.err.println("Connection failed: " + future.getException());
}UDP server implementation:
// UDP server setup
NioDatagramAcceptor acceptor = new NioDatagramAcceptor();
// Configure datagram-specific settings
DatagramSessionConfig config = acceptor.getSessionConfig();
config.setReceiveBufferSize(1024 * 64); // 64KB receive buffer
config.setSendBufferSize(1024 * 64); // 64KB send buffer
config.setReuseAddress(true);
config.setBroadcast(true); // Allow broadcast
// Set up filter chain for datagram processing
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(datagramCodec));
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
// Set handler for UDP messages
acceptor.setHandler(new DatagramHandler());
// Bind to UDP port
acceptor.bind(new InetSocketAddress(5000));
System.out.println("UDP server listening on port 5000");UDP client implementation:
// UDP client setup
NioDatagramConnector connector = new NioDatagramConnector();
// Configure datagram settings
DatagramSessionConfig config = connector.getSessionConfig();
config.setSendBufferSize(1024 * 32); // 32KB send buffer
config.setReceiveBufferSize(1024 * 32); // 32KB receive buffer
config.setBroadcast(true); // Allow broadcast
// Set up filter chain
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(datagramCodec));
// Set handler
connector.setHandler(new DatagramClientHandler());
// Connect to UDP server
ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 5000));
IoSession session = future.awaitUninterruptibly().getSession();
// Send UDP message
session.write("Hello UDP Server!");In-memory server for same-JVM communication:
// VM Pipe server setup
VmPipeAcceptor acceptor = new VmPipeAcceptor();
// Set up filter chain
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
// Set handler
acceptor.setHandler(new VmPipeHandler());
// Bind to VM pipe address
VmPipeAddress address = new VmPipeAddress(12345);
acceptor.bind(address);
System.out.println("VM Pipe server listening on: " + address);In-memory client:
// VM Pipe client setup
VmPipeConnector connector = new VmPipeConnector();
// Set up filter chain
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
// Set handler
connector.setHandler(new VmPipeClientHandler());
// Connect to VM pipe server
VmPipeAddress address = new VmPipeAddress(12345);
ConnectFuture future = connector.connect(address);
IoSession session = future.awaitUninterruptibly().getSession();
// Send object through VM pipe
session.write(new MyMessage("Hello VM Pipe!"));// Configure acceptor with multiple I/O processors
public class MultiThreadedServer {
public void createOptimizedServer() {
// Create acceptor with 4 I/O processors
NioSocketAcceptor acceptor = new NioSocketAcceptor(4);
// Or create with custom processor pool
IoProcessor<NioSession> processor = new SimpleIoProcessorPool<>(
NioProcessor.class, 8); // 8 processors
NioSocketAcceptor customAcceptor = new NioSocketAcceptor(processor);
// Configure with custom executor
Executor executor = Executors.newCachedThreadPool();
NioSocketAcceptor executorAcceptor = new NioSocketAcceptor(executor, processor);
}
}// Using custom SelectorProvider for advanced NIO configuration
public void createServerWithCustomSelector() throws IOException {
// Create custom selector provider
SelectorProvider provider = SelectorProvider.provider();
// Create acceptor with custom provider
NioSocketAcceptor acceptor = new NioSocketAcceptor(4, provider);
// Configure and start server
acceptor.setHandler(new MyHandler());
acceptor.bind(new InetSocketAddress(8080));
}public class ServiceLifecycleListener implements IoServiceListener {
@Override
public void serviceActivated(IoService service) throws Exception {
System.out.println("Service activated: " + service.getClass().getSimpleName());
// Initialize service-specific resources
initializeServiceResources(service);
}
@Override
public void serviceDeactivated(IoService service) throws Exception {
System.out.println("Service deactivated: " + service.getClass().getSimpleName());
// Cleanup service resources
cleanupServiceResources(service);
}
@Override
public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception {
System.out.println("Service idle: " + service.getClass().getSimpleName() +
" (" + idleStatus + ")");
}
@Override
public void sessionCreated(IoSession session) throws Exception {
System.out.println("Session created: " + session.getId());
}
@Override
public void sessionClosed(IoSession session) throws Exception {
System.out.println("Session closed: " + session.getId());
}
@Override
public void sessionDestroyed(IoSession session) throws Exception {
System.out.println("Session destroyed: " + session.getId());
}
}
// Using service listener
NioSocketAcceptor acceptor = new NioSocketAcceptor();
acceptor.addListener(new ServiceLifecycleListener());public class GracefulShutdown {
public void shutdownServer(IoAcceptor acceptor) {
System.out.println("Initiating server shutdown...");
// Stop accepting new connections
acceptor.unbind();
// Wait for existing sessions to complete
while (acceptor.getManagedSessionCount() > 0) {
try {
Thread.sleep(1000);
System.out.println("Waiting for " + acceptor.getManagedSessionCount() +
" sessions to close...");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
// Dispose the service
acceptor.dispose(true); // Wait for termination
System.out.println("Server shutdown complete");
}
public void forceShutdown(IoService service) {
// Close all sessions immediately
for (IoSession session : service.getManagedSessions().values()) {
session.closeNow();
}
// Dispose service without waiting
service.dispose(false);
}
}Complete statistics interface for monitoring service performance:
public interface IoServiceStatistics {
// Basic session statistics
long getCumulativeManagedSessionCount();
long getLargestManagedSessionCount();
long getLastIoTime();
// Message transfer statistics
long getReadBytes();
long getWrittenBytes();
long getReadMessages();
long getWrittenMessages();
// Throughput statistics (current)
double getReadBytesThroughput();
double getWrittenBytesThroughput();
double getReadMessagesThroughput();
double getWrittenMessagesThroughput();
// Peak throughput statistics
double getLargestReadBytesThroughput();
double getLargestWrittenBytesThroughput();
double getLargestReadMessagesThroughput();
double getLargestWrittenMessagesThroughput();
// Timing information
long getLastReadTime();
long getLastWriteTime();
// Throughput calculation configuration
int getThroughputCalculationInterval();
long getThroughputCalculationIntervalInMillis();
void setThroughputCalculationInterval(int throughputCalculationInterval);
}Configuration class for enabling/disabling specific statistics calculations:
public static class Config {
// Enable/disable specific statistics
void setThroughputStatisticsEnabled(boolean enabled);
boolean isThroughputStatisticsEnabled();
void setPeakThroughputStatisticsEnabled(boolean enabled);
boolean isPeakThroughputStatisticsEnabled();
void setTimingStatisticsEnabled(boolean enabled);
boolean isTimingStatisticsEnabled();
}public class ServiceMonitor {
public void printServiceStatistics(IoService service) {
IoServiceStatistics stats = service.getStatistics();
System.out.println("=== Service Statistics ===");
System.out.println("Service Type: " + service.getClass().getSimpleName());
System.out.println("Transport: " + service.getTransportMetadata().getProviderName());
System.out.println("Active: " + service.isActive());
System.out.println("Activation Time: " + new Date(service.getActivationTime()));
// Session statistics
System.out.println("Current Sessions: " + service.getManagedSessionCount());
System.out.println("Total Sessions: " + stats.getCumulativeManagedSessionCount());
// Message statistics
System.out.println("Messages Read: " + stats.getReadMessages());
System.out.println("Messages Written: " + stats.getWrittenMessages());
System.out.println("Bytes Read: " + formatBytes(stats.getReadBytes()));
System.out.println("Bytes Written: " + formatBytes(stats.getWrittenBytes()));
// Throughput statistics
System.out.printf("Read Throughput: %.2f KB/s%n",
stats.getReadBytesThroughput() / 1024.0);
System.out.printf("Write Throughput: %.2f KB/s%n",
stats.getWrittenBytesThroughput() / 1024.0);
System.out.printf("Message Read Rate: %.2f msgs/s%n",
stats.getReadMessagesThroughput());
System.out.printf("Message Write Rate: %.2f msgs/s%n",
stats.getWrittenMessagesThroughput());
// Queue statistics
System.out.println("Scheduled Write Messages: " + service.getScheduledWriteMessages());
System.out.println("Scheduled Write Bytes: " + formatBytes(service.getScheduledWriteBytes()));
}
private String formatBytes(long bytes) {
if (bytes < 1024) return bytes + " B";
if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0);
if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / (1024.0 * 1024));
return String.format("%.1f GB", bytes / (1024.0 * 1024 * 1024));
}
}public class BroadcastingService {
public void broadcastMessage(IoService service, Object message) {
// Broadcast to all managed sessions
Set<WriteFuture> futures = service.broadcast(message);
// Wait for all broadcasts to complete
for (WriteFuture future : futures) {
future.awaitUninterruptibly();
if (!future.isWritten()) {
System.err.println("Broadcast failed: " + future.getException());
}
}
}
public void selectiveBroadcast(IoService service, Object message,
Predicate<IoSession> filter) {
// Broadcast to filtered sessions
for (IoSession session : service.getManagedSessions().values()) {
if (filter.test(session)) {
session.write(message);
}
}
}
// Broadcast to authenticated users only
public void broadcastToAuthenticatedUsers(IoService service, Object message) {
selectiveBroadcast(service, message, session ->
session.getAttribute("authenticated", false));
}
}public class CustomProtocolAcceptor extends AbstractIoAcceptor {
public CustomProtocolAcceptor() {
super(new CustomSessionConfig(), CustomProcessor.class);
}
@Override
protected void init() throws Exception {
// Initialize custom transport
}
@Override
protected void destroy() throws Exception {
// Cleanup custom transport
}
@Override
public TransportMetadata getTransportMetadata() {
return CustomTransportMetadata.INSTANCE;
}
// Implement other abstract methods...
}Service abstractions in MINA Core provide a unified and powerful foundation for building both server and client applications across different transport protocols, with comprehensive lifecycle management, statistics, and configuration options.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-mina--mina-core