0
# Transport Protocols
1
2
ActiveMQ supports multiple messaging protocols beyond standard JMS, including STOMP, AMQP, MQTT, and HTTP. This enables integration with diverse client platforms and messaging ecosystems.
3
4
## Capabilities
5
6
### STOMP Protocol Support
7
8
Simple Text Oriented Messaging Protocol for interoperability with multiple platforms.
9
10
```java { .api }
11
/**
12
* STOMP protocol constants and command definitions
13
*/
14
public interface Stomp {
15
/** STOMP commands */
16
String CONNECT = "CONNECT";
17
String CONNECTED = "CONNECTED";
18
String SEND = "SEND";
19
String SUBSCRIBE = "SUBSCRIBE";
20
String UNSUBSCRIBE = "UNSUBSCRIBE";
21
String ACK = "ACK";
22
String NACK = "NACK";
23
String BEGIN = "BEGIN";
24
String COMMIT = "COMMIT";
25
String ABORT = "ABORT";
26
String DISCONNECT = "DISCONNECT";
27
String MESSAGE = "MESSAGE";
28
String RECEIPT = "RECEIPT";
29
String ERROR = "ERROR";
30
31
/** STOMP headers */
32
String DESTINATION = "destination";
33
String MESSAGE_ID = "message-id";
34
String SUBSCRIPTION = "subscription";
35
String RECEIPT_REQUESTED = "receipt";
36
String TRANSACTION = "transaction";
37
String ACK_MODE = "ack";
38
String SELECTOR = "selector";
39
String USER_ID = "login";
40
String PASSWORD = "passcode";
41
String CLIENT_ID = "client-id";
42
43
/** Message transformation headers */
44
String TRANSFORMATION = "transformation";
45
String TRANSFORMATION_JSON = "jms-json";
46
String TRANSFORMATION_XML = "jms-xml";
47
String TRANSFORMATION_OBJECT_JSON = "jms-object-json";
48
String TRANSFORMATION_OBJECT_XML = "jms-object-xml";
49
String TRANSFORMATION_MAP_JSON = "jms-map-json";
50
String TRANSFORMATION_MAP_XML = "jms-map-xml";
51
}
52
53
/**
54
* STOMP transport implementation
55
*/
56
public class StompTransport extends TcpTransport {
57
public StompTransport(WireFormat wireFormat, Socket socket) throws IOException;
58
public StompTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException;
59
}
60
61
/**
62
* STOMP wire format for message serialization
63
*/
64
public class StompWireFormat implements WireFormat {
65
/** Get maximum data length */
66
public int getMaxDataLength();
67
public void setMaxDataLength(int maxDataLength);
68
69
/** Get maximum frame size */
70
public long getMaxFrameSize();
71
public void setMaxFrameSize(long maxFrameSize);
72
}
73
74
/**
75
* STOMP protocol exception
76
*/
77
public class ProtocolException extends IOException {
78
public ProtocolException(String message);
79
public ProtocolException(String message, Throwable cause);
80
}
81
```
82
83
### AMQP Protocol Support
84
85
Advanced Message Queuing Protocol 1.0 support for enterprise messaging.
86
87
```java { .api }
88
/**
89
* AMQP transport implementation
90
*/
91
public class AmqpTransport extends TransportSupport {
92
public AmqpTransport(WireFormat wireFormat, Socket socket) throws IOException;
93
public AmqpTransport(WireFormat wireFormat, SSLSocket socket) throws IOException;
94
}
95
96
/**
97
* AMQP NIO SSL transport
98
*/
99
public class AmqpNioSslTransport extends AmqpNioTransport {
100
public AmqpNioSslTransport(WireFormat wireFormat, SocketChannel channel,
101
SSLEngine engine) throws IOException;
102
}
103
104
/**
105
* AMQP wire format implementation
106
*/
107
public class AmqpWireFormat implements WireFormat {
108
/** AMQP version constants */
109
public static final int DEFAULT_MAX_FRAME_SIZE = 1024 * 1024;
110
public static final String QUEUE_PREFIX = "queue://";
111
public static final String TOPIC_PREFIX = "topic://";
112
}
113
114
/**
115
* AMQP utilities and support functions
116
*/
117
public class AmqpSupport {
118
/** Convert JMS destination to AMQP address */
119
public static String toAddress(Destination destination);
120
121
/** Convert AMQP address to JMS destination */
122
public static Destination toDestination(String address);
123
124
/** Message property conversion */
125
public static Object convertProperty(Object value);
126
}
127
128
/**
129
* SASL authentication mechanism interface
130
*/
131
public interface SaslMechanism {
132
/** Get mechanism name */
133
String getName();
134
135
/** Create initial response */
136
byte[] getInitialResponse() throws SaslException;
137
138
/** Process challenge response */
139
byte[] getChallengeResponse(byte[] challenge) throws SaslException;
140
141
/** Check if authentication is complete */
142
boolean isComplete();
143
}
144
```
145
146
### MQTT Protocol Support
147
148
Message Queuing Telemetry Transport for IoT and lightweight messaging.
149
150
```java { .api }
151
/**
152
* MQTT transport implementation
153
*/
154
public class MQTTTransport extends TcpTransport {
155
public MQTTTransport(WireFormat wireFormat, Socket socket) throws IOException;
156
public MQTTTransport(WireFormat wireFormat, Socket socket, InitBuffer initBuffer) throws IOException;
157
}
158
159
/**
160
* MQTT NIO SSL transport
161
*/
162
public class MQTTNIOSSLTransport extends MQTTNIOTransport {
163
public MQTTNIOSSLTransport(WireFormat wireFormat, SocketChannel channel,
164
SSLEngine engine) throws IOException;
165
}
166
167
/**
168
* MQTT wire format for message serialization
169
*/
170
public class MQTTWireFormat implements WireFormat {
171
/** MQTT protocol version */
172
public void setVersion(int version);
173
public int getVersion();
174
175
/** Keep alive interval */
176
public void setKeepAlive(int keepAlive);
177
public int getKeepAlive();
178
}
179
180
/**
181
* MQTT subscription management
182
*/
183
public class MQTTSubscription {
184
/** Get topic filter */
185
public String getTopicFilter();
186
public void setTopicFilter(String topicFilter);
187
188
/** Get quality of service level */
189
public QoS getQoS();
190
public void setQoS(QoS qos);
191
192
/** MQTT QoS levels */
193
public enum QoS {
194
AT_MOST_ONCE(0),
195
AT_LEAST_ONCE(1),
196
EXACTLY_ONCE(2);
197
198
private final int value;
199
QoS(int value) { this.value = value; }
200
public int getValue() { return value; }
201
}
202
}
203
204
/**
205
* MQTT protocol exception
206
*/
207
public class MQTTProtocolException extends IOException {
208
public MQTTProtocolException(String message);
209
public MQTTProtocolException(String message, Throwable cause);
210
}
211
```
212
213
### HTTP Transport Support
214
215
HTTP tunneling and WebSocket support for web-based messaging.
216
217
```java { .api }
218
/**
219
* HTTP tunnel servlet for JMS over HTTP
220
*/
221
public class HttpTunnelServlet extends HttpServlet {
222
/** Handle HTTP GET requests */
223
protected void doGet(HttpServletRequest request, HttpServletResponse response)
224
throws ServletException, IOException;
225
226
/** Handle HTTP POST requests */
227
protected void doPost(HttpServletRequest request, HttpServletResponse response)
228
throws ServletException, IOException;
229
230
/** Configure tunnel parameters */
231
public void setMaxReadTimeoutMillis(long maxReadTimeoutMillis);
232
public long getMaxReadTimeoutMillis();
233
}
234
235
/**
236
* HTTP client transport implementation
237
*/
238
public class HttpClientTransport extends HttpTransportSupport {
239
public HttpClientTransport(TextWireFormat wireFormat, URI uri) throws IOException;
240
public HttpClientTransport(TextWireFormat wireFormat, URI uri, HttpClient httpClient) throws IOException;
241
}
242
243
/**
244
* HTTP transport server
245
*/
246
public class HttpTransportServer extends TransportServerSupport {
247
/** Set HTTP server configuration */
248
public void setHttpServer(Server httpServer);
249
public Server getHttpServer();
250
251
/** Bind to address */
252
public void bind() throws Exception;
253
254
/** Start HTTP server */
255
public void start() throws Exception;
256
257
/** Stop HTTP server */
258
public void stop() throws Exception;
259
}
260
261
/**
262
* HTTP discovery agent for broker discovery
263
*/
264
public class HTTPDiscoveryAgent extends DiscoveryAgentSupport {
265
/** Set registry URL */
266
public void setRegistryURL(String registryURL);
267
public String getRegistryURL();
268
269
/** Start discovery */
270
public void start() throws Exception;
271
272
/** Stop discovery */
273
public void stop() throws Exception;
274
275
/** Advertise service */
276
public void serviceFailed(DiscoveryEvent event) throws IOException;
277
public void serviceAdded(DiscoveryEvent event);
278
public void serviceRemoved(DiscoveryEvent event);
279
}
280
```
281
282
**Usage Examples:**
283
284
```java
285
// STOMP transport configuration
286
BrokerService broker = new BrokerService();
287
broker.addConnector("stomp://localhost:61613");
288
289
// AMQP transport
290
broker.addConnector("amqp://localhost:5672");
291
292
// MQTT transport
293
broker.addConnector("mqtt://localhost:1883");
294
295
// HTTP transport
296
broker.addConnector("http://localhost:8080");
297
298
// Multiple transports with SSL
299
broker.addConnector("stomp+ssl://localhost:61614");
300
broker.addConnector("mqtt+ssl://localhost:8883");
301
302
// Auto-detection transport (detects protocol automatically)
303
broker.addConnector("auto://localhost:5555");
304
```
305
306
## Types
307
308
```java { .api }
309
/**
310
* Wire format interface for protocol serialization
311
*/
312
public interface WireFormat {
313
/** Marshal object to byte array */
314
ByteSequence marshal(Object command) throws IOException;
315
316
/** Unmarshal byte array to object */
317
Object unmarshal(ByteSequence packet) throws IOException;
318
319
/** Set version */
320
void setVersion(int version);
321
int getVersion();
322
}
323
324
/**
325
* Text-based wire format for HTTP/WebSocket
326
*/
327
public interface TextWireFormat extends WireFormat {
328
/** Marshal to text */
329
String marshalText(Object command) throws IOException;
330
331
/** Unmarshal from text */
332
Object unmarshalText(String text) throws IOException;
333
}
334
335
/**
336
* Discovery event for service discovery
337
*/
338
public class DiscoveryEvent {
339
/** Get service name */
340
public String getServiceName();
341
342
/** Get service URI */
343
public URI getUri();
344
345
/** Check if service failed */
346
public boolean isFailed();
347
}
348
```