or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-config.mdindex.mdsink.mdsource.md

source.mddocs/

0

# Message Source

1

2

RabbitMQ source for consuming messages from queues with configurable delivery guarantees, automatic message acknowledgment during checkpoints, and support for exactly-once processing semantics.

3

4

## Capabilities

5

6

### RMQ Source Class

7

8

Main source class that reads messages from RabbitMQ queues with configurable processing guarantees.

9

10

```java { .api }

11

/**

12

* RabbitMQ source (consumer) which reads from a queue and acknowledges messages on checkpoints.

13

* When checkpointing is enabled, it guarantees exactly-once processing semantics.

14

*

15

* @param <OUT> The type of the data read from RabbitMQ

16

*/

17

public class RMQSource<OUT> extends MultipleIdsMessageAcknowledgingSourceBase<OUT, String, Long>

18

implements ResultTypeQueryable<OUT> {

19

20

/**

21

* Creates a new RabbitMQ source with at-least-once message processing guarantee when

22

* checkpointing is enabled. No strong delivery guarantees when checkpointing is disabled.

23

*

24

* @param rmqConnectionConfig The RabbitMQ connection configuration

25

* @param queueName The queue to receive messages from

26

* @param deserializationSchema A DeserializationSchema for turning bytes into Java objects

27

*/

28

public RMQSource(RMQConnectionConfig rmqConnectionConfig,

29

String queueName,

30

DeserializationSchema<OUT> deserializationSchema);

31

32

/**

33

* Creates a new RabbitMQ source with configurable correlation ID usage.

34

* For exactly-once processing, set usesCorrelationId to true and enable checkpointing.

35

*

36

* @param rmqConnectionConfig The RabbitMQ connection configuration

37

* @param queueName The queue to receive messages from

38

* @param usesCorrelationId Whether messages have unique correlation IDs for deduplication

39

* @param deserializationSchema A DeserializationSchema for turning bytes into Java objects

40

*/

41

public RMQSource(RMQConnectionConfig rmqConnectionConfig,

42

String queueName,

43

boolean usesCorrelationId,

44

DeserializationSchema<OUT> deserializationSchema);

45

46

/** Initializes the connection to RMQ and sets up the queue */

47

public void open(Configuration config) throws Exception;

48

49

/** Closes the RMQ connection */

50

public void close() throws Exception;

51

52

/** Main processing loop that consumes messages from the queue */

53

public void run(SourceContext<OUT> ctx) throws Exception;

54

55

/** Cancels the source operation */

56

public void cancel();

57

58

/** Returns the type information for the produced output type */

59

public TypeInformation<OUT> getProducedType();

60

}

61

```

62

63

### Customization Methods

64

65

Protected methods that can be overridden for custom queue and connection setup.

66

67

```java { .api }

68

/**

69

* Protected methods for customizing RMQ source behavior

70

*/

71

public class RMQSource<OUT> {

72

73

/**

74

* Initializes the connection to RMQ with a default connection factory.

75

* Override this method to setup and configure a custom ConnectionFactory.

76

*/

77

protected ConnectionFactory setupConnectionFactory() throws Exception;

78

79

/**

80

* Sets up the queue. The default implementation just declares the queue.

81

* Override this method for custom queue setup (i.e. binding to an exchange

82

* or defining custom queue parameters).

83

*/

84

protected void setupQueue() throws IOException;

85

86

/**

87

* Acknowledges session IDs during checkpoint creation.

88

* Called automatically by the framework during checkpointing.

89

*/

90

protected void acknowledgeSessionIDs(List<Long> sessionIds);

91

}

92

```

93

94

**Usage Examples:**

95

96

```java

97

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

98

import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;

99

import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

100

import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

101

102

// Basic at-least-once source

103

RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()

104

.setHost("localhost")

105

.setPort(5672)

106

.setVirtualHost("/")

107

.setUserName("guest")

108

.setPassword("guest")

109

.build();

110

111

RMQSource<String> basicSource = new RMQSource<>(

112

connectionConfig,

113

"input-queue",

114

new SimpleStringSchema()

115

);

116

117

// Exactly-once source with correlation IDs

118

RMQSource<String> exactlyOnceSource = new RMQSource<>(

119

connectionConfig,

120

"input-queue",

121

true, // use correlation IDs

122

new SimpleStringSchema()

123

);

124

125

// Custom source with queue binding

126

class CustomRMQSource extends RMQSource<String> {

127

public CustomRMQSource(RMQConnectionConfig config, String queueName) {

128

super(config, queueName, new SimpleStringSchema());

129

}

130

131

@Override

132

protected void setupQueue() throws IOException {

133

// Declare queue with custom parameters

134

channel.queueDeclare(queueName, true, false, false, null);

135

136

// Bind queue to exchange

137

channel.queueBind(queueName, "events-exchange", "user.*");

138

}

139

}

140

141

// Use in Flink pipeline

142

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

143

env.enableCheckpointing(5000); // Enable checkpointing for delivery guarantees

144

145

env.addSource(exactlyOnceSource)

146

.map(s -> processMessage(s))

147

.print();

148

149

env.execute("RabbitMQ Source Example");

150

```

