or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-pooling.mdembedded-broker.mdindex.mdjms-client.mdmanagement-monitoring.mdmessages-destinations.mdnetwork-clustering.mdpersistence-storage.mdsecurity.mdspring-integration.mdtransport-protocols.md

persistence-storage.mddocs/

0

# Persistence and Storage

1

2

ActiveMQ provides multiple persistence adapters for reliable message storage including high-performance KahaDB, JDBC database storage, and in-memory options for different deployment scenarios.

3

4

## Capabilities

5

6

### Core Persistence Interfaces

7

8

Base interfaces defining the persistence contract for message storage.

9

10

```java { .api }

11

/**

12

* Main persistence adapter interface for message storage

13

*/

14

public interface PersistenceAdapter extends Service {

15

/** Create message store for queue destinations */

16

MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;

17

18

/** Create message store for topic destinations */

19

TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;

20

21

/** Create transaction store for XA transactions */

22

TransactionStore createTransactionStore() throws IOException;

23

24

/** Get all known destinations */

25

Set<ActiveMQDestination> getDestinations();

26

27

/** Remove destination and its messages */

28

void removeQueueMessageStore(ActiveMQQueue destination);

29

void removeTopicMessageStore(ActiveMQTopic destination);

30

31

/** Get adapter size information */

32

long size();

33

34

/** Delete all messages */

35

void deleteAllMessages() throws IOException;

36

37

/** Checkpoint persistence state */

38

void checkpoint(boolean sync) throws IOException;

39

40

/** Set broker service reference */

41

void setBrokerService(BrokerService brokerService);

42

BrokerService getBrokerService();

43

44

/** Set usage manager for resource limits */

45

void setUsageManager(SystemUsage usageManager);

46

SystemUsage getUsageManager();

47

48

/** Directory management */

49

void setDirectory(File dir);

50

File getDirectory();

51

}

52

53

/**

54

* Message store interface for destination-specific storage

55

*/

56

public interface MessageStore {

57

/** Add message to store */

58

void addMessage(ConnectionContext context, Message message) throws IOException;

59

60

/** Remove message from store */

61

void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;

62

63

/** Remove all messages from store */

64

void removeAllMessages(ConnectionContext context) throws IOException;

65

66

/** Recover messages from store */

67

void recover(MessageRecoveryListener listener) throws Exception;

68

69

/** Get message count */

70

int getMessageCount() throws IOException;

71

72

/** Get message size */

73

long getMessageSize() throws IOException;

74

75

/** Start store */

76

void start() throws Exception;

77

78

/** Stop store */

79

void stop() throws Exception;

80

}

81

82

/**

83

* Topic-specific message store with subscription support

84

*/

85

public interface TopicMessageStore extends MessageStore {

86

/** Add subscription */

87

void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException;

88

89

/** Delete subscription */

90

void deleteSubscription(String clientId, String subscriptionName) throws IOException;

91

92

/** Get all subscriptions */

93

SubscriptionInfo[] getAllSubscriptions() throws IOException;

94

95

/** Recover subscription messages */

96

void recoverSubscription(String clientId, String subscriptionName,

97

MessageRecoveryListener listener) throws Exception;

98

99

/** Remove subscription messages */

100

void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;

101

102

/** Get subscription message count */

103

int getMessageCount(String clientId, String subscriptionName) throws IOException;

104

}

105

106

/**

107

* Transaction store interface for XA transaction support

108

*/

109

public interface TransactionStore {

110

/** Prepare transaction */

111

void prepare(TransactionId txid) throws IOException;

112

113

/** Commit transaction */

114

void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,

115

Runnable postCommit) throws IOException;

116

117

/** Rollback transaction */

118

void rollback(TransactionId txid) throws IOException;

119

120

/** Recover prepared transactions */

121

void recover(TransactionRecoveryListener listener) throws IOException;

122

}

123

```

124

125

### KahaDB Persistence Adapter

126

127

High-performance file-based persistence using indexed journals.

128

129

