CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-activemq--activemq-all

High performance Apache 2.0 licensed Message Broker supporting JMS, AMQP, MQTT, and HTTP protocols with comprehensive enterprise features including persistence, security, clustering, and Spring integration.

Pending
Overview
Eval results
Files

persistence-storage.mddocs/

Persistence and Storage

ActiveMQ provides multiple persistence adapters for reliable message storage including high-performance KahaDB, JDBC database storage, and in-memory options for different deployment scenarios.

Capabilities

Core Persistence Interfaces

Base interfaces defining the persistence contract for message storage.

/**
 * Main persistence adapter interface for message storage
 */
public interface PersistenceAdapter extends Service {
    /** Create message store for queue destinations */
    MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
    
    /** Create message store for topic destinations */
    TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
    
    /** Create transaction store for XA transactions */
    TransactionStore createTransactionStore() throws IOException;
    
    /** Get all known destinations */
    Set<ActiveMQDestination> getDestinations();
    
    /** Remove destination and its messages */
    void removeQueueMessageStore(ActiveMQQueue destination);
    void removeTopicMessageStore(ActiveMQTopic destination);
    
    /** Get adapter size information */
    long size();
    
    /** Delete all messages */
    void deleteAllMessages() throws IOException;
    
    /** Checkpoint persistence state */
    void checkpoint(boolean sync) throws IOException;
    
    /** Set broker service reference */
    void setBrokerService(BrokerService brokerService);
    BrokerService getBrokerService();
    
    /** Set usage manager for resource limits */
    void setUsageManager(SystemUsage usageManager);
    SystemUsage getUsageManager();
    
    /** Directory management */
    void setDirectory(File dir);
    File getDirectory();
}

/**
 * Message store interface for destination-specific storage
 */
public interface MessageStore {
    /** Add message to store */
    void addMessage(ConnectionContext context, Message message) throws IOException;
    
    /** Remove message from store */
    void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
    
    /** Remove all messages from store */
    void removeAllMessages(ConnectionContext context) throws IOException;
    
    /** Recover messages from store */
    void recover(MessageRecoveryListener listener) throws Exception;
    
    /** Get message count */
    int getMessageCount() throws IOException;
    
    /** Get message size */
    long getMessageSize() throws IOException;
    
    /** Start store */
    void start() throws Exception;
    
    /** Stop store */
    void stop() throws Exception;
}

/**
 * Topic-specific message store with subscription support
 */
public interface TopicMessageStore extends MessageStore {
    /** Add subscription */
    void addSubscription(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException;
    
    /** Delete subscription */
    void deleteSubscription(String clientId, String subscriptionName) throws IOException;
    
    /** Get all subscriptions */
    SubscriptionInfo[] getAllSubscriptions() throws IOException;
    
    /** Recover subscription messages */
    void recoverSubscription(String clientId, String subscriptionName, 
                           MessageRecoveryListener listener) throws Exception;
    
    /** Remove subscription messages */
    void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
    
    /** Get subscription message count */
    int getMessageCount(String clientId, String subscriptionName) throws IOException;
}

/**
 * Transaction store interface for XA transaction support
 */
public interface TransactionStore {
    /** Prepare transaction */
    void prepare(TransactionId txid) throws IOException;
    
    /** Commit transaction */
    void commit(TransactionId txid, boolean wasPrepared, Runnable preCommit, 
               Runnable postCommit) throws IOException;
    
    /** Rollback transaction */
    void rollback(TransactionId txid) throws IOException;
    
    /** Recover prepared transactions */
    void recover(TransactionRecoveryListener listener) throws IOException;
}

KahaDB Persistence Adapter

High-performance file-based persistence using indexed journals.

/**
 * KahaDB persistence adapter - high performance file-based storage
 * Uses indexed journal files for optimal message throughput
 */
public class KahaDBPersistenceAdapter implements PersistenceAdapter, JournaledStore {
    /** Create KahaDB adapter with default settings */
    public KahaDBPersistenceAdapter();
    
    /** Directory configuration */
    public void setDirectory(File directory);
    public File getDirectory();
    
    /** Journal configuration */
    public void setJournalMaxFileLength(int journalMaxFileLength);
    public int getJournalMaxFileLength();
    public void setJournalMaxWriteBatchSize(int journalMaxWriteBatchSize);
    public int getJournalMaxWriteBatchSize();
    public void setEnableJournalDiskSyncs(boolean enableJournalDiskSyncs);
    public boolean isEnableJournalDiskSyncs();
    public void setCleanupInterval(long cleanupInterval);
    public long getCleanupInterval();
    
    /** Index configuration */
    public void setIndexCacheSize(int indexCacheSize);
    public int getIndexCacheSize();
    public void setIndexWriteBatchSize(int indexWriteBatchSize);
    public int getIndexWriteBatchSize();
    public void setIndexEnablePageCaching(boolean indexEnablePageCaching);
    public boolean isIndexEnablePageCaching();
    
    /** Checkpoint configuration */
    public void setCheckpointInterval(long checkpointInterval);
    public long getCheckpointInterval();
    public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles);
    public boolean isIgnoreMissingJournalfiles();
    
