0
# JMS Client API
1
2
The JMS Client API provides the core functionality for connecting to ActiveMQ brokers and performing messaging operations. It includes connection factories, connections, sessions, producers, consumers, and comprehensive JMS 1.1/2.0 support.
3
4
## Capabilities
5
6
### Connection Factories
7
8
Primary entry points for creating JMS connections with various configuration options.
9
10
```java { .api }
11
/**
12
* Main connection factory for creating ActiveMQ connections
13
* Supports TCP, SSL, HTTP, and other transports via broker URL
14
*/
15
public class ActiveMQConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory, StatsCapable, Cloneable {
16
/** Create factory with default broker URL (vm://localhost) */
17
public ActiveMQConnectionFactory();
18
19
/** Create factory with specified broker URL */
20
public ActiveMQConnectionFactory(String brokerURL);
21
22
/** Create factory with username/password and broker URL */
23
public ActiveMQConnectionFactory(String userName, String password, String brokerURL);
24
25
/** Create standard JMS connection */
26
public Connection createConnection() throws JMSException;
27
28
/** Create connection with credentials */
29
public Connection createConnection(String userName, String password) throws JMSException;
30
31
/** Create queue-specific connection */
32
public QueueConnection createQueueConnection() throws JMSException;
33
34
/** Create topic-specific connection */
35
public TopicConnection createTopicConnection() throws JMSException;
36
37
/** Set broker URL (e.g., "tcp://localhost:61616", "ssl://broker:61617") */
38
public void setBrokerURL(String brokerURL);
39
public String getBrokerURL();
40
41
/** Configure prefetch policy for performance tuning */
42
public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy);
43
public ActiveMQPrefetchPolicy getPrefetchPolicy();
44
45
/** Configure message redelivery behavior */
46
public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy);
47
public RedeliveryPolicy getRedeliveryPolicy();
48
49
/** Enable/disable message transformation */
50
public void setTransformer(MessageTransformer transformer);
51
public MessageTransformer getTransformer();
52
}
53
54
/**
55
* SSL-enabled connection factory with additional security configuration
56
*/
57
public class ActiveMQSslConnectionFactory extends ActiveMQConnectionFactory {
58
/** Set key store location for client certificates */
59
public void setKeyStore(String keyStore);
60
61
/** Set trust store location for server certificate validation */
62
public void setTrustStore(String trustStore);
63
64
/** Set key store password */
65
public void setKeyStorePassword(String keyStorePassword);
66
67
/** Set trust store password */
68
public void setTrustStorePassword(String trustStorePassword);
69
}
70
71
/**
72
* XA transaction-enabled connection factory for distributed transactions
73
*/
74
public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory implements XAConnectionFactory {
75
/** Create XA connection for distributed transactions */
76
public XAConnection createXAConnection() throws JMSException;
77
78
/** Create XA connection with credentials */
79
public XAConnection createXAConnection(String userName, String password) throws JMSException;
80
}
81
```
82
83
**Usage Examples:**
84
85
```java
86
// Basic TCP connection
87
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
88
89
// SSL connection with authentication
90
ActiveMQSslConnectionFactory sslFactory = new ActiveMQSslConnectionFactory();
91
sslFactory.setBrokerURL("ssl://broker.example.com:61617");
92
sslFactory.setKeyStore("/path/to/client.keystore");
93
sslFactory.setTrustStore("/path/to/trust.keystore");
94
95
// Connection with prefetch tuning
96
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
97
ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
98
prefetch.setQueuePrefetch(1000);
99
prefetch.setTopicPrefetch(32766);
100
factory.setPrefetchPolicy(prefetch);
101
102
// XA connection for distributed transactions
103
ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory("tcp://localhost:61616");
104
XAConnection xaConnection = xaFactory.createXAConnection();
105
```
106
107
### Connections
108
109
JMS connection implementations providing the main communication channel to the broker.
110
111
```java { .api }
112
/**
113
* Main JMS Connection implementation with ActiveMQ extensions
114
*/
115
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, TransportListener, EnhancedConnection {
116
/** Start message delivery for all consumers on this connection */
117
public void start() throws JMSException;
118
119
/** Stop message delivery temporarily */
120
public void stop() throws JMSException;
121
122
/** Close connection and release resources */
123
public void close() throws JMSException;
124
125
/** Create session with transaction and acknowledgment settings */
126
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException;
127
128
/** Get connection metadata */
129
public ConnectionMetaData getMetaData() throws JMSException;
130
131
/** Set/get client ID for durable subscriptions */
132
public void setClientID(String clientID) throws JMSException;
133
public String getClientID() throws JMSException;
134
135
/** Set exception listener for asynchronous error handling */
136
public void setExceptionListener(ExceptionListener listener) throws JMSException;
137
public ExceptionListener getExceptionListener() throws JMSException;
138
139
/** Add connection-level consumer event listener */
140
public void addConsumerEventListener(ConsumerEventListener listener);
141
public void removeConsumerEventListener(ConsumerEventListener listener);
142
143
/** Get transport connector information */
144
public String getTransportConnectionInfo();
145
public ActiveMQConnectionStatistics getStatistics();
146
}
147
148
/**
149
* XA-enabled connection for distributed transactions
150
*/
151
public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection, XAQueueConnection, XAConnection {
152
/** Create XA session for distributed transactions */
153
public XASession createXASession() throws JMSException;
154
155
/** Create XA queue session */
156
public XAQueueSession createXAQueueSession() throws JMSException;
157
158
/** Create XA topic session */
159
public XATopicSession createXATopicSession() throws JMSException;
160
}
161
```
162
163
### Sessions
164
165
JMS session implementations for message operations and transaction management.
166
167
```java { .api }
168
/**
169
* Main JMS Session implementation
170
*/
171
public class ActiveMQSession implements Session, QueueSession, TopicSession, StatsCapable, ActiveMQDispatcher {
172
/** Create message producer for specified destination */
173
public MessageProducer createProducer(Destination destination) throws JMSException;
174
175
/** Create message consumer for specified destination */
176
public MessageConsumer createConsumer(Destination destination) throws JMSException;
177
178
/** Create consumer with message selector */
179
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException;
180
181
/** Create consumer with selector and local delivery option */
182
public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException;
183
184
/** Create durable topic subscriber */
185
public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException;
186
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException;
187
188
/** Create queue browser for examining messages without consuming */
189
public QueueBrowser createBrowser(Queue queue) throws JMSException;
190
public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException;
191
192
/** Create destinations */
193
public Queue createQueue(String queueName) throws JMSException;
194
public Topic createTopic(String topicName) throws JMSException;
195
public TemporaryQueue createTemporaryQueue() throws JMSException;
196
public TemporaryTopic createTemporaryTopic() throws JMSException;
197
198
/** Create messages */
199
public Message createMessage() throws JMSException;
200
public TextMessage createTextMessage() throws JMSException;
201
public TextMessage createTextMessage(String text) throws JMSException;
202
public BytesMessage createBytesMessage() throws JMSException;
203
public MapMessage createMapMessage() throws JMSException;
204
public ObjectMessage createObjectMessage() throws JMSException;
205
public ObjectMessage createObjectMessage(Serializable object) throws JMSException;
206
public StreamMessage createStreamMessage() throws JMSException;
207
208
/** Transaction control */
209
public void commit() throws JMSException;
210
public void rollback() throws JMSException;
211
public void recover() throws JMSException;
212
213
/** Session properties */
214
public boolean getTransacted() throws JMSException;
215
public int getAcknowledgeMode() throws JMSException;
216
217
/** Close session and release resources */
218
public void close() throws JMSException;
219
220
/** Unsubscribe from durable subscription */
221
public void unsubscribe(String name) throws JMSException;
222
}
223
224
/**
225
* XA-enabled session for distributed transactions
226
*/
227
public class ActiveMQXASession extends ActiveMQSession implements XASession {
228
/** Get XA resource for transaction coordination */
229
public XAResource getXAResource();
230
231
/** Get underlying JMS session */
232
public Session getSession() throws JMSException;
233
234
/** Commit XA transaction branch */
235
public void commit() throws JMSException;
236
237
/** Rollback XA transaction branch */
238
public void rollback() throws JMSException;
239
}
240
```
241
242
### Message Producers
243
244
Implementations for sending messages to destinations.
245
246
```java { .api }
247
/**
248
* Main message producer implementation
249
*/
250
public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport implements StatsCapable, Disposable {
251
/** Send message to producer's destination */
252
public void send(Message message) throws JMSException;
253
254
/** Send message with delivery options */
255
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException;
256
257
/** Send message to specific destination */
258
public void send(Destination destination, Message message) throws JMSException;
259
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException;
260
261
/** Get producer's destination */
262
public Destination getDestination() throws JMSException;
263
264
/** Set/get delivery mode (persistent/non-persistent) */
265
public void setDeliveryMode(int deliveryMode) throws JMSException;
266
public int getDeliveryMode() throws JMSException;
267
268
/** Set/get message priority */
269
public void setPriority(int defaultPriority) throws JMSException;
270
public int getPriority() throws JMSException;
271
272
/** Set/get time-to-live for messages */
273
public void setTimeToLive(long timeToLive) throws JMSException;
274
public long getTimeToLive() throws JMSException;
275
276
/** Enable/disable message IDs */
277
public void setDisableMessageID(boolean value) throws JMSException;
278
public boolean getDisableMessageID() throws JMSException;
279
280
/** Enable/disable message timestamps */
281
public void setDisableMessageTimestamp(boolean value) throws JMSException;
282
public boolean getDisableMessageTimestamp() throws JMSException;
283
284
/** Close producer */
285
public void close() throws JMSException;
286
}
287
288
/**
289
* Queue-specific message sender
290
*/
291
public class ActiveMQQueueSender extends ActiveMQMessageProducer implements QueueSender {
292
/** Get associated queue */
293
public Queue getQueue() throws JMSException;
294
295
/** Send message to queue */
296
public void send(Message message) throws JMSException;
297
public void send(Queue queue, Message message) throws JMSException;
298
}
299
300
/**
301
* Topic-specific message publisher
302
*/
303
public class ActiveMQTopicPublisher extends ActiveMQMessageProducer implements TopicPublisher {
304
/** Get associated topic */
305
public Topic getTopic() throws JMSException;
306
307
/** Publish message to topic */
308
public void publish(Message message) throws JMSException;
309
public void publish(Topic topic, Message message) throws JMSException;
310
}
311
```
312
313
### Message Consumers
314
315
Implementations for receiving messages from destinations.
316
317
```java { .api }
318
/**
319
* Main message consumer implementation
320
*/
321
public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsCapable, ActiveMQDispatcher {
322
/** Receive message (blocks until available) */
323
public Message receive() throws JMSException;
324
325
/** Receive message with timeout (milliseconds) */
326
public Message receive(long timeout) throws JMSException;
327
328
/** Receive message without blocking */
329
public Message receiveNoWait() throws JMSException;
330
331
/** Set/get message listener for asynchronous delivery */
332
public void setMessageListener(MessageListener listener) throws JMSException;
333
public MessageListener getMessageListener() throws JMSException;
334
335
/** Get message selector */
336
public String getMessageSelector() throws JMSException;
337
338
/** Close consumer */
339
public void close() throws JMSException;
340
341
/** Set available message listener (ActiveMQ extension) */
342
public void setAvailableListener(MessageAvailableListener availableListener);
343
public MessageAvailableListener getAvailableListener();
344
}
345
346
/**
347
* Queue-specific message receiver
348
*/
349
public class ActiveMQQueueReceiver extends ActiveMQMessageConsumer implements QueueReceiver {
350
/** Get associated queue */
351
public Queue getQueue() throws JMSException;
352
}
353
354
/**
355
* Topic-specific message subscriber
356
*/
357
public class ActiveMQTopicSubscriber extends ActiveMQMessageConsumer implements TopicSubscriber {
358
/** Get associated topic */
359
public Topic getTopic() throws JMSException;
360
361
/** Check if no-local delivery is enabled */
362
public boolean getNoLocal() throws JMSException;
363
}
364
365
/**
366
* Queue browser for examining messages without consuming
367
*/
368
public class ActiveMQQueueBrowser implements QueueBrowser, Enumeration {
369
/** Get queue being browsed */
370
public Queue getQueue() throws JMSException;
371
372
/** Get message selector */
373
public String getMessageSelector() throws JMSException;
374
375
/** Get enumeration of messages */
376
public Enumeration getEnumeration() throws JMSException;
377
378
/** Close browser */
379
public void close() throws JMSException;
380
381
/** Enumeration methods */
382
public boolean hasMoreElements();
383
public Object nextElement();
384
}
385
```
386
387
### JMS 2.0 Context API
388
389
Simplified JMS 2.0 API for easier development.
390
391
```java { .api }
392
/**
393
* JMS 2.0 Context implementation providing simplified API
394
*/
395
public class ActiveMQContext implements JMSContext {
396
/** Create producer */
397
public JMSProducer createProducer();
398
399
/** Create consumer */
400
public JMSConsumer createConsumer(Destination destination);
401
public JMSConsumer createConsumer(Destination destination, String messageSelector);
402
public JMSConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);
403
404
/** Create shared consumer */
405
public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName);
406
public JMSConsumer createSharedConsumer(Topic topic, String sharedSubscriptionName, String messageSelector);
407
408
/** Create durable consumer */
409
public JMSConsumer createDurableConsumer(Topic topic, String name);
410
public JMSConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal);
411
412
/** Create shared durable consumer */
413
public JMSConsumer createSharedDurableConsumer(Topic topic, String name);
414
public JMSConsumer createSharedDurableConsumer(Topic topic, String name, String messageSelector);
415
416
/** Create queue browser */
417
public QueueBrowser createBrowser(Queue queue);
418
public QueueBrowser createBrowser(Queue queue, String messageSelector);
419
420
/** Create destinations */
421
public Queue createQueue(String queueName);
422
public Topic createTopic(String topicName);
423
public TemporaryQueue createTemporaryQueue();
424
public TemporaryTopic createTemporaryTopic();
425
426
/** Create messages */
427
public Message createMessage();
428
public BytesMessage createBytesMessage();
429
public MapMessage createMapMessage();
430
public TextMessage createTextMessage();
431
public TextMessage createTextMessage(String text);
432
public ObjectMessage createObjectMessage();
433
public ObjectMessage createObjectMessage(Serializable object);
434
public StreamMessage createStreamMessage();
435
436
/** Session management */
437
public void start();
438
public void stop();
439
public void close();
440
public void commit();
441
public void rollback();
442
public void recover();
443
public void acknowledge();
444
445
/** Context properties */
446
public boolean getAutoStart();
447
public void setAutoStart(boolean autoStart);
448
public String getClientID();
449
public void setClientID(String clientID);
450
public int getSessionMode();
451
452
/** Exception handling */
453
public void setExceptionListener(ExceptionListener listener);
454
public ExceptionListener getExceptionListener();
455
456
/** Unsubscribe from durable subscription */
457
public void unsubscribe(String name);
458
}
459
460
/**
461
* JMS 2.0 Producer implementation
462
*/
463
public class ActiveMQProducer implements JMSProducer {
464
/** Send message */
465
public JMSProducer send(Destination destination, Message message);
466
public JMSProducer send(Destination destination, String body);
467
public JMSProducer send(Destination destination, Map<String,Object> body);
468
public JMSProducer send(Destination destination, byte[] body);
469
public JMSProducer send(Destination destination, Serializable body);
470
471
/** Set message properties */
472
public JMSProducer setProperty(String name, boolean value);
473
public JMSProducer setProperty(String name, byte value);
474
public JMSProducer setProperty(String name, short value);
475
public JMSProducer setProperty(String name, int value);
476
public JMSProducer setProperty(String name, long value);
477
public JMSProducer setProperty(String name, float value);
478
public JMSProducer setProperty(String name, double value);
479
public JMSProducer setProperty(String name, String value);
480
public JMSProducer setProperty(String name, Object value);
481
482
/** Clear properties */
483
public JMSProducer clearProperties();
484
485
/** Set delivery options */
486
public JMSProducer setDeliveryMode(int deliveryMode);
487
public JMSProducer setPriority(int priority);
488
public JMSProducer setTimeToLive(long timeToLive);
489
public JMSProducer setDeliveryDelay(long deliveryDelay);
490
491
/** Disable message features */
492
public JMSProducer setDisableMessageID(boolean value);
493
public JMSProducer setDisableMessageTimestamp(boolean value);
494
495
/** Set correlation ID and reply-to */
496
public JMSProducer setJMSCorrelationID(String correlationID);
497
public JMSProducer setJMSCorrelationIDAsBytes(byte[] correlationID);
498
public JMSProducer setJMSReplyTo(Destination replyTo);
499
public JMSProducer setJMSType(String type);
500
}
501
```
502
503
**Usage Examples:**
504
505
```java
506
// JMS 2.0 Context API usage
507
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
508
try (JMSContext context = factory.createContext()) {
509
// Create destinations
510
Queue queue = context.createQueue("example.queue");
511
Topic topic = context.createTopic("example.topic");
512
513
// Send messages
514
context.createProducer().send(queue, "Hello Queue!");
515
context.createProducer()
516
.setProperty("priority", "high")
517
.setDeliveryMode(DeliveryMode.PERSISTENT)
518
.send(topic, "Important Topic Message");
519
520
// Receive messages
521
JMSConsumer consumer = context.createConsumer(queue);
522
String message = consumer.receiveBody(String.class, 1000);
523
524
// Durable subscription
525
JMSConsumer durableConsumer = context.createDurableConsumer(topic, "subscription1");
526
Message topicMessage = durableConsumer.receive(1000);
527
}
528
```
529
530
## Exception Handling
531
532
```java { .api }
533
public class AlreadyClosedException extends JMSException {
534
public AlreadyClosedException();
535
public AlreadyClosedException(String reason);
536
}
537
538
public class ConnectionClosedException extends IllegalStateException {
539
public ConnectionClosedException();
540
public ConnectionClosedException(String message);
541
}
542
543
public class ConfigurationException extends JMSException {
544
public ConfigurationException(String reason);
545
public ConfigurationException(String reason, String errorCode);
546
}
547
```
548
549
## Types
550
551
```java { .api }
552
public class ActiveMQPrefetchPolicy implements Serializable {
553
public void setQueuePrefetch(int queuePrefetch);
554
public int getQueuePrefetch();
555
public void setTopicPrefetch(int topicPrefetch);
556
public int getTopicPrefetch();
557
public void setDurableTopicPrefetch(int durableTopicPrefetch);
558
public int getDurableTopicPrefetch();
559
public void setOptimizeAcknowledgeTimeOut(long optimizeAcknowledgeTimeOut);
560
public long getOptimizeAcknowledgeTimeOut();
561
}
562
563
public class RedeliveryPolicy extends DestinationMapEntry implements Cloneable, Serializable {
564
public void setMaximumRedeliveries(int maximumRedeliveries);
565
public int getMaximumRedeliveries();
566
public void setInitialRedeliveryDelay(long initialRedeliveryDelay);
567
public long getInitialRedeliveryDelay();
568
public void setRedeliveryDelay(long redeliveryDelay);
569
public long getRedeliveryDelay();
570
public void setUseExponentialBackOff(boolean useExponentialBackOff);
571
public boolean isUseExponentialBackOff();
572
public void setBackOffMultiplier(double backOffMultiplier);
573
public double getBackOffMultiplier();
574
}
575
576
public interface MessageTransformer {
577
Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException;
578
Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException;
579
}
580
```