or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdbuffer-management.mdfilter-chain.mdindex.mdprotocol-codecs.mdservice-abstractions.mdsession-management.mdtransport-layer.md

service-abstractions.mddocs/

0

# Service Abstractions

1

2

MINA Core provides unified service abstractions for both server (acceptor) and client (connector) applications through the `IoService`, `IoAcceptor`, and `IoConnector` interfaces. These abstractions provide consistent APIs across different transport types.

3

4

## Core Service Hierarchy

5

6

### TransportMetadata

7

8

Interface providing metadata about transport implementations:

9

10

```java { .api }

11

public interface TransportMetadata {

12

// Provider information

13

String getProviderName(); // e.g., "nio", "apr", "serial"

14

String getName(); // Transport name

15

16

// Connection model

17

ConnectionModel getConnectionModel(); // CONNECTION or CONNECTIONLESS

18

boolean isConnectionless();

19

20

// Address and session types

21

Class<? extends SocketAddress> getAddressType();

22

Class<? extends IoSession> getSessionType();

23

Class<? extends IoSessionConfig> getSessionConfigType();

24

25

// Capabilities

26

boolean hasFragmentation(); // Can messages be fragmented

27

Set<Class<? extends Object>> getEnvelopeTypes(); // Allowed message types

28

}

29

```

30

31

### ConnectionModel

32

33

Enumeration of connection models:

34

35

```java { .api }

36

public enum ConnectionModel {

37

CONNECTION, // Connection-oriented (TCP)

38

CONNECTIONLESS // Connectionless (UDP)

39

}

40

```

41

42

### IoService

43

44

The base interface for all MINA services:

45

46

```java { .api }

47

public interface IoService {

48

// Service metadata and state

49

TransportMetadata getTransportMetadata();

50

boolean isActive();

51

boolean isDisposing();

52

boolean isDisposed();

53

long getActivationTime();

54

55

// Service lifecycle

56

void dispose();

57

void dispose(boolean awaitTermination);

58

59

// Handler management

60

IoHandler getHandler();

61

void setHandler(IoHandler handler);

62

63

// Session management

64

Map<Long, IoSession> getManagedSessions();

65

int getManagedSessionCount();

66

IoSessionConfig getSessionConfig();

67

Set<WriteFuture> broadcast(Object message);

68

69

// Filter chain management

70

IoFilterChainBuilder getFilterChainBuilder();

71

void setFilterChainBuilder(IoFilterChainBuilder builder);

72

DefaultIoFilterChainBuilder getFilterChain();

73

74

// Data structure factory

75

IoSessionDataStructureFactory getSessionDataStructureFactory();

76

void setSessionDataStructureFactory(IoSessionDataStructureFactory factory);

77

78

// Statistics and monitoring

79

IoServiceStatistics getStatistics();

80

int getScheduledWriteBytes();

81

int getScheduledWriteMessages();

82

83

// Event listeners

84

void addListener(IoServiceListener listener);

85

void removeListener(IoServiceListener listener);

86

}

87

```

88

89

### IoAcceptor

90

91

Interface for server-side services that accept incoming connections:

92

93

```java { .api }

94

public interface IoAcceptor extends IoService {

95

// Binding and unbinding

96

void bind(SocketAddress localAddress) throws IOException;

97

void bind(SocketAddress... localAddresses) throws IOException;

98

void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException;

99

100

void unbind();

101

void unbind(SocketAddress localAddress);

102

void unbind(SocketAddress... localAddresses);

103

void unbind(Iterable<? extends SocketAddress> localAddresses);

104

105

// Local addresses

106

Set<SocketAddress> getLocalAddresses();

107

SocketAddress getLocalAddress();

108

SocketAddress getDefaultLocalAddress();

109

void setDefaultLocalAddress(SocketAddress localAddress);

110

List<SocketAddress> getDefaultLocalAddresses();

111

void setDefaultLocalAddresses(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses);

112

void setDefaultLocalAddresses(List<? extends SocketAddress> localAddresses);

113

114

// Additional binding methods

115

void bind() throws IOException;

116

void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException;

117

118

// Additional unbinding methods

119

void unbind(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses);

120

121

// Close behavior configuration

122

boolean isCloseOnDeactivation();

123

void setCloseOnDeactivation(boolean closeOnDeactivation);

124

125

// Session creation for connectionless transports

126

IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress);

127

}

128

```

129

130

### IoConnector

131

132

