RabbitMQ source for consuming messages from queues with configurable delivery guarantees, automatic message acknowledgment during checkpoints, and support for exactly-once processing semantics.
Main source class that reads messages from RabbitMQ queues with configurable processing guarantees.
/**
* RabbitMQ source (consumer) which reads from a queue and acknowledges messages on checkpoints.
* When checkpointing is enabled, it guarantees exactly-once processing semantics.
*
* @param <OUT> The type of the data read from RabbitMQ
*/
public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
implements ResultTypeQueryable<OUT> {
/**
* Creates a new RabbitMQ source with at-least-once message processing guarantee when
* checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.
*
* @param rmqConnectionConfig The RabbitMQ connection configuration
* @param queueName The queue to receive messages from
* @param deserializationSchema A DeserializationSchema for turning bytes into Java objects
*/
public RMQSource(RMQConnectionConfig rmqConnectionConfig,
String queueName,
DeserializationSchema<OUT> deserializationSchema);
/**
* Creates a new RabbitMQ source with configurable correlation ID usage.
* For exactly-once processing, set usesCorrelationId to true and enable checkpointing.
*
* @param rmqConnectionConfig The RabbitMQ connection configuration
* @param queueName The queue to receive messages from
* @param usesCorrelationId Whether messages have unique correlation IDs for deduplication
* @param deserializationSchema A DeserializationSchema for turning bytes into Java objects
*/
public RMQSource(RMQConnectionConfig rmqConnectionConfig,
String queueName,
boolean usesCorrelationId,
DeserializationSchema<OUT> deserializationSchema);
/** Initializes the connection to RMQ and sets up the queue */
public void open(Configuration config) throws Exception;
/** Closes the RMQ connection */
public void close() throws Exception;
/** Main processing loop that consumes messages from the queue */
public void run(SourceContext<OUT> ctx) throws Exception;
/** Cancels the source operation */
public void cancel();
/** Returns the type information for the produced output type */
public TypeInformation<OUT> getProducedType();
}Protected methods that can be overridden for custom queue and connection setup.
/**
* Protected methods for customizing RMQ source behavior
*/
public class RMQSource<OUT> {
/**
* Initializes the connection to RMQ with a default connection factory.
* Override this method to setup and configure a custom ConnectionFactory.
*/
protected ConnectionFactory setupConnectionFactory() throws Exception;
/**
* Sets up the queue. The default implementation just declares the queue.
* Override this method for custom queue setup (i.e. binding to an exchange
* or defining custom queue parameters).
*/
protected void setupQueue() throws IOException;
/**
* Acknowledges session IDs during checkpoint creation.
* Called automatically by the framework during checkpointing.
*/
protected void acknowledgeSessionIDs(List<Long> sessionIds);
}Usage Examples:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
// Basic at-least-once source
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.build();
RMQSource<String> basicSource = new RMQSource<>(
connectionConfig,
"input-queue",
new SimpleStringSchema()
);
// Exactly-once source with correlation IDs
RMQSource<String> exactlyOnceSource = new RMQSource<>(
connectionConfig,
"input-queue",
true, // use correlation IDs
new SimpleStringSchema()
);
// Custom source with queue binding
class CustomRMQSource extends RMQSource<String> {
public CustomRMQSource(RMQConnectionConfig config, String queueName) {
super(config, queueName, new SimpleStringSchema());
}
@Override
protected void setupQueue() throws IOException {
// Declare queue with custom parameters
channel.queueDeclare(queueName, true, false, false, null);
// Bind queue to exchange
channel.queueBind(queueName, "events-exchange", "user.*");
}
}
// Use in Flink pipeline
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // Enable checkpointing for delivery guarantees
env.addSource(exactlyOnceSource)
.map(s -> processMessage(s))
.print();
env.execute("RabbitMQ Source Example");The RMQSource supports three different processing modes:
usesCorrelationId to true and enable Flink checkpointing// Enable checkpointing in Flink
env.enableCheckpointing(5000);
// Create source with correlation ID support
RMQSource<String> source = new RMQSource<>(
connectionConfig,
"queue-name",
true, // enables exactly-once with correlation IDs
new SimpleStringSchema()
);usesCorrelationId to false or use single-parameter constructor// Enable checkpointing in Flink
env.enableCheckpointing(5000);
// Create source without correlation IDs
RMQSource<String> source = new RMQSource<>(
connectionConfig,
"queue-name",
new SimpleStringSchema() // at-least-once processing
);// No checkpointing enabled
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
RMQSource<String> source = new RMQSource<>(
connectionConfig,
"queue-name",
new SimpleStringSchema()
);By default, setupQueue() declares a durable, non-exclusive, non-auto-delete queue:
channel.queueDeclare(queueName, true, false, false, null);Override setupQueue() for custom queue configuration:
@Override
protected void setupQueue() throws IOException {
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 60 second TTL
args.put("x-max-length", 1000); // Max 1000 messages
// Declare queue with custom arguments
channel.queueDeclare(queueName, true, false, false, args);
// Bind to exchange with routing key
channel.queueBind(queueName, "my-exchange", "routing.key");
}Connection failures during open() throw RuntimeException with descriptive messages:
try {
source.open(config);
} catch (RuntimeException e) {
// Handle connection failures
logger.error("Failed to connect to RabbitMQ: " + e.getMessage());
}RuntimeExceptionusesCorrelationId=true throw exceptionsDeserializationSchemaWhen automatic recovery is enabled in RMQConnectionConfig: