or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-properties.mdconnection-configuration.mderror-handling.mdindex.mdlistener-configuration.mdmessage-operations.mdqueue-exchange-management.mdstream-processing.md

stream-processing.mddocs/

0

# Stream Processing Support

1

2

Configuration and usage of RabbitMQ Streams for high-throughput message processing scenarios, including stream templates, listeners, and advanced stream features.

3

4

## RabbitMQ Streams Overview

5

6

RabbitMQ Streams provide a high-throughput, persistent messaging solution designed for scenarios requiring:

7

- High message throughput (millions of messages per second)

8

- Message replay capability

9

- Persistent message storage

10

- Offset-based consumption

11

12

## Basic Stream Configuration

13

14

### Stream Properties Configuration

15

16

```yaml { .api }

17

spring:

18

rabbitmq:

19

stream:

20

name: my-stream # Default stream name

21

host: localhost # Stream host (default: localhost)

22

port: 5552 # Stream port (default: 5552)

23

username: guest # Stream username (uses rabbitmq.username if not set)

24

password: guest # Stream password (uses rabbitmq.password if not set)

25

```

26

27

### Stream Auto-Configuration

28

29

```java { .api }

30

@Configuration

31

@EnableRabbitStreaming

32

public class StreamConfig {

33

34

@Bean

35

public Environment streamEnvironment() {

36

return Environment.builder()

37

.host("localhost")

38

.port(5552)

39

.username("guest")

40

.password("guest")

41

.build();

42

}

43

44

@Bean

45

public RabbitStreamTemplate streamTemplate(Environment environment) {

46

return new RabbitStreamTemplate(environment, "my-stream");

47

}

48

}

49

```

50

51

## Stream Template Operations

52

53

### RabbitStreamTemplate Usage

54

55

```java { .api }

56

@Service

57

public class StreamPublisher {

58

59

@Autowired

60

private RabbitStreamTemplate streamTemplate;

61

62

// Basic message publishing

63

public void publishMessage(String message) {

64

streamTemplate.convertAndSend(message);

65

}

66

67

// Publish with custom properties

68

public void publishWithProperties(Object message) {

69

streamTemplate.convertAndSend(message, messageBuilder -> {

70

return messageBuilder

71

.properties()

72

.messageId("msg-" + UUID.randomUUID())

73

.correlationId("corr-" + System.currentTimeMillis())

74

.timestamp(new Date())

75

.messageBuilder();

76

});

77

}

78

79

// Batch publishing for high throughput

80

public void publishBatch(List<String> messages) {

81

List<com.rabbitmq.stream.Message> streamMessages = messages.stream()

82

.map(msg -> streamTemplate.messageBuilder().addData(msg.getBytes()).build())

83

.collect(Collectors.toList());

84

85

streamTemplate.send(streamMessages);

86

}

87

88

// Publish with confirmation

89

public void publishWithConfirmation(String message) {

90

ConfirmationHandler confirmationHandler = confirmationStatus -> {

91

if (confirmationStatus.isConfirmed()) {

92

log.info("Message confirmed");

93

} else {

94

log.error("Message not confirmed: {}", confirmationStatus.getCode());

95

}

96

};

97

98

streamTemplate.convertAndSend(message, confirmationHandler);

99

}

100

}

101

```

102

103

### Stream Template Configuration

104

105

```java { .api }

106

@Configuration

107

public class StreamTemplateConfig {

108

109

@Bean

110

public RabbitStreamTemplate customStreamTemplate(Environment environment) {

111

RabbitStreamTemplate template = new RabbitStreamTemplate(environment, "custom-stream");

112

113

// Configure message converter

114

template.setStreamMessageConverter(new SimpleStreamMessageConverter());

115

116

// Configure producer properties

117

template.setProducerCustomizer(builder -> {

118

builder.name("custom-producer")

119

.maxUnconfirmedMessages(1000)

120

.confirmTimeout(Duration.ofSeconds(5))

121

.batchSize(100)

122

.batchPublishingDelay(Duration.ofMillis(100));

123

});

124

125

return template;

126

}

127

128

@Bean

129

public ProducerCustomizer producerCustomizer() {

130

return builder -> {

131

builder.maxUnconfirmedMessages(10000)

132

.confirmTimeout(Duration.ofSeconds(10))

133

.batchSize(1000);

134

};

135

}

136

}

137

```

