0
# Service Abstractions
1
2
MINA Core provides unified service abstractions for both server (acceptor) and client (connector) applications through the `IoService`, `IoAcceptor`, and `IoConnector` interfaces. These abstractions provide consistent APIs across different transport types.
3
4
## Core Service Hierarchy
5
6
### TransportMetadata
7
8
Interface providing metadata about transport implementations:
9
10
```java { .api }
11
public interface TransportMetadata {
12
// Provider information
13
String getProviderName(); // e.g., "nio", "apr", "serial"
14
String getName(); // Transport name
15
16
// Connection model
17
ConnectionModel getConnectionModel(); // CONNECTION or CONNECTIONLESS
18
boolean isConnectionless();
19
20
// Address and session types
21
Class<? extends SocketAddress> getAddressType();
22
Class<? extends IoSession> getSessionType();
23
Class<? extends IoSessionConfig> getSessionConfigType();
24
25
// Capabilities
26
boolean hasFragmentation(); // Can messages be fragmented
27
Set<Class<? extends Object>> getEnvelopeTypes(); // Allowed message types
28
}
29
```
30
31
### ConnectionModel
32
33
Enumeration of connection models:
34
35
```java { .api }
36
public enum ConnectionModel {
37
CONNECTION, // Connection-oriented (TCP)
38
CONNECTIONLESS // Connectionless (UDP)
39
}
40
```
41
42
### IoService
43
44
The base interface for all MINA services:
45
46
```java { .api }
47
public interface IoService {
48
// Service metadata and state
49
TransportMetadata getTransportMetadata();
50
boolean isActive();
51
boolean isDisposing();
52
boolean isDisposed();
53
long getActivationTime();
54
55
// Service lifecycle
56
void dispose();
57
void dispose(boolean awaitTermination);
58
59
// Handler management
60
IoHandler getHandler();
61
void setHandler(IoHandler handler);
62
63
// Session management
64
Map<Long, IoSession> getManagedSessions();
65
int getManagedSessionCount();
66
IoSessionConfig getSessionConfig();
67
Set<WriteFuture> broadcast(Object message);
68
69
// Filter chain management
70
IoFilterChainBuilder getFilterChainBuilder();
71
void setFilterChainBuilder(IoFilterChainBuilder builder);
72
DefaultIoFilterChainBuilder getFilterChain();
73
74
// Data structure factory
75
IoSessionDataStructureFactory getSessionDataStructureFactory();
76
void setSessionDataStructureFactory(IoSessionDataStructureFactory factory);
77
78
// Statistics and monitoring
79
IoServiceStatistics getStatistics();
80
int getScheduledWriteBytes();
81
int getScheduledWriteMessages();
82
83
// Event listeners
84
void addListener(IoServiceListener listener);
85
void removeListener(IoServiceListener listener);
86
}
87
```
88
89
### IoAcceptor
90
91
Interface for server-side services that accept incoming connections:
92
93
```java { .api }
94
public interface IoAcceptor extends IoService {
95
// Binding and unbinding
96
void bind(SocketAddress localAddress) throws IOException;
97
void bind(SocketAddress... localAddresses) throws IOException;
98
void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException;
99
100
void unbind();
101
void unbind(SocketAddress localAddress);
102
void unbind(SocketAddress... localAddresses);
103
void unbind(Iterable<? extends SocketAddress> localAddresses);
104
105
// Local addresses
106
Set<SocketAddress> getLocalAddresses();
107
SocketAddress getLocalAddress();
108
SocketAddress getDefaultLocalAddress();
109
void setDefaultLocalAddress(SocketAddress localAddress);
110
List<SocketAddress> getDefaultLocalAddresses();
111
void setDefaultLocalAddresses(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses);
112
void setDefaultLocalAddresses(List<? extends SocketAddress> localAddresses);
113
114
// Additional binding methods
115
void bind() throws IOException;
116
void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException;
117
118
// Additional unbinding methods
119
void unbind(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses);
120
121
// Close behavior configuration
122
boolean isCloseOnDeactivation();
123
void setCloseOnDeactivation(boolean closeOnDeactivation);
124
125
// Session creation for connectionless transports
126
IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress);
127
}
128
```
129
130
### IoConnector
131
132
Interface for client-side services that establish outgoing connections:
133
134
```java { .api }
135
public interface IoConnector extends IoService {
136
// Connection operations
137
ConnectFuture connect(SocketAddress remoteAddress);
138
ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);
139
ConnectFuture connect(SocketAddress remoteAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
140
ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
141
142
// Remote address configuration
143
SocketAddress getDefaultRemoteAddress();
144
void setDefaultRemoteAddress(SocketAddress defaultRemoteAddress);
145
146
// Local address configuration
147
SocketAddress getDefaultLocalAddress();
148
void setDefaultLocalAddress(SocketAddress defaultLocalAddress);
149
150
// Additional connection methods
151
ConnectFuture connect();
152
ConnectFuture connect(IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
153
154
// Connection timeout
155
long getConnectTimeoutMillis();
156
void setConnectTimeoutMillis(long connectTimeoutMillis);
157
158
// Deprecated timeout methods (marked for removal)
159
@Deprecated
160
int getConnectTimeout();
161
@Deprecated
162
void setConnectTimeout(int connectTimeout);
163
}
164
```
165
166
### IoHandler Interface
167
168
Complete handler interface for processing I/O events:
169
170
```java { .api }
171
public interface IoHandler {
172
// Session lifecycle events
173
void sessionCreated(IoSession session) throws Exception;
174
void sessionOpened(IoSession session) throws Exception;
175
void sessionClosed(IoSession session) throws Exception;
176
void sessionIdle(IoSession session, IdleStatus status) throws Exception;
177
178
// Message handling
179
void messageReceived(IoSession session, Object message) throws Exception;
180
void messageSent(IoSession session, Object message) throws Exception;
181
182
// Input/Output events
183
void inputClosed(IoSession session) throws Exception; // Half-duplex channel closure
184
185
// Error handling
186
void exceptionCaught(IoSession session, Throwable cause) throws Exception;
187
188
// Filter events
189
void event(IoSession session, FilterEvent event) throws Exception;
190
}
191
```
192
193
## Service Implementations
194
195
### TCP Socket Services
196
197
#### NioSocketAcceptor
198
199
TCP server implementation using NIO:
200
201
```java { .api }
202
// Basic server setup
203
NioSocketAcceptor acceptor = new NioSocketAcceptor();
204
205
// Configure the acceptor
206
acceptor.setReuseAddress(true);
207
acceptor.setBacklog(100);
208
209
// Configure session settings
210
SocketSessionConfig config = acceptor.getSessionConfig();
211
config.setSendBufferSize(64 * 1024);
212
config.setReceiveBufferSize(64 * 1024);
213
config.setTcpNoDelay(true);
214
config.setKeepAlive(true);
215
216
// Set up filter chain
217
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));
218
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
219
220
// Set handler
221
acceptor.setHandler(new ServerHandler());
222
223
// Bind to multiple addresses
224
acceptor.bind(new InetSocketAddress(8080));
225
acceptor.bind(new InetSocketAddress(8443));
226
227
// Check bound addresses
228
Set<SocketAddress> addresses = acceptor.getLocalAddresses();
229
System.out.println("Server listening on: " + addresses);
230
```
231
232
#### NioSocketConnector
233
234
TCP client implementation using NIO:
235
236
```java { .api }
237
// Basic client setup
238
NioSocketConnector connector = new NioSocketConnector();
239
240
// Configure connection timeout
241
connector.setConnectTimeoutMillis(30000); // 30 seconds
242
connector.setConnectTimeoutCheckInterval(1000); // Check every second
243
244
// Configure session settings
245
SocketSessionConfig config = connector.getSessionConfig();
246
config.setSendBufferSize(32 * 1024);
247
config.setReceiveBufferSize(32 * 1024);
248
config.setTcpNoDelay(true);
249
250
// Set up filter chain
251
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));
252
connector.getFilterChain().addLast("logger", new LoggingFilter());
253
254
// Set handler
255
connector.setHandler(new ClientHandler());
256
257
// Connect to server
258
ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 8080));
259
future.awaitUninterruptibly();
260
261
if (future.isConnected()) {
262
IoSession session = future.getSession();
263
System.out.println("Connected to server: " + session.getRemoteAddress());
264
} else {
265
System.err.println("Connection failed: " + future.getException());
266
}
267
```
268
269
### UDP Datagram Services
270
271
#### NioDatagramAcceptor
272
273
UDP server implementation:
274
275
```java { .api }
276
// UDP server setup
277
NioDatagramAcceptor acceptor = new NioDatagramAcceptor();
278
279
// Configure datagram-specific settings
280
DatagramSessionConfig config = acceptor.getSessionConfig();
281
config.setReceiveBufferSize(1024 * 64); // 64KB receive buffer
282
config.setSendBufferSize(1024 * 64); // 64KB send buffer
283
config.setReuseAddress(true);
284
config.setBroadcast(true); // Allow broadcast
285
286
// Set up filter chain for datagram processing
287
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(datagramCodec));
288
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
289
290
// Set handler for UDP messages
291
acceptor.setHandler(new DatagramHandler());
292
293
// Bind to UDP port
294
acceptor.bind(new InetSocketAddress(5000));
295
System.out.println("UDP server listening on port 5000");
296
```
297
298
#### NioDatagramConnector
299
300
UDP client implementation:
301
302
```java { .api }
303
// UDP client setup
304
NioDatagramConnector connector = new NioDatagramConnector();
305
306
// Configure datagram settings
307
DatagramSessionConfig config = connector.getSessionConfig();
308
config.setSendBufferSize(1024 * 32); // 32KB send buffer
309
config.setReceiveBufferSize(1024 * 32); // 32KB receive buffer
310
config.setBroadcast(true); // Allow broadcast
311
312
// Set up filter chain
313
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(datagramCodec));
314
315
// Set handler
316
connector.setHandler(new DatagramClientHandler());
317
318
// Connect to UDP server
319
ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 5000));
320
IoSession session = future.awaitUninterruptibly().getSession();
321
322
// Send UDP message
323
session.write("Hello UDP Server!");
324
```
325
326
### VM Pipe Services (In-Memory)
327
328
#### VmPipeAcceptor
329
330
In-memory server for same-JVM communication:
331
332
```java { .api }
333
// VM Pipe server setup
334
VmPipeAcceptor acceptor = new VmPipeAcceptor();
335
336
// Set up filter chain
337
acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
338
acceptor.getFilterChain().addLast("logger", new LoggingFilter());
339
340
// Set handler
341
acceptor.setHandler(new VmPipeHandler());
342
343
// Bind to VM pipe address
344
VmPipeAddress address = new VmPipeAddress(12345);
345
acceptor.bind(address);
346
System.out.println("VM Pipe server listening on: " + address);
347
```
348
349
#### VmPipeConnector
350
351
In-memory client:
352
353
```java { .api }
354
// VM Pipe client setup
355
VmPipeConnector connector = new VmPipeConnector();
356
357
// Set up filter chain
358
connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
359
360
// Set handler
361
connector.setHandler(new VmPipeClientHandler());
362
363
// Connect to VM pipe server
364
VmPipeAddress address = new VmPipeAddress(12345);
365
ConnectFuture future = connector.connect(address);
366
IoSession session = future.awaitUninterruptibly().getSession();
367
368
// Send object through VM pipe
369
session.write(new MyMessage("Hello VM Pipe!"));
370
```
371
372
## Service Configuration
373
374
### Multi-threaded Configuration
375
376
```java { .api }
377
// Configure acceptor with multiple I/O processors
378
public class MultiThreadedServer {
379
380
public void createOptimizedServer() {
381
// Create acceptor with 4 I/O processors
382
NioSocketAcceptor acceptor = new NioSocketAcceptor(4);
383
384
// Or create with custom processor pool
385
IoProcessor<NioSession> processor = new SimpleIoProcessorPool<>(
386
NioProcessor.class, 8); // 8 processors
387
NioSocketAcceptor customAcceptor = new NioSocketAcceptor(processor);
388
389
// Configure with custom executor
390
Executor executor = Executors.newCachedThreadPool();
391
NioSocketAcceptor executorAcceptor = new NioSocketAcceptor(executor, processor);
392
}
393
}
394
```
395
396
### Custom SelectorProvider
397
398
```java { .api }
399
// Using custom SelectorProvider for advanced NIO configuration
400
public void createServerWithCustomSelector() throws IOException {
401
// Create custom selector provider
402
SelectorProvider provider = SelectorProvider.provider();
403
404
// Create acceptor with custom provider
405
NioSocketAcceptor acceptor = new NioSocketAcceptor(4, provider);
406
407
// Configure and start server
408
acceptor.setHandler(new MyHandler());
409
acceptor.bind(new InetSocketAddress(8080));
410
}
411
```
412
413
## Service Lifecycle Management
414
415
### Service Listeners
416
417
```java { .api }
418
public class ServiceLifecycleListener implements IoServiceListener {
419
420
@Override
421
public void serviceActivated(IoService service) throws Exception {
422
System.out.println("Service activated: " + service.getClass().getSimpleName());
423
424
// Initialize service-specific resources
425
initializeServiceResources(service);
426
}
427
428
@Override
429
public void serviceDeactivated(IoService service) throws Exception {
430
System.out.println("Service deactivated: " + service.getClass().getSimpleName());
431
432
// Cleanup service resources
433
cleanupServiceResources(service);
434
}
435
436
@Override
437
public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception {
438
System.out.println("Service idle: " + service.getClass().getSimpleName() +
439
" (" + idleStatus + ")");
440
}
441
442
@Override
443
public void sessionCreated(IoSession session) throws Exception {
444
System.out.println("Session created: " + session.getId());
445
}
446
447
@Override
448
public void sessionClosed(IoSession session) throws Exception {
449
System.out.println("Session closed: " + session.getId());
450
}
451
452
@Override
453
public void sessionDestroyed(IoSession session) throws Exception {
454
System.out.println("Session destroyed: " + session.getId());
455
}
456
}
457
458
// Using service listener
459
NioSocketAcceptor acceptor = new NioSocketAcceptor();
460
acceptor.addListener(new ServiceLifecycleListener());
461
```
462
463
### Graceful Service Shutdown
464
465
```java { .api }
466
public class GracefulShutdown {
467
468
public void shutdownServer(IoAcceptor acceptor) {
469
System.out.println("Initiating server shutdown...");
470
471
// Stop accepting new connections
472
acceptor.unbind();
473
474
// Wait for existing sessions to complete
475
while (acceptor.getManagedSessionCount() > 0) {
476
try {
477
Thread.sleep(1000);
478
System.out.println("Waiting for " + acceptor.getManagedSessionCount() +
479
" sessions to close...");
480
} catch (InterruptedException e) {
481
Thread.currentThread().interrupt();
482
break;
483
}
484
}
485
486
// Dispose the service
487
acceptor.dispose(true); // Wait for termination
488
System.out.println("Server shutdown complete");
489
}
490
491
public void forceShutdown(IoService service) {
492
// Close all sessions immediately
493
for (IoSession session : service.getManagedSessions().values()) {
494
session.closeNow();
495
}
496
497
// Dispose service without waiting
498
service.dispose(false);
499
}
500
}
501
```
502
503
## Service Statistics and Monitoring
504
505
### IoServiceStatistics Interface
506
507
Complete statistics interface for monitoring service performance:
508
509
```java { .api }
510
public interface IoServiceStatistics {
511
// Basic session statistics
512
long getCumulativeManagedSessionCount();
513
long getLargestManagedSessionCount();
514
long getLastIoTime();
515
516
// Message transfer statistics
517
long getReadBytes();
518
long getWrittenBytes();
519
long getReadMessages();
520
long getWrittenMessages();
521
522
// Throughput statistics (current)
523
double getReadBytesThroughput();
524
double getWrittenBytesThroughput();
525
double getReadMessagesThroughput();
526
double getWrittenMessagesThroughput();
527
528
// Peak throughput statistics
529
double getLargestReadBytesThroughput();
530
double getLargestWrittenBytesThroughput();
531
double getLargestReadMessagesThroughput();
532
double getLargestWrittenMessagesThroughput();
533
534
// Timing information
535
long getLastReadTime();
536
long getLastWriteTime();
537
538
// Throughput calculation configuration
539
int getThroughputCalculationInterval();
540
long getThroughputCalculationIntervalInMillis();
541
void setThroughputCalculationInterval(int throughputCalculationInterval);
542
}
543
```
544
545
### IoServiceStatistics.Config
546
547
Configuration class for enabling/disabling specific statistics calculations:
548
549
```java { .api }
550
public static class Config {
551
// Enable/disable specific statistics
552
void setThroughputStatisticsEnabled(boolean enabled);
553
boolean isThroughputStatisticsEnabled();
554
555
void setPeakThroughputStatisticsEnabled(boolean enabled);
556
boolean isPeakThroughputStatisticsEnabled();
557
558
void setTimingStatisticsEnabled(boolean enabled);
559
boolean isTimingStatisticsEnabled();
560
}
561
```
562
563
### Service Statistics Usage
564
565
```java { .api }
566
public class ServiceMonitor {
567
568
public void printServiceStatistics(IoService service) {
569
IoServiceStatistics stats = service.getStatistics();
570
571
System.out.println("=== Service Statistics ===");
572
System.out.println("Service Type: " + service.getClass().getSimpleName());
573
System.out.println("Transport: " + service.getTransportMetadata().getProviderName());
574
System.out.println("Active: " + service.isActive());
575
System.out.println("Activation Time: " + new Date(service.getActivationTime()));
576
577
// Session statistics
578
System.out.println("Current Sessions: " + service.getManagedSessionCount());
579
System.out.println("Total Sessions: " + stats.getCumulativeManagedSessionCount());
580
581
// Message statistics
582
System.out.println("Messages Read: " + stats.getReadMessages());
583
System.out.println("Messages Written: " + stats.getWrittenMessages());
584
System.out.println("Bytes Read: " + formatBytes(stats.getReadBytes()));
585
System.out.println("Bytes Written: " + formatBytes(stats.getWrittenBytes()));
586
587
// Throughput statistics
588
System.out.printf("Read Throughput: %.2f KB/s%n",
589
stats.getReadBytesThroughput() / 1024.0);
590
System.out.printf("Write Throughput: %.2f KB/s%n",
591
stats.getWrittenBytesThroughput() / 1024.0);
592
System.out.printf("Message Read Rate: %.2f msgs/s%n",
593
stats.getReadMessagesThroughput());
594
System.out.printf("Message Write Rate: %.2f msgs/s%n",
595
stats.getWrittenMessagesThroughput());
596
597
// Queue statistics
598
System.out.println("Scheduled Write Messages: " + service.getScheduledWriteMessages());
599
System.out.println("Scheduled Write Bytes: " + formatBytes(service.getScheduledWriteBytes()));
600
}
601
602
private String formatBytes(long bytes) {
603
if (bytes < 1024) return bytes + " B";
604
if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0);
605
if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / (1024.0 * 1024));
606
return String.format("%.1f GB", bytes / (1024.0 * 1024 * 1024));
607
}
608
}
609
```
610
611
### Session Broadcasting
612
613
```java { .api }
614
public class BroadcastingService {
615
616
public void broadcastMessage(IoService service, Object message) {
617
// Broadcast to all managed sessions
618
Set<WriteFuture> futures = service.broadcast(message);
619
620
// Wait for all broadcasts to complete
621
for (WriteFuture future : futures) {
622
future.awaitUninterruptibly();
623
if (!future.isWritten()) {
624
System.err.println("Broadcast failed: " + future.getException());
625
}
626
}
627
}
628
629
public void selectiveBroadcast(IoService service, Object message,
630
Predicate<IoSession> filter) {
631
// Broadcast to filtered sessions
632
for (IoSession session : service.getManagedSessions().values()) {
633
if (filter.test(session)) {
634
session.write(message);
635
}
636
}
637
}
638
639
// Broadcast to authenticated users only
640
public void broadcastToAuthenticatedUsers(IoService service, Object message) {
641
selectiveBroadcast(service, message, session ->
642
session.getAttribute("authenticated", false));
643
}
644
}
645
```
646
647
## Custom Service Implementation
648
649
### Creating Custom Service
650
651
```java { .api }
652
public class CustomProtocolAcceptor extends AbstractIoAcceptor {
653
654
public CustomProtocolAcceptor() {
655
super(new CustomSessionConfig(), CustomProcessor.class);
656
}
657
658
@Override
659
protected void init() throws Exception {
660
// Initialize custom transport
661
}
662
663
@Override
664
protected void destroy() throws Exception {
665
// Cleanup custom transport
666
}
667
668
@Override
669
public TransportMetadata getTransportMetadata() {
670
return CustomTransportMetadata.INSTANCE;
671
}
672
673
// Implement other abstract methods...
674
}
675
```
676
677
Service abstractions in MINA Core provide a unified and powerful foundation for building both server and client applications across different transport protocols, with comprehensive lifecycle management, statistics, and configuration options.