High performance Apache 2.0 licensed Message Broker supporting JMS, AMQP, MQTT, and HTTP protocols with comprehensive enterprise features including persistence, security, clustering, and Spring integration.
—
ActiveMQ provides multiple persistence adapters for reliable message storage including high-performance KahaDB, JDBC database storage, and in-memory options for different deployment scenarios.
Base interfaces defining the persistence contract for message storage.
/**
* Main persistence adapter interface for message storage
*/
public interface PersistenceAdapter extends Service {
/** Create message store for queue destinations */
MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
/** Create message store for topic destinations */
TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
/** Create transaction store for XA transactions */
TransactionStore createTransactionStore() throws IOException;
/** Get all known destinations */
Set<ActiveMQDestination> getDestinations();
/** Remove destination and its messages */
void removeQueueMessageStore(ActiveMQQueue destination);
void removeTopicMessageStore(ActiveMQTopic destination);
/** Get adapter size information */
long size();
/** Delete all messages */
void deleteAllMessages() throws IOException;
/** Checkpoint persistence state */
void checkpoint(boolean sync) throws IOException;
/** Set broker service reference */
void setBrokerService(BrokerService brokerService);
BrokerService getBrokerService();
/** Set usage manager for resource limits */
void setUsageManager(SystemUsage usageManager);
SystemUsage getUsageManager();
/** Directory management */
void setDirectory(File dir);
File getDirectory();
}
/**
* Message store interface for destination-specific storage
*/
public interface MessageStore {
/** Add message to store */
void addMessage(ConnectionContext context, Message message) throws IOException;
/** Remove message from store */
void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
/** Remove all messages from store */
void removeAllMessages(ConnectionContext context) throws IOException;
/** Recover messages from store */
void recover(MessageRecoveryListener listener) throws Exception;
/** Get message count */
int getMessageCount() throws IOException;
/** Get message size */
long getMessageSize() throws IOException;
/** Start store */
void start() throws Exception;
/** Stop store */
void stop() throws Exception;
}
/**
* Topic-specific message store with subscription support
*/
public interface TopicMessageStore extends MessageStore {
/** Add subscription */
void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException;
/** Delete subscription */
void deleteSubscription(String clientId, String subscriptionName) throws IOException;
/** Get all subscriptions */
SubscriptionInfo[] getAllSubscriptions() throws IOException;
/** Recover subscription messages */
void recoverSubscription(String clientId, String subscriptionName,
MessageRecoveryListener listener) throws Exception;
/** Remove subscription messages */
void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
/** Get subscription message count */
int getMessageCount(String clientId, String subscriptionName) throws IOException;
}
/**
* Transaction store interface for XA transaction support
*/
public interface TransactionStore {
/** Prepare transaction */
void prepare(TransactionId txid) throws IOException;
/** Commit transaction */
void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit,
Runnable postCommit) throws IOException;
/** Rollback transaction */
void rollback(TransactionId txid) throws IOException;
/** Recover prepared transactions */
void recover(TransactionRecoveryListener listener) throws IOException;
}High-performance file-based persistence using indexed journals.
/**
* KahaDB persistence adapter - high performance file-based storage
* Uses indexed journal files for optimal message throughput
*/
public class KahaDBPersistenceAdapter implements PersistenceAdapter, JournaledStore {
/** Create KahaDB adapter with default settings */
public KahaDBPersistenceAdapter();
/** Directory configuration */
public void setDirectory(File directory);
public File getDirectory();
/** Journal configuration */
public void setJournalMaxFileLength(int journalMaxFileLength);
public int getJournalMaxFileLength();
public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize);
public int getJournalMaxWriteBatchSize();
public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs);
public boolean isEnableJournalDiskSyncs();
public void setCleanupInterval(long cleanupInterval);
public long getCleanupInterval();
/** Index configuration */
public void setIndexCacheSize(int indexCacheSize);
public int getIndexCacheSize();
public void setIndexWriteBatchSize(int indexWriteBatchSize);
public int getIndexWriteBatchSize();
public void setIndexEnablePageCaching(boolean indexEnablePageCaching);
public boolean isIndexEnablePageCaching();
/** Checkpoint configuration */
public void setCheckpointInterval(long checkpointInterval);
public long getCheckpointInterval();
public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles);
public boolean isIgnoreMissingJournalfiles();
/** Performance tuning */
public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatchQueues);
public boolean isConcurrentStoreAndDispatchQueues();
public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatchTopics);
public boolean isConcurrentStoreAndDispatchTopics();
/** Archive configuration */
public void setArchiveDataLogs(boolean archiveDataLogs);
public boolean isArchiveDataLogs();
public void setDirectoryArchive(File directoryArchive);
public File getDirectoryArchive();
/** Compression */
public void setCompressJournalStream(boolean compressJournalStream);
public boolean isCompressJournalStream();
/** Locking */
public void setLockKeepAlivePeriod(long lockKeepAlivePeriod);
public long getLockKeepAlivePeriod();
/** Start adapter */
public void start() throws Exception;
/** Stop adapter */
public void stop() throws Exception;
/** Get storage size */
public long size();
/** Force checkpoint */
public void checkpoint(boolean sync) throws IOException;
/** Delete all messages */
public void deleteAllMessages() throws IOException;
}
/**
* Multi-KahaDB adapter for partitioned storage
*/
public class MultiKahaDBPersistenceAdapter implements PersistenceAdapter {
/** Set filtered persistence adapters */
public void setFilteredPersistenceAdapters(List<FilteredKahaDBPersistenceAdapter> adapters);
public List<FilteredKahaDBPersistenceAdapter> getFilteredPersistenceAdapters();
/** Add filtered adapter */
public FilteredKahaDBPersistenceAdapter addFilteredKahaDBPersistenceAdapter(
String queue, String topic, PersistenceAdapter adapter);
}
/**
* Filtered KahaDB adapter for destination-specific storage
*/
public class FilteredKahaDBPersistenceAdapter implements PersistenceAdapter {
/** Set destination filters */
public void setPerDestination(boolean perDestination);
public boolean isPerDestination();
public void setQueue(String queue);
public String getQueue();
public void setTopic(String topic);
public String getTopic();
}Usage Examples:
// Basic KahaDB configuration
KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
kahadb.setDirectory(new File("./activemq-data"));
kahadb.setJournalMaxFileLength(32 * 1024 * 1024); // 32MB journal files
kahadb.setIndexCacheSize(10000); // Cache 10K index pages
BrokerService broker = new BrokerService();
broker.setPersistenceAdapter(kahadb);
// High-performance configuration
KahaDBPersistenceAdapter highPerf = new KahaDBPersistenceAdapter();
highPerf.setDirectory(new File("/fast-ssd/activemq-data"));
highPerf.setJournalMaxFileLength(64 * 1024 * 1024); // Larger journals
highPerf.setJournalMaxWriteBatchSize(4 * 1024); // Batch writes
highPerf.setConcurrentStoreAndDispatchQueues(true); // Concurrent processing
highPerf.setIndexCacheSize(50000); // Larger index cache
highPerf.setCheckpointInterval(5000); // More frequent checkpoints
// Multi-KahaDB for partitioned storage
MultiKahaDBPersistenceAdapter multiKaha = new MultiKahaDBPersistenceAdapter();
multiKaha.addFilteredKahaDBPersistenceAdapter("orders.>", null,
createKahaDB("orders-storage"));
multiKaha.addFilteredKahaDBPersistenceAdapter("events.>", null,
createKahaDB("events-storage"));Database-based persistence for enterprise environments.
/**
* JDBC persistence adapter for database storage
*/
public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter {
/** DataSource configuration */
public void setDataSource(DataSource dataSource);
public DataSource getDataSource();
/** Database adapter configuration */
public void setAdapter(JDBCAdapter adapter);
public JDBCAdapter getAdapter();
/** Table configuration */
public void setStatements(Statements statements);
public Statements getStatements();
/** Transaction configuration */
public void setTransactionIsolation(int transactionIsolation);
public int getTransactionIsolation();
/** Locking configuration */
public void setLockKeepAlivePeriod(long lockKeepAlivePeriod);
public long getLockKeepAlivePeriod();
public void setLockAcquireSleepInterval(long lockAcquireSleepInterval);
public long getLockAcquireSleepInterval();
/** Cleanup configuration */
public void setCleanupPeriod(long cleanupPeriod);
public long getCleanupPeriod();
/** Performance tuning */
public void setUseLock(boolean useLock);
public boolean isUseLock();
public void setCreateTablesOnStartup(boolean createTablesOnStartup);
public boolean isCreateTablesOnStartup();
}
/**
* Database-specific JDBC adapters
*/
public interface JDBCAdapter {
/** Set SQL statements */
void setStatements(Statements statements);
/** Initialize adapter */
void doCreateTables(TransactionContext c) throws SQLException, IOException;
/** Clean up old messages */
void doDropTables(TransactionContext c) throws SQLException, IOException;
/** Get lock statement */
String getLimitStatement();
}
/**
* Oracle JDBC adapter with BLOB support
*/
public class OracleJDBCAdapter extends DefaultJDBCAdapter {
/** Configure for Oracle-specific features */
public void setUseExternalMessageReferences(boolean useExternalMessageReferences);
public boolean isUseExternalMessageReferences();
}
/**
* SQL Server JDBC adapter
*/
public class SqlServerJDBCAdapter extends DefaultJDBCAdapter {
/** SQL Server specific optimizations */
}Usage Examples:
// Basic JDBC configuration with MySQL
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
// Configure DataSource (using connection pool)
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost/activemq");
dataSource.setUsername("activemq");
dataSource.setPassword("password");
dataSource.setMaxActive(20);
jdbc.setDataSource(dataSource);
// Configure for MySQL
jdbc.setAdapter(new MySqlJDBCAdapter());
jdbc.setCreateTablesOnStartup(true);
// Oracle configuration with BLOB support
JDBCPersistenceAdapter oracleJdbc = new JDBCPersistenceAdapter();
OracleJDBCAdapter oracleAdapter = new OracleJDBCAdapter();
oracleAdapter.setUseExternalMessageReferences(true);
oracleJdbc.setAdapter(oracleAdapter);In-memory storage for testing and temporary brokers.
/**
* Memory-based persistence adapter
* Stores all messages in memory - no durability across restarts
*/
public class MemoryPersistenceAdapter implements PersistenceAdapter {
/** Create memory adapter */
public MemoryPersistenceAdapter();
/** Start adapter */
public void start() throws Exception;
/** Stop adapter */
public void stop() throws Exception;
/** Create message stores */
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
public TransactionStore createTransactionStore() throws IOException;
/** Get destinations */
public Set<ActiveMQDestination> getDestinations();
/** Memory usage */
public long size();
/** Clear all messages */
public void deleteAllMessages() throws IOException;
}
/**
* Memory-based message store
*/
public class MemoryMessageStore implements MessageStore {
/** Memory store operations */
public void addMessage(ConnectionContext context, Message message) throws IOException;
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
public void removeAllMessages(ConnectionContext context) throws IOException;
public void recover(MessageRecoveryListener listener) throws Exception;
/** Get statistics */
public int getMessageCount() throws IOException;
public long getMessageSize() throws IOException;
}Usage Examples:
// Memory persistence for testing
BrokerService testBroker = new BrokerService();
testBroker.setPersistenceAdapter(new MemoryPersistenceAdapter());
testBroker.setPersistent(false);
// Useful for unit tests and temporary brokers
BrokerService tempBroker = new BrokerService();
tempBroker.setBrokerName("temp-broker");
tempBroker.setPersistent(false);
tempBroker.addConnector("vm://temp-broker");public class PersistenceAdapterException extends IOException {
public PersistenceAdapterException(String message);
public PersistenceAdapterException(String message, Throwable cause);
}
public class RecoverableException extends IOException {
public RecoverableException(String message);
public RecoverableException(String message, Throwable cause);
}/**
* Message recovery listener interface
*/
public interface MessageRecoveryListener {
/** Recover message */
boolean recoverMessage(Message message) throws Exception;
/** Recover message reference */
boolean recoverMessageReference(MessageId messageReference) throws Exception;
/** Check if recovery should continue */
boolean hasSpace();
}
/**
* Transaction recovery listener interface
*/
public interface TransactionRecoveryListener {
/** Recover transaction */
void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks);
}
/**
* Subscription information for durable subscriptions
*/
public class SubscriptionInfo {
/** Subscription details */
public String getClientId();
public void setClientId(String clientId);
public String getSubscriptionName();
public void setSubscriptionName(String subscriptionName);
public ActiveMQDestination getDestination();
public void setDestination(ActiveMQDestination destination);
public String getSelector();
public void setSelector(String selector);
}
/**
* Journaled store interface for transaction log support
*/
public interface JournaledStore {
/** Get journal manager */
Journal getJournal();
/** Set journal manager */
void setJournal(Journal journal);
/** Force journal sync */
void checkpoint(boolean sync) throws IOException;
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-activemq--activemq-all