138

139

## Stream Listeners

140

141

### Basic Stream Listeners

142

143

```java { .api }

144

@Component

145

public class StreamListeners {

146

147

@RabbitStreamListener(id = "stream-listener-1", queues = "my-stream")

148

public void handleStreamMessage(String message) {

149

log.info("Received stream message: {}", message);

150

processMessage(message);

151

}

152

153

@RabbitStreamListener(id = "stream-listener-2", queues = "my-stream",

154

consumerCustomizer = "customConsumerCustomizer")

155

public void handleWithCustomConsumer(String message, Context context) {

156

log.info("Received message at offset {}: {}", context.offset(), message);

157

// Process message with offset information

158

}

159

160

@RabbitStreamListener(id = "native-listener", queues = "my-stream")

161

public void handleNativeMessage(com.rabbitmq.stream.Message message, Context context) {

162

// Handle native stream message

163

byte[] data = message.getBodyAsBinary();

164

long offset = context.offset();

165

long timestamp = context.timestamp();

166

167

log.info("Native message at offset {} with timestamp {}: {}",

168

offset, timestamp, new String(data));

169

}

170

}

171

```

172

173

### Consumer Customization

174

175

```java { .api }

176

@Configuration

177

public class StreamConsumerConfig {

178

179

@Bean("customConsumerCustomizer")

180

public ConsumerCustomizer consumerCustomizer() {

181

return builder -> {

182

builder.name("custom-consumer")

183

.offset(OffsetSpecification.first())

184

.manualTracker()

185

.autoTracker(); // For automatic offset tracking

186

};

187

}

188

189

@Bean("offsetConsumerCustomizer")

190

public ConsumerCustomizer offsetConsumerCustomizer() {

191

return builder -> {

192

// Start from specific offset

193

builder.offset(OffsetSpecification.offset(1000L));

194

};

195

}

196

197

@Bean("timestampConsumerCustomizer")

198

public ConsumerCustomizer timestampConsumerCustomizer() {

199

return builder -> {

200

// Start from specific timestamp

201

Date startTime = Date.from(Instant.now().minus(1, ChronoUnit.HOURS));

202

builder.offset(OffsetSpecification.timestamp(startTime));

203

};

204

}

205

206

@Bean("lastConsumedConsumerCustomizer")

207

public ConsumerCustomizer lastConsumedConsumerCustomizer() {

208

return builder -> {

209

// Resume from last consumed offset

210

builder.offset(OffsetSpecification.next());

211

};

212

}

213

}

214

```

215

216

### Advanced Stream Listeners

217

218

```java { .api }

219

@Component

220

public class AdvancedStreamListeners {

221

222

@RabbitStreamListener(id = "batch-listener", queues = "batch-stream")

223

public void handleBatch(@Payload List<String> messages,

224

@Header(AmqpHeaders.BATCH_SIZE) int batchSize) {

225

log.info("Received batch of {} messages", batchSize);

226

messages.forEach(this::processMessage);

227

}

228

229

@RabbitStreamListener(id = "filtered-listener", queues = "filtered-stream")

230

public void handleFiltered(String message,

231

@Header("messageType") String messageType,

232

Context context) {

233

if ("IMPORTANT".equals(messageType)) {

234

processImportantMessage(message, context.offset());

235

}

236

}

237

238

@RabbitStreamListener(id = "manual-ack-listener", queues = "manual-stream")

239

public void handleWithManualAck(String message, Context context) {

240

try {

241

processMessage(message);

242

// Manual offset tracking

243

context.storeOffset();

244

} catch (Exception e) {

245

log.error("Failed to process message at offset {}: {}",

246

context.offset(), message, e);

247

// Don't store offset - message will be reprocessed

248

}

249

}

250

}

251

```

252

253

## Stream Management

254

255

### Stream Declaration

256

257

