0
# Stream Support
1
2
Auto-configuration for RabbitMQ Streams, providing high-throughput persistent messaging with replay capabilities for building event-driven architectures and streaming data pipelines.
3
4
## Capabilities
5
6
### RabbitMQ Streams Overview
7
8
RabbitMQ Streams are a persistent messaging protocol designed for high-throughput scenarios where message replay and persistence are important. Unlike traditional AMQP queues, streams maintain message order and allow consumers to read from any point in the stream.
9
10
```java { .api }
11
/**
12
* Stream auto-configuration
13
*/
14
@Configuration(proxyBeanMethods = false)
15
@ConditionalOnClass(RabbitStreamTemplate.class)
16
public class RabbitStreamConfiguration {
17
18
/** Configure stream template */
19
@Bean
20
@ConditionalOnMissingBean
21
public RabbitStreamTemplateConfigurer rabbitStreamTemplateConfigurer(RabbitProperties properties);
22
23
/** Create stream template */
24
@Bean
25
@ConditionalOnMissingBean
26
public RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment,
27
RabbitStreamTemplateConfigurer configurer);
28
}
29
```
30
31
### Stream Template
32
33
Primary interface for producing messages to RabbitMQ streams with support for message routing and producer confirmation.
34
35
```java { .api }
36
/**
37
* Template for RabbitMQ Stream operations
38
*/
39
public class RabbitStreamTemplate implements RabbitStreamOperations {
40
41
/** Send message to stream */
42
public MessageBuilder send(Message message);
43
44
/** Send message with routing key */
45
public MessageBuilder send(Message message, String routingKey);
46
47
/** Convert and send object */
48
public MessageBuilder convertAndSend(Object message);
49
50
/** Convert and send with routing key */
51
public MessageBuilder convertAndSend(Object message, String routingKey);
52
53
/** Convert and send with properties */
54
public MessageBuilder convertAndSend(Object message, MessageProperties properties);
55
56
/** Message builder for fluent API */
57
public interface MessageBuilder {
58
/** Set routing key */
59
MessageBuilder to(String routingKey);
60
61
/** Set message properties */
62
MessageBuilder withProperties(MessageProperties properties);
63
64
/** Send synchronously */
65
void send();
66
67
/** Send asynchronously */
68
CompletableFuture<Void> sendAsync();
69
}
70
}
71
```
72
73
**Stream Producer Example:**
74
75
```java
76
import org.springframework.amqp.rabbit.stream.producer.RabbitStreamTemplate;
77
import org.springframework.beans.factory.annotation.Autowired;
78
import org.springframework.stereotype.Service;
79
80
@Service
81
public class StreamProducerService {
82
83
@Autowired
84
private RabbitStreamTemplate streamTemplate;
85
86
public void publishEvent(String streamName, Object event) {
87
// Send to stream with routing key
88
streamTemplate.convertAndSend(event, streamName);
89
}
90
91
public CompletableFuture<Void> publishEventAsync(String streamName, Object event) {
92
// Asynchronous send
93
return streamTemplate.convertAndSend(event)
94
.to(streamName)
95
.sendAsync();
96
}
97
98
public void publishWithCustomProperties(String streamName, Object event, String correlationId) {
99
MessageProperties properties = new MessageProperties();
100
properties.setCorrelationId(correlationId);
101
properties.setTimestamp(new Date());
102
103
streamTemplate.convertAndSend(event, properties)
104
.to(streamName)
105
.send();
106
}
107
}
108
```
109
110
### Stream Consumers
111
112
Consumer configuration for reading messages from RabbitMQ streams with offset management and replay capabilities.
113
114
```java { .api }
115
/**
116
* Stream listener annotation
117
*/
118
@Target({ElementType.METHOD})
119
@Retention(RetentionPolicy.RUNTIME)
120
public @interface RabbitStreamListener {
121
122
/** Stream name to consume from */
123
String[] queues() default {};
124
125
/** Consumer group for offset management */
126
String group() default "";
127
128
/** Offset specification (first, last, timestamp, offset) */
129
String offset() default "";
130
131
/** Container factory */
132
String containerFactory() default "";
133
134
/** Auto startup */
135
String autoStartup() default "";
136
137
/** Concurrency */
138
String concurrency() default "";
139
}
140
141
/**
142
* Stream message context for manual offset management
143
*/
144
public class StreamMessageContext {
145
146
/** Get message offset */
147
public long getOffset();
148
149
/** Get stream name */
150
public String getStream();
151
152
/** Get timestamp */
153
public long getTimestamp();
154
155
/** Manual acknowledgment */
156
public void ack();
157
}
158
```
159
160
**Stream Consumer Examples:**
161
162
```java
163
import org.springframework.amqp.rabbit.annotation.RabbitStreamListener;
164
import org.springframework.stereotype.Component;
165
166
@Component
167
public class StreamConsumers {
168
169
// Basic stream consumer
170
@RabbitStreamListener(queues = "events.stream")
171
public void handleStreamEvent(String message) {
172
System.out.println("Received stream message: " + message);
173
}
174
175
// Consumer with group (for offset management)
176
@RabbitStreamListener(queues = "user.events", group = "analytics-service")
177
public void handleUserEvents(UserEvent event) {
178
// Process user event
179
analyticsService.processUserEvent(event);
180
}
181
182
// Consumer starting from beginning
183
@RabbitStreamListener(queues = "audit.stream", offset = "first")
184
public void replayAuditEvents(AuditEvent event) {
185
// Replay all audit events from beginning
186
auditProcessor.reprocess(event);
187
}
188
189
// Consumer starting from specific timestamp
190
@RabbitStreamListener(queues = "transactions.stream",
191
offset = "timestamp:2023-01-01T00:00:00Z")
192
public void processTransactionsFrom(TransactionEvent transaction) {
193
// Process transactions from specific date
194
transactionProcessor.process(transaction);
195
}
196
197
// Manual offset management
198
@RabbitStreamListener(queues = "critical.stream")
199
public void handleCriticalEvents(String message, StreamMessageContext context) {
200
try {
201
processCriticalMessage(message);
202
// Manually acknowledge after successful processing
203
context.ack();
204
} catch (Exception e) {
205
// Don't ack on error - message will be redelivered to other consumers
206
log.error("Failed to process message at offset {}", context.getOffset(), e);
207
}
208
}
209
}
210
```
211
212
### Stream Configuration Properties
213
214
Configuration properties specific to RabbitMQ Streams under the `spring.rabbitmq.stream` prefix.
215
216
```java { .api }
217
/**
218
* Stream-specific configuration properties
219
*/
220
public static class Stream {
221
222
/** Stream host (defaults to main RabbitMQ host) */
223
private String host;
224
225
/** Stream port (default: 5552) */
226
private int port = 5552;
227
228
/** Stream username */
229
private String username;
230
231
/** Stream password */
232
private String password;
233
234
/** Stream name pattern */
235
private String name;
236
237
/** Stream environment configuration */
238
private final Environment environment = new Environment();
239
240
/**
241
* Stream environment configuration
242
*/
243
public static class Environment {
244
245
/** Maximum frame size */
246
private DataSize maxFrameSize;
247
248
/** Heartbeat interval */
249
private Duration heartbeat;
250
251
/** Connection timeout */
252
private Duration connectionTimeout;
253
254
/** Recovery back-off delay */
255
private Duration recoveryBackOffDelay;
256
257
/** Topology recovery enabled */
258
private Boolean topologyRecovery;
259
}
260
}
261
```
262
263
**Stream Configuration Example:**
264
265
```yaml
266
spring:
267
rabbitmq:
268
# Main RabbitMQ connection
269
host: rabbitmq.example.com
270
username: myapp
271
password: ${RABBITMQ_PASSWORD}
272
273
# Stream-specific configuration
274
stream:
275
host: ${spring.rabbitmq.host} # Use same host
276
port: 5552
277
username: ${spring.rabbitmq.username}
278
password: ${spring.rabbitmq.password}
279
environment:
280
max-frame-size: 1MB
281
heartbeat: 60s
282
connection-timeout: 30s
283
recovery-back-off-delay: 5s
284
topology-recovery: true
285
```
286
287
### Stream Container Factory
288
289
Factory configuration for stream listener containers with performance tuning options.
290
291
```java { .api }
292
/**
293
* Stream container factory configuration
294
*/
295
public static class StreamContainer {
296
297
/** Auto startup */
298
private boolean autoStartup = true;
299
300
/** Native listener */
301
private boolean nativeListener;
302
303
/** Retry configuration */
304
private final ListenerRetry retry = new ListenerRetry();
305
}
306
307
/**
308
* Stream listener container factory
309
*/
310
@Bean
311
@ConditionalOnMissingBean
312
public StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(
313
Environment rabbitStreamEnvironment,
314
StreamRabbitListenerContainerFactoryConfigurer configurer) {
315
316
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory();
317
configurer.configure(factory, rabbitStreamEnvironment);
318
return factory;
319
}
320
```
321
322
### Stream Administrative Operations
323
324
Administrative operations for managing streams, including creation, deletion, and metadata queries.
325
326
```java { .api }
327
/**
328
* Stream administrative operations
329
*/
330
public interface StreamAdmin {
331
332
/** Create a stream */
333
void createStream(String name, StreamCreationOptions options);
334
335
/** Delete a stream */
336
void deleteStream(String name);
337
338
/** Check if stream exists */
339
boolean streamExists(String name);
340
341
/** Get stream metadata */
342
StreamMetadata getStreamMetadata(String name);
343
344
/** Get stream statistics */
345
StreamStatistics getStreamStatistics(String name);
346
}
347
348
/**
349
* Stream creation options
350
*/
351
public class StreamCreationOptions {
352
353
/** Maximum age of messages */
354
private Duration maxAge;
355
356
/** Maximum size of stream */
357
private DataSize maxLength;
358
359
/** Maximum segment size */
360
private DataSize maxSegmentSize;
361
362
/** Leader locator strategy */
363
private String leaderLocator;
364
365
/** Initial cluster size */
366
private int initialClusterSize;
367
}
368
```
369
370
**Stream Administration Example:**
371
372
```java
373
import org.springframework.context.annotation.Bean;
374
import org.springframework.context.annotation.Configuration;
375
376
@Configuration
377
public class StreamAdminConfig {
378
379
@Bean
380
public StreamAdmin streamAdmin(Environment environment) {
381
return new StreamAdmin(environment);
382
}
383
384
@PostConstruct
385
public void setupStreams() {
386
// Create streams with retention policies
387
StreamCreationOptions options = new StreamCreationOptions()
388
.maxAge(Duration.ofDays(7)) // Keep messages for 7 days
389
.maxLength(DataSize.ofGigabytes(10)) // Max 10GB
390
.maxSegmentSize(DataSize.ofMegabytes(500)); // 500MB segments
391
392
if (!streamAdmin.streamExists("events.stream")) {
393
streamAdmin.createStream("events.stream", options);
394
}
395
396
if (!streamAdmin.streamExists("audit.stream")) {
397
// Audit stream with longer retention
398
StreamCreationOptions auditOptions = new StreamCreationOptions()
399
.maxAge(Duration.ofDays(365)) // Keep for 1 year
400
.maxLength(DataSize.ofGigabytes(100));
401
streamAdmin.createStream("audit.stream", auditOptions);
402
}
403
}
404
}
405
```
406
407
### Stream Performance Considerations
408
409
Configuration and best practices for optimal stream performance.
410
411
```java { .api }
412
/**
413
* Stream performance configuration
414
*/
415
@Configuration
416
public class StreamPerformanceConfig {
417
418
@Bean
419
@Primary
420
public RabbitStreamTemplate optimizedStreamTemplate(Environment environment) {
421
RabbitStreamTemplate template = new RabbitStreamTemplate(environment);
422
423
// Configure for high throughput
424
template.setProducerCustomizer(producer -> {
425
producer.batchSize(100); // Batch messages
426
producer.batchPublishingDelay(Duration.ofMillis(10)); // Small delay for batching
427
producer.compression(CompressionType.GZIP); // Compress messages
428
});
429
430
return template;
431
}
432
433
@Bean
434
public StreamRabbitListenerContainerFactory highThroughputContainerFactory(Environment environment) {
435
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory();
436
factory.setEnvironment(environment);
437
438
// Configure for high throughput consumption
439
factory.setConsumerCustomizer(consumer -> {
440
consumer.offset(OffsetSpecification.last()); // Start from latest
441
consumer.manualTrackingStrategy(); // Manual offset tracking
442
});
443
444
return factory;
445
}
446
}
447
```
448
449
### Stream Environment Customization
450
451
Spring Boot provides interfaces for customizing the RabbitMQ Stream environment.
452
453
```java { .api }
454
/**
455
* Callback interface for customizing the Stream EnvironmentBuilder
456
*/
457
@FunctionalInterface
458
public interface EnvironmentBuilderCustomizer {
459
460
/** Customize the EnvironmentBuilder */
461
void customize(EnvironmentBuilder builder);
462
}
463
464
/**
465
* Callback interface for customizing Stream producers
466
*/
467
@FunctionalInterface
468
public interface ProducerCustomizer {
469
470
/** Customize a stream producer */
471
void customize(Producer producer);
472
}
473
474
/**
475
* Callback interface for customizing Stream consumers
476
*/
477
@FunctionalInterface
478
public interface ConsumerCustomizer {
479
480
/** Customize a stream consumer */
481
void customize(Consumer consumer);
482
}
483
```
484
485
**Environment Customization Example:**
486
487
```java
488
import org.springframework.boot.autoconfigure.amqp.EnvironmentBuilderCustomizer;
489
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
490
491
@Configuration
492
public class StreamEnvironmentConfig {
493
494
@Bean
495
public EnvironmentBuilderCustomizer environmentBuilderCustomizer() {
496
return builder -> {
497
builder.lazyInitialization(true)
498
.addressResolver(address -> address) // Custom address resolution
499
.recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixedWithInitialDelayDelayPolicy(
500
Duration.ofSeconds(1), Duration.ofSeconds(10)));
501
};
502
}
503
504
@Bean
505
public ProducerCustomizer producerCustomizer() {
506
return producer -> {
507
producer.batchSize(50)
508
.batchPublishingDelay(Duration.ofMillis(100))
509
.maxUnconfirmedMessages(1000);
510
};
511
}
512
}
513
```