or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-connector-rabbitmq_2-10

Apache Flink RabbitMQ connector that provides source and sink functionality for streaming data to and from RabbitMQ message queues with exactly-once processing semantics when checkpointing is enabled

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-rabbitmq_2.10@1.3.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-rabbitmq_2-10@1.3.0

0

# Flink Connector RabbitMQ

1

2

Apache Flink RabbitMQ connector that provides source and sink functionality for streaming data to and from RabbitMQ message queues. This connector supports exactly-once processing semantics when checkpointing is enabled, at-least-once when checkpointing is enabled without correlation IDs, and no delivery guarantees when checkpointing is disabled.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-rabbitmq_2.10

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Group ID**: org.apache.flink

10

- **Artifact ID**: flink-connector-rabbitmq_2.10

11

- **Version**: 1.3.3

12

- **Installation**: Add dependency to your Maven POM file

13

14

```xml

15

<dependency>

16

<groupId>org.apache.flink</groupId>

17

<artifactId>flink-connector-rabbitmq_2.10</artifactId>

18

<version>1.3.3</version>

19

</dependency>

20

```

21

22

## Core Imports

23

24

```java

25

import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;

26

import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;

27

import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

28

import org.apache.flink.streaming.util.serialization.DeserializationSchema;

29

import org.apache.flink.streaming.util.serialization.SerializationSchema;

30

```

31

32

## Basic Usage

33

34

```java

35

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

36

import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;

37

import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;

38

import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

39

import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

40

41

// Set up execution environment

42

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

43

44

// Configure RabbitMQ connection

45

RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()

46

.setHost("localhost")

47

.setPort(5672)

48

.setVirtualHost("/")

49

.setUserName("guest")

50

.setPassword("guest")

51

.build();

52

53

// Create RabbitMQ source

54

RMQSource<String> rmqSource = new RMQSource<>(

55

connectionConfig,

56

"input-queue",

57

true, // use correlation IDs for exactly-once processing

58

new SimpleStringSchema()

59

);

60

61

// Create RabbitMQ sink

62

RMQSink<String> rmqSink = new RMQSink<>(

63

connectionConfig,

64

"output-queue",

65

new SimpleStringSchema()

66

);

67

68

// Build streaming pipeline

69

env.addSource(rmqSource)

70

.map(s -> s.toUpperCase())

71

.addSink(rmqSink);

72

73

env.execute("RabbitMQ Pipeline");

74

```

75

76

## Architecture

77

78

The Flink RabbitMQ connector is built around three key components:

79

80

- **Connection Management**: `RMQConnectionConfig` handles all connection parameters, timeouts, and recovery settings

81

- **Source Component**: `RMQSource` provides message consumption with configurable delivery guarantees

82

- **Sink Component**: `RMQSink` handles message publishing with error handling and queue setup

83

84

The connector supports different processing semantics based on configuration:

85

- **Exactly-once**: Enabled via checkpointing + correlation IDs + transactions

86

- **At-least-once**: Enabled via checkpointing + transactions (without correlation IDs)

87

- **No guarantees**: Auto-commit mode when checkpointing is disabled

88

89

## Capabilities

90

91

### Connection Configuration

92

93

Comprehensive connection configuration supporting both individual parameters and URI-based setup, with full control over timeouts, recovery settings, and SSL options.

94

95

```java { .api }

96

public class RMQConnectionConfig implements Serializable {

97

public String getHost();

98

public int getPort();

99

public String getVirtualHost();

100

public String getUsername();

101

public String getPassword();

102

public String getUri();

103

public ConnectionFactory getConnectionFactory();

104

}

105

106

public static class Builder {

107

public Builder setHost(String host);

108

public Builder setPort(int port);

109

public Builder setVirtualHost(String virtualHost);

110

public Builder setUserName(String username);

111

public Builder setPassword(String password);

112

public Builder setUri(String uri);

113

public RMQConnectionConfig build();

114

}

115

```

116

117

[Connection Configuration](./connection-config.md)

118

119

### Message Source

120

121

RabbitMQ source for consuming messages from queues with configurable delivery guarantees and automatic message acknowledgment during checkpoints.

122

123

```java { .api }

124

public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>

125

implements ResultTypeQueryable<OUT> {

126

127

public RMQSource(RMQConnectionConfig rmqConnectionConfig,

128

String queueName,

129

DeserializationSchema<OUT> deserializationSchema);

130

131

public RMQSource(RMQConnectionConfig rmqConnectionConfig,

132

String queueName,

133

boolean usesCorrelationId,

134

DeserializationSchema<OUT> deserializationSchema);

135

}

136

```

137

138

[Message Source](./source.md)

139

140

### Message Sink

141

142

RabbitMQ sink for publishing messages to queues with configurable error handling and automatic queue setup.

143

144

```java { .api }

145

public class RMQSink<IN> extends RichSinkFunction<IN> {

146

public RMQSink(RMQConnectionConfig rmqConnectionConfig,

147

String queueName,

148

SerializationSchema<IN> schema);

149

150

public void setLogFailuresOnly(boolean logFailuresOnly);

151

}

152

```

153

154

[Message Sink](./sink.md)

155

156

## Core Types

157

158

```java { .api }

159

// Required Flink interfaces for serialization

160

interface DeserializationSchema<T> {

161

T deserialize(byte[] message);

162

boolean isEndOfStream(T nextElement);

163

TypeInformation<T> getProducedType();

164

}

165

166

interface SerializationSchema<T> {

167

byte[] serialize(T element);

168

}

169

170

// RabbitMQ client types (from com.rabbitmq.client package)

171

class ConnectionFactory {

172

Connection newConnection() throws IOException, TimeoutException;

173

}

174

175

class Connection {

176

Channel createChannel() throws IOException;

177

void close() throws IOException;

178

}

179

180

class Channel {

181

void queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

182

void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

183

void basicAck(long deliveryTag, boolean multiple) throws IOException;

184

void basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

185

void txSelect() throws IOException;

186

void txCommit() throws IOException;

187

void close() throws IOException;

188

}

189

190

// RabbitMQ consumer classes (from com.rabbitmq.client package)

191

class QueueingConsumer {

192

QueueingConsumer(Channel channel);

193

Delivery nextDelivery() throws InterruptedException;

194

195

static class Delivery {

196

byte[] getBody();

197

Envelope getEnvelope();

198

BasicProperties getProperties();

199

}

200

}

201

202

class Envelope {

203

long getDeliveryTag();

204

String getExchange();

205

String getRoutingKey();

206

}

207

208

class BasicProperties {

209

String getCorrelationId();

210

Integer getDeliveryMode();

211

String getContentType();

212

}

213

```