Spring Boot starter for AMQP messaging with RabbitMQ including auto-configuration, connection management, and message processing capabilities
—
Auto-configuration for RabbitMQ Streams, providing high-throughput persistent messaging with replay capabilities for building event-driven architectures and streaming data pipelines.
RabbitMQ Streams are a persistent messaging protocol designed for high-throughput scenarios where message replay and persistence are important. Unlike traditional AMQP queues, streams maintain message order and allow consumers to read from any point in the stream.
/**
* Stream auto-configuration
*/
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RabbitStreamTemplate.class)
public class RabbitStreamConfiguration {
/** Configure stream template */
@Bean
@ConditionalOnMissingBean
public RabbitStreamTemplateConfigurer rabbitStreamTemplateConfigurer(RabbitProperties properties);
/** Create stream template */
@Bean
@ConditionalOnMissingBean
public RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment,
RabbitStreamTemplateConfigurer configurer);
}Primary interface for producing messages to RabbitMQ streams with support for message routing and producer confirmation.
/**
* Template for RabbitMQ Stream operations
*/
public class RabbitStreamTemplate implements RabbitStreamOperations {
/** Send message to stream */
public MessageBuilder send(Message message);
/** Send message with routing key */
public MessageBuilder send(Message message, String routingKey);
/** Convert and send object */
public MessageBuilder convertAndSend(Object message);
/** Convert and send with routing key */
public MessageBuilder convertAndSend(Object message, String routingKey);
/** Convert and send with properties */
public MessageBuilder convertAndSend(Object message, MessageProperties properties);
/** Message builder for fluent API */
public interface MessageBuilder {
/** Set routing key */
MessageBuilder to(String routingKey);
/** Set message properties */
MessageBuilder withProperties(MessageProperties properties);
/** Send synchronously */
void send();
/** Send asynchronously */
CompletableFuture<Void> sendAsync();
}
}Stream Producer Example:
import org.springframework.amqp.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class StreamProducerService {
@Autowired
private RabbitStreamTemplate streamTemplate;
public void publishEvent(String streamName, Object event) {
// Send to stream with routing key
streamTemplate.convertAndSend(event, streamName);
}
public CompletableFuture<Void> publishEventAsync(String streamName, Object event) {
// Asynchronous send
return streamTemplate.convertAndSend(event)
.to(streamName)
.sendAsync();
}
public void publishWithCustomProperties(String streamName, Object event, String correlationId) {
MessageProperties properties = new MessageProperties();
properties.setCorrelationId(correlationId);
properties.setTimestamp(new Date());
streamTemplate.convertAndSend(event, properties)
.to(streamName)
.send();
}
}Consumer configuration for reading messages from RabbitMQ streams with offset management and replay capabilities.
/**
* Stream listener annotation
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RabbitStreamListener {
/** Stream name to consume from */
String[] queues() default {};
/** Consumer group for offset management */
String group() default "";
/** Offset specification (first, last, timestamp, offset) */
String offset() default "";
/** Container factory */
String containerFactory() default "";
/** Auto startup */
String autoStartup() default "";
/** Concurrency */
String concurrency() default "";
}
/**
* Stream message context for manual offset management
*/
public class StreamMessageContext {
/** Get message offset */
public long getOffset();
/** Get stream name */
public String getStream();
/** Get timestamp */
public long getTimestamp();
/** Manual acknowledgment */
public void ack();
}Stream Consumer Examples:
import org.springframework.amqp.rabbit.annotation.RabbitStreamListener;
import org.springframework.stereotype.Component;
@Component
public class StreamConsumers {
// Basic stream consumer
@RabbitStreamListener(queues = "events.stream")
public void handleStreamEvent(String message) {
System.out.println("Received stream message: " + message);
}
// Consumer with group (for offset management)
@RabbitStreamListener(queues = "user.events", group = "analytics-service")
public void handleUserEvents(UserEvent event) {
// Process user event
analyticsService.processUserEvent(event);
}
// Consumer starting from beginning
@RabbitStreamListener(queues = "audit.stream", offset = "first")
public void replayAuditEvents(AuditEvent event) {
// Replay all audit events from beginning
auditProcessor.reprocess(event);
}
// Consumer starting from specific timestamp
@RabbitStreamListener(queues = "transactions.stream",
offset = "timestamp:2023-01-01T00:00:00Z")
public void processTransactionsFrom(TransactionEvent transaction) {
// Process transactions from specific date
transactionProcessor.process(transaction);
}
// Manual offset management
@RabbitStreamListener(queues = "critical.stream")
public void handleCriticalEvents(String message, StreamMessageContext context) {
try {
processCriticalMessage(message);
// Manually acknowledge after successful processing
context.ack();
} catch (Exception e) {
// Don't ack on error - message will be redelivered to other consumers
log.error("Failed to process message at offset {}", context.getOffset(), e);
}
}
}Configuration properties specific to RabbitMQ Streams under the spring.rabbitmq.stream prefix.
/**
* Stream-specific configuration properties
*/
public static class Stream {
/** Stream host (defaults to main RabbitMQ host) */
private String host;
/** Stream port (default: 5552) */
private int port = 5552;
/** Stream username */
private String username;
/** Stream password */
private String password;
/** Stream name pattern */
private String name;
/** Stream environment configuration */
private final Environment environment = new Environment();
/**
* Stream environment configuration
*/
public static class Environment {
/** Maximum frame size */
private DataSize maxFrameSize;
/** Heartbeat interval */
private Duration heartbeat;
/** Connection timeout */
private Duration connectionTimeout;
/** Recovery back-off delay */
private Duration recoveryBackOffDelay;
/** Topology recovery enabled */
private Boolean topologyRecovery;
}
}Stream Configuration Example:
spring:
rabbitmq:
# Main RabbitMQ connection
host: rabbitmq.example.com
username: myapp
password: ${RABBITMQ_PASSWORD}
# Stream-specific configuration
stream:
host: ${spring.rabbitmq.host} # Use same host
port: 5552
username: ${spring.rabbitmq.username}
password: ${spring.rabbitmq.password}
environment:
max-frame-size: 1MB
heartbeat: 60s
connection-timeout: 30s
recovery-back-off-delay: 5s
topology-recovery: trueFactory configuration for stream listener containers with performance tuning options.
/**
* Stream container factory configuration
*/
public static class StreamContainer {
/** Auto startup */
private boolean autoStartup = true;
/** Native listener */
private boolean nativeListener;
/** Retry configuration */
private final ListenerRetry retry = new ListenerRetry();
}
/**
* Stream listener container factory
*/
@Bean
@ConditionalOnMissingBean
public StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(
Environment rabbitStreamEnvironment,
StreamRabbitListenerContainerFactoryConfigurer configurer) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory();
configurer.configure(factory, rabbitStreamEnvironment);
return factory;
}Administrative operations for managing streams, including creation, deletion, and metadata queries.
/**
* Stream administrative operations
*/
public interface StreamAdmin {
/** Create a stream */
void createStream(String name, StreamCreationOptions options);
/** Delete a stream */
void deleteStream(String name);
/** Check if stream exists */
boolean streamExists(String name);
/** Get stream metadata */
StreamMetadata getStreamMetadata(String name);
/** Get stream statistics */
StreamStatistics getStreamStatistics(String name);
}
/**
* Stream creation options
*/
public class StreamCreationOptions {
/** Maximum age of messages */
private Duration maxAge;
/** Maximum size of stream */
private DataSize maxLength;
/** Maximum segment size */
private DataSize maxSegmentSize;
/** Leader locator strategy */
private String leaderLocator;
/** Initial cluster size */
private int initialClusterSize;
}Stream Administration Example:
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class StreamAdminConfig {
@Bean
public StreamAdmin streamAdmin(Environment environment) {
return new StreamAdmin(environment);
}
@PostConstruct
public void setupStreams() {
// Create streams with retention policies
StreamCreationOptions options = new StreamCreationOptions()
.maxAge(Duration.ofDays(7)) // Keep messages for 7 days
.maxLength(DataSize.ofGigabytes(10)) // Max 10GB
.maxSegmentSize(DataSize.ofMegabytes(500)); // 500MB segments
if (!streamAdmin.streamExists("events.stream")) {
streamAdmin.createStream("events.stream", options);
}
if (!streamAdmin.streamExists("audit.stream")) {
// Audit stream with longer retention
StreamCreationOptions auditOptions = new StreamCreationOptions()
.maxAge(Duration.ofDays(365)) // Keep for 1 year
.maxLength(DataSize.ofGigabytes(100));
streamAdmin.createStream("audit.stream", auditOptions);
}
}
}Configuration and best practices for optimal stream performance.
/**
* Stream performance configuration
*/
@Configuration
public class StreamPerformanceConfig {
@Bean
@Primary
public RabbitStreamTemplate optimizedStreamTemplate(Environment environment) {
RabbitStreamTemplate template = new RabbitStreamTemplate(environment);
// Configure for high throughput
template.setProducerCustomizer(producer -> {
producer.batchSize(100); // Batch messages
producer.batchPublishingDelay(Duration.ofMillis(10)); // Small delay for batching
producer.compression(CompressionType.GZIP); // Compress messages
});
return template;
}
@Bean
public StreamRabbitListenerContainerFactory highThroughputContainerFactory(Environment environment) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory();
factory.setEnvironment(environment);
// Configure for high throughput consumption
factory.setConsumerCustomizer(consumer -> {
consumer.offset(OffsetSpecification.last()); // Start from latest
consumer.manualTrackingStrategy(); // Manual offset tracking
});
return factory;
}
}Spring Boot provides interfaces for customizing the RabbitMQ Stream environment.
/**
* Callback interface for customizing the Stream EnvironmentBuilder
*/
@FunctionalInterface
public interface EnvironmentBuilderCustomizer {
/** Customize the EnvironmentBuilder */
void customize(EnvironmentBuilder builder);
}
/**
* Callback interface for customizing Stream producers
*/
@FunctionalInterface
public interface ProducerCustomizer {
/** Customize a stream producer */
void customize(Producer producer);
}
/**
* Callback interface for customizing Stream consumers
*/
@FunctionalInterface
public interface ConsumerCustomizer {
/** Customize a stream consumer */
void customize(Consumer consumer);
}Environment Customization Example:
import org.springframework.boot.autoconfigure.amqp.EnvironmentBuilderCustomizer;
import org.springframework.rabbit.stream.producer.ProducerCustomizer;
@Configuration
public class StreamEnvironmentConfig {
@Bean
public EnvironmentBuilderCustomizer environmentBuilderCustomizer() {
return builder -> {
builder.lazyInitialization(true)
.addressResolver(address -> address) // Custom address resolution
.recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixedWithInitialDelayDelayPolicy(
Duration.ofSeconds(1), Duration.ofSeconds(10)));
};
}
@Bean
public ProducerCustomizer producerCustomizer() {
return producer -> {
producer.batchSize(50)
.batchPublishingDelay(Duration.ofMillis(100))
.maxUnconfirmedMessages(1000);
};
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-boot--spring-boot-starter-amqp