```java { .api }

258

@Configuration

259

public class StreamDeclarationConfig {

260

261

@Bean

262

public Declarables streamTopology() {

263

return new Declarables(

264

new Stream("user-events", 100_000), // Max age in seconds

265

new Stream("order-events", Duration.ofHours(24)), // Max age as duration

266

new Stream("analytics-stream", ByteCapacity.GB(10)) // Max size

267

);

268

}

269

270

@Bean

271

public Stream customStream() {

272

Map<String, Object> arguments = new HashMap<>();

273

arguments.put("max-length-bytes", 1_000_000_000L); // 1GB

274

arguments.put("max-age", "24h");

275

arguments.put("stream-max-segment-size-bytes", 500_000_000L); // 500MB segments

276

277

return new Stream("custom-stream", arguments);

278

}

279

}

280

```

281

282

### Programmatic Stream Management

283

284

```java { .api }

285

@Service

286

public class StreamManagementService {

287

288

@Autowired

289

private Environment streamEnvironment;

290

291

public void createStream(String streamName, long maxAge) {

292

try {

293

streamEnvironment.streamCreator()

294

.stream(streamName)

295

.maxAge(Duration.ofSeconds(maxAge))

296

.create();

297

log.info("Stream '{}' created with max age {} seconds", streamName, maxAge);

298

} catch (StreamException e) {

299

if (e.getCode() == Constants.RESPONSE_CODE_STREAM_ALREADY_EXISTS) {

300

log.info("Stream '{}' already exists", streamName);

301

} else {

302

throw new RuntimeException("Failed to create stream: " + streamName, e);

303

}

304

}

305

}

306

307

public void deleteStream(String streamName) {

308

try {

309

streamEnvironment.deleteStream(streamName);

310

log.info("Stream '{}' deleted", streamName);

311

} catch (StreamException e) {

312

log.error("Failed to delete stream: {}", streamName, e);

313

throw new RuntimeException("Failed to delete stream: " + streamName, e);

314

}

315

}

316

317

public StreamStats getStreamStats(String streamName) {

318

try {

319

return streamEnvironment.queryStreamStats(streamName);

320

} catch (StreamException e) {

321

log.error("Failed to get stats for stream: {}", streamName, e);

322

throw new RuntimeException("Failed to get stream stats: " + streamName, e);

323

}

324

}

325

326

public void createStreamWithReplicas(String streamName, int replicas) {

327

streamEnvironment.streamCreator()

328

.stream(streamName)

329

.maxAge(Duration.ofDays(7))

330

.maxLengthBytes(ByteCapacity.GB(1))

331

.leaderLocator(LeaderLocator.leastLeaders())

332

.create();

333

}

334

}

335

```

336

337

## Super Streams (Partitioned Streams)

338

339

### Super Stream Configuration

340

341

```java { .api }

342

@Configuration

343

public class SuperStreamConfig {

344

345

@Bean

346

public Declarables superStreamTopology() {

347

return new Declarables(

348

new SuperStream("partitioned-events", 3), // 3 partitions

349

new SuperStream("user-activities", 5) // 5 partitions

350

);

351

}

352

353

@Bean

354

public RabbitStreamTemplate superStreamTemplate(Environment environment) {

355

RabbitStreamTemplate template = new RabbitStreamTemplate(environment, "partitioned-events");

356

357

// Configure routing strategy for partitioning

358

template.setSuperStreamRoutingStrategy(message -> {

359

// Route based on user ID for even distribution

360

String userId = (String) message.getApplicationProperties().get("userId");

361

return userId != null ? userId : "default";

362

});

363

364

return template;

365

}

366

}

367

```

368

369

### Super Stream Publishing

370

371

```java { .api }

372

@Service

373

public class SuperStreamPublisher {

374

375

@Autowired

376

private RabbitStreamTemplate superStreamTemplate;

377

378

public void publishToPartition(String userId, String eventData) {

379

superStreamTemplate.convertAndSend(eventData, messageBuilder -> {

380

return messageBuilder

381

.applicationProperties()

382

.entry("userId", userId)

383

.messageBuilder();

384

});

385

}

386

387

public void publishWithCustomRouting(String message, String routingKey) {

388

superStreamTemplate.send(

389

superStreamTemplate.messageBuilder()

390

.addData(message.getBytes())

391

.applicationProperties()

392

.entry("routingKey", routingKey)

393

.messageBuilder(),

394

routingKey // Explicit routing key

395

);

396

}

397

}

398

```

