0
# Persistence and Storage
1
2
ActiveMQ provides multiple persistence adapters for reliable message storage including high-performance KahaDB, JDBC database storage, and in-memory options for different deployment scenarios.
3
4
## Capabilities
5
6
### Core Persistence Interfaces
7
8
Base interfaces defining the persistence contract for message storage.
9
10
```java { .api }
11
/**
12
* Main persistence adapter interface for message storage
13
*/
14
public interface PersistenceAdapter extends Service {
15
/** Create message store for queue destinations */
16
MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
17
18
/** Create message store for topic destinations */
19
TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
20
21
/** Create transaction store for XA transactions */
22
TransactionStore createTransactionStore() throws IOException;
23
24
/** Get all known destinations */
25
Set<ActiveMQDestination> getDestinations();
26
27
/** Remove destination and its messages */
28
void removeQueueMessageStore(ActiveMQQueue destination);
29
void removeTopicMessageStore(ActiveMQTopic destination);
30
31
/** Get adapter size information */
32
long size();
33
34
/** Delete all messages */
35
void deleteAllMessages() throws IOException;
36
37
/** Checkpoint persistence state */
38
void checkpoint(boolean sync) throws IOException;
39
40
/** Set broker service reference */
41
void setBrokerService(BrokerService brokerService);
42
BrokerService getBrokerService();
43
44
/** Set usage manager for resource limits */
45
void setUsageManager(SystemUsage usageManager);
46
SystemUsage getUsageManager();
47
48
/** Directory management */
49
void setDirectory(File dir);
50
File getDirectory();
51
}
52
53
/**
54
* Message store interface for destination-specific storage
55
*/
56
public interface MessageStore {
57
/** Add message to store */
58
void addMessage(ConnectionContext context, Message message) throws IOException;
59
60
/** Remove message from store */
61
void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
62
63
/** Remove all messages from store */
64
void removeAllMessages(ConnectionContext context) throws IOException;
65
66
/** Recover messages from store */
67
void recover(MessageRecoveryListener listener) throws Exception;
68
69
/** Get message count */
70
int getMessageCount() throws IOException;
71
72
/** Get message size */
73
long getMessageSize() throws IOException;
74
75
/** Start store */
76
void start() throws Exception;
77
78
/** Stop store */
79
void stop() throws Exception;
80
}
81
82
/**
83
* Topic-specific message store with subscription support
84
*/
85
public interface TopicMessageStore extends MessageStore {
86
/** Add subscription */
87
void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException;
88
89
/** Delete subscription */
90
void deleteSubscription(String clientId, String subscriptionName) throws IOException;
91
92
/** Get all subscriptions */
93
SubscriptionInfo[] getAllSubscriptions() throws IOException;
94
95
/** Recover subscription messages */
96
void recoverSubscription(String clientId, String subscriptionName,
97
MessageRecoveryListener listener) throws Exception;
98
99
/** Remove subscription messages */
100
void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
101
102
/** Get subscription message count */
103
int getMessageCount(String clientId, String subscriptionName) throws IOException;
104
}
105
106
/**
107
* Transaction store interface for XA transaction support
108
*/
109
public interface TransactionStore {
110
/** Prepare transaction */
111
void prepare(TransactionId txid) throws IOException;
112
113
/** Commit transaction */
114
void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,
115
Runnable postCommit) throws IOException;
116
117
/** Rollback transaction */
118
void rollback(TransactionId txid) throws IOException;
119
120
/** Recover prepared transactions */
121
void recover(TransactionRecoveryListener listener) throws IOException;
122
}
123
```
124
125
### KahaDB Persistence Adapter
126
127
High-performance file-based persistence using indexed journals.
128
129
```java { .api }
130
/**
131
* KahaDB persistence adapter - high performance file-based storage
132
* Uses indexed journal files for optimal message throughput
133
*/
134
public class KahaDBPersistenceAdapter implements PersistenceAdapter, JournaledStore {
135
/** Create KahaDB adapter with default settings */
136
public KahaDBPersistenceAdapter();
137
138
/** Directory configuration */
139
public void setDirectory(File directory);
140
public File getDirectory();
141
142
/** Journal configuration */
143
public void setJournalMaxFileLength(int journalMaxFileLength);
144
public int getJournalMaxFileLength();
145
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize);
146
public int getJournalMaxWriteBatchSize();
147
public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs);
148
public boolean isEnableJournalDiskSyncs();
149
public void setCleanupInterval(long cleanupInterval);
150
public long getCleanupInterval();
151
152
/** Index configuration */
153
public void setIndexCacheSize(int indexCacheSize);
154
public int getIndexCacheSize();
155
public void setIndexWriteBatchSize(int indexWriteBatchSize);
156
public int getIndexWriteBatchSize();
157
public void setIndexEnablePageCaching(boolean indexEnablePageCaching);
158
public boolean isIndexEnablePageCaching();
159
160
/** Checkpoint configuration */
161
public void setCheckpointInterval(long checkpointInterval);
162
public long getCheckpointInterval();
163
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles);
164
public boolean isIgnoreMissingJournalfiles();
165
166
/** Performance tuning */
167
public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatchQueues);
168
public boolean isConcurrentStoreAndDispatchQueues();
169
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatchTopics);
170
public boolean isConcurrentStoreAndDispatchTopics();
171
172
/** Archive configuration */
173
public void setArchiveDataLogs(boolean archiveDataLogs);
174
public boolean isArchiveDataLogs();
175
public void setDirectoryArchive(File directoryArchive);
176
public File getDirectoryArchive();
177
178
/** Compression */
179
public void setCompressJournalStream(boolean compressJournalStream);
180
public boolean isCompressJournalStream();
181
182
/** Locking */
183
public void setLockKeepAlivePeriod(long lockKeepAlivePeriod);
184
public long getLockKeepAlivePeriod();
185
186
/** Start adapter */
187
public void start() throws Exception;
188
189
/** Stop adapter */
190
public void stop() throws Exception;
191
192
/** Get storage size */
193
public long size();
194
195
/** Force checkpoint */
196
public void checkpoint(boolean sync) throws IOException;
197
198
/** Delete all messages */
199
public void deleteAllMessages() throws IOException;
200
}
201
202
/**
203
* Multi-KahaDB adapter for partitioned storage
204
*/
205
public class MultiKahaDBPersistenceAdapter implements PersistenceAdapter {
206
/** Set filtered persistence adapters */
207
public void setFilteredPersistenceAdapters(List<FilteredKahaDBPersistenceAdapter> adapters);
208
public List<FilteredKahaDBPersistenceAdapter> getFilteredPersistenceAdapters();
209
210
/** Add filtered adapter */
211
public FilteredKahaDBPersistenceAdapter addFilteredKahaDBPersistenceAdapter(
212
String queue, String topic, PersistenceAdapter adapter);
213
}
214
215
/**
216
* Filtered KahaDB adapter for destination-specific storage
217
*/
218
public class FilteredKahaDBPersistenceAdapter implements PersistenceAdapter {
219
/** Set destination filters */
220
public void setPerDestination(boolean perDestination);
221
public boolean isPerDestination();
222
public void setQueue(String queue);
223
public String getQueue();
224
public void setTopic(String topic);
225
public String getTopic();
226
}
227
```
228
229
**Usage Examples:**
230
231
```java
232
// Basic KahaDB configuration
233
KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
234
kahadb.setDirectory(new File("./activemq-data"));
235
kahadb.setJournalMaxFileLength(32 * 1024 * 1024); // 32MB journal files
236
kahadb.setIndexCacheSize(10000); // Cache 10K index pages
237
238
BrokerService broker = new BrokerService();
239
broker.setPersistenceAdapter(kahadb);
240
241
// High-performance configuration
242
KahaDBPersistenceAdapter highPerf = new KahaDBPersistenceAdapter();
243
highPerf.setDirectory(new File("/fast-ssd/activemq-data"));
244
highPerf.setJournalMaxFileLength(64 * 1024 * 1024); // Larger journals
245
highPerf.setJournalMaxWriteBatchSize(4 * 1024); // Batch writes
246
highPerf.setConcurrentStoreAndDispatchQueues(true); // Concurrent processing
247
highPerf.setIndexCacheSize(50000); // Larger index cache
248
highPerf.setCheckpointInterval(5000); // More frequent checkpoints
249
250
// Multi-KahaDB for partitioned storage
251
MultiKahaDBPersistenceAdapter multiKaha = new MultiKahaDBPersistenceAdapter();
252
multiKaha.addFilteredKahaDBPersistenceAdapter("orders.>", null,
253
createKahaDB("orders-storage"));
254
multiKaha.addFilteredKahaDBPersistenceAdapter("events.>", null,
255
createKahaDB("events-storage"));
256
```
257
258
### JDBC Persistence Adapter
259
260
Database-based persistence for enterprise environments.
261
262
```java { .api }
263
/**
264
* JDBC persistence adapter for database storage
265
*/
266
public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter {
267
/** DataSource configuration */
268
public void setDataSource(DataSource dataSource);
269
public DataSource getDataSource();
270
271
/** Database adapter configuration */
272
public void setAdapter(JDBCAdapter adapter);
273
public JDBCAdapter getAdapter();
274
275
/** Table configuration */
276
public void setStatements(Statements statements);
277
public Statements getStatements();
278
279
/** Transaction configuration */
280
public void setTransactionIsolation(int transactionIsolation);
281
public int getTransactionIsolation();
282
283
/** Locking configuration */
284
public void setLockKeepAlivePeriod(long lockKeepAlivePeriod);
285
public long getLockKeepAlivePeriod();
286
public void setLockAcquireSleepInterval(long lockAcquireSleepInterval);
287
public long getLockAcquireSleepInterval();
288
289
/** Cleanup configuration */
290
public void setCleanupPeriod(long cleanupPeriod);
291
public long getCleanupPeriod();
292
293
/** Performance tuning */
294
public void setUseLock(boolean useLock);
295
public boolean isUseLock();
296
public void setCreateTablesOnStartup(boolean createTablesOnStartup);
297
public boolean isCreateTablesOnStartup();
298
}
299
300
/**
301
* Database-specific JDBC adapters
302
*/
303
public interface JDBCAdapter {
304
/** Set SQL statements */
305
void setStatements(Statements statements);
306
307
/** Initialize adapter */
308
void doCreateTables(TransactionContext c) throws SQLException, IOException;
309
310
/** Clean up old messages */
311
void doDropTables(TransactionContext c) throws SQLException, IOException;
312
313
/** Get lock statement */
314
String getLimitStatement();
315
}
316
317
/**
318
* Oracle JDBC adapter with BLOB support
319
*/
320
public class OracleJDBCAdapter extends DefaultJDBCAdapter {
321
/** Configure for Oracle-specific features */
322
public void setUseExternalMessageReferences(boolean useExternalMessageReferences);
323
public boolean isUseExternalMessageReferences();
324
}
325
326
/**
327
* SQL Server JDBC adapter
328
*/
329
public class SqlServerJDBCAdapter extends DefaultJDBCAdapter {
330
/** SQL Server specific optimizations */
331
}
332
```
333
334
**Usage Examples:**
335
336
```java
337
// Basic JDBC configuration with MySQL
338
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
339
340
// Configure DataSource (using connection pool)
341
BasicDataSource dataSource = new BasicDataSource();
342
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
343
dataSource.setUrl("jdbc:mysql://localhost/activemq");
344
dataSource.setUsername("activemq");
345
dataSource.setPassword("password");
346
dataSource.setMaxActive(20);
347
jdbc.setDataSource(dataSource);
348
349
// Configure for MySQL
350
jdbc.setAdapter(new MySqlJDBCAdapter());
351
jdbc.setCreateTablesOnStartup(true);
352
353
// Oracle configuration with BLOB support
354
JDBCPersistenceAdapter oracleJdbc = new JDBCPersistenceAdapter();
355
OracleJDBCAdapter oracleAdapter = new OracleJDBCAdapter();
356
oracleAdapter.setUseExternalMessageReferences(true);
357
oracleJdbc.setAdapter(oracleAdapter);
358
```
359
360
### Memory Persistence Adapter
361
362
In-memory storage for testing and temporary brokers.
363
364
```java { .api }
365
/**
366
* Memory-based persistence adapter
367
* Stores all messages in memory - no durability across restarts
368
*/
369
public class MemoryPersistenceAdapter implements PersistenceAdapter {
370
/** Create memory adapter */
371
public MemoryPersistenceAdapter();
372
373
/** Start adapter */
374
public void start() throws Exception;
375
376
/** Stop adapter */
377
public void stop() throws Exception;
378
379
/** Create message stores */
380
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
381
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
382
public TransactionStore createTransactionStore() throws IOException;
383
384
/** Get destinations */
385
public Set<ActiveMQDestination> getDestinations();
386
387
/** Memory usage */
388
public long size();
389
390
/** Clear all messages */
391
public void deleteAllMessages() throws IOException;
392
}
393
394
/**
395
* Memory-based message store
396
*/
397
public class MemoryMessageStore implements MessageStore {
398
/** Memory store operations */
399
public void addMessage(ConnectionContext context, Message message) throws IOException;
400
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
401
public void removeAllMessages(ConnectionContext context) throws IOException;
402
public void recover(MessageRecoveryListener listener) throws Exception;
403
404
/** Get statistics */
405
public int getMessageCount() throws IOException;
406
public long getMessageSize() throws IOException;
407
}
408
```
409
410
**Usage Examples:**
411
412
```java
413
// Memory persistence for testing
414
BrokerService testBroker = new BrokerService();
415
testBroker.setPersistenceAdapter(new MemoryPersistenceAdapter());
416
testBroker.setPersistent(false);
417
418
// Useful for unit tests and temporary brokers
419
BrokerService tempBroker = new BrokerService();
420
tempBroker.setBrokerName("temp-broker");
421
tempBroker.setPersistent(false);
422
tempBroker.addConnector("vm://temp-broker");
423
```
424
425
## Exception Handling
426
427
```java { .api }
428
public class PersistenceAdapterException extends IOException {
429
public PersistenceAdapterException(String message);
430
public PersistenceAdapterException(String message, Throwable cause);
431
}
432
433
public class RecoverableException extends IOException {
434
public RecoverableException(String message);
435
public RecoverableException(String message, Throwable cause);
436
}
437
```
438
439
## Types
440
441
```java { .api }
442
/**
443
* Message recovery listener interface
444
*/
445
public interface MessageRecoveryListener {
446
/** Recover message */
447
boolean recoverMessage(Message message) throws Exception;
448
449
/** Recover message reference */
450
boolean recoverMessageReference(MessageId messageReference) throws Exception;
451
452
/** Check if recovery should continue */
453
boolean hasSpace();
454
}
455
456
/**
457
* Transaction recovery listener interface
458
*/
459
public interface TransactionRecoveryListener {
460
/** Recover transaction */
461
void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks);
462
}
463
464
/**
465
* Subscription information for durable subscriptions
466
*/
467
public class SubscriptionInfo {
468
/** Subscription details */
469
public String getClientId();
470
public void setClientId(String clientId);
471
public String getSubscriptionName();
472
public void setSubscriptionName(String subscriptionName);
473
public ActiveMQDestination getDestination();
474
public void setDestination(ActiveMQDestination destination);
475
public String getSelector();
476
public void setSelector(String selector);
477
}
478
479
/**
480
* Journaled store interface for transaction log support
481
*/
482
public interface JournaledStore {
483
/** Get journal manager */
484
Journal getJournal();
485
486
/** Set journal manager */
487
void setJournal(Journal journal);
488
489
/** Force journal sync */
490
void checkpoint(boolean sync) throws IOException;
491
}
492
```