0
# Messages and Destinations
1
2
ActiveMQ provides comprehensive implementations of all JMS message types with additional scheduling and routing capabilities, plus complete destination support for queues, topics, and temporary destinations.
3
4
## Capabilities
5
6
### Base Message Implementation
7
8
Core message implementation with ActiveMQ scheduling extensions.
9
10
```java { .api }
11
/**
12
* Base ActiveMQ message implementation with scheduling support
13
* Extends JMS Message with delayed delivery and redelivery features
14
*/
15
public class ActiveMQMessage implements Message, ScheduledMessage {
16
/** Set message scheduled for future delivery */
17
public void setScheduledDeliveryTime(long scheduledDeliveryTime);
18
public long getScheduledDeliveryTime();
19
20
/** Set delay before initial delivery */
21
public void setRedeliveryDelay(long redeliveryDelay);
22
public long getRedeliveryDelay();
23
24
/** Set periodic delivery schedule */
25
public void setPeriod(long period);
26
public long getPeriod();
27
28
/** Set number of times to repeat delivery */
29
public void setRepeat(int repeat);
30
public int getRepeat();
31
32
/** Set CRON expression for complex scheduling */
33
public void setCronEntry(String cronEntry);
34
public String getCronEntry();
35
36
/** Standard JMS message properties */
37
public String getJMSMessageID() throws JMSException;
38
public void setJMSMessageID(String id) throws JMSException;
39
public long getJMSTimestamp() throws JMSException;
40
public void setJMSTimestamp(long timestamp) throws JMSException;
41
public byte[] getJMSCorrelationIDAsBytes() throws JMSException;
42
public void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException;
43
public String getJMSCorrelationID() throws JMSException;
44
public void setJMSCorrelationID(String correlationID) throws JMSException;
45
public Destination getJMSReplyTo() throws JMSException;
46
public void setJMSReplyTo(Destination replyTo) throws JMSException;
47
public Destination getJMSDestination() throws JMSException;
48
public void setJMSDestination(Destination destination) throws JMSException;
49
public int getJMSDeliveryMode() throws JMSException;
50
public void setJMSDeliveryMode(int deliveryMode) throws JMSException;
51
public boolean getJMSRedelivered() throws JMSException;
52
public void setJMSRedelivered(boolean redelivered) throws JMSException;
53
public String getJMSType() throws JMSException;
54
public void setJMSType(String type) throws JMSException;
55
public long getJMSExpiration() throws JMSException;
56
public void setJMSExpiration(long expiration) throws JMSException;
57
public int getJMSPriority() throws JMSException;
58
public void setJMSPriority(int priority) throws JMSException;
59
60
/** Message properties */
61
public void clearProperties() throws JMSException;
62
public boolean propertyExists(String name) throws JMSException;
63
public boolean getBooleanProperty(String name) throws JMSException;
64
public byte getByteProperty(String name) throws JMSException;
65
public short getShortProperty(String name) throws JMSException;
66
public int getIntProperty(String name) throws JMSException;
67
public long getLongProperty(String name) throws JMSException;
68
public float getFloatProperty(String name) throws JMSException;
69
public double getDoubleProperty(String name) throws JMSException;
70
public String getStringProperty(String name) throws JMSException;
71
public Object getObjectProperty(String name) throws JMSException;
72
public Enumeration getPropertyNames() throws JMSException;
73
public void setBooleanProperty(String name, boolean value) throws JMSException;
74
public void setByteProperty(String name, byte value) throws JMSException;
75
public void setShortProperty(String name, short value) throws JMSException;
76
public void setIntProperty(String name, int value) throws JMSException;
77
public void setLongProperty(String name, long value) throws JMSException;
78
public void setFloatProperty(String name, float value) throws JMSException;
79
public void setDoubleProperty(String name, double value) throws JMSException;
80
public void setStringProperty(String name, String value) throws JMSException;
81
public void setObjectProperty(String name, Object value) throws JMSException;
82
83
/** Message acknowledgment */
84
public void acknowledge() throws JMSException;
85
86
/** Clear message body */
87
public void clearBody() throws JMSException;
88
}
89
```
90
91
**Usage Examples:**
92
93
```java
94
// Schedule message for future delivery
95
TextMessage message = session.createTextMessage("Delayed message");
96
message.setScheduledDeliveryTime(System.currentTimeMillis() + 60000); // 1 minute delay
97
98
// Periodic message delivery
99
TextMessage periodicMessage = session.createTextMessage("Periodic update");
100
periodicMessage.setPeriod(30000); // Every 30 seconds
101
periodicMessage.setRepeat(10); // Repeat 10 times
102
103
// CRON-based scheduling
104
TextMessage cronMessage = session.createTextMessage("Daily report");
105
cronMessage.setCronEntry("0 0 9 * * ?"); // Every day at 9 AM
106
```
107
108
### Text Messages
109
110
Messages containing string payloads.
111
112
```java { .api }
113
/**
114
* Text message implementation for string content
115
*/
116
public class ActiveMQTextMessage extends ActiveMQMessage implements TextMessage {
117
/** Set text content */
118
public void setText(String string) throws JMSException;
119
120
/** Get text content */
121
public String getText() throws JMSException;
122
}
123
```
124
125
### Bytes Messages
126
127
Messages containing binary data with stream-like access.
128
129
```java { .api }
130
/**
131
* Bytes message implementation for binary content
132
*/
133
public class ActiveMQBytesMessage extends ActiveMQMessage implements BytesMessage {
134
/** Get body length in bytes */
135
public long getBodyLength() throws JMSException;
136
137
/** Read bytes from message body */
138
public int readBytes(byte[] value) throws JMSException;
139
public int readBytes(byte[] value, int length) throws JMSException;
140
141
/** Write bytes to message body */
142
public void writeBytes(byte[] value) throws JMSException;
143
public void writeBytes(byte[] value, int offset, int length) throws JMSException;
144
145
/** Read typed values */
146
public boolean readBoolean() throws JMSException;
147
public byte readByte() throws JMSException;
148
public int readUnsignedByte() throws JMSException;
149
public short readShort() throws JMSException;
150
public int readUnsignedShort() throws JMSException;
151
public char readChar() throws JMSException;
152
public int readInt() throws JMSException;
153
public long readLong() throws JMSException;
154
public float readFloat() throws JMSException;
155
public double readDouble() throws JMSException;
156
public String readUTF() throws JMSException;
157
158
/** Write typed values */
159
public void writeBoolean(boolean value) throws JMSException;
160
public void writeByte(byte value) throws JMSException;
161
public void writeShort(short value) throws JMSException;
162
public void writeChar(char value) throws JMSException;
163
public void writeInt(int value) throws JMSException;
164
public void writeLong(long value) throws JMSException;
165
public void writeFloat(float value) throws JMSException;
166
public void writeDouble(double value) throws JMSException;
167
public void writeUTF(String value) throws JMSException;
168
169
/** Reset message for reading */
170
public void reset() throws JMSException;
171
}
172
```
173
174
### Map Messages
175
176
Messages containing name-value pairs.
177
178
```java { .api }
179
/**
180
* Map message implementation for key-value content
181
*/
182
public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
183
/** Get value by name */
184
public boolean getBoolean(String name) throws JMSException;
185
public byte getByte(String name) throws JMSException;
186
public short getShort(String name) throws JMSException;
187
public char getChar(String name) throws JMSException;
188
public int getInt(String name) throws JMSException;
189
public long getLong(String name) throws JMSException;
190
public float getFloat(String name) throws JMSException;
191
public double getDouble(String name) throws JMSException;
192
public String getString(String name) throws JMSException;
193
public byte[] getBytes(String name) throws JMSException;
194
public Object getObject(String name) throws JMSException;
195
196
/** Set value by name */
197
public void setBoolean(String name, boolean value) throws JMSException;
198
public void setByte(String name, byte value) throws JMSException;
199
public void setShort(String name, short value) throws JMSException;
200
public void setChar(String name, char value) throws JMSException;
201
public void setInt(String name, int value) throws JMSException;
202
public void setLong(String name, long value) throws JMSException;
203
public void setFloat(String name, float value) throws JMSException;
204
public void setDouble(String name, double value) throws JMSException;
205
public void setString(String name, String value) throws JMSException;
206
public void setBytes(String name, byte[] value) throws JMSException;
207
public void setBytes(String name, byte[] value, int offset, int length) throws JMSException;
208
public void setObject(String name, Object value) throws JMSException;
209
210
/** Check if item exists */
211
public boolean itemExists(String name) throws JMSException;
212
213
/** Get all map names */
214
public Enumeration getMapNames() throws JMSException;
215
}
216
```
217
218
### Object Messages
219
220
Messages containing serializable Java objects.
221
222
```java { .api }
223
/**
224
* Object message implementation for serializable Java objects
225
*/
226
public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMessage, TransientInitializer {
227
/** Set object payload */
228
public void setObject(Serializable object) throws JMSException;
229
230
/** Get object payload */
231
public Serializable getObject() throws JMSException;
232
}
233
```
234
235
### Stream Messages
236
237
Messages containing a stream of typed primitive values.
238
239
```java { .api }
240
/**
241
* Stream message implementation for sequential primitive values
242
*/
243
public class ActiveMQStreamMessage extends ActiveMQMessage implements StreamMessage {
244
/** Read typed values in sequence */
245
public boolean readBoolean() throws JMSException;
246
public byte readByte() throws JMSException;
247
public short readShort() throws JMSException;
248
public char readChar() throws JMSException;
249
public int readInt() throws JMSException;
250
public long readLong() throws JMSException;
251
public float readFloat() throws JMSException;
252
public double readDouble() throws JMSException;
253
public String readString() throws JMSException;
254
public int readBytes(byte[] value) throws JMSException;
255
public Object readObject() throws JMSException;
256
257
/** Write typed values in sequence */
258
public void writeBoolean(boolean value) throws JMSException;
259
public void writeByte(byte value) throws JMSException;
260
public void writeShort(short value) throws JMSException;
261
public void writeChar(char value) throws JMSException;
262
public void writeInt(int value) throws JMSException;
263
public void writeLong(long value) throws JMSException;
264
public void writeFloat(float value) throws JMSException;
265
public void writeDouble(double value) throws JMSException;
266
public void writeString(String value) throws JMSException;
267
public void writeBytes(byte[] value) throws JMSException;
268
public void writeBytes(byte[] value, int offset, int length) throws JMSException;
269
public void writeObject(Object value) throws JMSException;
270
271
/** Reset stream for reading */
272
public void reset() throws JMSException;
273
}
274
```
275
276
### BLOB Messages
277
278
Messages for large binary objects stored externally.
279
280
```java { .api }
281
/**
282
* BLOB message implementation for large binary objects
283
* Stores large content externally to avoid memory issues
284
*/
285
public class ActiveMQBlobMessage extends ActiveMQMessage implements BlobMessage {
286
/** Set BLOB downloader for retrieving content */
287
public void setBlobDownloader(BlobDownloader blobDownloader);
288
public BlobDownloader getBlobDownloader();
289
290
/** Get input stream for reading BLOB content */
291
public InputStream getInputStream() throws IOException, JMSException;
292
293
/** Get BLOB URL */
294
public URL getURL() throws JMSException;
295
296
/** Set BLOB URL */
297
public void setURL(URL url);
298
299
/** Get BLOB name */
300
public String getName() throws JMSException;
301
302
/** Set BLOB name */
303
public void setName(String name) throws JMSException;
304
305
/** Check if BLOB is deleted on delivery */
306
public boolean isDeletedByBroker();
307
public void setDeletedByBroker(boolean deletedByBroker);
308
}
309
310
/**
311
* Interface for BLOB message support
312
*/
313
public interface BlobMessage extends Message {
314
public InputStream getInputStream() throws IOException, JMSException;
315
public URL getURL() throws JMSException;
316
public String getName() throws JMSException;
317
public void setName(String name) throws JMSException;
318
}
319
```
320
321
### Destinations
322
323
Destination implementations for message routing.
324
325
```java { .api }
326
/**
327
* Base destination implementation
328
*/
329
public abstract class ActiveMQDestination implements Destination, Serializable, Comparable<ActiveMQDestination> {
330
/** Get destination's physical name */
331
public String getPhysicalName();
332
333
/** Set destination's physical name */
334
public void setPhysicalName(String physicalName);
335
336
/** Check destination type */
337
public boolean isQueue();
338
public boolean isTopic();
339
public boolean isTemporary();
340
341
/** Create destination from string */
342
public static ActiveMQDestination createDestination(String name, byte defaultType);
343
public static ActiveMQDestination[] createDestinations(String names, byte defaultType);
344
345
/** Destination options */
346
public void setOptions(Map<String, String> options);
347
public Map<String, String> getOptions();
348
}
349
350
/**
351
* Queue destination for point-to-point messaging
352
*/
353
public class ActiveMQQueue extends ActiveMQDestination implements Queue {
354
/** Create queue with name */
355
public ActiveMQQueue(String name);
356
357
/** Get queue name */
358
public String getQueueName() throws JMSException;
359
360
/** Create queue from URI */
361
public static ActiveMQQueue createQueue(String name);
362
}
363
364
/**
365
* Topic destination for publish-subscribe messaging
366
*/
367
public class ActiveMQTopic extends ActiveMQDestination implements Topic {
368
/** Create topic with name */
369
public ActiveMQTopic(String name);
370
371
/** Get topic name */
372
public String getTopicName() throws JMSException;
373
374
/** Create topic from URI */
375
public static ActiveMQTopic createTopic(String name);
376
}
377
378
/**
379
* Temporary queue destination
380
*/
381
public class ActiveMQTempQueue extends ActiveMQTempDestination implements TemporaryQueue {
382
/** Get queue name */
383
public String getQueueName() throws JMSException;
384
385
/** Delete temporary queue */
386
public void delete() throws JMSException;
387
}
388
389
/**
390
* Temporary topic destination
391
*/
392
public class ActiveMQTempTopic extends ActiveMQTempDestination implements TemporaryTopic {
393
/** Get topic name */
394
public String getTopicName() throws JMSException;
395
396
/** Delete temporary topic */
397
public void delete() throws JMSException;
398
}
399
400
/**
401
* Base class for temporary destinations
402
*/
403
public abstract class ActiveMQTempDestination extends ActiveMQDestination {
404
/** Delete temporary destination */
405
public abstract void delete() throws JMSException;
406
407
/** Get connection ID */
408
public String getConnectionId();
409
410
/** Set connection ID */
411
public void setConnectionId(String connectionId);
412
}
413
```
414
415
**Usage Examples:**
416
417
```java
418
// Creating destinations
419
ActiveMQQueue queue = new ActiveMQQueue("orders.processing");
420
ActiveMQTopic topic = new ActiveMQTopic("news.updates");
421
422
// Queue with options
423
ActiveMQQueue priorityQueue = new ActiveMQQueue("high.priority?prioritizedMessages=true");
424
425
// Topic with selector
426
ActiveMQTopic filteredTopic = new ActiveMQTopic("events.filtered?retroactive=true");
427
428
// Temporary destinations
429
TemporaryQueue tempQueue = session.createTemporaryQueue();
430
TemporaryTopic tempTopic = session.createTemporaryTopic();
431
432
// Clean up temporary destinations
433
tempQueue.delete();
434
tempTopic.delete();
435
```
436
437
### Composite Destinations
438
439
Special destinations for advanced routing scenarios.
440
441
```java { .api }
442
/**
443
* Composite destination for sending to multiple destinations
444
*/
445
public class ActiveMQCompositeDestination extends ActiveMQDestination {
446
/** Create composite destination from destination array */
447
public ActiveMQCompositeDestination(ActiveMQDestination[] destinations);
448
449
/** Get constituent destinations */
450
public ActiveMQDestination[] getDestinations();
451
452
/** Set constituent destinations */
453
public void setDestinations(ActiveMQDestination[] destinations);
454
}
455
```
456
457
**Usage Examples:**
458
459
```java
460
// Send message to multiple destinations
461
ActiveMQQueue queue1 = new ActiveMQQueue("orders.processing");
462
ActiveMQQueue queue2 = new ActiveMQQueue("orders.audit");
463
ActiveMQCompositeDestination composite = new ActiveMQCompositeDestination(
464
new ActiveMQDestination[]{queue1, queue2}
465
);
466
467
MessageProducer producer = session.createProducer(composite);
468
producer.send(session.createTextMessage("Order processed"));
469
```
470
471
## Types
472
473
```java { .api }
474
/**
475
* Interface for scheduled message delivery
476
*/
477
public interface ScheduledMessage {
478
/** Constants for scheduling properties */
479
String AMQ_SCHEDULED_DELAY = "AMQ_SCHEDULED_DELAY";
480
String AMQ_SCHEDULED_PERIOD = "AMQ_SCHEDULED_PERIOD";
481
String AMQ_SCHEDULED_REPEAT = "AMQ_SCHEDULED_REPEAT";
482
String AMQ_SCHEDULED_CRON = "AMQ_SCHEDULED_CRON";
483
484
/** Set scheduled delivery time */
485
void setScheduledDeliveryTime(long scheduledDeliveryTime);
486
long getScheduledDeliveryTime();
487
}
488
489
/**
490
* BLOB download strategy interface
491
*/
492
public interface BlobDownloadStrategy {
493
/** Download BLOB content */
494
InputStream getInputStream(ActiveMQBlobMessage message) throws IOException, JMSException;
495
496
/** Delete BLOB after download */
497
void deleteFile(ActiveMQBlobMessage message) throws IOException, JMSException;
498
}
499
500
/**
501
* BLOB upload strategy interface
502
*/
503
public interface BlobUploadStrategy {
504
/** Upload BLOB content */
505
BlobUploadStrategy uploadFile(ActiveMQBlobMessage message, File file) throws JMSException, IOException;
506
507
/** Upload BLOB from input stream */
508
BlobUploadStrategy uploadStream(ActiveMQBlobMessage message, InputStream in) throws JMSException, IOException;
509
}
510
```