    /** Performance tuning */
    public void setConcurrentStoreAndDispatchQueues(boolean concurrentStoreAndDispatchQueues);
    public boolean isConcurrentStoreAndDispatchQueues();
    public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatchTopics);
    public boolean isConcurrentStoreAndDispatchTopics();
    
    /** Archive configuration */
    public void setArchiveDataLogs(boolean archiveDataLogs);
    public boolean isArchiveDataLogs();
    public void setDirectoryArchive(File directoryArchive);
    public File getDirectoryArchive();
    
    /** Compression */
    public void setCompressJournalStream(boolean compressJournalStream);
    public boolean isCompressJournalStream();
    
    /** Locking */
    public void setLockKeepAlivePeriod(long lockKeepAlivePeriod);
    public long getLockKeepAlivePeriod();
    
    /** Start adapter */
    public void start() throws Exception;
    
    /** Stop adapter */
    public void stop() throws Exception;
    
    /** Get storage size */
    public long size();
    
    /** Force checkpoint */
    public void checkpoint(boolean sync) throws IOException;
    
    /** Delete all messages */
    public void deleteAllMessages() throws IOException;
}

/**
 * Multi-KahaDB adapter for partitioned storage
 */
public class MultiKahaDBPersistenceAdapter implements PersistenceAdapter {
    /** Set filtered persistence adapters */
    public void setFilteredPersistenceAdapters(List<FilteredKahaDBPersistenceAdapter> adapters);
    public List<FilteredKahaDBPersistenceAdapter> getFilteredPersistenceAdapters();
    
    /** Add filtered adapter */
    public FilteredKahaDBPersistenceAdapter addFilteredKahaDBPersistenceAdapter(
        String queue, String topic, PersistenceAdapter adapter);
}

/**
 * Filtered KahaDB adapter for destination-specific storage
 */
public class FilteredKahaDBPersistenceAdapter implements PersistenceAdapter {
    /** Set destination filters */
    public void setPerDestination(boolean perDestination);
    public boolean isPerDestination();
    public void setQueue(String queue);
    public String getQueue();
    public void setTopic(String topic);
    public String getTopic();
}

Usage Examples:

// Basic KahaDB configuration
KahaDBPersistenceAdapter kahadb = new KahaDBPersistenceAdapter();
kahadb.setDirectory(new File("./activemq-data"));
kahadb.setJournalMaxFileLength(32 * 1024 * 1024); // 32MB journal files
kahadb.setIndexCacheSize(10000); // Cache 10K index pages

BrokerService broker = new BrokerService();
broker.setPersistenceAdapter(kahadb);

// High-performance configuration
KahaDBPersistenceAdapter highPerf = new KahaDBPersistenceAdapter();
highPerf.setDirectory(new File("/fast-ssd/activemq-data"));
highPerf.setJournalMaxFileLength(64 * 1024 * 1024); // Larger journals
highPerf.setJournalMaxWriteBatchSize(4 * 1024); // Batch writes
highPerf.setConcurrentStoreAndDispatchQueues(true); // Concurrent processing
highPerf.setIndexCacheSize(50000); // Larger index cache
highPerf.setCheckpointInterval(5000); // More frequent checkpoints

// Multi-KahaDB for partitioned storage
MultiKahaDBPersistenceAdapter multiKaha = new MultiKahaDBPersistenceAdapter();
multiKaha.addFilteredKahaDBPersistenceAdapter("orders.>", null, 
    createKahaDB("orders-storage"));
multiKaha.addFilteredKahaDBPersistenceAdapter("events.>", null, 
    createKahaDB("events-storage"));

JDBC Persistence Adapter

Database-based persistence for enterprise environments.

/**
 * JDBC persistence adapter for database storage
 */
public class JDBCPersistenceAdapter extends DataSourceServiceSupport implements PersistenceAdapter {
    /** DataSource configuration */
    public void setDataSource(DataSource dataSource);
    public DataSource getDataSource();
    
    /** Database adapter configuration */
    public void setAdapter(JDBCAdapter adapter);
    public JDBCAdapter getAdapter();
    
    /** Table configuration */
    public void setStatements(Statements statements);
    public Statements getStatements();
    
    /** Transaction configuration */
    public void setTransactionIsolation(int transactionIsolation);
    public int getTransactionIsolation();
    
    /** Locking configuration */
    public void setLockKeepAlivePeriod(long lockKeepAlivePeriod);
    public long getLockKeepAlivePeriod();
    public void setLockAcquireSleepInterval(long lockAcquireSleepInterval);
    public long getLockAcquireSleepInterval();
    
    /** Cleanup configuration */
    public void setCleanupPeriod(long cleanupPeriod);
    public long getCleanupPeriod();
    
    /** Performance tuning */
    public void setUseLock(boolean useLock);
    public boolean isUseLock();
    public void setCreateTablesOnStartup(boolean createTablesOnStartup);
    public boolean isCreateTablesOnStartup();
}

/**
 * Database-specific JDBC adapters
 */
public interface JDBCAdapter {
    /** Set SQL statements */
    void setStatements(Statements statements);
    
    /** Initialize adapter */
    void doCreateTables(TransactionContext c) throws SQLException, IOException;
    
    /** Clean up old messages */
    void doDropTables(TransactionContext c) throws SQLException, IOException;
    
    /** Get lock statement */
    String getLimitStatement();
}

/**
 * Oracle JDBC adapter with BLOB support
 */
public class OracleJDBCAdapter extends DefaultJDBCAdapter {
    /** Configure for Oracle-specific features */
    public void setUseExternalMessageReferences(boolean useExternalMessageReferences);
    public boolean isUseExternalMessageReferences();
}

/**
 * SQL Server JDBC adapter
 */
public class SqlServerJDBCAdapter extends DefaultJDBCAdapter {
    /** SQL Server specific optimizations */
}

Usage Examples:

// Basic JDBC configuration with MySQL
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();

// Configure DataSource (using connection pool)
BasicDataSource dataSource = new BasicDataSource();
dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setUrl("jdbc:mysql://localhost/activemq");
dataSource.setUsername("activemq");
dataSource.setPassword("password");
dataSource.setMaxActive(20);
jdbc.setDataSource(dataSource);

// Configure for MySQL
jdbc.setAdapter(new MySqlJDBCAdapter());
jdbc.setCreateTablesOnStartup(true);

// Oracle configuration with BLOB support
JDBCPersistenceAdapter oracleJdbc = new JDBCPersistenceAdapter();
OracleJDBCAdapter oracleAdapter = new OracleJDBCAdapter();
oracleAdapter.setUseExternalMessageReferences(true);
oracleJdbc.setAdapter(oracleAdapter);

Memory Persistence Adapter

In-memory storage for testing and temporary brokers.

/**
 * Memory-based persistence adapter
 * Stores all messages in memory - no durability across restarts
 */
public class MemoryPersistenceAdapter implements PersistenceAdapter {
    /** Create memory adapter */
    public MemoryPersistenceAdapter();
    
    /** Start adapter */
    public void start() throws Exception;
    
    /** Stop adapter */
    public void stop() throws Exception;
    
    /** Create message stores */
    public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException;
    public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
    public TransactionStore createTransactionStore() throws IOException;
    
    /** Get destinations */
    public Set<ActiveMQDestination> getDestinations();
    
    /** Memory usage */
    public long size();
    
    /** Clear all messages */
    public void deleteAllMessages() throws IOException;
}

/**
 * Memory-based message store
 */
public class MemoryMessageStore implements MessageStore {
    /** Memory store operations */
    public void addMessage(ConnectionContext context, Message message) throws IOException;
    public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException;
    public void removeAllMessages(ConnectionContext context) throws IOException;
    public void recover(MessageRecoveryListener listener) throws Exception;
    
    /** Get statistics */
    public int getMessageCount() throws IOException;
    public long getMessageSize() throws IOException;
}

Usage Examples:

// Memory persistence for testing
BrokerService testBroker = new BrokerService();
testBroker.setPersistenceAdapter(new MemoryPersistenceAdapter());
testBroker.setPersistent(false);

// Useful for unit tests and temporary brokers
BrokerService tempBroker = new BrokerService();
tempBroker.setBrokerName("temp-broker");
tempBroker.setPersistent(false);
tempBroker.addConnector("vm://temp-broker");

Exception Handling

public class PersistenceAdapterException extends IOException {
    public PersistenceAdapterException(String message);
    public PersistenceAdapterException(String message, Throwable cause);
}

public class RecoverableException extends IOException {
    public RecoverableException(String message);
    public RecoverableException(String message, Throwable cause);
}

Types

/**
 * Message recovery listener interface
 */
public interface MessageRecoveryListener {
    /** Recover message */
    boolean recoverMessage(Message message) throws Exception;
    
    /** Recover message reference */
    boolean recoverMessageReference(MessageId messageReference) throws Exception;
    
    /** Check if recovery should continue */
    boolean hasSpace();
}

/**
 * Transaction recovery listener interface
 */
public interface TransactionRecoveryListener {
    /** Recover transaction */
    void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks);
}

/**
 * Subscription information for durable subscriptions
 */
public class SubscriptionInfo {
    /** Subscription details */
    public String getClientId();
    public void setClientId(String clientId);
    public String getSubscriptionName();
    public void setSubscriptionName(String subscriptionName);
    public ActiveMQDestination getDestination();
    public void setDestination(ActiveMQDestination destination);
    public String getSelector();
    public void setSelector(String selector);
}

/**
 * Journaled store interface for transaction log support
 */
public interface JournaledStore {
    /** Get journal manager */
    Journal getJournal();
    
    /** Set journal manager */
    void setJournal(Journal journal);
    
    /** Force journal sync */
    void checkpoint(boolean sync) throws IOException;
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-activemq--activemq-all

docs

connection-pooling.md

embedded-broker.md

index.md

jms-client.md

management-monitoring.md

messages-destinations.md

network-clustering.md

persistence-storage.md

security.md

spring-integration.md

transport-protocols.md

tile.json