CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-cloud--spring-cloud-stream

A framework for building message-driven microservice applications on Spring Boot with Spring Integration.

Pending
Overview
Eval results
Files

configuration.mddocs/

Configuration System

Spring Cloud Stream's configuration system provides comprehensive property-based configuration for bindings, binders, and Spring Boot auto-configuration integration. It allows fine-grained control over message processing behavior, middleware connections, and application lifecycle management.

Capabilities

Binding Service Properties

Main configuration properties class for the entire Spring Cloud Stream application.

/**
 * Main configuration properties for Spring Cloud Stream binding service.
 * Contains binder configurations, binding properties, and global settings.
 */
@ConfigurationProperties("spring.cloud.stream")
public class BindingServiceProperties implements ApplicationContextAware {
    
    /**
     * Default binder to use when no specific binder is configured.
     */
    private String defaultBinder;
    
    /**
     * Configuration for available binders keyed by binder name.
     */
    private Map<String, BinderProperties> binders = new HashMap<>();
    
    /**
     * Configuration for individual bindings keyed by binding name.
     */
    private Map<String, BindingProperties> bindings = new HashMap<>();
    
    /**
     * Number of deployed instances of the application.
     */
    private int instanceCount = 1;
    
    /**
     * Instance index of this application (0-based).
     */
    private int instanceIndex = 0;
    
    /**
     * Whether to allow dynamic destination creation.
     */
    private boolean dynamicDestinations = true;
    
    /**
     * Cache size for destination information.
     */
    private int bindingRetryInterval = 30;
    
    public String getDefaultBinder();
    public void setDefaultBinder(String defaultBinder);
    
    public Map<String, BinderProperties> getBinders();
    public void setBinders(Map<String, BinderProperties> binders);
    
    public Map<String, BindingProperties> getBindings();
    public void setBindings(Map<String, BindingProperties> bindings);
    
    public int getInstanceCount();
    public void setInstanceCount(int instanceCount);
    
    public int getInstanceIndex();
    public void setInstanceIndex(int instanceIndex);
    
    public boolean isDynamicDestinations();
    public void setDynamicDestinations(boolean dynamicDestinations);
    
    public int getBindingRetryInterval();
    public void setBindingRetryInterval(int bindingRetryInterval);
    
    /**
     * Get binding properties for a specific binding name.
     * @param bindingName the binding name
     * @return binding properties or default if not found
     */
    public BindingProperties getBindingProperties(String bindingName);
    
    /**
     * Get binder properties for a specific binder name.
     * @param binderName the binder name
     * @return binder properties or null if not found
     */
    public BinderProperties getBinderProperties(String binderName);
    
    public void setApplicationContext(ApplicationContext applicationContext);
}

Binding Properties

Configuration properties for individual message bindings.

/**
 * Properties for individual message bindings.
 * Controls destination, content type, and consumer/producer behavior.
 */
public class BindingProperties implements Cloneable {
    
    /**
     * The logical destination name (topic, queue, etc.).
     */
    private String destination;
    
    /**
     * Consumer group for this binding.
     */
    private String group;
    
    /**
     * Content type for message serialization/deserialization.
     */
    private String contentType;
    
    /**
     * Specific binder to use for this binding.
     */
    private String binder;
    
    /**
     * Consumer-specific properties.
     */
    private ConsumerProperties consumer = new ConsumerProperties();
    
    /**
     * Producer-specific properties.
     */
    private ProducerProperties producer = new ProducerProperties();
    
    public String getDestination();
    public void setDestination(String destination);
    
    public String getGroup();
    public void setGroup(String group);
    
    public String getContentType();
    public void setContentType(String contentType);
    
    public String getBinder();
    public void setBinder(String binder);
    
    public ConsumerProperties getConsumer();
    public void setConsumer(ConsumerProperties consumer);
    
    public ProducerProperties getProducer();
    public void setProducer(ProducerProperties producer);
    
    public BindingProperties clone();
}

Binder Properties

Configuration properties for message binders (middleware connections).

/**
 * Configuration properties for individual binders.
 * Defines how to connect to specific middleware systems.
 */
public class BinderProperties {
    
    /**
     * The binder type (e.g., "kafka", "rabbit").
     */
    private String type;
    
    /**
     * Environment properties specific to this binder.
     */
    private Map<String, Object> environment = new HashMap<>();
    
    /**
     * Whether this binder should inherit the parent environment.
     */
    private boolean inheritEnvironment = true;
    