```java { .api }

130

/**

131

* KahaDB persistence adapter - high performance file-based storage

132

* Uses indexed journal files for optimal message throughput

133

*/

134

public class KahaDBPersistenceAdapter implements PersistenceAdapter, JournaledStore {

135

/** Create KahaDB adapter with default settings */

136

public KahaDBPersistenceAdapter();

137

138

/** Directory configuration */

139

public void setDirectory(File directory);

140

public File getDirectory();

141

142

/** Journal configuration */

143

public void setJournalMaxFileLength(int journalMaxFileLength);

144

public int getJournalMaxFileLength();

145

public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize);

146

public int getJournalMaxWriteBatchSize();

147

public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs);

148

public boolean isEnableJournalDiskSyncs();

149

public void setCleanupInterval(long cleanupInterval);

150

public long getCleanupInterval();

151

152

/** Index configuration */

153

public void setIndexCacheSize(int indexCacheSize);

154

public int getIndexCacheSize();

155

public void setIndexWriteBatchSize(int indexWriteBatchSize);

156

public int getIndexWriteBatchSize();

157

public void setIndexEnablePageCaching(boolean indexEnablePageCaching);

158

public boolean isIndexEnablePageCaching();

159

160

/** Checkpoint configuration */

161

public void setCheckpointInterval(long checkpointInterval);

162

public long getCheckpointInterval();

163

public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles);

164

public boolean isIgnoreMissingJournalfiles();

165

166

/** Performance tuning */

167

public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatchQueues);

168

public boolean isConcurrentStoreAndDispatchQueues();

169

public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatchTopics);

170

public boolean isConcurrentStoreAndDispatchTopics();

171

172

/** Archive configuration */

173

public void setArchiveDataLogs(boolean archiveDataLogs);

174

public boolean isArchiveDataLogs();

175

public void setDirectoryArchive(File directoryArchive);

176

public File getDirectoryArchive();

177

178

/** Compression */

179

public void setCompressJournalStream(boolean compressJournalStream);

180

public boolean isCompressJournalStream();

181

182

/** Locking */

183

public void setLockKeepAlivePeriod(long lockKeepAlivePeriod);

184

public long getLockKeepAlivePeriod();

185

186

/** Start adapter */

187

public void start() throws Exception;

188

189

/** Stop adapter */

190

public void stop() throws Exception;

191

192

/** Get storage size */

193

public long size();

194

195

/** Force checkpoint */

196

public void checkpoint(boolean sync) throws IOException;

197

198

/** Delete all messages */

199

public void deleteAllMessages() throws IOException;

200

}

201

202

/**

203

* Multi-KahaDB adapter for partitioned storage

204

*/

205

public class MultiKahaDBPersistenceAdapter implements PersistenceAdapter {

206

/** Set filtered persistence adapters */

207

public void setFilteredPersistenceAdapters(List<FilteredKahaDBPersistenceAdapter> adapters);

208

public List<FilteredKahaDBPersistenceAdapter> getFilteredPersistenceAdapters();

209

210

/** Add filtered adapter */

211

public FilteredKahaDBPersistenceAdapter addFilteredKahaDBPersistenceAdapter(

212

String queue, String topic, PersistenceAdapter adapter);

213

}

214

215

/**

216

* Filtered KahaDB adapter for destination-specific storage

217

*/

218

public class FilteredKahaDBPersistenceAdapter implements PersistenceAdapter {

219

/** Set destination filters */

220

public void setPerDestination(boolean perDestination);

221

public boolean isPerDestination();

222

public void setQueue(String queue);

223

public String getQueue();

224

public void setTopic(String topic);

225

public String getTopic();

226

}

227

```

228

229

**Usage Examples:**

230

231

