A framework for building message-driven microservice applications on Spring Boot with Spring Integration.
—
Spring Cloud Stream's configuration system provides comprehensive property-based configuration for bindings, binders, and Spring Boot auto-configuration integration. It allows fine-grained control over message processing behavior, middleware connections, and application lifecycle management.
Main configuration properties class for the entire Spring Cloud Stream application.
/**
* Main configuration properties for Spring Cloud Stream binding service.
* Contains binder configurations, binding properties, and global settings.
*/
@ConfigurationProperties("spring.cloud.stream")
public class BindingServiceProperties implements ApplicationContextAware {
/**
* Default binder to use when no specific binder is configured.
*/
private String defaultBinder;
/**
* Configuration for available binders keyed by binder name.
*/
private Map<String, BinderProperties> binders = new HashMap<>();
/**
* Configuration for individual bindings keyed by binding name.
*/
private Map<String, BindingProperties> bindings = new HashMap<>();
/**
* Number of deployed instances of the application.
*/
private int instanceCount = 1;
/**
* Instance index of this application (0-based).
*/
private int instanceIndex = 0;
/**
* Whether to allow dynamic destination creation.
*/
private boolean dynamicDestinations = true;
/**
* Cache size for destination information.
*/
private int bindingRetryInterval = 30;
public String getDefaultBinder();
public void setDefaultBinder(String defaultBinder);
public Map<String, BinderProperties> getBinders();
public void setBinders(Map<String, BinderProperties> binders);
public Map<String, BindingProperties> getBindings();
public void setBindings(Map<String, BindingProperties> bindings);
public int getInstanceCount();
public void setInstanceCount(int instanceCount);
public int getInstanceIndex();
public void setInstanceIndex(int instanceIndex);
public boolean isDynamicDestinations();
public void setDynamicDestinations(boolean dynamicDestinations);
public int getBindingRetryInterval();
public void setBindingRetryInterval(int bindingRetryInterval);
/**
* Get binding properties for a specific binding name.
* @param bindingName the binding name
* @return binding properties or default if not found
*/
public BindingProperties getBindingProperties(String bindingName);
/**
* Get binder properties for a specific binder name.
* @param binderName the binder name
* @return binder properties or null if not found
*/
public BinderProperties getBinderProperties(String binderName);
public void setApplicationContext(ApplicationContext applicationContext);
}Configuration properties for individual message bindings.
/**
* Properties for individual message bindings.
* Controls destination, content type, and consumer/producer behavior.
*/
public class BindingProperties implements Cloneable {
/**
* The logical destination name (topic, queue, etc.).
*/
private String destination;
/**
* Consumer group for this binding.
*/
private String group;
/**
* Content type for message serialization/deserialization.
*/
private String contentType;
/**
* Specific binder to use for this binding.
*/
private String binder;
/**
* Consumer-specific properties.
*/
private ConsumerProperties consumer = new ConsumerProperties();
/**
* Producer-specific properties.
*/
private ProducerProperties producer = new ProducerProperties();
public String getDestination();
public void setDestination(String destination);
public String getGroup();
public void setGroup(String group);
public String getContentType();
public void setContentType(String contentType);
public String getBinder();
public void setBinder(String binder);
public ConsumerProperties getConsumer();
public void setConsumer(ConsumerProperties consumer);
public ProducerProperties getProducer();
public void setProducer(ProducerProperties producer);
public BindingProperties clone();
}Configuration properties for message binders (middleware connections).
/**
* Configuration properties for individual binders.
* Defines how to connect to specific middleware systems.
*/
public class BinderProperties {
/**
* The binder type (e.g., "kafka", "rabbit").
*/
private String type;
/**
* Environment properties specific to this binder.
*/
private Map<String, Object> environment = new HashMap<>();
/**
* Whether this binder should inherit the parent environment.
*/
private boolean inheritEnvironment = true;
/**
* Whether this binder is a default candidate for auto-selection.
*/
private boolean defaultCandidate = true;
public String getType();
public void setType(String type);
public Map<String, Object> getEnvironment();
public void setEnvironment(Map<String, Object> environment);
public boolean isInheritEnvironment();
public void setInheritEnvironment(boolean inheritEnvironment);
public boolean isDefaultCandidate();
public void setDefaultCandidate(boolean defaultCandidate);
}Configuration properties for message consumers.
/**
* Configuration properties for message consumers.
* Controls retry behavior, concurrency, and partitioning.
*/
public class ConsumerProperties implements Cloneable {
/**
* Maximum number of retry attempts for failed messages.
*/
private int maxAttempts = 3;
/**
* Initial backoff interval for retries (milliseconds).
*/
private int backOffInitialInterval = 1000;
/**
* Maximum backoff interval for retries (milliseconds).
*/
private int backOffMaxInterval = 10000;
/**
* Backoff multiplier for exponential backoff.
*/
private double backOffMultiplier = 2.0;
/**
* Whether failed messages should be retried by default.
*/
private boolean defaultRetryable = true;
/**
* Number of concurrent consumer threads.
*/
private int concurrency = 1;
/**
* Whether this consumer supports partitioned data.
*/
private boolean partitioned = false;
/**
* List of partition indexes this instance should consume from.
*/
private Integer[] instanceIndexList;
/**
* How message headers should be handled.
*/
private HeaderMode headerMode = HeaderMode.embeddedHeaders;
/**
* Whether to use native decoding instead of message converters.
*/
private boolean useNativeDecoding = false;
/**
* Whether to multiplex multiple consumers on a single connection.
*/
private boolean multiplex = false;
public int getMaxAttempts();
public void setMaxAttempts(int maxAttempts);
public int getBackOffInitialInterval();
public void setBackOffInitialInterval(int backOffInitialInterval);
public int getBackOffMaxInterval();
public void setBackOffMaxInterval(int backOffMaxInterval);
public double getBackOffMultiplier();
public void setBackOffMultiplier(double backOffMultiplier);
public boolean isDefaultRetryable();
public void setDefaultRetryable(boolean defaultRetryable);
public int getConcurrency();
public void setConcurrency(int concurrency);
public boolean isPartitioned();
public void setPartitioned(boolean partitioned);
public Integer[] getInstanceIndexList();
public void setInstanceIndexList(Integer[] instanceIndexList);
public HeaderMode getHeaderMode();
public void setHeaderMode(HeaderMode headerMode);
public boolean isUseNativeDecoding();
public void setUseNativeDecoding(boolean useNativeDecoding);
public boolean isMultiplex();
public void setMultiplex(boolean multiplex);
public ConsumerProperties clone();
}Configuration properties for message producers.
/**
* Configuration properties for message producers.
* Controls partitioning, error handling, and synchronization behavior.
*/
public class ProducerProperties implements Cloneable {
/**
* Number of partitions for the target destination.
*/
private int partitionCount = 1;
/**
* SpEL expression for extracting partition key from messages.
*/
private String partitionKeyExpression;
/**
* Bean name of partition key extractor strategy.
*/
private String partitionKeyExtractorName;
/**
* Bean name of partition selector strategy.
*/
private String partitionSelectorName;
/**
* SpEL expression for selecting partition.
*/
private String partitionSelectorExpression;
/**
* Whether this producer should partition data.
*/
private boolean partitioned = false;
/**
* Consumer groups that must exist before messages are sent.
*/
private RequiredGroups requiredGroups = new RequiredGroups();
/**
* How message headers should be handled.
*/
private HeaderMode headerMode = HeaderMode.embeddedHeaders;
/**
* Whether to use native encoding instead of message converters.
*/
private boolean useNativeEncoding = false;
/**
* Whether to create an error channel for send failures.
*/
private boolean errorChannelEnabled = false;
/**
* Whether message sending should be synchronous.
*/
private boolean sync = false;
public int getPartitionCount();
public void setPartitionCount(int partitionCount);
public String getPartitionKeyExpression();
public void setPartitionKeyExpression(String partitionKeyExpression);
public String getPartitionKeyExtractorName();
public void setPartitionKeyExtractorName(String partitionKeyExtractorName);
public String getPartitionSelectorName();
public void setPartitionSelectorName(String partitionSelectorName);
public String getPartitionSelectorExpression();
public void setPartitionSelectorExpression(String partitionSelectorExpression);
public boolean isPartitioned();
public void setPartitioned(boolean partitioned);
public RequiredGroups getRequiredGroups();
public void setRequiredGroups(RequiredGroups requiredGroups);
public HeaderMode getHeaderMode();
public void setHeaderMode(HeaderMode headerMode);
public boolean isUseNativeEncoding();
public void setUseNativeEncoding(boolean useNativeEncoding);
public boolean isErrorChannelEnabled();
public void setErrorChannelEnabled(boolean errorChannelEnabled);
public boolean isSync();
public void setSync(boolean sync);
public ProducerProperties clone();
}Configuration properties specific to Spring Integration components.
/**
* Configuration properties for Spring Integration components.
*/
@ConfigurationProperties("spring.cloud.stream.integration")
public class SpringIntegrationProperties {
/**
* Properties for message handler configuration.
*/
private MessageHandlerProperties messageHandlerNotPropagatedHeaders = new MessageHandlerProperties();
/**
* Default poller configuration.
*/
private PollerProperties poller = new PollerProperties();
public MessageHandlerProperties getMessageHandlerNotPropagatedHeaders();
public void setMessageHandlerNotPropagatedHeaders(MessageHandlerProperties messageHandlerNotPropagatedHeaders);
public PollerProperties getPoller();
public void setPoller(PollerProperties poller);
/**
* Properties for message handler configuration.
*/
public static class MessageHandlerProperties {
private String[] notPropagatedHeaders = new String[0];
public String[] getNotPropagatedHeaders();
public void setNotPropagatedHeaders(String[] notPropagatedHeaders);
}
/**
* Properties for poller configuration.
*/
public static class PollerProperties {
private long fixedDelay = 1000;
private long maxMessagesPerPoll = 1;
private String cron;
private String initialDelay;
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
public long getFixedDelay();
public void setFixedDelay(long fixedDelay);
public long getMaxMessagesPerPoll();
public void setMaxMessagesPerPoll(long maxMessagesPerPoll);
public String getCron();
public void setCron(String cron);
public String getInitialDelay();
public void setInitialDelay(String initialDelay);
public TimeUnit getTimeUnit();
public void setTimeUnit(TimeUnit timeUnit);
}
}Spring Boot auto-configuration classes for automatic setup.
/**
* Auto-configuration for binder factory and related beans.
*/
@Configuration
@EnableConfigurationProperties({BindingServiceProperties.class, SpringIntegrationProperties.class})
@Import({ContentTypeConfiguration.class, SpelExpressionConverterConfiguration.class})
public class BinderFactoryAutoConfiguration {
/**
* Creates the default binder factory.
* @return configured BinderFactory
*/
@Bean
@ConditionalOnMissingBean(BinderFactory.class)
public BinderFactory binderFactory();
/**
* Creates the default binder type registry.
* @return configured BinderTypeRegistry
*/
@Bean
@ConditionalOnMissingBean(BinderTypeRegistry.class)
public BinderTypeRegistry binderTypeRegistry();
/**
* Creates stream function properties.
* @return configured StreamFunctionProperties
*/
@Bean
@ConditionalOnMissingBean
public StreamFunctionProperties streamFunctionProperties();
/**
* Creates message handler method factory.
* @return configured MessageHandlerMethodFactory
*/
@Bean
@ConditionalOnMissingBean(MessageHandlerMethodFactory.class)
public MessageHandlerMethodFactory messageHandlerMethodFactory();
}
/**
* Auto-configuration for binding service and related components.
*/
@Configuration
@EnableConfigurationProperties(BindingServiceProperties.class)
@Import(BinderFactoryAutoConfiguration.class)
public class BindingServiceConfiguration {
/**
* Creates the central binding service.
* @return configured BindingService
*/
@Bean
@ConditionalOnMissingBean
public BindingService bindingService();
/**
* Creates binding lifecycle controller.
* @return configured BindingsLifecycleController
*/
@Bean
@ConditionalOnMissingBean
public BindingsLifecycleController bindingsLifecycleController();
/**
* Creates composite message channel configurer.
* @return configured CompositeMessageChannelConfigurer
*/
@Bean
@ConditionalOnMissingBean(MessageChannelConfigurer.class)
public CompositeMessageChannelConfigurer compositeMessageChannelConfigurer();
}
/**
* Auto-configuration for binder health indicators.
*/
@Configuration
@ConditionalOnClass(HealthIndicator.class)
@AutoConfigureAfter(BindingServiceConfiguration.class)
public class BindersHealthIndicatorAutoConfiguration {
/**
* Creates health indicator for binders.
* @return configured BindersHealthIndicator
*/
@Bean
@ConditionalOnMissingBean(name = "bindersHealthIndicator")
public BindersHealthIndicator bindersHealthIndicator();
}
/**
* Auto-configuration for bindings actuator endpoint.
*/
@Configuration
@ConditionalOnClass(Endpoint.class)
@AutoConfigureAfter(BindingServiceConfiguration.class)
public class BindingsEndpointAutoConfiguration {
/**
* Creates bindings actuator endpoint.
* @return configured BindingsEndpoint
*/
@Bean
@ConditionalOnMissingBean
public BindingsEndpoint bindingsEndpoint();
}
/**
* Auto-configuration for channels actuator endpoint.
*/
@Configuration
@ConditionalOnClass(Endpoint.class)
public class ChannelsEndpointAutoConfiguration {
/**
* Creates channels actuator endpoint.
* @return configured ChannelsEndpoint
*/
@Bean
@ConditionalOnMissingBean
public ChannelsEndpoint channelsEndpoint();
}Configuration for content type handling and message conversion.
/**
* Configuration for content type handling.
*/
@Configuration
public class ContentTypeConfiguration {
/**
* Creates composite message converter factory.
* @return configured CompositeMessageConverterFactory
*/
@Bean
@ConditionalOnMissingBean
public CompositeMessageConverterFactory compositeMessageConverterFactory();
/**
* Creates message converter utils.
* @return configured MessageConverterUtils
*/
@Bean
@ConditionalOnMissingBean
public MessageConverterUtils messageConverterUtils();
}
/**
* Configuration for SpEL expression converters.
*/
@Configuration
public class SpelExpressionConverterConfiguration {
/**
* Creates SpEL expression converter.
* @return configured Converter
*/
@Bean
@ConditionalOnMissingBean
public Converter<String, Expression> spelExpressionConverter();
}Interfaces for customizing various components during configuration.
/**
* Customizer for message sources.
* @param <T> the message source type
*/
public interface MessageSourceCustomizer<T> {
/**
* Customize a message source.
* @param source the message source to customize
* @param destinationName the destination name
* @param group the consumer group
*/
void customize(T source, String destinationName, String group);
}
/**
* Customizer for producer message handlers.
* @param <H> the message handler type
*/
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {
/**
* Customize a producer message handler.
* @param handler the message handler to customize
* @param destinationName the destination name
*/
void customize(H handler, String destinationName);
}
/**
* Customizer for consumer endpoints.
* @param <E> the endpoint type
*/
public interface ConsumerEndpointCustomizer<E extends MessageProducer> {
/**
* Customize a consumer endpoint.
* @param endpoint the endpoint to customize
* @param destinationName the destination name
* @param group the consumer group
*/
void customize(E endpoint, String destinationName, String group);
}
/**
* Customizer for listener containers.
* @param <T> the container type
*/
public interface ListenerContainerCustomizer<T> {
/**
* Customize a listener container.
* @param container the container to customize
* @param destinationName the destination name
* @param group the consumer group
*/
void customize(T container, String destinationName, String group);
}Post processors for modifying the Spring environment during application startup.
/**
* Environment post processor for poller configuration.
*/
public class PollerConfigEnvironmentPostProcessor implements EnvironmentPostProcessor {
/**
* Post process the environment to add poller configuration.
* @param environment the environment to modify
* @param application the Spring application
*/
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application);
}Utility classes for configuration handling.
/**
* Represents required consumer groups for a producer.
*/
public class RequiredGroups {
private String[] groups = new String[0];
public String[] getGroups();
public void setGroups(String[] groups);
}
/**
* Advice for binding handlers.
*/
public class BindingHandlerAdvise {
/**
* Apply advice to binding handler methods.
* @param method the method being advised
* @param args the method arguments
* @return the advised result
*/
public Object invoke(Method method, Object[] args);
}Usage Examples:
// Configuration using properties
# application.yml
spring:
cloud:
stream:
default-binder: kafka
instance-count: 3
instance-index: 0
dynamic-destinations: true
binders:
kafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092,localhost:9093
auto-create-topics: true
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
bindings:
input:
destination: my-input-topic
group: my-group
binder: kafka
content-type: application/json
consumer:
max-attempts: 5
back-off-initial-interval: 2000
back-off-multiplier: 2.0
concurrency: 2
partitioned: true
instance-index-list: [0, 1]
output:
destination: my-output-topic
binder: rabbit
content-type: application/json
producer:
partition-count: 3
partition-key-expression: payload.userId
partitioned: true
required-groups: [audit-group, analytics-group]
sync: true
// Programmatic configuration
@Configuration
public class StreamConfiguration {
@Bean
@ConfigurationProperties("custom.stream")
public BindingServiceProperties customBindingProperties() {
return new BindingServiceProperties();
}
@Bean
public MessageSourceCustomizer<KafkaMessageSource> kafkaMessageSourceCustomizer() {
return (source, destination, group) -> {
// Custom configuration for Kafka message sources
source.setGroupId(group + "-custom");
};
}
@Bean
public ProducerMessageHandlerCustomizer<MessageHandler> producerCustomizer() {
return (handler, destination) -> {
// Custom configuration for producers
if (handler instanceof KafkaProducerMessageHandler) {
KafkaProducerMessageHandler kafkaHandler = (KafkaProducerMessageHandler) handler;
kafkaHandler.setSync(true);
}
};
}
@Bean
public ConsumerEndpointCustomizer<MessageProducer> consumerCustomizer() {
return (endpoint, destination, group) -> {
// Custom configuration for consumers
if (endpoint instanceof KafkaMessageDrivenChannelAdapter) {
KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) endpoint;
adapter.setRecoveryInterval(10000);
}
};
}
}
// Dynamic configuration
@Component
public class DynamicConfigurationService {
private final BindingServiceProperties bindingProperties;
public DynamicConfigurationService(BindingServiceProperties bindingProperties) {
this.bindingProperties = bindingProperties;
}
public void addDynamicBinding(String bindingName, String destination, String group) {
BindingProperties binding = new BindingProperties();
binding.setDestination(destination);
binding.setGroup(group);
binding.setContentType("application/json");
ConsumerProperties consumer = new ConsumerProperties();
consumer.setMaxAttempts(3);
consumer.setConcurrency(2);
binding.setConsumer(consumer);
bindingProperties.getBindings().put(bindingName, binding);
}
public void configureBinder(String binderName, String type, Map<String, Object> environment) {
BinderProperties binder = new BinderProperties();
binder.setType(type);
binder.setEnvironment(environment);
binder.setInheritEnvironment(true);
binder.setDefaultCandidate(true);
bindingProperties.getBinders().put(binderName, binder);
}
public BindingProperties getBindingConfig(String bindingName) {
return bindingProperties.getBindingProperties(bindingName);
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-cloud--spring-cloud-stream