or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection-config.mdindex.mdsink.mdsource.md

sink.mddocs/

0

# Message Sink

1

2

RabbitMQ sink for publishing messages to queues with configurable error handling, automatic queue setup, and robust connection management.

3

4

## Capabilities

5

6

### RMQ Sink Class

7

8

Main sink class that publishes messages to RabbitMQ queues with configurable error handling behavior.

9

10

```java { .api }

11

/**

12

* A Sink for publishing data into RabbitMQ

13

*

14

* @param <IN> The type of the data to be published to RabbitMQ

15

*/

16

public class RMQSink<IN> extends RichSinkFunction<IN> {

17

18

/**

19

* Creates a new RabbitMQ sink for publishing messages to a queue

20

*

21

* @param rmqConnectionConfig The RabbitMQ connection configuration

22

* @param queueName The queue to publish messages to

23

* @param schema A SerializationSchema for turning Java objects into bytes

24

*/

25

public RMQSink(RMQConnectionConfig rmqConnectionConfig,

26

String queueName,

27

SerializationSchema<IN> schema);

28

29

/** Initializes the RabbitMQ connection and channel, and sets up the queue */

30

public void open(Configuration config) throws Exception;

31

32

/**

33

* Called when new data arrives to the sink, and forwards it to RMQ

34

*

35

* @param value The incoming data to publish

36

*/

37

public void invoke(IN value);

38

39

/** Closes the RabbitMQ connection and channel */

40

public void close();

41

42

/**

43

* Defines whether the producer should fail on errors, or only log them.

44

* If set to true, exceptions will be only logged.

45

* If set to false, exceptions will be thrown and cause the streaming program to fail.

46

*

47

* @param logFailuresOnly The flag to indicate logging-only on exceptions

48

*/

49

public void setLogFailuresOnly(boolean logFailuresOnly);

50

}

51

```

52

53

### Customization Methods

54

55

Protected methods that can be overridden for custom queue setup.

56

57

```java { .api }

58

/**

59

* Protected methods for customizing RMQ sink behavior

60

*/

61

public class RMQSink<IN> {

62

63

/**

64

* Sets up the queue. The default implementation just declares the queue.

65

* Override this method for custom queue setup (i.e. binding to an exchange

66

* or defining custom queue parameters).

67

*/

68

protected void setupQueue() throws IOException;

69

}

70

```

71

72

**Usage Examples:**

73

74

```java

75

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

76

import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;

77

import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

78

import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

79

80

// Basic sink configuration

81

RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()

82

.setHost("localhost")

83

.setPort(5672)

84

.setVirtualHost("/")

85

.setUserName("guest")

86

.setPassword("guest")

87

.build();

88

89

RMQSink<String> basicSink = new RMQSink<>(

90

connectionConfig,

91

"output-queue",

92

new SimpleStringSchema()

93

);

94

95

// Sink with error logging (doesn't fail on publish errors)

96

RMQSink<String> resilientSink = new RMQSink<>(

97

connectionConfig,

98

"output-queue",

99

new SimpleStringSchema()

100

);

101

resilientSink.setLogFailuresOnly(true);

102

103

// Custom sink with exchange publishing

104

class CustomRMQSink extends RMQSink<String> {

105

private final String exchangeName;

106

private final String routingKey;

107

108

public CustomRMQSink(RMQConnectionConfig config, String exchange, String routing) {

109

super(config, "", new SimpleStringSchema()); // queue name not used

110

this.exchangeName = exchange;

111

this.routingKey = routing;

112

}

113

114

@Override

115

protected void setupQueue() throws IOException {

116

// Declare exchange instead of queue

117

channel.exchangeDeclare(exchangeName, "topic", true);

118

}

119

120

@Override

121

public void invoke(String value) {

122

try {

123

byte[] msg = schema.serialize(value);

124

// Publish to exchange with routing key instead of queue

125

channel.basicPublish(exchangeName, routingKey, null, msg);

126

} catch (IOException e) {

127

if (logFailuresOnly) {

128

LOG.error("Cannot send message to exchange {}", exchangeName, e);

129

} else {

130

throw new RuntimeException("Cannot send message to exchange " + exchangeName, e);

131

}

132

}

133

}

134

}

135

136

// Use in Flink pipeline

137

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

138

139

env.fromElements("Hello", "World", "from", "Flink")

140

.map(s -> s.toUpperCase())

141

.addSink(basicSink);

142

143

env.execute("RabbitMQ Sink Example");

144

```

145

146

## Publishing Behavior

147

148

### Default Publishing

149

150

By default, messages are published to the specified queue using the default exchange:

151

152

```java

153

channel.basicPublish("", queueName, null, serializedMessage);

154

```

155

156

- **Exchange**: "" (default exchange)

157

- **Routing Key**: Queue name

158

- **Properties**: null (no special message properties)

159

- **Body**: Serialized message bytes

160

161

### Custom Publishing

162

163

Override `invoke()` method for custom publishing behavior:

164

165

```java

166

@Override

167

public void invoke(MyDataType value) {

168

try {

169

byte[] msg = schema.serialize(value);

170

171

// Custom message properties

172

AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()

173

.contentType("application/json")

174

.deliveryMode(2) // persistent

175

.timestamp(new Date())

176

.build();

177

178

// Publish to specific exchange with routing key

179

channel.basicPublish("my-exchange", "routing.key", props, msg);

180

181

} catch (IOException e) {

182

handlePublishError(e);

183

}

184

}

185

```