```java

232

// Basic KahaDB configuration

233

KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();

234

kahadb.setDirectory(new File("./activemq-data"));

235

kahadb.setJournalMaxFileLength(32 * 1024 * 1024); // 32MB journal files

236

kahadb.setIndexCacheSize(10000); // Cache 10K index pages

237

238

BrokerService broker = new BrokerService();

239

broker.setPersistenceAdapter(kahadb);

240

241

// High-performance configuration

242

KahaDBPersistenceAdapter highPerf = new KahaDBPersistenceAdapter();

243

highPerf.setDirectory(new File("/fast-ssd/activemq-data"));

244

highPerf.setJournalMaxFileLength(64 * 1024 * 1024); // Larger journals

245

highPerf.setJournalMaxWriteBatchSize(4 * 1024); // Batch writes

246

highPerf.setConcurrentStoreAndDispatchQueues(true); // Concurrent processing

247

highPerf.setIndexCacheSize(50000); // Larger index cache

248

highPerf.setCheckpointInterval(5000); // More frequent checkpoints

249

250

// Multi-KahaDB for partitioned storage

251

MultiKahaDBPersistenceAdapter multiKaha = new MultiKahaDBPersistenceAdapter();

252

multiKaha.addFilteredKahaDBPersistenceAdapter("orders.>", null,

253

createKahaDB("orders-storage"));

254

multiKaha.addFilteredKahaDBPersistenceAdapter("events.>", null,

255

createKahaDB("events-storage"));

256

```

257

258

### JDBC Persistence Adapter

259

260

Database-based persistence for enterprise environments.

261

262

```java { .api }

263

/**

264

* JDBC persistence adapter for database storage

265

*/

266

public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter {

267

/** DataSource configuration */

268

public void setDataSource(DataSource dataSource);

269

public DataSource getDataSource();

270

271

/** Database adapter configuration */

272

public void setAdapter(JDBCAdapter adapter);

273

public JDBCAdapter getAdapter();

274

275

/** Table configuration */

276

public void setStatements(Statements statements);

277

public Statements getStatements();

278

279

/** Transaction configuration */

280

public void setTransactionIsolation(int transactionIsolation);

281

public int getTransactionIsolation();

282

283

/** Locking configuration */

284

public void setLockKeepAlivePeriod(long lockKeepAlivePeriod);

285

public long getLockKeepAlivePeriod();

286

public void setLockAcquireSleepInterval(long lockAcquireSleepInterval);

287

public long getLockAcquireSleepInterval();

288

289

/** Cleanup configuration */

290

public void setCleanupPeriod(long cleanupPeriod);

291

public long getCleanupPeriod();

292

293

/** Performance tuning */

294

public void setUseLock(boolean useLock);

295

public boolean isUseLock();

296

public void setCreateTablesOnStartup(boolean createTablesOnStartup);

297

public boolean isCreateTablesOnStartup();

298

}

299

300

/**

301

* Database-specific JDBC adapters

302

*/

303

public interface JDBCAdapter {

304

/** Set SQL statements */

305

void setStatements(Statements statements);

306

307

/** Initialize adapter */

308

void doCreateTables(TransactionContext c) throws SQLException, IOException;

309

310

/** Clean up old messages */

311

void doDropTables(TransactionContext c) throws SQLException, IOException;

312

313

/** Get lock statement */

314

String getLimitStatement();

315

}

316

317

/**

318

* Oracle JDBC adapter with BLOB support

319

*/

320

public class OracleJDBCAdapter extends DefaultJDBCAdapter {

321

/** Configure for Oracle-specific features */

322

public void setUseExternalMessageReferences(boolean useExternalMessageReferences);

323

public boolean isUseExternalMessageReferences();

324

}

325

326

/**

327

* SQL Server JDBC adapter

328

*/

329

public class SqlServerJDBCAdapter extends DefaultJDBCAdapter {

330

/** SQL Server specific optimizations */

331

}

332

```

333

334

**Usage Examples:**

335

336

```java

337

// Basic JDBC configuration with MySQL

338

JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();

339

340

// Configure DataSource (using connection pool)

341

BasicDataSource dataSource = new BasicDataSource();

342

dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");

343

dataSource.setUrl("jdbc:mysql://localhost/activemq");

344

dataSource.setUsername("activemq");

345

dataSource.setPassword("password");

346

dataSource.setMaxActive(20);

347

jdbc.setDataSource(dataSource);

348

349

// Configure for MySQL

350

jdbc.setAdapter(new MySqlJDBCAdapter());

351

jdbc.setCreateTablesOnStartup(true);

352

353

// Oracle configuration with BLOB support

354

JDBCPersistenceAdapter oracleJdbc = new JDBCPersistenceAdapter();

355

OracleJDBCAdapter oracleAdapter = new OracleJDBCAdapter();

356

oracleAdapter.setUseExternalMessageReferences(true);

357

oracleJdbc.setAdapter(oracleAdapter);

358

```

