RabbitMQ sink for publishing messages to queues with configurable error handling, automatic queue setup, and robust connection management.
Main sink class that publishes messages to RabbitMQ queues with configurable error handling behavior.
/**
* A Sink for publishing data into RabbitMQ
*
* @param <IN> The type of the data to be published to RabbitMQ
*/
public class RMQSink<IN> extends RichSinkFunction<IN> {
/**
* Creates a new RabbitMQ sink for publishing messages to a queue
*
* @param rmqConnectionConfig The RabbitMQ connection configuration
* @param queueName The queue to publish messages to
* @param schema A SerializationSchema for turning Java objects into bytes
*/
public RMQSink(RMQConnectionConfig rmqConnectionConfig,
String queueName,
SerializationSchema<IN> schema);
/** Initializes the RabbitMQ connection and channel, and sets up the queue */
public void open(Configuration config) throws Exception;
/**
* Called when new data arrives to the sink, and forwards it to RMQ
*
* @param value The incoming data to publish
*/
public void invoke(IN value);
/** Closes the RabbitMQ connection and channel */
public void close();
/**
* Defines whether the producer should fail on errors, or only log them.
* If set to true, exceptions will be only logged.
* If set to false, exceptions will be thrown and cause the streaming program to fail.
*
* @param logFailuresOnly The flag to indicate logging-only on exceptions
*/
public void setLogFailuresOnly(boolean logFailuresOnly);
}Protected methods that can be overridden for custom queue setup.
/**
* Protected methods for customizing RMQ sink behavior
*/
public class RMQSink<IN> {
/**
* Sets up the queue. The default implementation just declares the queue.
* Override this method for custom queue setup (i.e. binding to an exchange
* or defining custom queue parameters).
*/
protected void setupQueue() throws IOException;
}Usage Examples:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
// Basic sink configuration
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.build();
RMQSink<String> basicSink = new RMQSink<>(
connectionConfig,
"output-queue",
new SimpleStringSchema()
);
// Sink with error logging (doesn't fail on publish errors)
RMQSink<String> resilientSink = new RMQSink<>(
connectionConfig,
"output-queue",
new SimpleStringSchema()
);
resilientSink.setLogFailuresOnly(true);
// Custom sink with exchange publishing
class CustomRMQSink extends RMQSink<String> {
private final String exchangeName;
private final String routingKey;
public CustomRMQSink(RMQConnectionConfig config, String exchange, String routing) {
super(config, "", new SimpleStringSchema()); // queue name not used
this.exchangeName = exchange;
this.routingKey = routing;
}
@Override
protected void setupQueue() throws IOException {
// Declare exchange instead of queue
channel.exchangeDeclare(exchangeName, "topic", true);
}
@Override
public void invoke(String value) {
try {
byte[] msg = schema.serialize(value);
// Publish to exchange with routing key instead of queue
channel.basicPublish(exchangeName, routingKey, null, msg);
} catch (IOException e) {
if (logFailuresOnly) {
LOG.error("Cannot send message to exchange {}", exchangeName, e);
} else {
throw new RuntimeException("Cannot send message to exchange " + exchangeName, e);
}
}
}
}
// Use in Flink pipeline
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements("Hello", "World", "from", "Flink")
.map(s -> s.toUpperCase())
.addSink(basicSink);
env.execute("RabbitMQ Sink Example");By default, messages are published to the specified queue using the default exchange:
channel.basicPublish("", queueName, null, serializedMessage);Override invoke() method for custom publishing behavior:
@Override
public void invoke(MyDataType value) {
try {
byte[] msg = schema.serialize(value);
// Custom message properties
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2) // persistent
.timestamp(new Date())
.build();
// Publish to specific exchange with routing key
channel.basicPublish("my-exchange", "routing.key", props, msg);
} catch (IOException e) {
handlePublishError(e);
}
}By default, setupQueue() declares a non-durable, non-exclusive, non-auto-delete queue:
channel.queueDeclare(queueName, false, false, false, null);Override setupQueue() for custom queue configuration:
@Override
protected void setupQueue() throws IOException {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 300000); // 5 minute TTL
args.put("x-max-length", 10000); // Max 10,000 messages
args.put("x-overflow", "reject-publish"); // Reject when full
// Declare durable queue with custom arguments
channel.queueDeclare(queueName, true, false, false, args);
// Bind queue to exchange
channel.queueBind(queueName, "events-exchange", "output.*");
}For exchange-based publishing, override setupQueue() to declare exchanges:
@Override
protected void setupQueue() throws IOException {
// Declare topic exchange
channel.exchangeDeclare("events-exchange", "topic", true);
// Optionally declare queues bound to the exchange
channel.queueDeclare("event-queue", true, false, false, null);
channel.queueBind("event-queue", "events-exchange", "event.*");
}The sink supports two error handling modes via setLogFailuresOnly():
RMQSink<String> sink = new RMQSink<>(config, "queue", schema);
sink.setLogFailuresOnly(false); // default behavior
// Publishing errors throw RuntimeException, causing job failureRMQSink<String> sink = new RMQSink<>(config, "queue", schema);
sink.setLogFailuresOnly(true);
// Publishing errors are logged but don't cause job failureThrown during open() when connection setup fails:
try {
sink.open(config);
} catch (RuntimeException e) {
// Handle connection setup failures
logger.error("Failed to connect to RabbitMQ: " + e.getMessage());
}Thrown during invoke() when message publishing fails:
// Error handling in custom sink
@Override
public void invoke(MyType value) {
try {
byte[] msg = schema.serialize(value);
channel.basicPublish("", queueName, null, msg);
} catch (IOException e) {
if (logFailuresOnly) {
LOG.error("Failed to publish message", e);
// Continue processing other messages
} else {
throw new RuntimeException("Publishing failed", e);
// Job will restart from last checkpoint
}
}
}Thrown during close() when connection cleanup fails. The implementation attempts to close both channel and connection, logging the first error and throwing the second if both fail:
@Override
public void close() {
IOException t = null;
try {
channel.close();
} catch (IOException e) {
t = e;
}
try {
connection.close();
} catch (IOException e) {
if(t != null) {
LOG.warn("Both channel and connection closing failed. Logging channel exception and failing with connection exception", t);
}
t = e;
}
if(t != null) {
throw new RuntimeException("Error while closing RMQ connection with " + queueName
+ " at " + rmqConnectionConfig.getHost(), t);
}
}For high-throughput scenarios, consider implementing custom batching:
class BatchingRMQSink extends RMQSink<String> {
private final List<String> batch = new ArrayList<>();
private final int batchSize = 100;
@Override
public void invoke(String value) {
synchronized (batch) {
batch.add(value);
if (batch.size() >= batchSize) {
flushBatch();
}
}
}
private void flushBatch() {
// Publish entire batch in one operation
for (String item : batch) {
super.invoke(item);
}
batch.clear();
}
}For multiple sinks, consider sharing connection configuration:
// Shared connection config
RMQConnectionConfig sharedConfig = new RMQConnectionConfig.Builder()
.setHost("rabbitmq-cluster")
.setAutomaticRecovery(true)
.setNetworkRecoveryInterval(5000)
.build();
// Multiple sinks with shared config
RMQSink<String> sink1 = new RMQSink<>(sharedConfig, "queue1", schema);
RMQSink<String> sink2 = new RMQSink<>(sharedConfig, "queue2", schema);