Apache Flink RabbitMQ connector that provides source and sink functionality for streaming data to and from RabbitMQ message queues with exactly-once processing semantics when checkpointing is enabled
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-rabbitmq_2-10@1.3.0Apache Flink RabbitMQ connector that provides source and sink functionality for streaming data to and from RabbitMQ message queues. This connector supports exactly-once processing semantics when checkpointing is enabled, at-least-once when checkpointing is enabled without correlation IDs, and no delivery guarantees when checkpointing is disabled.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.10</artifactId>
<version>1.3.3</version>
</dependency>import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.SerializationSchema;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
// Set up execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configure RabbitMQ connection
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setVirtualHost("/")
.setUserName("guest")
.setPassword("guest")
.build();
// Create RabbitMQ source
RMQSource<String> rmqSource = new RMQSource<>(
connectionConfig,
"input-queue",
true, // use correlation IDs for exactly-once processing
new SimpleStringSchema()
);
// Create RabbitMQ sink
RMQSink<String> rmqSink = new RMQSink<>(
connectionConfig,
"output-queue",
new SimpleStringSchema()
);
// Build streaming pipeline
env.addSource(rmqSource)
.map(s -> s.toUpperCase())
.addSink(rmqSink);
env.execute("RabbitMQ Pipeline");The Flink RabbitMQ connector is built around three key components:
RMQConnectionConfig handles all connection parameters, timeouts, and recovery settingsRMQSource provides message consumption with configurable delivery guaranteesRMQSink handles message publishing with error handling and queue setupThe connector supports different processing semantics based on configuration:
Comprehensive connection configuration supporting both individual parameters and URI-based setup, with full control over timeouts, recovery settings, and SSL options.
public class RMQConnectionConfig implements Serializable {
public String getHost();
public int getPort();
public String getVirtualHost();
public String getUsername();
public String getPassword();
public String getUri();
public ConnectionFactory getConnectionFactory();
}
public static class Builder {
public Builder setHost(String host);
public Builder setPort(int port);
public Builder setVirtualHost(String virtualHost);
public Builder setUserName(String username);
public Builder setPassword(String password);
public Builder setUri(String uri);
public RMQConnectionConfig build();
}RabbitMQ source for consuming messages from queues with configurable delivery guarantees and automatic message acknowledgment during checkpoints.
public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
implements ResultTypeQueryable<OUT> {
public RMQSource(RMQConnectionConfig rmqConnectionConfig,
String queueName,
DeserializationSchema<OUT> deserializationSchema);
public RMQSource(RMQConnectionConfig rmqConnectionConfig,
String queueName,
boolean usesCorrelationId,
DeserializationSchema<OUT> deserializationSchema);
}RabbitMQ sink for publishing messages to queues with configurable error handling and automatic queue setup.
public class RMQSink<IN> extends RichSinkFunction<IN> {
public RMQSink(RMQConnectionConfig rmqConnectionConfig,
String queueName,
SerializationSchema<IN> schema);
public void setLogFailuresOnly(boolean logFailuresOnly);
}// Required Flink interfaces for serialization
interface DeserializationSchema<T> {
T deserialize(byte[] message);
boolean isEndOfStream(T nextElement);
TypeInformation<T> getProducedType();
}
interface SerializationSchema<T> {
byte[] serialize(T element);
}
// RabbitMQ client types (from com.rabbitmq.client package)
class ConnectionFactory {
Connection newConnection() throws IOException, TimeoutException;
}
class Connection {
Channel createChannel() throws IOException;
void close() throws IOException;
}
class Channel {
void queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicAck(long deliveryTag, boolean multiple) throws IOException;
void basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
void txSelect() throws IOException;
void txCommit() throws IOException;
void close() throws IOException;
}
// RabbitMQ consumer classes (from com.rabbitmq.client package)
class QueueingConsumer {
QueueingConsumer(Channel channel);
Delivery nextDelivery() throws InterruptedException;
static class Delivery {
byte[] getBody();
Envelope getEnvelope();
BasicProperties getProperties();
}
}
class Envelope {
long getDeliveryTag();
String getExchange();
String getRoutingKey();
}
class BasicProperties {
String getCorrelationId();
Integer getDeliveryMode();
String getContentType();
}