High performance Apache 2.0 licensed Message Broker supporting JMS, AMQP, MQTT, and HTTP protocols with comprehensive enterprise features including persistence, security, clustering, and Spring integration.
—
The JMS Client API provides the core functionality for connecting to ActiveMQ brokers and performing messaging operations. It includes connection factories, connections, sessions, producers, consumers, and comprehensive JMS 1.1/2.0 support.
Primary entry points for creating JMS connections with various configuration options.
/**
* Main connection factory for creating ActiveMQ connections
* Supports TCP, SSL, HTTP, and other transports via broker URL
*/
public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
/** Create factory with default broker URL (vm://localhost) */
public ActiveMQConnectionFactory();
/** Create factory with specified broker URL */
public ActiveMQConnectionFactory(String brokerURL);
/** Create factory with username/password and broker URL */
public ActiveMQConnectionFactory(String userName, String password, String brokerURL);
/** Create standard JMS connection */
public Connection createConnection() throws JMSException;
/** Create connection with credentials */
public Connection createConnection(String userName, String password) throws JMSException;
/** Create queue-specific connection */
public QueueConnection createQueueConnection() throws JMSException;
/** Create topic-specific connection */
public TopicConnection createTopicConnection() throws JMSException;
/** Set broker URL (e.g., "tcp://localhost:61616", "ssl://broker:61617") */
public void setBrokerURL(String brokerURL);
public String getBrokerURL();
/** Configure prefetch policy for performance tuning */
public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy);
public ActiveMQPrefetchPolicy getPrefetchPolicy();
/** Configure message redelivery behavior */
public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy);
public RedeliveryPolicy getRedeliveryPolicy();
/** Enable/disable message transformation */
public void setTransformer(MessageTransformer transformer);
public MessageTransformer getTransformer();
}
/**
* SSL-enabled connection factory with additional security configuration
*/
public class ActiveMQSslConnectionFactory extends ActiveMQConnectionFactory {
/** Set key store location for client certificates */
public void setKeyStore(String keyStore);
/** Set trust store location for server certificate validation */
public void setTrustStore(String trustStore);
/** Set key store password */
public void setKeyStorePassword(String keyStorePassword);
/** Set trust store password */
public void setTrustStorePassword(String trustStorePassword);
}
/**
* XA transaction-enabled connection factory for distributed transactions
*/
public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory implements XAConnectionFactory {
/** Create XA connection for distributed transactions */
public XAConnection createXAConnection() throws JMSException;
/** Create XA connection with credentials */
public XAConnection createXAConnection(String userName, String password) throws JMSException;
}Usage Examples:
// Basic TCP connection
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
// SSL connection with authentication
ActiveMQSslConnectionFactory sslFactory = new ActiveMQSslConnectionFactory();
sslFactory.setBrokerURL("ssl://broker.example.com:61617");
sslFactory.setKeyStore("/path/to/client.keystore");
sslFactory.setTrustStore("/path/to/trust.keystore");
// Connection with prefetch tuning
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
prefetch.setQueuePrefetch(1000);
prefetch.setTopicPrefetch(32766);
factory.setPrefetchPolicy(prefetch);
// XA connection for distributed transactions
ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory("tcp://localhost:61616");
XAConnection xaConnection = xaFactory.createXAConnection();JMS connection implementations providing the main communication channel to the broker.
/**
* Main JMS Connection implementation with ActiveMQ extensions
*/
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
/** Start message delivery for all consumers on this connection */
public void start() throws JMSException;
/** Stop message delivery temporarily */
public void stop() throws JMSException;
/** Close connection and release resources */
public void close() throws JMSException;
/** Create session with transaction and acknowledgment settings */
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException;
/** Get connection metadata */
public ConnectionMetaData getMetaData() throws JMSException;
/** Set/get client ID for durable subscriptions */
public void setClientID(String clientID) throws JMSException;
public String getClientID() throws JMSException;
/** Set exception listener for asynchronous error handling */
public void setExceptionListener(ExceptionListener listener) throws JMSException;
public ExceptionListener getExceptionListener() throws JMSException;
/** Add connection-level consumer event listener */
public void addConsumerEventListener(ConsumerEventListener listener);
public void removeConsumerEventListener(ConsumerEventListener listener);
/** Get transport connector information */
public String getTransportConnectionInfo();
public ActiveMQConnectionStatistics getStatistics();
}
/**
* XA-enabled connection for distributed transactions
*/
public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection, XAQueueConnection, XAConnection {
/** Create XA session for distributed transactions */
public XASession createXASession() throws JMSException;
/** Create XA queue session */
public XAQueueSession createXAQueueSession() throws JMSException;
/** Create XA topic session */
public XATopicSession createXATopicSession() throws JMSException;
}JMS session implementations for message operations and transaction management.
/**
* Main JMS Session implementation
*/
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
/** Create message producer for specified destination */
public MessageProducer createProducer(Destination destination) throws JMSException;
/** Create message consumer for specified destination */
public MessageConsumer createConsumer(Destination destination) throws JMSException;
/** Create consumer with message selector */
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException;
/** Create consumer with selector and local delivery option */
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException;
/** Create durable topic subscriber */
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException;
/** Create queue browser for examining messages without consuming */
public QueueBrowser createBrowser(Queue queue) throws JMSException;
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException;
/** Create destinations */
public Queue createQueue(String queueName) throws JMSException;
public Topic createTopic(String topicName) throws JMSException;
public TemporaryQueue createTemporaryQueue() throws JMSException;
public TemporaryTopic createTemporaryTopic() throws JMSException;
/** Create messages */
public Message createMessage() throws JMSException;
public TextMessage createTextMessage() throws JMSException;
public TextMessage createTextMessage(String text) throws JMSException;
public BytesMessage createBytesMessage() throws JMSException;
public MapMessage createMapMessage() throws JMSException;
public ObjectMessage createObjectMessage() throws JMSException;
public ObjectMessage createObjectMessage(Serializable object) throws JMSException;
public StreamMessage createStreamMessage() throws JMSException;
/** Transaction control */
public void commit() throws JMSException;
public void rollback() throws JMSException;
public void recover() throws JMSException;
/** Session properties */
public boolean getTransacted() throws JMSException;
public int getAcknowledgeMode() throws JMSException;
/** Close session and release resources */
public void close() throws JMSException;
/** Unsubscribe from durable subscription */
public void unsubscribe(String name) throws JMSException;
}
/**
* XA-enabled session for distributed transactions
*/
public class ActiveMQXASession extends ActiveMQSession implements XASession {
/** Get XA resource for transaction coordination */
public XAResource getXAResource();
/** Get underlying JMS session */
public Session getSession() throws JMSException;
/** Commit XA transaction branch */
public void commit() throws JMSException;
/** Rollback XA transaction branch */
public void rollback() throws JMSException;
}Implementations for sending messages to destinations.
/**
* Main message producer implementation
*/
public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
/** Send message to producer's destination */
public void send(Message message) throws JMSException;
/** Send message with delivery options */
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException;
/** Send message to specific destination */
public void send(Destination destination, Message message) throws JMSException;
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException;
/** Get producer's destination */
public Destination getDestination() throws JMSException;
/** Set/get delivery mode (persistent/non-persistent) */
public void setDeliveryMode(int deliveryMode) throws JMSException;
public int getDeliveryMode() throws JMSException;
/** Set/get message priority */
public void setPriority(int defaultPriority) throws JMSException;
public int getPriority() throws JMSException;
/** Set/get time-to-live for messages */
public void setTimeToLive(long timeToLive) throws JMSException;
public long getTimeToLive() throws JMSException;
/** Enable/disable message IDs */
public void setDisableMessageID(boolean value) throws JMSException;
public boolean getDisableMessageID() throws JMSException;
/** Enable/disable message timestamps */
public void setDisableMessageTimestamp(boolean value) throws JMSException;
public boolean getDisableMessageTimestamp() throws JMSException;
/** Close producer */
public void close() throws JMSException;
}
/**
* Queue-specific message sender
*/
public class ActiveMQQueueSender extends ActiveMQMessageProducer implements QueueSender {
/** Get associated queue */
public Queue getQueue() throws JMSException;
/** Send message to queue */
public void send(Message message) throws JMSException;
public void send(Queue queue, Message message) throws JMSException;
}
/**
* Topic-specific message publisher
*/
public class ActiveMQTopicPublisher extends ActiveMQMessageProducer implements TopicPublisher {
/** Get associated topic */
public Topic getTopic() throws JMSException;
/** Publish message to topic */
public void publish(Message message) throws JMSException;
public void publish(Topic topic, Message message) throws JMSException;
}Implementations for receiving messages from destinations.
/**
* Main message consumer implementation
*/
public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
/** Receive message (blocks until available) */
public Message receive() throws JMSException;
/** Receive message with timeout (milliseconds) */
public Message receive(long timeout) throws JMSException;
/** Receive message without blocking */
public Message receiveNoWait() throws JMSException;
/** Set/get message listener for asynchronous delivery */
public void setMessageListener(MessageListener listener) throws JMSException;
public MessageListener getMessageListener() throws JMSException;
/** Get message selector */
public String getMessageSelector() throws JMSException;
/** Close consumer */
public void close() throws JMSException;
/** Set available message listener (ActiveMQ extension) */
public void setAvailableListener(MessageAvailableListener availableListener);
public MessageAvailableListener getAvailableListener();
}
/**
* Queue-specific message receiver
*/
public class ActiveMQQueueReceiver extends ActiveMQMessageConsumer implements QueueReceiver {
/** Get associated queue */
public Queue getQueue() throws JMSException;
}
/**
* Topic-specific message subscriber
*/
public class ActiveMQTopicSubscriber extends ActiveMQMessageConsumer implements TopicSubscriber {
/** Get associated topic */
public Topic getTopic() throws JMSException;
/** Check if no-local delivery is enabled */
public boolean getNoLocal() throws JMSException;
}
/**
* Queue browser for examining messages without consuming
*/
public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
/** Get queue being browsed */
public Queue getQueue() throws JMSException;
/** Get message selector */
public String getMessageSelector() throws JMSException;
/** Get enumeration of messages */
public Enumeration getEnumeration() throws JMSException;
/** Close browser */
public void close() throws JMSException;
/** Enumeration methods */
public boolean hasMoreElements();
public Object nextElement();
}Simplified JMS 2.0 API for easier development.
/**
* JMS 2.0 Context implementation providing simplified API
*/
public class ActiveMQContext implements JMSContext {
/** Create producer */
public JMSProducer createProducer();
/** Create consumer */
public JMSConsumer createConsumer(Destination destination);
public JMSConsumer createConsumer(Destination destination, String messageSelector);
public JMSConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);
/** Create shared consumer */
public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName);
public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector);
/** Create durable consumer */
public JMSConsumer createDurableConsumer(Topic topic, String name);
public JMSConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal);
/** Create shared durable consumer */
public JMSConsumer createSharedDurableConsumer(Topic topic, String name);
public JMSConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector);
/** Create queue browser */
public QueueBrowser createBrowser(Queue queue);
public QueueBrowser createBrowser(Queue queue, String messageSelector);
/** Create destinations */
public Queue createQueue(String queueName);
public Topic createTopic(String topicName);
public TemporaryQueue createTemporaryQueue();
public TemporaryTopic createTemporaryTopic();
/** Create messages */
public Message createMessage();
public BytesMessage createBytesMessage();
public MapMessage createMapMessage();
public TextMessage createTextMessage();
public TextMessage createTextMessage(String text);
public ObjectMessage createObjectMessage();
public ObjectMessage createObjectMessage(Serializable object);
public StreamMessage createStreamMessage();
/** Session management */
public void start();
public void stop();
public void close();
public void commit();
public void rollback();
public void recover();
public void acknowledge();
/** Context properties */
public boolean getAutoStart();
public void setAutoStart(boolean autoStart);
public String getClientID();
public void setClientID(String clientID);
public int getSessionMode();
/** Exception handling */
public void setExceptionListener(ExceptionListener listener);
public ExceptionListener getExceptionListener();
/** Unsubscribe from durable subscription */
public void unsubscribe(String name);
}
/**
* JMS 2.0 Producer implementation
*/
public class ActiveMQProducer implements JMSProducer {
/** Send message */
public JMSProducer send(Destination destination, Message message);
public JMSProducer send(Destination destination, String body);
public JMSProducer send(Destination destination, Map<String,Object> body);
public JMSProducer send(Destination destination, byte[] body);
public JMSProducer send(Destination destination, Serializable body);
/** Set message properties */
public JMSProducer setProperty(String name, boolean value);
public JMSProducer setProperty(String name, byte value);
public JMSProducer setProperty(String name, short value);
public JMSProducer setProperty(String name, int value);
public JMSProducer setProperty(String name, long value);
public JMSProducer setProperty(String name, float value);
public JMSProducer setProperty(String name, double value);
public JMSProducer setProperty(String name, String value);
public JMSProducer setProperty(String name, Object value);
/** Clear properties */
public JMSProducer clearProperties();
/** Set delivery options */
public JMSProducer setDeliveryMode(int deliveryMode);
public JMSProducer setPriority(int priority);
public JMSProducer setTimeToLive(long timeToLive);
public JMSProducer setDeliveryDelay(long deliveryDelay);
/** Disable message features */
public JMSProducer setDisableMessageID(boolean value);
public JMSProducer setDisableMessageTimestamp(boolean value);
/** Set correlation ID and reply-to */
public JMSProducer setJMSCorrelationID(String correlationID);
public JMSProducer setJMSCorrelationIDAsBytes(byte[] correlationID);
public JMSProducer setJMSReplyTo(Destination replyTo);
public JMSProducer setJMSType(String type);
}Usage Examples:
// JMS 2.0 Context API usage
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
try (JMSContext context = factory.createContext()) {
// Create destinations
Queue queue = context.createQueue("example.queue");
Topic topic = context.createTopic("example.topic");
// Send messages
context.createProducer().send(queue, "Hello Queue!");
context.createProducer()
.setProperty("priority", "high")
.setDeliveryMode(DeliveryMode.PERSISTENT)
.send(topic, "Important Topic Message");
// Receive messages
JMSConsumer consumer = context.createConsumer(queue);
String message = consumer.receiveBody(String.class, 1000);
// Durable subscription
JMSConsumer durableConsumer = context.createDurableConsumer(topic, "subscription1");
Message topicMessage = durableConsumer.receive(1000);
}public class AlreadyClosedException extends JMSException {
public AlreadyClosedException();
public AlreadyClosedException(String reason);
}
public class ConnectionClosedException extends IllegalStateException {
public ConnectionClosedException();
public ConnectionClosedException(String message);
}
public class ConfigurationException extends JMSException {
public ConfigurationException(String reason);
public ConfigurationException(String reason, String errorCode);
}public class ActiveMQPrefetchPolicy implements Serializable {
public void setQueuePrefetch(int queuePrefetch);
public int getQueuePrefetch();
public void setTopicPrefetch(int topicPrefetch);
public int getTopicPrefetch();
public void setDurableTopicPrefetch(int durableTopicPrefetch);
public int getDurableTopicPrefetch();
public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut);
public long getOptimizeAcknowledgeTimeOut();
}
public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {
public void setMaximumRedeliveries(int maximumRedeliveries);
public int getMaximumRedeliveries();
public void setInitialRedeliveryDelay(long initialRedeliveryDelay);
public long getInitialRedeliveryDelay();
public void setRedeliveryDelay(long redeliveryDelay);
public long getRedeliveryDelay();
public void setUseExponentialBackOff(boolean useExponentialBackOff);
public boolean isUseExponentialBackOff();
public void setBackOffMultiplier(double backOffMultiplier);
public double getBackOffMultiplier();
}
public interface MessageTransformer {
Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException;
Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException;
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-activemq--activemq-all