0
# Stream Processing Support
1
2
Configuration and usage of RabbitMQ Streams for high-throughput message processing scenarios, including stream templates, listeners, and advanced stream features.
3
4
## RabbitMQ Streams Overview
5
6
RabbitMQ Streams provide a high-throughput, persistent messaging solution designed for scenarios requiring:
7
- High message throughput (millions of messages per second)
8
- Message replay capability
9
- Persistent message storage
10
- Offset-based consumption
11
12
## Basic Stream Configuration
13
14
### Stream Properties Configuration
15
16
```yaml { .api }
17
spring:
18
rabbitmq:
19
stream:
20
name: my-stream # Default stream name
21
host: localhost # Stream host (default: localhost)
22
port: 5552 # Stream port (default: 5552)
23
username: guest # Stream username (uses rabbitmq.username if not set)
24
password: guest # Stream password (uses rabbitmq.password if not set)
25
```
26
27
### Stream Auto-Configuration
28
29
```java { .api }
30
@Configuration
31
@EnableRabbitStreaming
32
public class StreamConfig {
33
34
@Bean
35
public Environment streamEnvironment() {
36
return Environment.builder()
37
.host("localhost")
38
.port(5552)
39
.username("guest")
40
.password("guest")
41
.build();
42
}
43
44
@Bean
45
public RabbitStreamTemplate streamTemplate(Environment environment) {
46
return new RabbitStreamTemplate(environment, "my-stream");
47
}
48
}
49
```
50
51
## Stream Template Operations
52
53
### RabbitStreamTemplate Usage
54
55
```java { .api }
56
@Service
57
public class StreamPublisher {
58
59
@Autowired
60
private RabbitStreamTemplate streamTemplate;
61
62
// Basic message publishing
63
public void publishMessage(String message) {
64
streamTemplate.convertAndSend(message);
65
}
66
67
// Publish with custom properties
68
public void publishWithProperties(Object message) {
69
streamTemplate.convertAndSend(message, messageBuilder -> {
70
return messageBuilder
71
.properties()
72
.messageId("msg-" + UUID.randomUUID())
73
.correlationId("corr-" + System.currentTimeMillis())
74
.timestamp(new Date())
75
.messageBuilder();
76
});
77
}
78
79
// Batch publishing for high throughput
80
public void publishBatch(List<String> messages) {
81
List<com.rabbitmq.stream.Message> streamMessages = messages.stream()
82
.map(msg -> streamTemplate.messageBuilder().addData(msg.getBytes()).build())
83
.collect(Collectors.toList());
84
85
streamTemplate.send(streamMessages);
86
}
87
88
// Publish with confirmation
89
public void publishWithConfirmation(String message) {
90
ConfirmationHandler confirmationHandler = confirmationStatus -> {
91
if (confirmationStatus.isConfirmed()) {
92
log.info("Message confirmed");
93
} else {
94
log.error("Message not confirmed: {}", confirmationStatus.getCode());
95
}
96
};
97
98
streamTemplate.convertAndSend(message, confirmationHandler);
99
}
100
}
101
```
102
103
### Stream Template Configuration
104
105
```java { .api }
106
@Configuration
107
public class StreamTemplateConfig {
108
109
@Bean
110
public RabbitStreamTemplate customStreamTemplate(Environment environment) {
111
RabbitStreamTemplate template = new RabbitStreamTemplate(environment, "custom-stream");
112
113
// Configure message converter
114
template.setStreamMessageConverter(new SimpleStreamMessageConverter());
115
116
// Configure producer properties
117
template.setProducerCustomizer(builder -> {
118
builder.name("custom-producer")
119
.maxUnconfirmedMessages(1000)
120
.confirmTimeout(Duration.ofSeconds(5))
121
.batchSize(100)
122
.batchPublishingDelay(Duration.ofMillis(100));
123
});
124
125
return template;
126
}
127
128
@Bean
129
public ProducerCustomizer producerCustomizer() {
130
return builder -> {
131
builder.maxUnconfirmedMessages(10000)
132
.confirmTimeout(Duration.ofSeconds(10))
133
.batchSize(1000);
134
};
135
}
136
}
137
```
138
139
## Stream Listeners
140
141
### Basic Stream Listeners
142
143
```java { .api }
144
@Component
145
public class StreamListeners {
146
147
@RabbitStreamListener(id = "stream-listener-1", queues = "my-stream")
148
public void handleStreamMessage(String message) {
149
log.info("Received stream message: {}", message);
150
processMessage(message);
151
}
152
153
@RabbitStreamListener(id = "stream-listener-2", queues = "my-stream",
154
consumerCustomizer = "customConsumerCustomizer")
155
public void handleWithCustomConsumer(String message, Context context) {
156
log.info("Received message at offset {}: {}", context.offset(), message);
157
// Process message with offset information
158
}
159
160
@RabbitStreamListener(id = "native-listener", queues = "my-stream")
161
public void handleNativeMessage(com.rabbitmq.stream.Message message, Context context) {
162
// Handle native stream message
163
byte[] data = message.getBodyAsBinary();
164
long offset = context.offset();
165
long timestamp = context.timestamp();
166
167
log.info("Native message at offset {} with timestamp {}: {}",
168
offset, timestamp, new String(data));
169
}
170
}
171
```
172
173
### Consumer Customization
174
175
```java { .api }
176
@Configuration
177
public class StreamConsumerConfig {
178
179
@Bean("customConsumerCustomizer")
180
public ConsumerCustomizer consumerCustomizer() {
181
return builder -> {
182
builder.name("custom-consumer")
183
.offset(OffsetSpecification.first())
184
.manualTracker()
185
.autoTracker(); // For automatic offset tracking
186
};
187
}
188
189
@Bean("offsetConsumerCustomizer")
190
public ConsumerCustomizer offsetConsumerCustomizer() {
191
return builder -> {
192
// Start from specific offset
193
builder.offset(OffsetSpecification.offset(1000L));
194
};
195
}
196
197
@Bean("timestampConsumerCustomizer")
198
public ConsumerCustomizer timestampConsumerCustomizer() {
199
return builder -> {
200
// Start from specific timestamp
201
Date startTime = Date.from(Instant.now().minus(1, ChronoUnit.HOURS));
202
builder.offset(OffsetSpecification.timestamp(startTime));
203
};
204
}
205
206
@Bean("lastConsumedConsumerCustomizer")
207
public ConsumerCustomizer lastConsumedConsumerCustomizer() {
208
return builder -> {
209
// Resume from last consumed offset
210
builder.offset(OffsetSpecification.next());
211
};
212
}
213
}
214
```
215
216
### Advanced Stream Listeners
217
218
```java { .api }
219
@Component
220
public class AdvancedStreamListeners {
221
222
@RabbitStreamListener(id = "batch-listener", queues = "batch-stream")
223
public void handleBatch(@Payload List<String> messages,
224
@Header(AmqpHeaders.BATCH_SIZE) int batchSize) {
225
log.info("Received batch of {} messages", batchSize);
226
messages.forEach(this::processMessage);
227
}
228
229
@RabbitStreamListener(id = "filtered-listener", queues = "filtered-stream")
230
public void handleFiltered(String message,
231
@Header("messageType") String messageType,
232
Context context) {
233
if ("IMPORTANT".equals(messageType)) {
234
processImportantMessage(message, context.offset());
235
}
236
}
237
238
@RabbitStreamListener(id = "manual-ack-listener", queues = "manual-stream")
239
public void handleWithManualAck(String message, Context context) {
240
try {
241
processMessage(message);
242
// Manual offset tracking
243
context.storeOffset();
244
} catch (Exception e) {
245
log.error("Failed to process message at offset {}: {}",
246
context.offset(), message, e);
247
// Don't store offset - message will be reprocessed
248
}
249
}
250
}
251
```
252
253
## Stream Management
254
255
### Stream Declaration
256
257
```java { .api }
258
@Configuration
259
public class StreamDeclarationConfig {
260
261
@Bean
262
public Declarables streamTopology() {
263
return new Declarables(
264
new Stream("user-events", 100_000), // Max age in seconds
265
new Stream("order-events", Duration.ofHours(24)), // Max age as duration
266
new Stream("analytics-stream", ByteCapacity.GB(10)) // Max size
267
);
268
}
269
270
@Bean
271
public Stream customStream() {
272
Map<String, Object> arguments = new HashMap<>();
273
arguments.put("max-length-bytes", 1_000_000_000L); // 1GB
274
arguments.put("max-age", "24h");
275
arguments.put("stream-max-segment-size-bytes", 500_000_000L); // 500MB segments
276
277
return new Stream("custom-stream", arguments);
278
}
279
}
280
```
281
282
### Programmatic Stream Management
283
284
```java { .api }
285
@Service
286
public class StreamManagementService {
287
288
@Autowired
289
private Environment streamEnvironment;
290
291
public void createStream(String streamName, long maxAge) {
292
try {
293
streamEnvironment.streamCreator()
294
.stream(streamName)
295
.maxAge(Duration.ofSeconds(maxAge))
296
.create();
297
log.info("Stream '{}' created with max age {} seconds", streamName, maxAge);
298
} catch (StreamException e) {
299
if (e.getCode() == Constants.RESPONSE_CODE_STREAM_ALREADY_EXISTS) {
300
log.info("Stream '{}' already exists", streamName);
301
} else {
302
throw new RuntimeException("Failed to create stream: " + streamName, e);
303
}
304
}
305
}
306
307
public void deleteStream(String streamName) {
308
try {
309
streamEnvironment.deleteStream(streamName);
310
log.info("Stream '{}' deleted", streamName);
311
} catch (StreamException e) {
312
log.error("Failed to delete stream: {}", streamName, e);
313
throw new RuntimeException("Failed to delete stream: " + streamName, e);
314
}
315
}
316
317
public StreamStats getStreamStats(String streamName) {
318
try {
319
return streamEnvironment.queryStreamStats(streamName);
320
} catch (StreamException e) {
321
log.error("Failed to get stats for stream: {}", streamName, e);
322
throw new RuntimeException("Failed to get stream stats: " + streamName, e);
323
}
324
}
325
326
public void createStreamWithReplicas(String streamName, int replicas) {
327
streamEnvironment.streamCreator()
328
.stream(streamName)
329
.maxAge(Duration.ofDays(7))
330
.maxLengthBytes(ByteCapacity.GB(1))
331
.leaderLocator(LeaderLocator.leastLeaders())
332
.create();
333
}
334
}
335
```
336
337
## Super Streams (Partitioned Streams)
338
339
### Super Stream Configuration
340
341
```java { .api }
342
@Configuration
343
public class SuperStreamConfig {
344
345
@Bean
346
public Declarables superStreamTopology() {
347
return new Declarables(
348
new SuperStream("partitioned-events", 3), // 3 partitions
349
new SuperStream("user-activities", 5) // 5 partitions
350
);
351
}
352
353
@Bean
354
public RabbitStreamTemplate superStreamTemplate(Environment environment) {
355
RabbitStreamTemplate template = new RabbitStreamTemplate(environment, "partitioned-events");
356
357
// Configure routing strategy for partitioning
358
template.setSuperStreamRoutingStrategy(message -> {
359
// Route based on user ID for even distribution
360
String userId = (String) message.getApplicationProperties().get("userId");
361
return userId != null ? userId : "default";
362
});
363
364
return template;
365
}
366
}
367
```
368
369
### Super Stream Publishing
370
371
```java { .api }
372
@Service
373
public class SuperStreamPublisher {
374
375
@Autowired
376
private RabbitStreamTemplate superStreamTemplate;
377
378
public void publishToPartition(String userId, String eventData) {
379
superStreamTemplate.convertAndSend(eventData, messageBuilder -> {
380
return messageBuilder
381
.applicationProperties()
382
.entry("userId", userId)
383
.messageBuilder();
384
});
385
}
386
387
public void publishWithCustomRouting(String message, String routingKey) {
388
superStreamTemplate.send(
389
superStreamTemplate.messageBuilder()
390
.addData(message.getBytes())
391
.applicationProperties()
392
.entry("routingKey", routingKey)
393
.messageBuilder(),
394
routingKey // Explicit routing key
395
);
396
}
397
}
398
```
399
400
### Super Stream Consumers
401
402
```java { .api }
403
@Component
404
public class SuperStreamConsumers {
405
406
@RabbitStreamListener(id = "super-stream-consumer", queues = "partitioned-events")
407
public void handlePartitionedMessage(String message, Context context) {
408
String partition = context.stream(); // Get partition name
409
log.info("Received message from partition {}: {}", partition, message);
410
}
411
412
@RabbitStreamListener(id = "single-active-consumer",
413
queues = "partitioned-events",
414
consumerCustomizer = "singleActiveConsumerCustomizer")
415
public void handleWithSingleActiveConsumer(String message) {
416
// Only one consumer will be active per partition
417
log.info("Single active consumer received: {}", message);
418
}
419
}
420
421
@Configuration
422
public class SuperStreamConsumerConfig {
423
424
@Bean("singleActiveConsumerCustomizer")
425
public ConsumerCustomizer singleActiveConsumerCustomizer() {
426
return builder -> {
427
builder.singleActiveConsumer()
428
.consumerUpdate(updateListener -> {
429
// Handle consumer updates (active/inactive)
430
log.info("Consumer update: active = {}", updateListener.isActive());
431
});
432
};
433
}
434
}
435
```
436
437
## Performance Optimization
438
439
### High-Throughput Configuration
440
441
```java { .api }
442
@Configuration
443
public class HighThroughputStreamConfig {
444
445
@Bean
446
public Environment highThroughputEnvironment() {
447
return Environment.builder()
448
.host("localhost")
449
.port(5552)
450
.requestedMaxFrameSize(1048576) // 1MB frame size
451
.requestedHeartbeat(Duration.ofSeconds(10))
452
.build();
453
}
454
455
@Bean
456
public RabbitStreamTemplate highThroughputTemplate(Environment environment) {
457
RabbitStreamTemplate template = new RabbitStreamTemplate(environment, "high-throughput-stream");
458
459
template.setProducerCustomizer(builder -> {
460
builder.maxUnconfirmedMessages(20000)
461
.confirmTimeout(Duration.ofSeconds(30))
462
.batchSize(1000)
463
.batchPublishingDelay(Duration.ofMillis(10))
464
.compression(Compression.GZIP);
465
});
466
467
return template;
468
}
469
470
@Bean("highThroughputConsumerCustomizer")
471
public ConsumerCustomizer highThroughputConsumerCustomizer() {
472
return builder -> {
473
builder.offset(OffsetSpecification.next())
474
.creditOnProcessedMessageCount(1000) // Credit every 1000 messages
475
.initialCredits(10000); // Initial credit window
476
};
477
}
478
}
479
```
480
481
### Batch Processing Configuration
482
483
```java { .api }
484
@Configuration
485
public class BatchStreamConfig {
486
487
@Bean
488
public SimpleRabbitListenerContainerFactory streamBatchContainerFactory(
489
Environment environment) {
490
491
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
492
factory.setNativeListener(true);
493
factory.setBatchListener(true);
494
factory.setBatchSize(100);
495
factory.setConsumerBatchEnabled(true);
496
497
return factory;
498
}
499
}
500
501
@Component
502
public class BatchStreamProcessor {
503
504
@RabbitStreamListener(id = "batch-processor",
505
queues = "batch-stream",
506
containerFactory = "streamBatchContainerFactory")
507
public void processBatch(List<com.rabbitmq.stream.Message> messages,
508
@Header(AmqpHeaders.BATCH_SIZE) int batchSize) {
509
510
log.info("Processing batch of {} messages", batchSize);
511
512
List<ProcessedMessage> processedMessages = messages.stream()
513
.map(this::processStreamMessage)
514
.collect(Collectors.toList());
515
516
// Bulk process
517
bulkProcess(processedMessages);
518
}
519
520
private ProcessedMessage processStreamMessage(com.rabbitmq.stream.Message message) {
521
// Convert and process stream message
522
return new ProcessedMessage(new String(message.getBodyAsBinary()));
523
}
524
525
private void bulkProcess(List<ProcessedMessage> messages) {
526
// Bulk processing logic
527
}
528
}
529
```
530
531
## Stream Monitoring and Management
532
533
### Stream Health and Metrics
534
535
```java { .api }
536
@Component
537
public class StreamMonitoring {
538
539
@Autowired
540
private Environment streamEnvironment;
541
542
@Autowired
543
private MeterRegistry meterRegistry;
544
545
@EventListener
546
public void onStreamStats(StreamStatsEvent event) {
547
StreamStats stats = event.getStats();
548
String streamName = event.getStreamName();
549
550
// Register metrics
551
Gauge.builder("stream.committed.offset")
552
.description("Stream committed offset")
553
.register(meterRegistry, stats, s -> s.getCommittedChunkId());
554
555
Gauge.builder("stream.first.offset")
556
.description("Stream first offset")
557
.register(meterRegistry, stats, s -> s.getFirstOffset());
558
559
Gauge.builder("stream.last.offset")
560
.description("Stream last offset")
561
.register(meterRegistry, stats, s -> s.getLastOffset());
562
}
563
564
@Scheduled(fixedRate = 30000) // Every 30 seconds
565
public void collectStreamMetrics() {
566
List<String> streams = getActiveStreams();
567
568
for (String streamName : streams) {
569
try {
570
StreamStats stats = streamEnvironment.queryStreamStats(streamName);
571
publishStreamMetrics(streamName, stats);
572
} catch (Exception e) {
573
log.error("Failed to collect metrics for stream: {}", streamName, e);
574
}
575
}
576
}
577
578
private List<String> getActiveStreams() {
579
// Return list of active streams to monitor
580
return Arrays.asList("user-events", "order-events", "analytics-stream");
581
}
582
583
private void publishStreamMetrics(String streamName, StreamStats stats) {
584
// Publish metrics to monitoring system
585
meterRegistry.gauge("stream.size.bytes", Tags.of("stream", streamName), stats.getByteSize());
586
meterRegistry.gauge("stream.message.count", Tags.of("stream", streamName),
587
stats.getLastOffset() - stats.getFirstOffset());
588
}
589
}
590
```
591
592
### Stream Administration
593
594
```java { .api }
595
@RestController
596
@RequestMapping("/admin/streams")
597
public class StreamAdminController {
598
599
@Autowired
600
private StreamManagementService streamManagementService;
601
602
@PostMapping("/{streamName}")
603
public ResponseEntity<String> createStream(@PathVariable String streamName,
604
@RequestParam(defaultValue = "86400") long maxAgeSeconds) {
605
try {
606
streamManagementService.createStream(streamName, maxAgeSeconds);
607
return ResponseEntity.ok("Stream created successfully");
608
} catch (Exception e) {
609
return ResponseEntity.status(500).body("Failed to create stream: " + e.getMessage());
610
}
611
}
612
613
@DeleteMapping("/{streamName}")
614
public ResponseEntity<String> deleteStream(@PathVariable String streamName) {
615
try {
616
streamManagementService.deleteStream(streamName);
617
return ResponseEntity.ok("Stream deleted successfully");
618
} catch (Exception e) {
619
return ResponseEntity.status(500).body("Failed to delete stream: " + e.getMessage());
620
}
621
}
622
623
@GetMapping("/{streamName}/stats")
624
public ResponseEntity<StreamStatsResponse> getStreamStats(@PathVariable String streamName) {
625
try {
626
StreamStats stats = streamManagementService.getStreamStats(streamName);
627
StreamStatsResponse response = new StreamStatsResponse(stats);
628
return ResponseEntity.ok(response);
629
} catch (Exception e) {
630
return ResponseEntity.status(500).build();
631
}
632
}
633
}
634
```