151

152

## Processing Semantics

153

154

The RMQSource supports three different processing modes:

155

156

### Exactly-Once Processing

157

158

- **Requirements**: Checkpointing enabled + correlation IDs + RabbitMQ transactions

159

- **Usage**: Set `usesCorrelationId` to `true` and enable Flink checkpointing

160

- **Behavior**: Messages are acknowledged only during successful checkpoints; correlation IDs prevent duplicate processing

161

- **Producer Requirement**: Must set unique correlation IDs on messages

162

163

```java

164

// Enable checkpointing in Flink

165

env.enableCheckpointing(5000);

166

167

// Create source with correlation ID support

168

RMQSource<String> source = new RMQSource<>(

169

connectionConfig,

170

"queue-name",

171

true, // enables exactly-once with correlation IDs

172

new SimpleStringSchema()

173

);

174

```

175

176

### At-Least-Once Processing

177

178

- **Requirements**: Checkpointing enabled + RabbitMQ transactions (no correlation IDs)

179

- **Usage**: Enable Flink checkpointing, set `usesCorrelationId` to `false` or use single-parameter constructor

180

- **Behavior**: Messages acknowledged during checkpoints; may process duplicates after failures

181

182

```java

183

// Enable checkpointing in Flink

184

env.enableCheckpointing(5000);

185

186

// Create source without correlation IDs

187

RMQSource<String> source = new RMQSource<>(

188

connectionConfig,

189

"queue-name",

190

new SimpleStringSchema() // at-least-once processing

191

);

192

```

193

194

### No Delivery Guarantees

195

196

- **Requirements**: No checkpointing

197

- **Usage**: Disable checkpointing or don't enable it

198

- **Behavior**: Auto-acknowledgment mode; messages may be lost on failures but no transaction overhead

199

200

```java

201

// No checkpointing enabled

202

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

203

204

RMQSource<String> source = new RMQSource<>(

205

connectionConfig,

206

"queue-name",

207

new SimpleStringSchema()

208

);

209

```

210

211

## Queue Configuration

212

213

### Default Queue Setup

214

215

By default, `setupQueue()` declares a durable, non-exclusive, non-auto-delete queue:

216

217

```java

218

channel.queueDeclare(queueName, true, false, false, null);

219

```

220

221

### Custom Queue Setup

222

223

Override `setupQueue()` for custom queue configuration:

224

225

```java

226

@Override

227

protected void setupQueue() throws IOException {

228

Map<String, Object> args = new HashMap<>();

229

args.put("x-message-ttl", 60000); // 60 second TTL

230

args.put("x-max-length", 1000); // Max 1000 messages

231

232

// Declare queue with custom arguments

233

channel.queueDeclare(queueName, true, false, false, args);

234

235

// Bind to exchange with routing key

236

channel.queueBind(queueName, "my-exchange", "routing.key");

237

}

238

```

239

240

## Error Handling

241

242

### Connection Failures

243

244

Connection failures during `open()` throw `RuntimeException` with descriptive messages:

245

246

```java

247

try {

248

source.open(config);

249

} catch (RuntimeException e) {

250

// Handle connection failures

251

logger.error("Failed to connect to RabbitMQ: " + e.getMessage());

252

}

253

```

254

255

### Message Processing Failures

256

257

- **Exactly-once mode**: Failed acknowledgments during checkpointing throw `RuntimeException`

258

- **Correlation ID violations**: Missing correlation IDs when `usesCorrelationId=true` throw exceptions

259

- **Deserialization failures**: Handled by the provided `DeserializationSchema`

260

261

### Recovery Behavior

262

263

When automatic recovery is enabled in `RMQConnectionConfig`:

264

- Network failures trigger automatic reconnection

265

- Topology recovery re-declares queues and bindings

266

- Recovery interval controls delay between attempts