0
# Message Sink
1
2
RabbitMQ sink for publishing messages to queues with configurable error handling, automatic queue setup, and robust connection management.
3
4
## Capabilities
5
6
### RMQ Sink Class
7
8
Main sink class that publishes messages to RabbitMQ queues with configurable error handling behavior.
9
10
```java { .api }
11
/**
12
* A Sink for publishing data into RabbitMQ
13
*
14
* @param <IN> The type of the data to be published to RabbitMQ
15
*/
16
public class RMQSink<IN> extends RichSinkFunction<IN> {
17
18
/**
19
* Creates a new RabbitMQ sink for publishing messages to a queue
20
*
21
* @param rmqConnectionConfig The RabbitMQ connection configuration
22
* @param queueName The queue to publish messages to
23
* @param schema A SerializationSchema for turning Java objects into bytes
24
*/
25
public RMQSink(RMQConnectionConfig rmqConnectionConfig,
26
String queueName,
27
SerializationSchema<IN> schema);
28
29
/** Initializes the RabbitMQ connection and channel, and sets up the queue */
30
public void open(Configuration config) throws Exception;
31
32
/**
33
* Called when new data arrives to the sink, and forwards it to RMQ
34
*
35
* @param value The incoming data to publish
36
*/
37
public void invoke(IN value);
38
39
/** Closes the RabbitMQ connection and channel */
40
public void close();
41
42
/**
43
* Defines whether the producer should fail on errors, or only log them.
44
* If set to true, exceptions will be only logged.
45
* If set to false, exceptions will be thrown and cause the streaming program to fail.
46
*
47
* @param logFailuresOnly The flag to indicate logging-only on exceptions
48
*/
49
public void setLogFailuresOnly(boolean logFailuresOnly);
50
}
51
```
52
53
### Customization Methods
54
55
Protected methods that can be overridden for custom queue setup.
56
57
```java { .api }
58
/**
59
* Protected methods for customizing RMQ sink behavior
60
*/
61
public class RMQSink<IN> {
62
63
/**
64
* Sets up the queue. The default implementation just declares the queue.
65
* Override this method for custom queue setup (i.e. binding to an exchange
66
* or defining custom queue parameters).
67
*/
68
protected void setupQueue() throws IOException;
69
}
70
```
71
72
**Usage Examples:**
73
74
```java
75
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
76
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
77
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
78
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
79
80
// Basic sink configuration
81
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
82
.setHost("localhost")
83
.setPort(5672)
84
.setVirtualHost("/")
85
.setUserName("guest")
86
.setPassword("guest")
87
.build();
88
89
RMQSink<String> basicSink = new RMQSink<>(
90
connectionConfig,
91
"output-queue",
92
new SimpleStringSchema()
93
);
94
95
// Sink with error logging (doesn't fail on publish errors)
96
RMQSink<String> resilientSink = new RMQSink<>(
97
connectionConfig,
98
"output-queue",
99
new SimpleStringSchema()
100
);
101
resilientSink.setLogFailuresOnly(true);
102
103
// Custom sink with exchange publishing
104
class CustomRMQSink extends RMQSink<String> {
105
private final String exchangeName;
106
private final String routingKey;
107
108
public CustomRMQSink(RMQConnectionConfig config, String exchange, String routing) {
109
super(config, "", new SimpleStringSchema()); // queue name not used
110
this.exchangeName = exchange;
111
this.routingKey = routing;
112
}
113
114
@Override
115
protected void setupQueue() throws IOException {
116
// Declare exchange instead of queue
117
channel.exchangeDeclare(exchangeName, "topic", true);
118
}
119
120
@Override
121
public void invoke(String value) {
122
try {
123
byte[] msg = schema.serialize(value);
124
// Publish to exchange with routing key instead of queue
125
channel.basicPublish(exchangeName, routingKey, null, msg);
126
} catch (IOException e) {
127
if (logFailuresOnly) {
128
LOG.error("Cannot send message to exchange {}", exchangeName, e);
129
} else {
130
throw new RuntimeException("Cannot send message to exchange " + exchangeName, e);
131
}
132
}
133
}
134
}
135
136
// Use in Flink pipeline
137
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
138
139
env.fromElements("Hello", "World", "from", "Flink")
140
.map(s -> s.toUpperCase())
141
.addSink(basicSink);
142
143
env.execute("RabbitMQ Sink Example");
144
```
145
146
## Publishing Behavior
147
148
### Default Publishing
149
150
By default, messages are published to the specified queue using the default exchange:
151
152
```java
153
channel.basicPublish("", queueName, null, serializedMessage);
154
```
155
156
- **Exchange**: "" (default exchange)
157
- **Routing Key**: Queue name
158
- **Properties**: null (no special message properties)
159
- **Body**: Serialized message bytes
160
161
### Custom Publishing
162
163
Override `invoke()` method for custom publishing behavior:
164
165
```java
166
@Override
167
public void invoke(MyDataType value) {
168
try {
169
byte[] msg = schema.serialize(value);
170
171
// Custom message properties
172
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
173
.contentType("application/json")
174
.deliveryMode(2) // persistent
175
.timestamp(new Date())
176
.build();
177
178
// Publish to specific exchange with routing key
179
channel.basicPublish("my-exchange", "routing.key", props, msg);
180
181
} catch (IOException e) {
182
handlePublishError(e);
183
}
184
}
185
```
186
187
## Queue Configuration
188
189
### Default Queue Setup
190
191
By default, `setupQueue()` declares a non-durable, non-exclusive, non-auto-delete queue:
192
193
```java
194
channel.queueDeclare(queueName, false, false, false, null);
195
```
196
197
### Custom Queue Setup
198
199
Override `setupQueue()` for custom queue configuration:
200
201
```java
202
@Override
203
protected void setupQueue() throws IOException {
204
Map<String, Object> args = new HashMap<>();
205
args.put("x-message-ttl", 300000); // 5 minute TTL
206
args.put("x-max-length", 10000); // Max 10,000 messages
207
args.put("x-overflow", "reject-publish"); // Reject when full
208
209
// Declare durable queue with custom arguments
210
channel.queueDeclare(queueName, true, false, false, args);
211
212
// Bind queue to exchange
213
channel.queueBind(queueName, "events-exchange", "output.*");
214
}
215
```
216
217
### Exchange Setup
218
219
For exchange-based publishing, override `setupQueue()` to declare exchanges:
220
221
```java
222
@Override
223
protected void setupQueue() throws IOException {
224
// Declare topic exchange
225
channel.exchangeDeclare("events-exchange", "topic", true);
226
227
// Optionally declare queues bound to the exchange
228
channel.queueDeclare("event-queue", true, false, false, null);
229
channel.queueBind("event-queue", "events-exchange", "event.*");
230
}
231
```
232
233
## Error Handling
234
235
### Configuration Options
236
237
The sink supports two error handling modes via `setLogFailuresOnly()`:
238
239
#### Fail-Fast Mode (default)
240
241
```java
242
RMQSink<String> sink = new RMQSink<>(config, "queue", schema);
243
sink.setLogFailuresOnly(false); // default behavior
244
245
// Publishing errors throw RuntimeException, causing job failure
246
```
247
248
#### Resilient Mode
249
250
```java
251
RMQSink<String> sink = new RMQSink<>(config, "queue", schema);
252
sink.setLogFailuresOnly(true);
253
254
// Publishing errors are logged but don't cause job failure
255
```
256
257
### Error Types
258
259
#### Connection Errors
260
261
Thrown during `open()` when connection setup fails:
262
263
```java
264
try {
265
sink.open(config);
266
} catch (RuntimeException e) {
267
// Handle connection setup failures
268
logger.error("Failed to connect to RabbitMQ: " + e.getMessage());
269
}
270
```
271
272
#### Publishing Errors
273
274
Thrown during `invoke()` when message publishing fails:
275
276
- **Network issues**: Connection lost during publish
277
- **Queue full**: When queue reaches maximum capacity
278
- **Authentication**: Invalid credentials or permissions
279
- **Serialization**: Schema serialization failures
280
281
```java
282
// Error handling in custom sink
283
@Override
284
public void invoke(MyType value) {
285
try {
286
byte[] msg = schema.serialize(value);
287
channel.basicPublish("", queueName, null, msg);
288
} catch (IOException e) {
289
if (logFailuresOnly) {
290
LOG.error("Failed to publish message", e);
291
// Continue processing other messages
292
} else {
293
throw new RuntimeException("Publishing failed", e);
294
// Job will restart from last checkpoint
295
}
296
}
297
}
298
```
299
300
#### Connection Cleanup Errors
301
302
Thrown during `close()` when connection cleanup fails. The implementation attempts to close both channel and connection, logging the first error and throwing the second if both fail:
303
304
```java
305
@Override
306
public void close() {
307
IOException t = null;
308
try {
309
channel.close();
310
} catch (IOException e) {
311
t = e;
312
}
313
314
try {
315
connection.close();
316
} catch (IOException e) {
317
if(t != null) {
318
LOG.warn("Both channel and connection closing failed. Logging channel exception and failing with connection exception", t);
319
}
320
t = e;
321
}
322
if(t != null) {
323
throw new RuntimeException("Error while closing RMQ connection with " + queueName
324
+ " at " + rmqConnectionConfig.getHost(), t);
325
}
326
}
327
```
328
329
## Performance Considerations
330
331
### Batching
332
333
For high-throughput scenarios, consider implementing custom batching:
334
335
```java
336
class BatchingRMQSink extends RMQSink<String> {
337
private final List<String> batch = new ArrayList<>();
338
private final int batchSize = 100;
339
340
@Override
341
public void invoke(String value) {
342
synchronized (batch) {
343
batch.add(value);
344
if (batch.size() >= batchSize) {
345
flushBatch();
346
}
347
}
348
}
349
350
private void flushBatch() {
351
// Publish entire batch in one operation
352
for (String item : batch) {
353
super.invoke(item);
354
}
355
batch.clear();
356
}
357
}
358
```
359
360
### Connection Pooling
361
362
For multiple sinks, consider sharing connection configuration:
363
364
```java
365
// Shared connection config
366
RMQConnectionConfig sharedConfig = new RMQConnectionConfig.Builder()
367
.setHost("rabbitmq-cluster")
368
.setAutomaticRecovery(true)
369
.setNetworkRecoveryInterval(5000)
370
.build();
371
372
// Multiple sinks with shared config
373
RMQSink<String> sink1 = new RMQSink<>(sharedConfig, "queue1", schema);
374
RMQSink<String> sink2 = new RMQSink<>(sharedConfig, "queue2", schema);
375
```