0
# Message Source
1
2
RabbitMQ source for consuming messages from queues with configurable delivery guarantees, automatic message acknowledgment during checkpoints, and support for exactly-once processing semantics.
3
4
## Capabilities
5
6
### RMQ Source Class
7
8
Main source class that reads messages from RabbitMQ queues with configurable processing guarantees.
9
10
```java { .api }
11
/**
12
* RabbitMQ source (consumer) which reads from a queue and acknowledges messages on checkpoints.
13
* When checkpointing is enabled, it guarantees exactly-once processing semantics.
14
*
15
* @param <OUT> The type of the data read from RabbitMQ
16
*/
17
public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
18
implements ResultTypeQueryable<OUT> {
19
20
/**
21
* Creates a new RabbitMQ source with at-least-once message processing guarantee when
22
* checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.
23
*
24
* @param rmqConnectionConfig The RabbitMQ connection configuration
25
* @param queueName The queue to receive messages from
26
* @param deserializationSchema A DeserializationSchema for turning bytes into Java objects
27
*/
28
public RMQSource(RMQConnectionConfig rmqConnectionConfig,
29
String queueName,
30
DeserializationSchema<OUT> deserializationSchema);
31
32
/**
33
* Creates a new RabbitMQ source with configurable correlation ID usage.
34
* For exactly-once processing, set usesCorrelationId to true and enable checkpointing.
35
*
36
* @param rmqConnectionConfig The RabbitMQ connection configuration
37
* @param queueName The queue to receive messages from
38
* @param usesCorrelationId Whether messages have unique correlation IDs for deduplication
39
* @param deserializationSchema A DeserializationSchema for turning bytes into Java objects
40
*/
41
public RMQSource(RMQConnectionConfig rmqConnectionConfig,
42
String queueName,
43
boolean usesCorrelationId,
44
DeserializationSchema<OUT> deserializationSchema);
45
46
/** Initializes the connection to RMQ and sets up the queue */
47
public void open(Configuration config) throws Exception;
48
49
/** Closes the RMQ connection */
50
public void close() throws Exception;
51
52
/** Main processing loop that consumes messages from the queue */
53
public void run(SourceContext<OUT> ctx) throws Exception;
54
55
/** Cancels the source operation */
56
public void cancel();
57
58
/** Returns the type information for the produced output type */
59
public TypeInformation<OUT> getProducedType();
60
}
61
```
62
63
### Customization Methods
64
65
Protected methods that can be overridden for custom queue and connection setup.
66
67
```java { .api }
68
/**
69
* Protected methods for customizing RMQ source behavior
70
*/
71
public class RMQSource<OUT> {
72
73
/**
74
* Initializes the connection to RMQ with a default connection factory.
75
* Override this method to setup and configure a custom ConnectionFactory.
76
*/
77
protected ConnectionFactory setupConnectionFactory() throws Exception;
78
79
/**
80
* Sets up the queue. The default implementation just declares the queue.
81
* Override this method for custom queue setup (i.e. binding to an exchange
82
* or defining custom queue parameters).
83
*/
84
protected void setupQueue() throws IOException;
85
86
/**
87
* Acknowledges session IDs during checkpoint creation.
88
* Called automatically by the framework during checkpointing.
89
*/
90
protected void acknowledgeSessionIDs(List<Long> sessionIds);
91
}
92
```
93
94
**Usage Examples:**
95
96
```java
97
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
98
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
99
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
100
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
101
102
// Basic at-least-once source
103
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
104
.setHost("localhost")
105
.setPort(5672)
106
.setVirtualHost("/")
107
.setUserName("guest")
108
.setPassword("guest")
109
.build();
110
111
RMQSource<String> basicSource = new RMQSource<>(
112
connectionConfig,
113
"input-queue",
114
new SimpleStringSchema()
115
);
116
117
// Exactly-once source with correlation IDs
118
RMQSource<String> exactlyOnceSource = new RMQSource<>(
119
connectionConfig,
120
"input-queue",
121
true, // use correlation IDs
122
new SimpleStringSchema()
123
);
124
125
// Custom source with queue binding
126
class CustomRMQSource extends RMQSource<String> {
127
public CustomRMQSource(RMQConnectionConfig config, String queueName) {
128
super(config, queueName, new SimpleStringSchema());
129
}
130
131
@Override
132
protected void setupQueue() throws IOException {
133
// Declare queue with custom parameters
134
channel.queueDeclare(queueName, true, false, false, null);
135
136
// Bind queue to exchange
137
channel.queueBind(queueName, "events-exchange", "user.*");
138
}
139
}
140
141
// Use in Flink pipeline
142
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
143
env.enableCheckpointing(5000); // Enable checkpointing for delivery guarantees
144
145
env.addSource(exactlyOnceSource)
146
.map(s -> processMessage(s))
147
.print();
148
149
env.execute("RabbitMQ Source Example");
150
```
151
152
## Processing Semantics
153
154
The RMQSource supports three different processing modes:
155
156
### Exactly-Once Processing
157
158
- **Requirements**: Checkpointing enabled + correlation IDs + RabbitMQ transactions
159
- **Usage**: Set `usesCorrelationId` to `true` and enable Flink checkpointing
160
- **Behavior**: Messages are acknowledged only during successful checkpoints; correlation IDs prevent duplicate processing
161
- **Producer Requirement**: Must set unique correlation IDs on messages
162
163
```java
164
// Enable checkpointing in Flink
165
env.enableCheckpointing(5000);
166
167
// Create source with correlation ID support
168
RMQSource<String> source = new RMQSource<>(
169
connectionConfig,
170
"queue-name",
171
true, // enables exactly-once with correlation IDs
172
new SimpleStringSchema()
173
);
174
```
175
176
### At-Least-Once Processing
177
178
- **Requirements**: Checkpointing enabled + RabbitMQ transactions (no correlation IDs)
179
- **Usage**: Enable Flink checkpointing, set `usesCorrelationId` to `false` or use single-parameter constructor
180
- **Behavior**: Messages acknowledged during checkpoints; may process duplicates after failures
181
182
```java
183
// Enable checkpointing in Flink
184
env.enableCheckpointing(5000);
185
186
// Create source without correlation IDs
187
RMQSource<String> source = new RMQSource<>(
188
connectionConfig,
189
"queue-name",
190
new SimpleStringSchema() // at-least-once processing
191
);
192
```
193
194
### No Delivery Guarantees
195
196
- **Requirements**: No checkpointing
197
- **Usage**: Disable checkpointing or don't enable it
198
- **Behavior**: Auto-acknowledgment mode; messages may be lost on failures but no transaction overhead
199
200
```java
201
// No checkpointing enabled
202
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
203
204
RMQSource<String> source = new RMQSource<>(
205
connectionConfig,
206
"queue-name",
207
new SimpleStringSchema()
208
);
209
```
210
211
## Queue Configuration
212
213
### Default Queue Setup
214
215
By default, `setupQueue()` declares a durable, non-exclusive, non-auto-delete queue:
216
217
```java
218
channel.queueDeclare(queueName, true, false, false, null);
219
```
220
221
### Custom Queue Setup
222
223
Override `setupQueue()` for custom queue configuration:
224
225
```java
226
@Override
227
protected void setupQueue() throws IOException {
228
Map<String, Object> args = new HashMap<>();
229
args.put("x-message-ttl", 60000); // 60 second TTL
230
args.put("x-max-length", 1000); // Max 1000 messages
231
232
// Declare queue with custom arguments
233
channel.queueDeclare(queueName, true, false, false, args);
234
235
// Bind to exchange with routing key
236
channel.queueBind(queueName, "my-exchange", "routing.key");
237
}
238
```
239
240
## Error Handling
241
242
### Connection Failures
243
244
Connection failures during `open()` throw `RuntimeException` with descriptive messages:
245
246
```java
247
try {
248
source.open(config);
249
} catch (RuntimeException e) {
250
// Handle connection failures
251
logger.error("Failed to connect to RabbitMQ: " + e.getMessage());
252
}
253
```
254
255
### Message Processing Failures
256
257
- **Exactly-once mode**: Failed acknowledgments during checkpointing throw `RuntimeException`
258
- **Correlation ID violations**: Missing correlation IDs when `usesCorrelationId=true` throw exceptions
259
- **Deserialization failures**: Handled by the provided `DeserializationSchema`
260
261
### Recovery Behavior
262
263
When automatic recovery is enabled in `RMQConnectionConfig`:
264
- Network failures trigger automatic reconnection
265
- Topology recovery re-declares queues and bindings
266
- Recovery interval controls delay between attempts