Apache Flink RabbitMQ connector that provides source and sink functionality for streaming data to and from RabbitMQ message queues with exactly-once processing semantics when checkpointing is enabled
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-rabbitmq_2-10@1.3.00
# Flink Connector RabbitMQ
1
2
Apache Flink RabbitMQ connector that provides source and sink functionality for streaming data to and from RabbitMQ message queues. This connector supports exactly-once processing semantics when checkpointing is enabled, at-least-once when checkpointing is enabled without correlation IDs, and no delivery guarantees when checkpointing is disabled.
3
4
## Package Information
5
6
- **Package Name**: flink-connector-rabbitmq_2.10
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Group ID**: org.apache.flink
10
- **Artifact ID**: flink-connector-rabbitmq_2.10
11
- **Version**: 1.3.3
12
- **Installation**: Add dependency to your Maven POM file
13
14
```xml
15
<dependency>
16
<groupId>org.apache.flink</groupId>
17
<artifactId>flink-connector-rabbitmq_2.10</artifactId>
18
<version>1.3.3</version>
19
</dependency>
20
```
21
22
## Core Imports
23
24
```java
25
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
26
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
27
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
28
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
29
import org.apache.flink.streaming.util.serialization.SerializationSchema;
30
```
31
32
## Basic Usage
33
34
```java
35
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
36
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
37
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
38
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
39
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
40
41
// Set up execution environment
42
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
43
44
// Configure RabbitMQ connection
45
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
46
.setHost("localhost")
47
.setPort(5672)
48
.setVirtualHost("/")
49
.setUserName("guest")
50
.setPassword("guest")
51
.build();
52
53
// Create RabbitMQ source
54
RMQSource<String> rmqSource = new RMQSource<>(
55
connectionConfig,
56
"input-queue",
57
true, // use correlation IDs for exactly-once processing
58
new SimpleStringSchema()
59
);
60
61
// Create RabbitMQ sink
62
RMQSink<String> rmqSink = new RMQSink<>(
63
connectionConfig,
64
"output-queue",
65
new SimpleStringSchema()
66
);
67
68
// Build streaming pipeline
69
env.addSource(rmqSource)
70
.map(s -> s.toUpperCase())
71
.addSink(rmqSink);
72
73
env.execute("RabbitMQ Pipeline");
74
```
75
76
## Architecture
77
78
The Flink RabbitMQ connector is built around three key components:
79
80
- **Connection Management**: `RMQConnectionConfig` handles all connection parameters, timeouts, and recovery settings
81
- **Source Component**: `RMQSource` provides message consumption with configurable delivery guarantees
82
- **Sink Component**: `RMQSink` handles message publishing with error handling and queue setup
83
84
The connector supports different processing semantics based on configuration:
85
- **Exactly-once**: Enabled via checkpointing + correlation IDs + transactions
86
- **At-least-once**: Enabled via checkpointing + transactions (without correlation IDs)
87
- **No guarantees**: Auto-commit mode when checkpointing is disabled
88
89
## Capabilities
90
91
### Connection Configuration
92
93
Comprehensive connection configuration supporting both individual parameters and URI-based setup, with full control over timeouts, recovery settings, and SSL options.
94
95
```java { .api }
96
public class RMQConnectionConfig implements Serializable {
97
public String getHost();
98
public int getPort();
99
public String getVirtualHost();
100
public String getUsername();
101
public String getPassword();
102
public String getUri();
103
public ConnectionFactory getConnectionFactory();
104
}
105
106
public static class Builder {
107
public Builder setHost(String host);
108
public Builder setPort(int port);
109
public Builder setVirtualHost(String virtualHost);
110
public Builder setUserName(String username);
111
public Builder setPassword(String password);
112
public Builder setUri(String uri);
113
public RMQConnectionConfig build();
114
}
115
```
116
117
[Connection Configuration](./connection-config.md)
118
119
### Message Source
120
121
RabbitMQ source for consuming messages from queues with configurable delivery guarantees and automatic message acknowledgment during checkpoints.
122
123
```java { .api }
124
public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>
125
implements ResultTypeQueryable<OUT> {
126
127
public RMQSource(RMQConnectionConfig rmqConnectionConfig,
128
String queueName,
129
DeserializationSchema<OUT> deserializationSchema);
130
131
public RMQSource(RMQConnectionConfig rmqConnectionConfig,
132
String queueName,
133
boolean usesCorrelationId,
134
DeserializationSchema<OUT> deserializationSchema);
135
}
136
```
137
138
[Message Source](./source.md)
139
140
### Message Sink
141
142
RabbitMQ sink for publishing messages to queues with configurable error handling and automatic queue setup.
143
144
```java { .api }
145
public class RMQSink<IN> extends RichSinkFunction<IN> {
146
public RMQSink(RMQConnectionConfig rmqConnectionConfig,
147
String queueName,
148
SerializationSchema<IN> schema);
149
150
public void setLogFailuresOnly(boolean logFailuresOnly);
151
}
152
```
153
154
[Message Sink](./sink.md)
155
156
## Core Types
157
158
```java { .api }
159
// Required Flink interfaces for serialization
160
interface DeserializationSchema<T> {
161
T deserialize(byte[] message);
162
boolean isEndOfStream(T nextElement);
163
TypeInformation<T> getProducedType();
164
}
165
166
interface SerializationSchema<T> {
167
byte[] serialize(T element);
168
}
169
170
// RabbitMQ client types (from com.rabbitmq.client package)
171
class ConnectionFactory {
172
Connection newConnection() throws IOException, TimeoutException;
173
}
174
175
class Connection {
176
Channel createChannel() throws IOException;
177
void close() throws IOException;
178
}
179
180
class Channel {
181
void queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
182
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
183
void basicAck(long deliveryTag, boolean multiple) throws IOException;
184
void basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
185
void txSelect() throws IOException;
186
void txCommit() throws IOException;
187
void close() throws IOException;
188
}
189
190
// RabbitMQ consumer classes (from com.rabbitmq.client package)
191
class QueueingConsumer {
192
QueueingConsumer(Channel channel);
193
Delivery nextDelivery() throws InterruptedException;
194
195
static class Delivery {
196
byte[] getBody();
197
Envelope getEnvelope();
198
BasicProperties getProperties();
199
}
200
}
201
202
class Envelope {
203
long getDeliveryTag();
204
String getExchange();
205
String getRoutingKey();
206
}
207
208
class BasicProperties {
209
String getCorrelationId();
210
Integer getDeliveryMode();
211
String getContentType();
212
}
213
```