Apache MINA Core is a network application framework for building high-performance, scalable network applications with event-driven, asynchronous I/O over various transports including TCP/IP and UDP/IP via Java NIO.
—
MINA Core provides comprehensive session management through the IoSession interface and related components. Sessions represent connections between endpoints and provide lifecycle management, configuration, and attribute storage.
Interface for pluggable attribute storage implementations:
public interface IoSessionAttributeMap {
Object getAttribute(IoSession session, Object key, Object defaultValue);
Object setAttribute(IoSession session, Object key, Object value);
Object setAttributeIfAbsent(IoSession session, Object key, Object value);
Object removeAttribute(IoSession session, Object key);
boolean removeAttribute(IoSession session, Object key, Object value);
boolean replaceAttribute(IoSession session, Object key, Object oldValue, Object newValue);
boolean containsAttribute(IoSession session, Object key);
Set<Object> getAttributeKeys(IoSession session);
void dispose(IoSession session) throws Exception;
}Factory for creating session data structures:
public interface IoSessionDataStructureFactory {
IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;
WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception;
}Interface for session recycling in connectionless transports:
public interface IoSessionRecycler {
void put(IoSession session);
IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress);
void remove(IoSession session);
}Exception thrown during session initialization:
public class IoSessionInitializationException extends RuntimeException {
public IoSessionInitializationException();
public IoSessionInitializationException(String message);
public IoSessionInitializationException(String message, Throwable cause);
public IoSessionInitializationException(Throwable cause);
}The central interface representing a connection between two endpoints:
public interface IoSession {
// Identity and metadata
long getId();
IoService getService();
IoHandler getHandler();
TransportMetadata getTransportMetadata();
// Configuration and state
IoSessionConfig getConfig();
boolean isConnected();
boolean isActive();
boolean isClosing();
boolean isSecured();
boolean isServer();
// Address information
SocketAddress getRemoteAddress();
SocketAddress getLocalAddress();
SocketAddress getServiceAddress();
// I/O operations
WriteFuture write(Object message);
WriteFuture write(Object message, SocketAddress destination);
ReadFuture read();
// Session closure
CloseFuture closeNow();
CloseFuture closeOnFlush();
CloseFuture getCloseFuture();
// Attribute management
Object getAttribute(Object key);
Object getAttribute(Object key, Object defaultValue);
Object setAttribute(Object key, Object value);
Object setAttribute(Object key);
Object setAttributeIfAbsent(Object key, Object value);
Object setAttributeIfAbsent(Object key);
Object removeAttribute(Object key);
boolean removeAttribute(Object key, Object value);
boolean replaceAttribute(Object key, Object oldValue, Object newValue);
boolean containsAttribute(Object key);
Set<Object> getAttributeKeys();
// Flow control
void suspendRead();
void suspendWrite();
void resumeRead();
void resumeWrite();
boolean isReadSuspended();
boolean isWriteSuspended();
// Filter chain and write queue access
IoFilterChain getFilterChain();
WriteRequestQueue getWriteRequestQueue();
void setCurrentWriteRequest(WriteRequest currentWriteRequest);
// Extended statistics and timing
long getCreationTime();
long getLastIoTime();
long getLastReadTime();
long getLastWriteTime();
long getReadBytes();
long getWrittenBytes();
long getReadMessages();
long getWrittenMessages();
double getReadBytesThroughput();
double getWrittenBytesThroughput();
double getReadMessagesThroughput();
double getWrittenMessagesThroughput();
int getScheduledWriteMessages();
long getScheduledWriteBytes();
Object getCurrentWriteMessage();
WriteRequest getCurrentWriteRequest();
// Extended idle detection
boolean isIdle(IdleStatus status);
boolean isReaderIdle();
boolean isWriterIdle();
boolean isBothIdle();
int getIdleCount(IdleStatus status);
int getReaderIdleCount();
int getWriterIdleCount();
int getBothIdleCount();
long getLastIdleTime(IdleStatus status);
long getLastReaderIdleTime();
long getLastWriterIdleTime();
long getLastBothIdleTime();
// Throughput calculation
void updateThroughput(long currentTime, boolean force);
}Configuration interface for session-specific settings:
public interface IoSessionConfig {
// Buffer sizes
int getReadBufferSize();
void setReadBufferSize(int readBufferSize);
int getMinReadBufferSize();
void setMinReadBufferSize(int minReadBufferSize);
int getMaxReadBufferSize();
void setMaxReadBufferSize(int maxReadBufferSize);
// Idle time configuration
int getIdleTime(IdleStatus status);
void setIdleTime(IdleStatus status, int idleTime);
int getReaderIdleTime();
void setReaderIdleTime(int idleTimeInSeconds);
int getWriterIdleTime();
void setWriterIdleTime(int idleTimeInSeconds);
int getBothIdleTime();
void setBothIdleTime(int idleTimeInSeconds);
// Throughput calculation
int getThroughputCalculationInterval();
void setThroughputCalculationInterval(int throughputCalculationInterval);
// Write timeout
int getWriteTimeout();
void setWriteTimeout(int writeTimeout);
long getWriteTimeoutInMillis();
// Extended idle time methods
long getIdleTimeInMillis(IdleStatus status);
long getReaderIdleTimeInMillis();
long getWriterIdleTimeInMillis();
long getBothIdleTimeInMillis();
// Extended throughput methods
long getThroughputCalculationIntervalInMillis();
// Read operation
boolean isUseReadOperation();
void setUseReadOperation(boolean useReadOperation);
// Configuration copy
void setAll(IoSessionConfig config);
}Sessions progress through several states during their lifecycle:
public enum SessionState {
OPENING, // Session is being established
OPENED, // Session is active and ready for I/O
CLOSING // Session is being closed
}Handle session lifecycle events through IoHandler:
public class SessionLifecycleHandler extends IoHandlerAdapter {
@Override
public void sessionCreated(IoSession session) throws Exception {
// Called when session is first created (before opening)
System.out.println("Session created: " + session.getId());
// Initialize session attributes
session.setAttribute("createTime", System.currentTimeMillis());
session.setAttribute("requestCount", 0);
}
@Override
public void sessionOpened(IoSession session) throws Exception {
// Called when session is opened and ready for I/O
System.out.println("Session opened: " + session.getRemoteAddress());
// Configure session-specific settings
IoSessionConfig config = session.getConfig();
config.setReadBufferSize(4096);
config.setIdleTime(IdleStatus.BOTH_IDLE, 300); // 5 minutes
config.setWriteTimeout(30); // 30 seconds
// Send welcome message
session.write("Welcome to the server!");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
// Called when session is closed
Long createTime = (Long) session.getAttribute("createTime");
long duration = System.currentTimeMillis() - createTime;
System.out.println("Session closed after " + duration + "ms");
// Cleanup resources
cleanupSessionResources(session);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
// Called when session becomes idle
System.out.println("Session idle: " + status);
if (status == IdleStatus.BOTH_IDLE) {
// Close inactive sessions
session.closeNow();
} else if (status == IdleStatus.WRITER_IDLE) {
// Send keep-alive message
session.write("PING");
}
}
}// Configure session settings during initialization
public void configureSession(IoSession session) {
IoSessionConfig config = session.getConfig();
// Buffer configuration
config.setReadBufferSize(8192); // 8KB read buffer
config.setMinReadBufferSize(1024); // Minimum 1KB
config.setMaxReadBufferSize(65536); // Maximum 64KB
// Idle timeout configuration (in seconds)
config.setIdleTime(IdleStatus.READER_IDLE, 60); // 1 minute read timeout
config.setIdleTime(IdleStatus.WRITER_IDLE, 30); // 30 seconds write timeout
config.setIdleTime(IdleStatus.BOTH_IDLE, 90); // 90 seconds total timeout
// Write timeout (in seconds)
config.setWriteTimeout(10);
// Throughput calculation interval (in seconds)
config.setThroughputCalculationInterval(3);
}// TCP Socket configuration
public void configureTcpSession(IoSession session) {
if (session.getTransportMetadata().getProviderName().equals("nio")) {
// Access socket-specific configuration directly from the session
SocketSessionConfig config = (SocketSessionConfig) session.getConfig();
config.setSendBufferSize(64 * 1024); // 64KB send buffer
config.setReceiveBufferSize(64 * 1024); // 64KB receive buffer
config.setTcpNoDelay(true); // Disable Nagle algorithm
config.setKeepAlive(true); // Enable keep-alive
config.setSoLinger(0); // Immediate close
config.setReuseAddress(true); // Allow address reuse
}
}Use AttributeKey for type-safe session attributes:
// Define attribute keys
public class SessionAttributes {
public static final AttributeKey USER_INFO = new AttributeKey(UserInfo.class, "userInfo");
public static final AttributeKey LOGIN_TIME = new AttributeKey(Long.class, "loginTime");
public static final AttributeKey REQUEST_COUNT = new AttributeKey(Integer.class, "requestCount");
public static final AttributeKey SESSION_DATA = new AttributeKey(Map.class, "sessionData");
}
// Using type-safe attributes
public class TypeSafeSessionHandler extends IoHandlerAdapter {
@Override
public void sessionOpened(IoSession session) throws Exception {
// Set typed attributes
session.setAttribute(SessionAttributes.LOGIN_TIME, System.currentTimeMillis());
session.setAttribute(SessionAttributes.REQUEST_COUNT, 0);
session.setAttribute(SessionAttributes.SESSION_DATA, new HashMap<String, Object>());
}
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
// Get typed attributes
Integer count = (Integer) session.getAttribute(SessionAttributes.REQUEST_COUNT);
session.setAttribute(SessionAttributes.REQUEST_COUNT, count + 1);
// Check if user is authenticated
UserInfo user = (UserInfo) session.getAttribute(SessionAttributes.USER_INFO);
if (user == null) {
session.write("Please authenticate first");
return;
}
// Process authenticated request
processRequest(session, user, message);
}
}// Atomic attribute operations
public class AttributeOperations {
public void atomicIncrement(IoSession session, AttributeKey key) {
session.setAttribute(key, ((Integer) session.getAttribute(key, 0)) + 1);
}
public boolean setIfAbsent(IoSession session, AttributeKey key, Object value) {
return session.setAttributeIfAbsent(key, value) == null;
}
public boolean replaceValue(IoSession session, AttributeKey key,
Object oldValue, Object newValue) {
return session.replaceAttribute(key, oldValue, newValue);
}
public boolean removeValue(IoSession session, AttributeKey key, Object value) {
return session.removeAttribute(key, value);
}
}public class IdleStatus {
public static final IdleStatus READER_IDLE = new IdleStatus("READER_IDLE");
public static final IdleStatus WRITER_IDLE = new IdleStatus("WRITER_IDLE");
public static final IdleStatus BOTH_IDLE = new IdleStatus("BOTH_IDLE");
private final String name;
private IdleStatus(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}public class IdleDetectionHandler extends IoHandlerAdapter {
@Override
public void sessionOpened(IoSession session) throws Exception {
// Configure idle detection
session.getConfig().setIdleTime(IdleStatus.READER_IDLE, 30); // 30 seconds
session.getConfig().setIdleTime(IdleStatus.WRITER_IDLE, 60); // 1 minute
session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 120); // 2 minutes
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
int idleCount = session.getIdleCount(status);
switch (status) {
case READER_IDLE:
if (idleCount >= 3) {
// Close session after 3 consecutive reader idle events
session.closeNow();
} else {
// Send keep-alive request
session.write("PING");
}
break;
case WRITER_IDLE:
// Send heartbeat to keep connection alive
session.write("HEARTBEAT");
break;
case BOTH_IDLE:
// Close completely idle sessions
System.out.println("Closing idle session: " + session.getId());
session.closeNow();
break;
}
}
}public class FlowControlExample {
public void handleBackpressure(IoSession session) {
// Check write queue size
int queuedMessages = session.getScheduledWriteMessages();
long queuedBytes = session.getScheduledWriteBytes();
if (queuedMessages > 1000 || queuedBytes > 1024 * 1024) { // 1MB
// Suspend reading to apply backpressure
session.suspendRead();
// Schedule resume after delay
scheduleResumeRead(session, 1000); // 1 second delay
}
}
public void suspendSlowClient(IoSession session) {
// Check throughput
double writeThroughput = session.getWrittenBytesThroughput();
if (writeThroughput < 1024) { // Less than 1KB/s
// Suspend write operations for slow client
session.suspendWrite();
// Mark session for potential cleanup
session.setAttribute("suspended", true);
}
}
public void resumeOperations(IoSession session) {
if (session.isReadSuspended()) {
session.resumeRead();
}
if (session.isWriteSuspended()) {
session.resumeWrite();
session.removeAttribute("suspended");
}
}
}public class SessionStatistics {
public void printSessionStats(IoSession session) {
// Force throughput calculation
session.updateThroughput(System.currentTimeMillis(), true);
// Basic statistics
System.out.println("Session ID: " + session.getId());
System.out.println("Creation Time: " + new Date(session.getCreationTime()));
System.out.println("Last I/O Time: " + new Date(session.getLastIoTime()));
// Data transfer statistics
System.out.println("Bytes Read: " + session.getReadBytes());
System.out.println("Bytes Written: " + session.getWrittenBytes());
System.out.println("Messages Read: " + session.getReadMessages());
System.out.println("Messages Written: " + session.getWrittenMessages());
// Throughput statistics
System.out.printf("Read Throughput: %.2f bytes/sec%n",
session.getReadBytesThroughput());
System.out.printf("Write Throughput: %.2f bytes/sec%n",
session.getWrittenBytesThroughput());
System.out.printf("Read Message Rate: %.2f msgs/sec%n",
session.getReadMessagesThroughput());
System.out.printf("Write Message Rate: %.2f msgs/sec%n",
session.getWrittenMessagesThroughput());
// Queue statistics
System.out.println("Queued Write Messages: " + session.getScheduledWriteMessages());
System.out.println("Queued Write Bytes: " + session.getScheduledWriteBytes());
}
}public class CustomSessionInitializer implements IoSessionInitializer<ConnectFuture> {
@Override
public void initializeSession(IoSession session, ConnectFuture future) {
// Configure session during connection
session.getConfig().setReadBufferSize(8192);
session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 60);
// Set initial attributes
session.setAttribute("connectionTime", System.currentTimeMillis());
session.setAttribute("protocol", "custom-v1.0");
// Add session-specific filters
session.getFilterChain().addFirst("auth", new AuthenticationFilter());
}
}
// Using session initializer with connector
IoConnector connector = new NioSocketConnector();
CustomSessionInitializer initializer = new CustomSessionInitializer();
ConnectFuture future = connector.connect(address, initializer);Session management in MINA Core provides comprehensive control over connection lifecycle, configuration, attributes, and statistics, enabling robust and scalable network applications.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-mina--mina-core