CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-boot--spring-boot-starter-amqp

Spring Boot starter for AMQP messaging with RabbitMQ including auto-configuration, connection management, and message processing capabilities

Pending
Overview
Eval results
Files

streams.mddocs/

Stream Support

Auto-configuration for RabbitMQ Streams, providing high-throughput persistent messaging with replay capabilities for building event-driven architectures and streaming data pipelines.

Capabilities

RabbitMQ Streams Overview

RabbitMQ Streams are a persistent messaging protocol designed for high-throughput scenarios where message replay and persistence are important. Unlike traditional AMQP queues, streams maintain message order and allow consumers to read from any point in the stream.

/**
 * Stream auto-configuration
 */
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RabbitStreamTemplate.class)
public class RabbitStreamConfiguration {
    
    /** Configure stream template */
    @Bean
    @ConditionalOnMissingBean
    public RabbitStreamTemplateConfigurer rabbitStreamTemplateConfigurer(RabbitProperties properties);
    
    /** Create stream template */
    @Bean  
    @ConditionalOnMissingBean
    public RabbitStreamTemplate rabbitStreamTemplate(Environment rabbitStreamEnvironment,
        RabbitStreamTemplateConfigurer configurer);
}

Stream Template

Primary interface for producing messages to RabbitMQ streams with support for message routing and producer confirmation.

/**
 * Template for RabbitMQ Stream operations
 */
public class RabbitStreamTemplate implements RabbitStreamOperations {
    
    /** Send message to stream */
    public MessageBuilder send(Message message);
    
    /** Send message with routing key */
    public MessageBuilder send(Message message, String routingKey);
    
    /** Convert and send object */
    public MessageBuilder convertAndSend(Object message);
    
    /** Convert and send with routing key */
    public MessageBuilder convertAndSend(Object message, String routingKey);
    
    /** Convert and send with properties */
    public MessageBuilder convertAndSend(Object message, MessageProperties properties);
    
    /** Message builder for fluent API */
    public interface MessageBuilder {
        /** Set routing key */
        MessageBuilder to(String routingKey);
        
        /** Set message properties */
        MessageBuilder withProperties(MessageProperties properties);
        
        /** Send synchronously */
        void send();
        
        /** Send asynchronously */
        CompletableFuture<Void> sendAsync();
    }
}

Stream Producer Example:

import org.springframework.amqp.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class StreamProducerService {
    
    @Autowired
    private RabbitStreamTemplate streamTemplate;
    
    public void publishEvent(String streamName, Object event) {
        // Send to stream with routing key
        streamTemplate.convertAndSend(event, streamName);
    }
    
    public CompletableFuture<Void> publishEventAsync(String streamName, Object event) {
        // Asynchronous send
        return streamTemplate.convertAndSend(event)
            .to(streamName)
            .sendAsync();
    }
    
    public void publishWithCustomProperties(String streamName, Object event, String correlationId) {
        MessageProperties properties = new MessageProperties();
        properties.setCorrelationId(correlationId);
        properties.setTimestamp(new Date());
        
        streamTemplate.convertAndSend(event, properties)
            .to(streamName)
            .send();
    }
}

Stream Consumers

Consumer configuration for reading messages from RabbitMQ streams with offset management and replay capabilities.

/**
 * Stream listener annotation
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RabbitStreamListener {
    
    /** Stream name to consume from */
    String[] queues() default {};
    
    /** Consumer group for offset management */
    String group() default "";
    
    /** Offset specification (first, last, timestamp, offset) */
    String offset() default "";
    
    /** Container factory */
    String containerFactory() default "";
    
    /** Auto startup */
    String autoStartup() default "";
    
    /** Concurrency */
    String concurrency() default "";
}

/**
 * Stream message context for manual offset management
 */
public class StreamMessageContext {
    
    /** Get message offset */
    public long getOffset();
    
    /** Get stream name */
    public String getStream();
    
    /** Get timestamp */
    public long getTimestamp();
    
    /** Manual acknowledgment */
    public void ack();
}

Stream Consumer Examples:

import org.springframework.amqp.rabbit.annotation.RabbitStreamListener;
import org.springframework.stereotype.Component;

@Component
public class StreamConsumers {
    
    // Basic stream consumer
    @RabbitStreamListener(queues = "events.stream")
    public void handleStreamEvent(String message) {
        System.out.println("Received stream message: " + message);
    }
    