186

187

## Queue Configuration

188

189

### Default Queue Setup

190

191

By default, `setupQueue()` declares a non-durable, non-exclusive, non-auto-delete queue:

192

193

```java

194

channel.queueDeclare(queueName, false, false, false, null);

195

```

196

197

### Custom Queue Setup

198

199

Override `setupQueue()` for custom queue configuration:

200

201

```java

202

@Override

203

protected void setupQueue() throws IOException {

204

Map<String, Object> args = new HashMap<>();

205

args.put("x-message-ttl", 300000); // 5 minute TTL

206

args.put("x-max-length", 10000); // Max 10,000 messages

207

args.put("x-overflow", "reject-publish"); // Reject when full

208

209

// Declare durable queue with custom arguments

210

channel.queueDeclare(queueName, true, false, false, args);

211

212

// Bind queue to exchange

213

channel.queueBind(queueName, "events-exchange", "output.*");

214

}

215

```

216

217

### Exchange Setup

218

219

For exchange-based publishing, override `setupQueue()` to declare exchanges:

220

221

```java

222

@Override

223

protected void setupQueue() throws IOException {

224

// Declare topic exchange

225

channel.exchangeDeclare("events-exchange", "topic", true);

226

227

// Optionally declare queues bound to the exchange

228

channel.queueDeclare("event-queue", true, false, false, null);

229

channel.queueBind("event-queue", "events-exchange", "event.*");

230

}

231

```

232

233

## Error Handling

234

235

### Configuration Options

236

237

The sink supports two error handling modes via `setLogFailuresOnly()`:

238

239

#### Fail-Fast Mode (default)

240

241

```java

242

RMQSink<String> sink = new RMQSink<>(config, "queue", schema);

243

sink.setLogFailuresOnly(false); // default behavior

244

245

// Publishing errors throw RuntimeException, causing job failure

246

```

247

248

#### Resilient Mode

249

250

```java

251

RMQSink<String> sink = new RMQSink<>(config, "queue", schema);

252

sink.setLogFailuresOnly(true);

253

254

// Publishing errors are logged but don't cause job failure

255

```

256

257

### Error Types

258

259

#### Connection Errors

260

261

Thrown during `open()` when connection setup fails:

262

263

```java

264

try {

265

sink.open(config);

266

} catch (RuntimeException e) {

267

// Handle connection setup failures

268

logger.error("Failed to connect to RabbitMQ: " + e.getMessage());

269

}

270

```

271

272

#### Publishing Errors

273

274

Thrown during `invoke()` when message publishing fails:

275

276

- **Network issues**: Connection lost during publish

277

- **Queue full**: When queue reaches maximum capacity

278

- **Authentication**: Invalid credentials or permissions

279

- **Serialization**: Schema serialization failures

280

281

```java

282

// Error handling in custom sink

283

@Override

284

public void invoke(MyType value) {

285

try {

286

byte[] msg = schema.serialize(value);

287

channel.basicPublish("", queueName, null, msg);

288

} catch (IOException e) {

289

if (logFailuresOnly) {

290

LOG.error("Failed to publish message", e);

291

// Continue processing other messages

292

} else {

293

throw new RuntimeException("Publishing failed", e);

294

// Job will restart from last checkpoint

295

}

296

}

297

}

298

```

299

300

#### Connection Cleanup Errors

301

302

Thrown during `close()` when connection cleanup fails. The implementation attempts to close both channel and connection, logging the first error and throwing the second if both fail:

303

304

```java

305

@Override

306

public void close() {

307

IOException t = null;

308

try {

309

channel.close();

310

} catch (IOException e) {

311

t = e;

312

}

313

314

try {

315

connection.close();

316

} catch (IOException e) {

317

if(t != null) {

318

LOG.warn("Both channel and connection closing failed. Logging channel exception and failing with connection exception", t);

319

}

320

t = e;

321

}

322

if(t != null) {

323

throw new RuntimeException("Error while closing RMQ connection with " + queueName

324

+ " at " + rmqConnectionConfig.getHost(), t);

325

}

326

}

327

```

328

329

## Performance Considerations

330

331

### Batching

332

333

For high-throughput scenarios, consider implementing custom batching:

334

335

```java

336

class BatchingRMQSink extends RMQSink<String> {

337

private final List<String> batch = new ArrayList<>();

338

private final int batchSize = 100;

339

340

@Override

341

public void invoke(String value) {

342

synchronized (batch) {

343

batch.add(value);

344

if (batch.size() >= batchSize) {

345

flushBatch();

346

}

347

}

348

}

349

350

private void flushBatch() {

351

// Publish entire batch in one operation

352

for (String item : batch) {

353

super.invoke(item);

354

}

355

batch.clear();

356

}

357

}

358

```

359

360

### Connection Pooling

361

362

For multiple sinks, consider sharing connection configuration:

363

364

```java

365

// Shared connection config

366

RMQConnectionConfig sharedConfig = new RMQConnectionConfig.Builder()

367

.setHost("rabbitmq-cluster")

368

.setAutomaticRecovery(true)

369

.setNetworkRecoveryInterval(5000)

370

.build();

371

372

// Multiple sinks with shared config

373

RMQSink<String> sink1 = new RMQSink<>(sharedConfig, "queue1", schema);

374

RMQSink<String> sink2 = new RMQSink<>(sharedConfig, "queue2", schema);

375

```