    /**
     * Whether this binder is a default candidate for auto-selection.
     */
    private boolean defaultCandidate = true;
    
    public String getType();
    public void setType(String type);
    
    public Map<String, Object> getEnvironment();
    public void setEnvironment(Map<String, Object> environment);
    
    public boolean isInheritEnvironment();
    public void setInheritEnvironment(boolean inheritEnvironment);
    
    public boolean isDefaultCandidate();
    public void setDefaultCandidate(boolean defaultCandidate);
}

Consumer Properties

Configuration properties for message consumers.

/**
 * Configuration properties for message consumers.
 * Controls retry behavior, concurrency, and partitioning.
 */
public class ConsumerProperties implements Cloneable {
    
    /**
     * Maximum number of retry attempts for failed messages.
     */
    private int maxAttempts = 3;
    
    /**
     * Initial backoff interval for retries (milliseconds).
     */
    private int backOffInitialInterval = 1000;
    
    /**
     * Maximum backoff interval for retries (milliseconds).
     */
    private int backOffMaxInterval = 10000;
    
    /**
     * Backoff multiplier for exponential backoff.
     */
    private double backOffMultiplier = 2.0;
    
    /**
     * Whether failed messages should be retried by default.
     */
    private boolean defaultRetryable = true;
    
    /**
     * Number of concurrent consumer threads.
     */
    private int concurrency = 1;
    
    /**
     * Whether this consumer supports partitioned data.
     */
    private boolean partitioned = false;
    
    /**
     * List of partition indexes this instance should consume from.
     */
    private Integer[] instanceIndexList;
    
    /**
     * How message headers should be handled.
     */
    private HeaderMode headerMode = HeaderMode.embeddedHeaders;
    
    /**
     * Whether to use native decoding instead of message converters.
     */
    private boolean useNativeDecoding = false;
    
    /**
     * Whether to multiplex multiple consumers on a single connection.
     */
    private boolean multiplex = false;
    
    public int getMaxAttempts();
    public void setMaxAttempts(int maxAttempts);
    
    public int getBackOffInitialInterval();
    public void setBackOffInitialInterval(int backOffInitialInterval);
    
    public int getBackOffMaxInterval();
    public void setBackOffMaxInterval(int backOffMaxInterval);
    
    public double getBackOffMultiplier();
    public void setBackOffMultiplier(double backOffMultiplier);
    
    public boolean isDefaultRetryable();
    public void setDefaultRetryable(boolean defaultRetryable);
    
    public int getConcurrency();
    public void setConcurrency(int concurrency);
    
    public boolean isPartitioned();
    public void setPartitioned(boolean partitioned);
    
    public Integer[] getInstanceIndexList();
    public void setInstanceIndexList(Integer[] instanceIndexList);
    
    public HeaderMode getHeaderMode();
    public void setHeaderMode(HeaderMode headerMode);
    
    public boolean isUseNativeDecoding();
    public void setUseNativeDecoding(boolean useNativeDecoding);
    
    public boolean isMultiplex();
    public void setMultiplex(boolean multiplex);
    
    public ConsumerProperties clone();
}

Producer Properties

Configuration properties for message producers.

/**
 * Configuration properties for message producers.
 * Controls partitioning, error handling, and synchronization behavior.
 */
public class ProducerProperties implements Cloneable {
    
    /**
     * Number of partitions for the target destination.
     */
    private int partitionCount = 1;
    
    /**
     * SpEL expression for extracting partition key from messages.
     */
    private String partitionKeyExpression;
    
    /**
     * Bean name of partition key extractor strategy.
     */
    private String partitionKeyExtractorName;
    
    /**
     * Bean name of partition selector strategy.
     */
    private String partitionSelectorName;
    
    /**
     * SpEL expression for selecting partition.
     */
    private String partitionSelectorExpression;
    
    /**
     * Whether this producer should partition data.
     */
    private boolean partitioned = false;
    
    /**
     * Consumer groups that must exist before messages are sent.
     */
    private RequiredGroups requiredGroups = new RequiredGroups();
    
    /**
     * How message headers should be handled.
     */
    private HeaderMode headerMode = HeaderMode.embeddedHeaders;
    
    /**
     * Whether to use native encoding instead of message converters.
     */
    private boolean useNativeEncoding = false;
    
    /**
     * Whether to create an error channel for send failures.
     */
    private boolean errorChannelEnabled = false;
    
    /**
     * Whether message sending should be synchronous.
     */
    private boolean sync = false;
    