    // Consumer with group (for offset management)
    @RabbitStreamListener(queues = "user.events", group = "analytics-service")
    public void handleUserEvents(UserEvent event) {
        // Process user event
        analyticsService.processUserEvent(event);
    }
    
    // Consumer starting from beginning
    @RabbitStreamListener(queues = "audit.stream", offset = "first")
    public void replayAuditEvents(AuditEvent event) {
        // Replay all audit events from beginning
        auditProcessor.reprocess(event);
    }
    
    // Consumer starting from specific timestamp
    @RabbitStreamListener(queues = "transactions.stream", 
                         offset = "timestamp:2023-01-01T00:00:00Z")
    public void processTransactionsFrom(TransactionEvent transaction) {
        // Process transactions from specific date
        transactionProcessor.process(transaction);
    }
    
    // Manual offset management
    @RabbitStreamListener(queues = "critical.stream")
    public void handleCriticalEvents(String message, StreamMessageContext context) {
        try {
            processCriticalMessage(message);
            // Manually acknowledge after successful processing
            context.ack();
        } catch (Exception e) {
            // Don't ack on error - message will be redelivered to other consumers
            log.error("Failed to process message at offset {}", context.getOffset(), e);
        }
    }
}

Stream Configuration Properties

Configuration properties specific to RabbitMQ Streams under the spring.rabbitmq.stream prefix.

/**
 * Stream-specific configuration properties
 */
public static class Stream {
    
    /** Stream host (defaults to main RabbitMQ host) */
    private String host;
    
    /** Stream port (default: 5552) */
    private int port = 5552;
    
    /** Stream username */
    private String username;
    
    /** Stream password */
    private String password;
    
    /** Stream name pattern */
    private String name;
    
    /** Stream environment configuration */
    private final Environment environment = new Environment();
    
    /**
     * Stream environment configuration
     */
    public static class Environment {
        
        /** Maximum frame size */
        private DataSize maxFrameSize;
        
        /** Heartbeat interval */
        private Duration heartbeat;
        
        /** Connection timeout */
        private Duration connectionTimeout;
        
        /** Recovery back-off delay */
        private Duration recoveryBackOffDelay;
        
        /** Topology recovery enabled */
        private Boolean topologyRecovery;
    }
}

Stream Configuration Example:

spring:
  rabbitmq:
    # Main RabbitMQ connection
    host: rabbitmq.example.com
    username: myapp
    password: ${RABBITMQ_PASSWORD}
    
    # Stream-specific configuration
    stream:
      host: ${spring.rabbitmq.host}  # Use same host
      port: 5552
      username: ${spring.rabbitmq.username}
      password: ${spring.rabbitmq.password}
      environment:
        max-frame-size: 1MB
        heartbeat: 60s
        connection-timeout: 30s
        recovery-back-off-delay: 5s
        topology-recovery: true

Stream Container Factory

Factory configuration for stream listener containers with performance tuning options.

/**
 * Stream container factory configuration
 */
public static class StreamContainer {
    
    /** Auto startup */
    private boolean autoStartup = true;
    
    /** Native listener */
    private boolean nativeListener;
    
    /** Retry configuration */
    private final ListenerRetry retry = new ListenerRetry();
}

/**
 * Stream listener container factory
 */
@Bean
@ConditionalOnMissingBean
public StreamRabbitListenerContainerFactory streamRabbitListenerContainerFactory(
    Environment rabbitStreamEnvironment,
    StreamRabbitListenerContainerFactoryConfigurer configurer) {
    
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory();
    configurer.configure(factory, rabbitStreamEnvironment);
    return factory;
}

Stream Administrative Operations

Administrative operations for managing streams, including creation, deletion, and metadata queries.

/**
 * Stream administrative operations
 */
public interface StreamAdmin {
    
    /** Create a stream */
    void createStream(String name, StreamCreationOptions options);
    
    /** Delete a stream */
    void deleteStream(String name);
    
    /** Check if stream exists */
    boolean streamExists(String name);
    
    /** Get stream metadata */
    StreamMetadata getStreamMetadata(String name);
    
    /** Get stream statistics */
    StreamStatistics getStreamStatistics(String name);
}

/**
 * Stream creation options
 */
public class StreamCreationOptions {
    
    /** Maximum age of messages */
    private Duration maxAge;
    