399

400

### Super Stream Consumers

401

402

```java { .api }

403

@Component

404

public class SuperStreamConsumers {

405

406

@RabbitStreamListener(id = "super-stream-consumer", queues = "partitioned-events")

407

public void handlePartitionedMessage(String message, Context context) {

408

String partition = context.stream(); // Get partition name

409

log.info("Received message from partition {}: {}", partition, message);

410

}

411

412

@RabbitStreamListener(id = "single-active-consumer",

413

queues = "partitioned-events",

414

consumerCustomizer = "singleActiveConsumerCustomizer")

415

public void handleWithSingleActiveConsumer(String message) {

416

// Only one consumer will be active per partition

417

log.info("Single active consumer received: {}", message);

418

}

419

}

420

421

@Configuration

422

public class SuperStreamConsumerConfig {

423

424

@Bean("singleActiveConsumerCustomizer")

425

public ConsumerCustomizer singleActiveConsumerCustomizer() {

426

return builder -> {

427

builder.singleActiveConsumer()

428

.consumerUpdate(updateListener -> {

429

// Handle consumer updates (active/inactive)

430

log.info("Consumer update: active = {}", updateListener.isActive());

431

});

432

};

433

}

434

}

435

```

436

437

## Performance Optimization

438

439

### High-Throughput Configuration

440

441

```java { .api }

442

@Configuration

443

public class HighThroughputStreamConfig {

444

445

@Bean

446

public Environment highThroughputEnvironment() {

447

return Environment.builder()

448

.host("localhost")

449

.port(5552)

450

.requestedMaxFrameSize(1048576) // 1MB frame size

451

.requestedHeartbeat(Duration.ofSeconds(10))

452

.build();

453

}

454

455

@Bean

456

public RabbitStreamTemplate highThroughputTemplate(Environment environment) {

457

RabbitStreamTemplate template = new RabbitStreamTemplate(environment, "high-throughput-stream");

458

459

template.setProducerCustomizer(builder -> {

460

builder.maxUnconfirmedMessages(20000)

461

.confirmTimeout(Duration.ofSeconds(30))

462

.batchSize(1000)

463

.batchPublishingDelay(Duration.ofMillis(10))

464

.compression(Compression.GZIP);

465

});

466

467

return template;

468

}

469

470

@Bean("highThroughputConsumerCustomizer")

471

public ConsumerCustomizer highThroughputConsumerCustomizer() {

472

return builder -> {

473

builder.offset(OffsetSpecification.next())

474

.creditOnProcessedMessageCount(1000) // Credit every 1000 messages

475

.initialCredits(10000); // Initial credit window

476

};

477

}

478

}

479

```

480

481

### Batch Processing Configuration

482

483

```java { .api }

484

@Configuration

485

public class BatchStreamConfig {

486

487

@Bean

488

public SimpleRabbitListenerContainerFactory streamBatchContainerFactory(

489

Environment environment) {

490

491

SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

492

factory.setNativeListener(true);

493

factory.setBatchListener(true);

494

factory.setBatchSize(100);

495

factory.setConsumerBatchEnabled(true);

496

497

return factory;

498

}

499

}

500

501

@Component

502

public class BatchStreamProcessor {

503

504

@RabbitStreamListener(id = "batch-processor",

505

queues = "batch-stream",

506

containerFactory = "streamBatchContainerFactory")

507

public void processBatch(List<com.rabbitmq.stream.Message> messages,

508

@Header(AmqpHeaders.BATCH_SIZE) int batchSize) {

509

510

log.info("Processing batch of {} messages", batchSize);

511

512

List<ProcessedMessage> processedMessages = messages.stream()

513

.map(this::processStreamMessage)

514

.collect(Collectors.toList());

515

516

// Bulk process

517

bulkProcess(processedMessages);

518

}

519

520

private ProcessedMessage processStreamMessage(com.rabbitmq.stream.Message message) {

521

// Convert and process stream message

522

return new ProcessedMessage(new String(message.getBodyAsBinary()));

523

}

524

525

private void bulkProcess(List<ProcessedMessage> messages) {

526

// Bulk processing logic

527

}

528

}

529

```