359

360

### Memory Persistence Adapter

361

362

In-memory storage for testing and temporary brokers.

363

364

```java { .api }

365

/**

366

* Memory-based persistence adapter

367

* Stores all messages in memory - no durability across restarts

368

*/

369

public class MemoryPersistenceAdapter implements PersistenceAdapter {

370

/** Create memory adapter */

371

public MemoryPersistenceAdapter();

372

373

/** Start adapter */

374

public void start() throws Exception;

375

376

/** Stop adapter */

377

public void stop() throws Exception;

378

379

/** Create message stores */

380

public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;

381

public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;

382

public TransactionStore createTransactionStore() throws IOException;

383

384

/** Get destinations */

385

public Set<ActiveMQDestination> getDestinations();

386

387

/** Memory usage */

388

public long size();

389

390

/** Clear all messages */

391

public void deleteAllMessages() throws IOException;

392

}

393

394

/**

395

* Memory-based message store

396

*/

397

public class MemoryMessageStore implements MessageStore {

398

/** Memory store operations */

399

public void addMessage(ConnectionContext context, Message message) throws IOException;

400

public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;

401

public void removeAllMessages(ConnectionContext context) throws IOException;

402

public void recover(MessageRecoveryListener listener) throws Exception;

403

404

/** Get statistics */

405

public int getMessageCount() throws IOException;

406

public long getMessageSize() throws IOException;

407

}

408

```

409

410

**Usage Examples:**

411

412

```java

413

// Memory persistence for testing

414

BrokerService testBroker = new BrokerService();

415

testBroker.setPersistenceAdapter(new MemoryPersistenceAdapter());

416

testBroker.setPersistent(false);

417

418

// Useful for unit tests and temporary brokers

419

BrokerService tempBroker = new BrokerService();

420

tempBroker.setBrokerName("temp-broker");

421

tempBroker.setPersistent(false);

422

tempBroker.addConnector("vm://temp-broker");

423

```

424

425

## Exception Handling

426

427

```java { .api }

428

public class PersistenceAdapterException extends IOException {

429

public PersistenceAdapterException(String message);

430

public PersistenceAdapterException(String message, Throwable cause);

431

}

432

433

public class RecoverableException extends IOException {

434

public RecoverableException(String message);

435

public RecoverableException(String message, Throwable cause);

436

}

437

```

438

439

## Types

440

441

```java { .api }

442

/**

443

* Message recovery listener interface

444

*/

445

public interface MessageRecoveryListener {

446

/** Recover message */

447

boolean recoverMessage(Message message) throws Exception;

448

449

/** Recover message reference */

450

boolean recoverMessageReference(MessageId messageReference) throws Exception;

451

452

/** Check if recovery should continue */

453

boolean hasSpace();

454

}

455

456

/**

457

* Transaction recovery listener interface

458

*/

459

public interface TransactionRecoveryListener {

460

/** Recover transaction */

461

void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks);

462

}

463

464

/**

465

* Subscription information for durable subscriptions

466

*/

467

public class SubscriptionInfo {

468

/** Subscription details */

469

public String getClientId();

470

public void setClientId(String clientId);

471

public String getSubscriptionName();

472

public void setSubscriptionName(String subscriptionName);

473

public ActiveMQDestination getDestination();

474

public void setDestination(ActiveMQDestination destination);

475

public String getSelector();

476

public void setSelector(String selector);

477

}

478

479

/**

480

* Journaled store interface for transaction log support

481

*/

482

public interface JournaledStore {

483

/** Get journal manager */

484

Journal getJournal();

485

486

/** Set journal manager */

487

void setJournal(Journal journal);

488

489

/** Force journal sync */

490

void checkpoint(boolean sync) throws IOException;

491

}

492

```