    public int getPartitionCount();
    public void setPartitionCount(int partitionCount);
    
    public String getPartitionKeyExpression();
    public void setPartitionKeyExpression(String partitionKeyExpression);
    
    public String getPartitionKeyExtractorName();
    public void setPartitionKeyExtractorName(String partitionKeyExtractorName);
    
    public String getPartitionSelectorName();
    public void setPartitionSelectorName(String partitionSelectorName);
    
    public String getPartitionSelectorExpression();
    public void setPartitionSelectorExpression(String partitionSelectorExpression);
    
    public boolean isPartitioned();
    public void setPartitioned(boolean partitioned);
    
    public RequiredGroups getRequiredGroups();
    public void setRequiredGroups(RequiredGroups requiredGroups);
    
    public HeaderMode getHeaderMode();
    public void setHeaderMode(HeaderMode headerMode);
    
    public boolean isUseNativeEncoding();
    public void setUseNativeEncoding(boolean useNativeEncoding);
    
    public boolean isErrorChannelEnabled();
    public void setErrorChannelEnabled(boolean errorChannelEnabled);
    
    public boolean isSync();
    public void setSync(boolean sync);
    
    public ProducerProperties clone();
}

Spring Integration Properties

Configuration properties specific to Spring Integration components.

/**
 * Configuration properties for Spring Integration components.
 */
@ConfigurationProperties("spring.cloud.stream.integration")
public class SpringIntegrationProperties {
    
    /**
     * Properties for message handler configuration.
     */
    private MessageHandlerProperties messageHandlerNotPropagatedHeaders = new MessageHandlerProperties();
    
    /**
     * Default poller configuration.
     */
    private PollerProperties poller = new PollerProperties();
    
    public MessageHandlerProperties getMessageHandlerNotPropagatedHeaders();
    public void setMessageHandlerNotPropagatedHeaders(MessageHandlerProperties messageHandlerNotPropagatedHeaders);
    
    public PollerProperties getPoller();
    public void setPoller(PollerProperties poller);
    
    /**
     * Properties for message handler configuration.
     */
    public static class MessageHandlerProperties {
        
        private String[] notPropagatedHeaders = new String[0];
        
        public String[] getNotPropagatedHeaders();
        public void setNotPropagatedHeaders(String[] notPropagatedHeaders);
    }
    
    /**
     * Properties for poller configuration.
     */
    public static class PollerProperties {
        
        private long fixedDelay = 1000;
        private long maxMessagesPerPoll = 1;
        private String cron;
        private String initialDelay;
        private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
        
        public long getFixedDelay();
        public void setFixedDelay(long fixedDelay);
        
        public long getMaxMessagesPerPoll();
        public void setMaxMessagesPerPoll(long maxMessagesPerPoll);
        
        public String getCron();
        public void setCron(String cron);
        
        public String getInitialDelay();
        public void setInitialDelay(String initialDelay);
        
        public TimeUnit getTimeUnit();
        public void setTimeUnit(TimeUnit timeUnit);
    }
}

Auto-Configuration Classes

Spring Boot auto-configuration classes for automatic setup.

/**
 * Auto-configuration for binder factory and related beans.
 */
@Configuration
@EnableConfigurationProperties({BindingServiceProperties.class, SpringIntegrationProperties.class})
@Import({ContentTypeConfiguration.class, SpelExpressionConverterConfiguration.class})
public class BinderFactoryAutoConfiguration {
    
    /**
     * Creates the default binder factory.
     * @return configured BinderFactory
     */
    @Bean
    @ConditionalOnMissingBean(BinderFactory.class)
    public BinderFactory binderFactory();
    
    /**
     * Creates the default binder type registry.
     * @return configured BinderTypeRegistry
     */
    @Bean
    @ConditionalOnMissingBean(BinderTypeRegistry.class)
    public BinderTypeRegistry binderTypeRegistry();
    
    /**
     * Creates stream function properties.
     * @return configured StreamFunctionProperties
     */
    @Bean
    @ConditionalOnMissingBean
    public StreamFunctionProperties streamFunctionProperties();
    
    /**
     * Creates message handler method factory.
     * @return configured MessageHandlerMethodFactory
     */
    @Bean
    @ConditionalOnMissingBean(MessageHandlerMethodFactory.class)
    public MessageHandlerMethodFactory messageHandlerMethodFactory();
}

/**
 * Auto-configuration for binding service and related components.
 */