Interface for client-side services that establish outgoing connections:

133

134

```java { .api }

135

public interface IoConnector extends IoService {

136

// Connection operations

137

ConnectFuture connect(SocketAddress remoteAddress);

138

ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);

139

ConnectFuture connect(SocketAddress remoteAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

140

ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

141

142

// Remote address configuration

143

SocketAddress getDefaultRemoteAddress();

144

void setDefaultRemoteAddress(SocketAddress defaultRemoteAddress);

145

146

// Local address configuration

147

SocketAddress getDefaultLocalAddress();

148

void setDefaultLocalAddress(SocketAddress defaultLocalAddress);

149

150

// Additional connection methods

151

ConnectFuture connect();

152

ConnectFuture connect(IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

153

154

// Connection timeout

155

long getConnectTimeoutMillis();

156

void setConnectTimeoutMillis(long connectTimeoutMillis);

157

158

// Deprecated timeout methods (marked for removal)

159

@Deprecated

160

int getConnectTimeout();

161

@Deprecated

162

void setConnectTimeout(int connectTimeout);

163

}

164

```

165

166

### IoHandler Interface

167

168

Complete handler interface for processing I/O events:

169

170

```java { .api }

171

public interface IoHandler {

172

// Session lifecycle events

173

void sessionCreated(IoSession session) throws Exception;

174

void sessionOpened(IoSession session) throws Exception;

175

void sessionClosed(IoSession session) throws Exception;

176

void sessionIdle(IoSession session, IdleStatus status) throws Exception;

177

178

// Message handling

179

void messageReceived(IoSession session, Object message) throws Exception;

180

void messageSent(IoSession session, Object message) throws Exception;

181

182

// Input/Output events

183

void inputClosed(IoSession session) throws Exception; // Half-duplex channel closure

184

185

// Error handling

186

void exceptionCaught(IoSession session, Throwable cause) throws Exception;

187

188

// Filter events

189

void event(IoSession session, FilterEvent event) throws Exception;

190

}

191

```

192

193

## Service Implementations

194

195

### TCP Socket Services

196

197

#### NioSocketAcceptor

198

199

TCP server implementation using NIO:

200

201

```java { .api }

202

// Basic server setup

203

NioSocketAcceptor acceptor = new NioSocketAcceptor();

204

205

// Configure the acceptor

206

acceptor.setReuseAddress(true);

207

acceptor.setBacklog(100);

208

209

// Configure session settings

210

SocketSessionConfig config = acceptor.getSessionConfig();

211

config.setSendBufferSize(64 * 1024);

212

config.setReceiveBufferSize(64 * 1024);

213

config.setTcpNoDelay(true);

214

config.setKeepAlive(true);

215

216

// Set up filter chain

217

acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));

218

acceptor.getFilterChain().addLast("logger", new LoggingFilter());

219

220

// Set handler

221

acceptor.setHandler(new ServerHandler());

222

223

// Bind to multiple addresses

224

acceptor.bind(new InetSocketAddress(8080));

225

acceptor.bind(new InetSocketAddress(8443));

226

227

// Check bound addresses

228

Set<SocketAddress> addresses = acceptor.getLocalAddresses();

229

System.out.println("Server listening on: " + addresses);

230

```

231

232

#### NioSocketConnector

233

234

TCP client implementation using NIO:

235

236

```java { .api }

237

// Basic client setup

238

NioSocketConnector connector = new NioSocketConnector();

239

240

// Configure connection timeout

241

connector.setConnectTimeoutMillis(30000); // 30 seconds

242

connector.setConnectTimeoutCheckInterval(1000); // Check every second

243

244

// Configure session settings

245

SocketSessionConfig config = connector.getSessionConfig();

246

config.setSendBufferSize(32 * 1024);

247

config.setReceiveBufferSize(32 * 1024);

248

config.setTcpNoDelay(true);

249

250

// Set up filter chain

251

connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(codecFactory));

252

connector.getFilterChain().addLast("logger", new LoggingFilter());

253

254

// Set handler

255

connector.setHandler(new ClientHandler());

256

257

// Connect to server

258

ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 8080));

259

future.awaitUninterruptibly();

260

261

if (future.isConnected()) {

262

IoSession session = future.getSession();

263

System.out.println("Connected to server: " + session.getRemoteAddress());

264

} else {

265

System.err.println("Connection failed: " + future.getException());

266

}

267

```

268

269

### UDP Datagram Services

270