    /** Maximum size of stream */
    private DataSize maxLength;
    
    /** Maximum segment size */
    private DataSize maxSegmentSize;
    
    /** Leader locator strategy */
    private String leaderLocator;
    
    /** Initial cluster size */
    private int initialClusterSize;
}

Stream Administration Example:

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StreamAdminConfig {
    
    @Bean
    public StreamAdmin streamAdmin(Environment environment) {
        return new StreamAdmin(environment);
    }
    
    @PostConstruct
    public void setupStreams() {
        // Create streams with retention policies
        StreamCreationOptions options = new StreamCreationOptions()
            .maxAge(Duration.ofDays(7))           // Keep messages for 7 days
            .maxLength(DataSize.ofGigabytes(10))  // Max 10GB
            .maxSegmentSize(DataSize.ofMegabytes(500)); // 500MB segments
            
        if (!streamAdmin.streamExists("events.stream")) {
            streamAdmin.createStream("events.stream", options);
        }
        
        if (!streamAdmin.streamExists("audit.stream")) {
            // Audit stream with longer retention
            StreamCreationOptions auditOptions = new StreamCreationOptions()
                .maxAge(Duration.ofDays(365))  // Keep for 1 year
                .maxLength(DataSize.ofGigabytes(100));
            streamAdmin.createStream("audit.stream", auditOptions);
        }
    }
}

Stream Performance Considerations

Configuration and best practices for optimal stream performance.

/**
 * Stream performance configuration
 */
@Configuration
public class StreamPerformanceConfig {
    
    @Bean
    @Primary
    public RabbitStreamTemplate optimizedStreamTemplate(Environment environment) {
        RabbitStreamTemplate template = new RabbitStreamTemplate(environment);
        
        // Configure for high throughput
        template.setProducerCustomizer(producer -> {
            producer.batchSize(100);              // Batch messages
            producer.batchPublishingDelay(Duration.ofMillis(10));  // Small delay for batching
            producer.compression(CompressionType.GZIP);            // Compress messages
        });
        
        return template;
    }
    
    @Bean
    public StreamRabbitListenerContainerFactory highThroughputContainerFactory(Environment environment) {
        StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory();
        factory.setEnvironment(environment);
        
        // Configure for high throughput consumption
        factory.setConsumerCustomizer(consumer -> {
            consumer.offset(OffsetSpecification.last());  // Start from latest
            consumer.manualTrackingStrategy();           // Manual offset tracking
        });
        
        return factory;
    }
}

Stream Environment Customization

Spring Boot provides interfaces for customizing the RabbitMQ Stream environment.

/**
 * Callback interface for customizing the Stream EnvironmentBuilder
 */
@FunctionalInterface
public interface EnvironmentBuilderCustomizer {
    
    /** Customize the EnvironmentBuilder */
    void customize(EnvironmentBuilder builder);
}

/**
 * Callback interface for customizing Stream producers
 */
@FunctionalInterface  
public interface ProducerCustomizer {
    
    /** Customize a stream producer */
    void customize(Producer producer);
}

/**
 * Callback interface for customizing Stream consumers
 */
@FunctionalInterface
public interface ConsumerCustomizer {
    
    /** Customize a stream consumer */
    void customize(Consumer consumer);
}

Environment Customization Example:

import org.springframework.boot.autoconfigure.amqp.EnvironmentBuilderCustomizer;
import org.springframework.rabbit.stream.producer.ProducerCustomizer;

@Configuration
public class StreamEnvironmentConfig {
    
    @Bean
    public EnvironmentBuilderCustomizer environmentBuilderCustomizer() {
        return builder -> {
            builder.lazyInitialization(true)
                .addressResolver(address -> address) // Custom address resolution
                .recoveryBackOffDelayPolicy(BackOffDelayPolicy.fixedWithInitialDelayDelayPolicy(
                    Duration.ofSeconds(1), Duration.ofSeconds(10)));
        };
    }
    
    @Bean
    public ProducerCustomizer producerCustomizer() {
        return producer -> {
            producer.batchSize(50)
                .batchPublishingDelay(Duration.ofMillis(100))
                .maxUnconfirmedMessages(1000);
        };
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-boot--spring-boot-starter-amqp

docs

actuator.md

configuration.md

core-messaging.md

index.md

listeners.md

streams.md

tile.json