@Configuration
@EnableConfigurationProperties(BindingServiceProperties.class)
@Import(BinderFactoryAutoConfiguration.class)
public class BindingServiceConfiguration {
    
    /**
     * Creates the central binding service.
     * @return configured BindingService
     */
    @Bean
    @ConditionalOnMissingBean
    public BindingService bindingService();
    
    /**
     * Creates binding lifecycle controller.
     * @return configured BindingsLifecycleController
     */
    @Bean
    @ConditionalOnMissingBean
    public BindingsLifecycleController bindingsLifecycleController();
    
    /**
     * Creates composite message channel configurer.
     * @return configured CompositeMessageChannelConfigurer
     */
    @Bean
    @ConditionalOnMissingBean(MessageChannelConfigurer.class)
    public CompositeMessageChannelConfigurer compositeMessageChannelConfigurer();
}

/**
 * Auto-configuration for binder health indicators.
 */
@Configuration
@ConditionalOnClass(HealthIndicator.class)
@AutoConfigureAfter(BindingServiceConfiguration.class)
public class BindersHealthIndicatorAutoConfiguration {
    
    /**
     * Creates health indicator for binders.
     * @return configured BindersHealthIndicator
     */
    @Bean
    @ConditionalOnMissingBean(name = "bindersHealthIndicator")
    public BindersHealthIndicator bindersHealthIndicator();
}

/**
 * Auto-configuration for bindings actuator endpoint.
 */
@Configuration
@ConditionalOnClass(Endpoint.class)
@AutoConfigureAfter(BindingServiceConfiguration.class)
public class BindingsEndpointAutoConfiguration {
    
    /**
     * Creates bindings actuator endpoint.
     * @return configured BindingsEndpoint
     */
    @Bean
    @ConditionalOnMissingBean
    public BindingsEndpoint bindingsEndpoint();
}

/**
 * Auto-configuration for channels actuator endpoint.
 */
@Configuration
@ConditionalOnClass(Endpoint.class)
public class ChannelsEndpointAutoConfiguration {
    
    /**
     * Creates channels actuator endpoint.
     * @return configured ChannelsEndpoint
     */
    @Bean
    @ConditionalOnMissingBean
    public ChannelsEndpoint channelsEndpoint();
}

Content Type Configuration

Configuration for content type handling and message conversion.

/**
 * Configuration for content type handling.
 */
@Configuration
public class ContentTypeConfiguration {
    
    /**
     * Creates composite message converter factory.
     * @return configured CompositeMessageConverterFactory
     */
    @Bean
    @ConditionalOnMissingBean
    public CompositeMessageConverterFactory compositeMessageConverterFactory();
    
    /**
     * Creates message converter utils.
     * @return configured MessageConverterUtils
     */
    @Bean
    @ConditionalOnMissingBean
    public MessageConverterUtils messageConverterUtils();
}

/**
 * Configuration for SpEL expression converters.
 */
@Configuration
public class SpelExpressionConverterConfiguration {
    
    /**
     * Creates SpEL expression converter.
     * @return configured Converter
     */
    @Bean
    @ConditionalOnMissingBean
    public Converter<String, Expression> spelExpressionConverter();
}

Customizer Interfaces

Interfaces for customizing various components during configuration.

/**
 * Customizer for message sources.
 * @param <T> the message source type
 */
public interface MessageSourceCustomizer<T> {
    
    /**
     * Customize a message source.
     * @param source the message source to customize
     * @param destinationName the destination name
     * @param group the consumer group
     */
    void customize(T source, String destinationName, String group);
}

/**
 * Customizer for producer message handlers.
 * @param <H> the message handler type
 */
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {
    
    /**
     * Customize a producer message handler.
     * @param handler the message handler to customize
     * @param destinationName the destination name
     */
    void customize(H handler, String destinationName);
}

/**
 * Customizer for consumer endpoints.
 * @param <E> the endpoint type
 */
public interface ConsumerEndpointCustomizer<E extends MessageProducer> {
    
    /**
     * Customize a consumer endpoint.
     * @param endpoint the endpoint to customize
     * @param destinationName the destination name
     * @param group the consumer group
     */
    void customize(E endpoint, String destinationName, String group);
}

/**
 * Customizer for listener containers.
 * @param <T> the container type
 */
public interface ListenerContainerCustomizer<T> {
    
    /**
     * Customize a listener container.
     * @param container the container to customize
     * @param destinationName the destination name
     * @param group the consumer group
     */
    void customize(T container, String destinationName, String group);
}