271

#### NioDatagramAcceptor

272

273

UDP server implementation:

274

275

```java { .api }

276

// UDP server setup

277

NioDatagramAcceptor acceptor = new NioDatagramAcceptor();

278

279

// Configure datagram-specific settings

280

DatagramSessionConfig config = acceptor.getSessionConfig();

281

config.setReceiveBufferSize(1024 * 64); // 64KB receive buffer

282

config.setSendBufferSize(1024 * 64); // 64KB send buffer

283

config.setReuseAddress(true);

284

config.setBroadcast(true); // Allow broadcast

285

286

// Set up filter chain for datagram processing

287

acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(datagramCodec));

288

acceptor.getFilterChain().addLast("logger", new LoggingFilter());

289

290

// Set handler for UDP messages

291

acceptor.setHandler(new DatagramHandler());

292

293

// Bind to UDP port

294

acceptor.bind(new InetSocketAddress(5000));

295

System.out.println("UDP server listening on port 5000");

296

```

297

298

#### NioDatagramConnector

299

300

UDP client implementation:

301

302

```java { .api }

303

// UDP client setup

304

NioDatagramConnector connector = new NioDatagramConnector();

305

306

// Configure datagram settings

307

DatagramSessionConfig config = connector.getSessionConfig();

308

config.setSendBufferSize(1024 * 32); // 32KB send buffer

309

config.setReceiveBufferSize(1024 * 32); // 32KB receive buffer

310

config.setBroadcast(true); // Allow broadcast

311

312

// Set up filter chain

313

connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(datagramCodec));

314

315

// Set handler

316

connector.setHandler(new DatagramClientHandler());

317

318

// Connect to UDP server

319

ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 5000));

320

IoSession session = future.awaitUninterruptibly().getSession();

321

322

// Send UDP message

323

session.write("Hello UDP Server!");

324

```

325

326

### VM Pipe Services (In-Memory)

327

328

#### VmPipeAcceptor

329

330

In-memory server for same-JVM communication:

331

332

```java { .api }

333

// VM Pipe server setup

334

VmPipeAcceptor acceptor = new VmPipeAcceptor();

335

336

// Set up filter chain

337

acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));

338

acceptor.getFilterChain().addLast("logger", new LoggingFilter());

339

340

// Set handler

341

acceptor.setHandler(new VmPipeHandler());

342

343

// Bind to VM pipe address

344

VmPipeAddress address = new VmPipeAddress(12345);

345

acceptor.bind(address);

346

System.out.println("VM Pipe server listening on: " + address);

347

```

348

349

#### VmPipeConnector

350

351

In-memory client:

352

353

```java { .api }

354

// VM Pipe client setup

355

VmPipeConnector connector = new VmPipeConnector();

356

357

// Set up filter chain

358

connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));

359

360

// Set handler

361

connector.setHandler(new VmPipeClientHandler());

362

363

// Connect to VM pipe server

364

VmPipeAddress address = new VmPipeAddress(12345);

365

ConnectFuture future = connector.connect(address);

366

IoSession session = future.awaitUninterruptibly().getSession();

367

368

// Send object through VM pipe

369

session.write(new MyMessage("Hello VM Pipe!"));

370

```

371

372

## Service Configuration

373

374

### Multi-threaded Configuration

375

376

```java { .api }

377

// Configure acceptor with multiple I/O processors

378

public class MultiThreadedServer {

379

380

public void createOptimizedServer() {

381

// Create acceptor with 4 I/O processors

382

NioSocketAcceptor acceptor = new NioSocketAcceptor(4);

383

384

// Or create with custom processor pool

385

IoProcessor<NioSession> processor = new SimpleIoProcessorPool<>(

386

NioProcessor.class, 8); // 8 processors

387

NioSocketAcceptor customAcceptor = new NioSocketAcceptor(processor);

388

389

// Configure with custom executor

390

Executor executor = Executors.newCachedThreadPool();

391

NioSocketAcceptor executorAcceptor = new NioSocketAcceptor(executor, processor);

392

}

393

}

394

```

395

396

### Custom SelectorProvider

397

398

```java { .api }

399

// Using custom SelectorProvider for advanced NIO configuration

400

public void createServerWithCustomSelector() throws IOException {

401

// Create custom selector provider

402

SelectorProvider provider = SelectorProvider.provider();

403

404

// Create acceptor with custom provider

405

NioSocketAcceptor acceptor = new NioSocketAcceptor(4, provider);

406

407

// Configure and start server

408

acceptor.setHandler(new MyHandler());

409

acceptor.bind(new InetSocketAddress(8080));

410

}

411

```

