0
# Core Messaging
1
2
Foundation messaging interfaces and classes providing the basic abstractions for message handling, channels, and core operations in the Spring Messaging framework.
3
4
## Capabilities
5
6
### Message Interface
7
8
The core message abstraction representing a generic message with headers and typed payload.
9
10
```java { .api }
11
/**
12
* A generic message representation with headers and body.
13
* @param <T> the payload type
14
*/
15
public interface Message<T> {
16
/**
17
* Return the message payload.
18
*/
19
T getPayload();
20
21
/**
22
* Return message headers for the message (never null but may be empty).
23
*/
24
MessageHeaders getHeaders();
25
}
26
```
27
28
### Message Channels
29
30
Channel abstractions for sending and receiving messages with different semantic models.
31
32
```java { .api }
33
/**
34
* Defines methods for sending messages.
35
*/
36
@FunctionalInterface
37
public interface MessageChannel {
38
/** Constant for sending a message without a prescribed timeout */
39
long INDEFINITE_TIMEOUT = -1;
40
41
/**
42
* Send a message, blocking indefinitely if necessary.
43
*/
44
boolean send(Message<?> message);
45
46
/**
47
* Send a message, blocking until either the message is accepted or the timeout elapses.
48
*/
49
boolean send(Message<?> message, long timeout);
50
}
51
52
/**
53
* A MessageChannel that maintains a registry of subscribers and invokes them to handle messages.
54
*/
55
public interface SubscribableChannel extends MessageChannel {
56
/**
57
* Register a message handler with this channel.
58
*/
59
boolean subscribe(MessageHandler handler);
60
61
/**
62
* Un-register a message handler from this channel.
63
*/
64
boolean unsubscribe(MessageHandler handler);
65
}
66
67
/**
68
* A MessageChannel from which messages may be actively received through polling.
69
*/
70
public interface PollableChannel extends MessageChannel {
71
/**
72
* Receive a message from this channel, blocking indefinitely if necessary.
73
*/
74
Message<?> receive();
75
76
/**
77
* Receive a message from this channel, blocking until either message is available or timeout elapses.
78
*/
79
Message<?> receive(long timeout);
80
}
81
```
82
83
### Message Handlers
84
85
Contract for handling messages with both synchronous and reactive implementations.
86
87
```java { .api }
88
/**
89
* Contract for handling a Message.
90
*/
91
@FunctionalInterface
92
public interface MessageHandler {
93
/**
94
* Handle the given message.
95
* @param message the message to be handled
96
* @throws MessagingException if the handler failed to process the message
97
*/
98
void handleMessage(Message<?> message) throws MessagingException;
99
}
100
101
/**
102
* A contract for handling a Message reactively.
103
*/
104
public interface ReactiveMessageHandler {
105
/**
106
* Handle the given message reactively.
107
* @param message the message to be handled
108
* @return a Mono that completes when handling is done
109
*/
110
Mono<Void> handleMessage(Message<?> message);
111
}
112
```
113
114
### Message Headers
115
116
Immutable message headers implementation providing access to message metadata.
117
118
```java { .api }
119
/**
120
* The headers for a Message. Immutable implementation of Map.
121
*/
122
public final class MessageHeaders implements Map<String, Object>, Serializable {
123
/** Standard header name for message ID */
124
public static final String ID = "id";
125
126
/** Standard header name for timestamp */
127
public static final String TIMESTAMP = "timestamp";
128
129
/** Standard header name for content type */
130
public static final String CONTENT_TYPE = "contentType";
131
132
/** Standard header name for reply channel */
133
public static final String REPLY_CHANNEL = "replyChannel";
134
135
/** Standard header name for error channel */
136
public static final String ERROR_CHANNEL = "errorChannel";
137
138
/**
139
* Construct MessageHeaders with the given headers Map.
140
*/
141
public MessageHeaders(@Nullable Map<String, Object> headers);
142
143
// Map interface implementation
144
public Object get(Object key);
145
public boolean containsKey(Object key);
146
public Set<String> keySet();
147
public Collection<Object> values();
148
public Set<Map.Entry<String, Object>> entrySet();
149
public int size();
150
public boolean isEmpty();
151
152
/**
153
* Get the ID header value.
154
*/
155
@Nullable
156
public UUID getId();
157
158
/**
159
* Get the timestamp header value.
160
*/
161
@Nullable
162
public Long getTimestamp();
163
164
/**
165
* Get the reply channel header value.
166
*/
167
@Nullable
168
public Object getReplyChannel();
169
170
/**
171
* Get the error channel header value.
172
*/
173
@Nullable
174
public Object getErrorChannel();
175
}
176
```
177
178
### Message Building
179
180
Utility class for creating messages with builder pattern and static factory methods.
181
182
```java { .api }
183
/**
184
* A builder for creating messages with fluent API.
185
*/
186
public final class MessageBuilder<T> {
187
188
/**
189
* Create a new builder for a message with the given payload.
190
*/
191
public static <T> MessageBuilder<T> withPayload(T payload);
192
193
/**
194
* Create a new builder from an existing message, copying payload and headers.
195
*/
196
public static <T> MessageBuilder<T> fromMessage(Message<T> message);
197
198
/**
199
* Create a message directly with the given payload and headers.
200
*/
201
public static <T> Message<T> createMessage(T payload, MessageHeaders messageHeaders);
202
203
/**
204
* Set a header value.
205
*/
206
public MessageBuilder<T> setHeader(String headerName, Object headerValue);
207
208
/**
209
* Set a header value only if not already present.
210
*/
211
public MessageBuilder<T> setHeaderIfAbsent(String headerName, Object headerValue);
212
213
/**
214
* Copy headers from a map.
215
*/
216
public MessageBuilder<T> copyHeaders(@Nullable Map<String, ?> headersToCopy);
217
218
/**
219
* Copy headers from a map if not already present.
220
*/
221
public MessageBuilder<T> copyHeadersIfAbsent(@Nullable Map<String, ?> headersToCopy);
222
223
/**
224
* Remove a header.
225
*/
226
public MessageBuilder<T> removeHeader(String headerName);
227
228
/**
229
* Remove headers matching patterns.
230
*/
231
public MessageBuilder<T> removeHeaders(String... headerPatterns);
232
233
/**
234
* Build the final message.
235
*/
236
public Message<T> build();
237
}
238
```
239
240
### Core Exceptions
241
242
Base messaging exceptions for error handling.
243
244
```java { .api }
245
/**
246
* Base exception for failures that occur during messaging.
247
*/
248
public class MessagingException extends RuntimeException {
249
250
public MessagingException(String description);
251
252
public MessagingException(String description, Throwable cause);
253
254
public MessagingException(Message<?> failedMessage);
255
256
public MessagingException(Message<?> failedMessage, Throwable cause);
257
258
public MessagingException(Message<?> failedMessage, String description);
259
260
public MessagingException(Message<?> failedMessage, String description, Throwable cause);
261
262
/**
263
* Return the Message for which delivery failed, if available.
264
*/
265
@Nullable
266
public Message<?> getFailedMessage();
267
}
268
269
/**
270
* Exception that indicates an error occurred during message handling.
271
*/
272
public class MessageHandlingException extends MessagingException {
273
// Inherits all constructors from MessagingException
274
}
275
276
/**
277
* Exception that indicates an error occurred during message delivery.
278
*/
279
public class MessageDeliveryException extends MessagingException {
280
// Inherits all constructors from MessagingException
281
}
282
```
283
284
**Usage Examples:**
285
286
```java
287
import org.springframework.messaging.*;
288
import org.springframework.messaging.support.MessageBuilder;
289
import org.springframework.messaging.support.GenericMessage;
290
291
// Create messages using different methods
292
Message<String> simpleMessage = new GenericMessage<>("Hello World");
293
294
// Using MessageBuilder static factory method
295
Message<String> messageWithHeaders = MessageBuilder
296
.withPayload("Hello with headers")
297
.setHeader("type", "greeting")
298
.setHeader("priority", "high")
299
.build();
300
301
// Create from existing message
302
Message<String> copiedMessage = MessageBuilder
303
.fromMessage(simpleMessage)
304
.setHeader("copied", true)
305
.build();
306
307
// Create message directly with headers
308
Map<String, Object> headers = Map.of("urgent", true, "sender", "system");
309
MessageHeaders messageHeaders = new MessageHeaders(headers);
310
Message<String> directMessage = MessageBuilder.createMessage("Direct message", messageHeaders);
311
312
// Handle messages
313
MessageHandler handler = message -> {
314
System.out.println("Received: " + message.getPayload());
315
MessageHeaders headers = message.getHeaders();
316
System.out.println("Message ID: " + headers.getId());
317
System.out.println("Timestamp: " + headers.getTimestamp());
318
};
319
320
// Channel operations
321
MessageChannel channel = getChannel(); // obtain from context
322
boolean success = channel.send(messageWithHeaders);
323
324
// Subscribable channel
325
SubscribableChannel subscribableChannel = getSubscribableChannel();
326
subscribableChannel.subscribe(handler);
327
328
// Pollable channel
329
PollableChannel pollableChannel = getPollableChannel();
330
Message<?> received = pollableChannel.receive(5000); // 5 second timeout
331
```