Environment Post Processors

Post processors for modifying the Spring environment during application startup.

/**
 * Environment post processor for poller configuration.
 */
public class PollerConfigEnvironmentPostProcessor implements EnvironmentPostProcessor {
    
    /**
     * Post process the environment to add poller configuration.
     * @param environment the environment to modify
     * @param application the Spring application
     */
    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application);
}

Utility Classes

Utility classes for configuration handling.

/**
 * Represents required consumer groups for a producer.
 */
public class RequiredGroups {
    
    private String[] groups = new String[0];
    
    public String[] getGroups();
    public void setGroups(String[] groups);
}

/**
 * Advice for binding handlers.
 */
public class BindingHandlerAdvise {
    
    /**
     * Apply advice to binding handler methods.
     * @param method the method being advised
     * @param args the method arguments
     * @return the advised result
     */
    public Object invoke(Method method, Object[] args);
}

Usage Examples:

// Configuration using properties
# application.yml
spring:
  cloud:
    stream:
      default-binder: kafka
      instance-count: 3
      instance-index: 0
      dynamic-destinations: true
      
      binders:
        kafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092,localhost:9093
                      auto-create-topics: true
        rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                
      bindings:
        input:
          destination: my-input-topic
          group: my-group
          binder: kafka
          content-type: application/json
          consumer:
            max-attempts: 5
            back-off-initial-interval: 2000
            back-off-multiplier: 2.0
            concurrency: 2
            partitioned: true
            instance-index-list: [0, 1]
            
        output:
          destination: my-output-topic
          binder: rabbit
          content-type: application/json
          producer:
            partition-count: 3
            partition-key-expression: payload.userId
            partitioned: true
            required-groups: [audit-group, analytics-group]
            sync: true

// Programmatic configuration
@Configuration
public class StreamConfiguration {
    
    @Bean
    @ConfigurationProperties("custom.stream")
    public BindingServiceProperties customBindingProperties() {
        return new BindingServiceProperties();
    }
    
    @Bean
    public MessageSourceCustomizer<KafkaMessageSource> kafkaMessageSourceCustomizer() {
        return (source, destination, group) -> {
            // Custom configuration for Kafka message sources
            source.setGroupId(group + "-custom");
        };
    }
    
    @Bean
    public ProducerMessageHandlerCustomizer<MessageHandler> producerCustomizer() {
        return (handler, destination) -> {
            // Custom configuration for producers
            if (handler instanceof KafkaProducerMessageHandler) {
                KafkaProducerMessageHandler kafkaHandler = (KafkaProducerMessageHandler) handler;
                kafkaHandler.setSync(true);
            }
        };
    }
    
    @Bean
    public ConsumerEndpointCustomizer<MessageProducer> consumerCustomizer() {
        return (endpoint, destination, group) -> {
            // Custom configuration for consumers
            if (endpoint instanceof KafkaMessageDrivenChannelAdapter) {
                KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) endpoint;
                adapter.setRecoveryInterval(10000);
            }
        };
    }
}

// Dynamic configuration
@Component
public class DynamicConfigurationService {
    
    private final BindingServiceProperties bindingProperties;
    
    public DynamicConfigurationService(BindingServiceProperties bindingProperties) {
        this.bindingProperties = bindingProperties;
    }
    
    public void addDynamicBinding(String bindingName, String destination, String group) {
        BindingProperties binding = new BindingProperties();
        binding.setDestination(destination);
        binding.setGroup(group);
        binding.setContentType("application/json");
        
        ConsumerProperties consumer = new ConsumerProperties();
        consumer.setMaxAttempts(3);
        consumer.setConcurrency(2);
        binding.setConsumer(consumer);
        
        bindingProperties.getBindings().put(bindingName, binding);
    }
    
    public void configureBinder(String binderName, String type, Map<String, Object> environment) {
        BinderProperties binder = new BinderProperties();
        binder.setType(type);
        binder.setEnvironment(environment);
        binder.setInheritEnvironment(true);
        binder.setDefaultCandidate(true);
        
        bindingProperties.getBinders().put(binderName, binder);
    }
    
    public BindingProperties getBindingConfig(String bindingName) {
        return bindingProperties.getBindingProperties(bindingName);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-cloud--spring-cloud-stream

docs

actuator-integration.md

binder-framework.md

binding-management.md

configuration.md

function-support.md

index.md

message-conversion.md

tile.json