412

413

## Service Lifecycle Management

414

415

### Service Listeners

416

417

```java { .api }

418

public class ServiceLifecycleListener implements IoServiceListener {

419

420

@Override

421

public void serviceActivated(IoService service) throws Exception {

422

System.out.println("Service activated: " + service.getClass().getSimpleName());

423

424

// Initialize service-specific resources

425

initializeServiceResources(service);

426

}

427

428

@Override

429

public void serviceDeactivated(IoService service) throws Exception {

430

System.out.println("Service deactivated: " + service.getClass().getSimpleName());

431

432

// Cleanup service resources

433

cleanupServiceResources(service);

434

}

435

436

@Override

437

public void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception {

438

System.out.println("Service idle: " + service.getClass().getSimpleName() +

439

" (" + idleStatus + ")");

440

}

441

442

@Override

443

public void sessionCreated(IoSession session) throws Exception {

444

System.out.println("Session created: " + session.getId());

445

}

446

447

@Override

448

public void sessionClosed(IoSession session) throws Exception {

449

System.out.println("Session closed: " + session.getId());

450

}

451

452

@Override

453

public void sessionDestroyed(IoSession session) throws Exception {

454

System.out.println("Session destroyed: " + session.getId());

455

}

456

}

457

458

// Using service listener

459

NioSocketAcceptor acceptor = new NioSocketAcceptor();

460

acceptor.addListener(new ServiceLifecycleListener());

461

```

462

463

### Graceful Service Shutdown

464

465

```java { .api }

466

public class GracefulShutdown {

467

468

public void shutdownServer(IoAcceptor acceptor) {

469

System.out.println("Initiating server shutdown...");

470

471

// Stop accepting new connections

472

acceptor.unbind();

473

474

// Wait for existing sessions to complete

475

while (acceptor.getManagedSessionCount() > 0) {

476

try {

477

Thread.sleep(1000);

478

System.out.println("Waiting for " + acceptor.getManagedSessionCount() +

479

" sessions to close...");

480

} catch (InterruptedException e) {

481

Thread.currentThread().interrupt();

482

break;

483

}

484

}

485

486

// Dispose the service

487

acceptor.dispose(true); // Wait for termination

488

System.out.println("Server shutdown complete");

489

}

490

491

public void forceShutdown(IoService service) {

492

// Close all sessions immediately

493

for (IoSession session : service.getManagedSessions().values()) {

494

session.closeNow();

495

}

496

497

// Dispose service without waiting

498

service.dispose(false);

499

}

500

}

501

```

502

503

## Service Statistics and Monitoring

504

505

### IoServiceStatistics Interface

506

507

Complete statistics interface for monitoring service performance:

508

509

```java { .api }

510

public interface IoServiceStatistics {

511

// Basic session statistics

512

long getCumulativeManagedSessionCount();

513

long getLargestManagedSessionCount();

514

long getLastIoTime();

515

516

// Message transfer statistics

517

long getReadBytes();

518

long getWrittenBytes();

519

long getReadMessages();

520

long getWrittenMessages();

521

522

// Throughput statistics (current)

523

double getReadBytesThroughput();

524

double getWrittenBytesThroughput();

525

double getReadMessagesThroughput();

526

double getWrittenMessagesThroughput();

527

528

// Peak throughput statistics

529

double getLargestReadBytesThroughput();

530

double getLargestWrittenBytesThroughput();

531

double getLargestReadMessagesThroughput();

532

double getLargestWrittenMessagesThroughput();

533

534

// Timing information

535

long getLastReadTime();

536

long getLastWriteTime();

537

538

// Throughput calculation configuration

539

int getThroughputCalculationInterval();

540

long getThroughputCalculationIntervalInMillis();

541

void setThroughputCalculationInterval(int throughputCalculationInterval);

542

}

543

```

544

545

### IoServiceStatistics.Config

546

547

Configuration class for enabling/disabling specific statistics calculations:

548

549

```java { .api }

550

public static class Config {

551

// Enable/disable specific statistics

552

void setThroughputStatisticsEnabled(boolean enabled);

553

boolean isThroughputStatisticsEnabled();

554

555

void setPeakThroughputStatisticsEnabled(boolean enabled);

556

boolean isPeakThroughputStatisticsEnabled();

557

558

void setTimingStatisticsEnabled(boolean enabled);

559

boolean isTimingStatisticsEnabled();

560

}

561

```