530

531

## Stream Monitoring and Management

532

533

### Stream Health and Metrics

534

535

```java { .api }

536

@Component

537

public class StreamMonitoring {

538

539

@Autowired

540

private Environment streamEnvironment;

541

542

@Autowired

543

private MeterRegistry meterRegistry;

544

545

@EventListener

546

public void onStreamStats(StreamStatsEvent event) {

547

StreamStats stats = event.getStats();

548

String streamName = event.getStreamName();

549

550

// Register metrics

551

Gauge.builder("stream.committed.offset")

552

.description("Stream committed offset")

553

.register(meterRegistry, stats, s -> s.getCommittedChunkId());

554

555

Gauge.builder("stream.first.offset")

556

.description("Stream first offset")

557

.register(meterRegistry, stats, s -> s.getFirstOffset());

558

559

Gauge.builder("stream.last.offset")

560

.description("Stream last offset")

561

.register(meterRegistry, stats, s -> s.getLastOffset());

562

}

563

564

@Scheduled(fixedRate = 30000) // Every 30 seconds

565

public void collectStreamMetrics() {

566

List<String> streams = getActiveStreams();

567

568

for (String streamName : streams) {

569

try {

570

StreamStats stats = streamEnvironment.queryStreamStats(streamName);

571

publishStreamMetrics(streamName, stats);

572

} catch (Exception e) {

573

log.error("Failed to collect metrics for stream: {}", streamName, e);

574

}

575

}

576

}

577

578

private List<String> getActiveStreams() {

579

// Return list of active streams to monitor

580

return Arrays.asList("user-events", "order-events", "analytics-stream");

581

}

582

583

private void publishStreamMetrics(String streamName, StreamStats stats) {

584

// Publish metrics to monitoring system

585

meterRegistry.gauge("stream.size.bytes", Tags.of("stream", streamName), stats.getByteSize());

586

meterRegistry.gauge("stream.message.count", Tags.of("stream", streamName),

587

stats.getLastOffset() - stats.getFirstOffset());

588

}

589

}

590

```

591

592

### Stream Administration

593

594

```java { .api }

595

@RestController

596

@RequestMapping("/admin/streams")

597

public class StreamAdminController {

598

599

@Autowired

600

private StreamManagementService streamManagementService;

601

602

@PostMapping("/{streamName}")

603

public ResponseEntity<String> createStream(@PathVariable String streamName,

604

@RequestParam(defaultValue = "86400") long maxAgeSeconds) {

605

try {

606

streamManagementService.createStream(streamName, maxAgeSeconds);

607

return ResponseEntity.ok("Stream created successfully");

608

} catch (Exception e) {

609

return ResponseEntity.status(500).body("Failed to create stream: " + e.getMessage());

610

}

611

}

612

613

@DeleteMapping("/{streamName}")

614

public ResponseEntity<String> deleteStream(@PathVariable String streamName) {

615

try {

616

streamManagementService.deleteStream(streamName);

617

return ResponseEntity.ok("Stream deleted successfully");

618

} catch (Exception e) {

619

return ResponseEntity.status(500).body("Failed to delete stream: " + e.getMessage());

620

}

621

}

622

623

@GetMapping("/{streamName}/stats")

624

public ResponseEntity<StreamStatsResponse> getStreamStats(@PathVariable String streamName) {

625

try {

626

StreamStats stats = streamManagementService.getStreamStats(streamName);

627

StreamStatsResponse response = new StreamStatsResponse(stats);

628

return ResponseEntity.ok(response);

629

} catch (Exception e) {

630

return ResponseEntity.status(500).build();

631

}

632

}

633

}

634

```