562

563

### Service Statistics Usage

564

565

```java { .api }

566

public class ServiceMonitor {

567

568

public void printServiceStatistics(IoService service) {

569

IoServiceStatistics stats = service.getStatistics();

570

571

System.out.println("=== Service Statistics ===");

572

System.out.println("Service Type: " + service.getClass().getSimpleName());

573

System.out.println("Transport: " + service.getTransportMetadata().getProviderName());

574

System.out.println("Active: " + service.isActive());

575

System.out.println("Activation Time: " + new Date(service.getActivationTime()));

576

577

// Session statistics

578

System.out.println("Current Sessions: " + service.getManagedSessionCount());

579

System.out.println("Total Sessions: " + stats.getCumulativeManagedSessionCount());

580

581

// Message statistics

582

System.out.println("Messages Read: " + stats.getReadMessages());

583

System.out.println("Messages Written: " + stats.getWrittenMessages());

584

System.out.println("Bytes Read: " + formatBytes(stats.getReadBytes()));

585

System.out.println("Bytes Written: " + formatBytes(stats.getWrittenBytes()));

586

587

// Throughput statistics

588

System.out.printf("Read Throughput: %.2f KB/s%n",

589

stats.getReadBytesThroughput() / 1024.0);

590

System.out.printf("Write Throughput: %.2f KB/s%n",

591

stats.getWrittenBytesThroughput() / 1024.0);

592

System.out.printf("Message Read Rate: %.2f msgs/s%n",

593

stats.getReadMessagesThroughput());

594

System.out.printf("Message Write Rate: %.2f msgs/s%n",

595

stats.getWrittenMessagesThroughput());

596

597

// Queue statistics

598

System.out.println("Scheduled Write Messages: " + service.getScheduledWriteMessages());

599

System.out.println("Scheduled Write Bytes: " + formatBytes(service.getScheduledWriteBytes()));

600

}

601

602

private String formatBytes(long bytes) {

603

if (bytes < 1024) return bytes + " B";

604

if (bytes < 1024 * 1024) return String.format("%.1f KB", bytes / 1024.0);

605

if (bytes < 1024 * 1024 * 1024) return String.format("%.1f MB", bytes / (1024.0 * 1024));

606

return String.format("%.1f GB", bytes / (1024.0 * 1024 * 1024));

607

}

608

}

609

```

610

611

### Session Broadcasting

612

613

```java { .api }

614

public class BroadcastingService {

615

616

public void broadcastMessage(IoService service, Object message) {

617

// Broadcast to all managed sessions

618

Set<WriteFuture> futures = service.broadcast(message);

619

620

// Wait for all broadcasts to complete

621

for (WriteFuture future : futures) {

622

future.awaitUninterruptibly();

623

if (!future.isWritten()) {

624

System.err.println("Broadcast failed: " + future.getException());

625

}

626

}

627

}

628

629

public void selectiveBroadcast(IoService service, Object message,

630

Predicate<IoSession> filter) {

631

// Broadcast to filtered sessions

632

for (IoSession session : service.getManagedSessions().values()) {

633

if (filter.test(session)) {

634

session.write(message);

635

}

636

}

637

}

638

639

// Broadcast to authenticated users only

640

public void broadcastToAuthenticatedUsers(IoService service, Object message) {

641

selectiveBroadcast(service, message, session ->

642

session.getAttribute("authenticated", false));

643

}

644

}

645

```

646

647

## Custom Service Implementation

648

649

### Creating Custom Service

650

651

```java { .api }

652

public class CustomProtocolAcceptor extends AbstractIoAcceptor {

653

654

public CustomProtocolAcceptor() {

655

super(new CustomSessionConfig(), CustomProcessor.class);

656

}

657

658

@Override

659

protected void init() throws Exception {

660

// Initialize custom transport

661

}

662

663

@Override

664

protected void destroy() throws Exception {

665

// Cleanup custom transport

666

}

667

668

@Override

669

public TransportMetadata getTransportMetadata() {

670

return CustomTransportMetadata.INSTANCE;

671

}

672

673

// Implement other abstract methods...

674

}

675

```

676

677

Service abstractions in MINA Core provide a unified and powerful foundation for building both server and client applications across different transport protocols, with comprehensive lifecycle management, statistics, and configuration options.