0
# Configuration System
1
2
Spring Cloud Stream's configuration system provides comprehensive property-based configuration for bindings, binders, and Spring Boot auto-configuration integration. It allows fine-grained control over message processing behavior, middleware connections, and application lifecycle management.
3
4
## Capabilities
5
6
### Binding Service Properties
7
8
Main configuration properties class for the entire Spring Cloud Stream application.
9
10
```java { .api }
11
/**
12
* Main configuration properties for Spring Cloud Stream binding service.
13
* Contains binder configurations, binding properties, and global settings.
14
*/
15
@ConfigurationProperties("spring.cloud.stream")
16
public class BindingServiceProperties implements ApplicationContextAware {
17
18
/**
19
* Default binder to use when no specific binder is configured.
20
*/
21
private String defaultBinder;
22
23
/**
24
* Configuration for available binders keyed by binder name.
25
*/
26
private Map<String, BinderProperties> binders = new HashMap<>();
27
28
/**
29
* Configuration for individual bindings keyed by binding name.
30
*/
31
private Map<String, BindingProperties> bindings = new HashMap<>();
32
33
/**
34
* Number of deployed instances of the application.
35
*/
36
private int instanceCount = 1;
37
38
/**
39
* Instance index of this application (0-based).
40
*/
41
private int instanceIndex = 0;
42
43
/**
44
* Whether to allow dynamic destination creation.
45
*/
46
private boolean dynamicDestinations = true;
47
48
/**
49
* Cache size for destination information.
50
*/
51
private int bindingRetryInterval = 30;
52
53
public String getDefaultBinder();
54
public void setDefaultBinder(String defaultBinder);
55
56
public Map<String, BinderProperties> getBinders();
57
public void setBinders(Map<String, BinderProperties> binders);
58
59
public Map<String, BindingProperties> getBindings();
60
public void setBindings(Map<String, BindingProperties> bindings);
61
62
public int getInstanceCount();
63
public void setInstanceCount(int instanceCount);
64
65
public int getInstanceIndex();
66
public void setInstanceIndex(int instanceIndex);
67
68
public boolean isDynamicDestinations();
69
public void setDynamicDestinations(boolean dynamicDestinations);
70
71
public int getBindingRetryInterval();
72
public void setBindingRetryInterval(int bindingRetryInterval);
73
74
/**
75
* Get binding properties for a specific binding name.
76
* @param bindingName the binding name
77
* @return binding properties or default if not found
78
*/
79
public BindingProperties getBindingProperties(String bindingName);
80
81
/**
82
* Get binder properties for a specific binder name.
83
* @param binderName the binder name
84
* @return binder properties or null if not found
85
*/
86
public BinderProperties getBinderProperties(String binderName);
87
88
public void setApplicationContext(ApplicationContext applicationContext);
89
}
90
```
91
92
### Binding Properties
93
94
Configuration properties for individual message bindings.
95
96
```java { .api }
97
/**
98
* Properties for individual message bindings.
99
* Controls destination, content type, and consumer/producer behavior.
100
*/
101
public class BindingProperties implements Cloneable {
102
103
/**
104
* The logical destination name (topic, queue, etc.).
105
*/
106
private String destination;
107
108
/**
109
* Consumer group for this binding.
110
*/
111
private String group;
112
113
/**
114
* Content type for message serialization/deserialization.
115
*/
116
private String contentType;
117
118
/**
119
* Specific binder to use for this binding.
120
*/
121
private String binder;
122
123
/**
124
* Consumer-specific properties.
125
*/
126
private ConsumerProperties consumer = new ConsumerProperties();
127
128
/**
129
* Producer-specific properties.
130
*/
131
private ProducerProperties producer = new ProducerProperties();
132
133
public String getDestination();
134
public void setDestination(String destination);
135
136
public String getGroup();
137
public void setGroup(String group);
138
139
public String getContentType();
140
public void setContentType(String contentType);
141
142
public String getBinder();
143
public void setBinder(String binder);
144
145
public ConsumerProperties getConsumer();
146
public void setConsumer(ConsumerProperties consumer);
147
148
public ProducerProperties getProducer();
149
public void setProducer(ProducerProperties producer);
150
151
public BindingProperties clone();
152
}
153
```
154
155
### Binder Properties
156
157
Configuration properties for message binders (middleware connections).
158
159
```java { .api }
160
/**
161
* Configuration properties for individual binders.
162
* Defines how to connect to specific middleware systems.
163
*/
164
public class BinderProperties {
165
166
/**
167
* The binder type (e.g., "kafka", "rabbit").
168
*/
169
private String type;
170
171
/**
172
* Environment properties specific to this binder.
173
*/
174
private Map<String, Object> environment = new HashMap<>();
175
176
/**
177
* Whether this binder should inherit the parent environment.
178
*/
179
private boolean inheritEnvironment = true;
180
181
/**
182
* Whether this binder is a default candidate for auto-selection.
183
*/
184
private boolean defaultCandidate = true;
185
186
public String getType();
187
public void setType(String type);
188
189
public Map<String, Object> getEnvironment();
190
public void setEnvironment(Map<String, Object> environment);
191
192
public boolean isInheritEnvironment();
193
public void setInheritEnvironment(boolean inheritEnvironment);
194
195
public boolean isDefaultCandidate();
196
public void setDefaultCandidate(boolean defaultCandidate);
197
}
198
```
199
200
### Consumer Properties
201
202
Configuration properties for message consumers.
203
204
```java { .api }
205
/**
206
* Configuration properties for message consumers.
207
* Controls retry behavior, concurrency, and partitioning.
208
*/
209
public class ConsumerProperties implements Cloneable {
210
211
/**
212
* Maximum number of retry attempts for failed messages.
213
*/
214
private int maxAttempts = 3;
215
216
/**
217
* Initial backoff interval for retries (milliseconds).
218
*/
219
private int backOffInitialInterval = 1000;
220
221
/**
222
* Maximum backoff interval for retries (milliseconds).
223
*/
224
private int backOffMaxInterval = 10000;
225
226
/**
227
* Backoff multiplier for exponential backoff.
228
*/
229
private double backOffMultiplier = 2.0;
230
231
/**
232
* Whether failed messages should be retried by default.
233
*/
234
private boolean defaultRetryable = true;
235
236
/**
237
* Number of concurrent consumer threads.
238
*/
239
private int concurrency = 1;
240
241
/**
242
* Whether this consumer supports partitioned data.
243
*/
244
private boolean partitioned = false;
245
246
/**
247
* List of partition indexes this instance should consume from.
248
*/
249
private Integer[] instanceIndexList;
250
251
/**
252
* How message headers should be handled.
253
*/
254
private HeaderMode headerMode = HeaderMode.embeddedHeaders;
255
256
/**
257
* Whether to use native decoding instead of message converters.
258
*/
259
private boolean useNativeDecoding = false;
260
261
/**
262
* Whether to multiplex multiple consumers on a single connection.
263
*/
264
private boolean multiplex = false;
265
266
public int getMaxAttempts();
267
public void setMaxAttempts(int maxAttempts);
268
269
public int getBackOffInitialInterval();
270
public void setBackOffInitialInterval(int backOffInitialInterval);
271
272
public int getBackOffMaxInterval();
273
public void setBackOffMaxInterval(int backOffMaxInterval);
274
275
public double getBackOffMultiplier();
276
public void setBackOffMultiplier(double backOffMultiplier);
277
278
public boolean isDefaultRetryable();
279
public void setDefaultRetryable(boolean defaultRetryable);
280
281
public int getConcurrency();
282
public void setConcurrency(int concurrency);
283
284
public boolean isPartitioned();
285
public void setPartitioned(boolean partitioned);
286
287
public Integer[] getInstanceIndexList();
288
public void setInstanceIndexList(Integer[] instanceIndexList);
289
290
public HeaderMode getHeaderMode();
291
public void setHeaderMode(HeaderMode headerMode);
292
293
public boolean isUseNativeDecoding();
294
public void setUseNativeDecoding(boolean useNativeDecoding);
295
296
public boolean isMultiplex();
297
public void setMultiplex(boolean multiplex);
298
299
public ConsumerProperties clone();
300
}
301
```
302
303
### Producer Properties
304
305
Configuration properties for message producers.
306
307
```java { .api }
308
/**
309
* Configuration properties for message producers.
310
* Controls partitioning, error handling, and synchronization behavior.
311
*/
312
public class ProducerProperties implements Cloneable {
313
314
/**
315
* Number of partitions for the target destination.
316
*/
317
private int partitionCount = 1;
318
319
/**
320
* SpEL expression for extracting partition key from messages.
321
*/
322
private String partitionKeyExpression;
323
324
/**
325
* Bean name of partition key extractor strategy.
326
*/
327
private String partitionKeyExtractorName;
328
329
/**
330
* Bean name of partition selector strategy.
331
*/
332
private String partitionSelectorName;
333
334
/**
335
* SpEL expression for selecting partition.
336
*/
337
private String partitionSelectorExpression;
338
339
/**
340
* Whether this producer should partition data.
341
*/
342
private boolean partitioned = false;
343
344
/**
345
* Consumer groups that must exist before messages are sent.
346
*/
347
private RequiredGroups requiredGroups = new RequiredGroups();
348
349
/**
350
* How message headers should be handled.
351
*/
352
private HeaderMode headerMode = HeaderMode.embeddedHeaders;
353
354
/**
355
* Whether to use native encoding instead of message converters.
356
*/
357
private boolean useNativeEncoding = false;
358
359
/**
360
* Whether to create an error channel for send failures.
361
*/
362
private boolean errorChannelEnabled = false;
363
364
/**
365
* Whether message sending should be synchronous.
366
*/
367
private boolean sync = false;
368
369
public int getPartitionCount();
370
public void setPartitionCount(int partitionCount);
371
372
public String getPartitionKeyExpression();
373
public void setPartitionKeyExpression(String partitionKeyExpression);
374
375
public String getPartitionKeyExtractorName();
376
public void setPartitionKeyExtractorName(String partitionKeyExtractorName);
377
378
public String getPartitionSelectorName();
379
public void setPartitionSelectorName(String partitionSelectorName);
380
381
public String getPartitionSelectorExpression();
382
public void setPartitionSelectorExpression(String partitionSelectorExpression);
383
384
public boolean isPartitioned();
385
public void setPartitioned(boolean partitioned);
386
387
public RequiredGroups getRequiredGroups();
388
public void setRequiredGroups(RequiredGroups requiredGroups);
389
390
public HeaderMode getHeaderMode();
391
public void setHeaderMode(HeaderMode headerMode);
392
393
public boolean isUseNativeEncoding();
394
public void setUseNativeEncoding(boolean useNativeEncoding);
395
396
public boolean isErrorChannelEnabled();
397
public void setErrorChannelEnabled(boolean errorChannelEnabled);
398
399
public boolean isSync();
400
public void setSync(boolean sync);
401
402
public ProducerProperties clone();
403
}
404
```
405
406
### Spring Integration Properties
407
408
Configuration properties specific to Spring Integration components.
409
410
```java { .api }
411
/**
412
* Configuration properties for Spring Integration components.
413
*/
414
@ConfigurationProperties("spring.cloud.stream.integration")
415
public class SpringIntegrationProperties {
416
417
/**
418
* Properties for message handler configuration.
419
*/
420
private MessageHandlerProperties messageHandlerNotPropagatedHeaders = new MessageHandlerProperties();
421
422
/**
423
* Default poller configuration.
424
*/
425
private PollerProperties poller = new PollerProperties();
426
427
public MessageHandlerProperties getMessageHandlerNotPropagatedHeaders();
428
public void setMessageHandlerNotPropagatedHeaders(MessageHandlerProperties messageHandlerNotPropagatedHeaders);
429
430
public PollerProperties getPoller();
431
public void setPoller(PollerProperties poller);
432
433
/**
434
* Properties for message handler configuration.
435
*/
436
public static class MessageHandlerProperties {
437
438
private String[] notPropagatedHeaders = new String[0];
439
440
public String[] getNotPropagatedHeaders();
441
public void setNotPropagatedHeaders(String[] notPropagatedHeaders);
442
}
443
444
/**
445
* Properties for poller configuration.
446
*/
447
public static class PollerProperties {
448
449
private long fixedDelay = 1000;
450
private long maxMessagesPerPoll = 1;
451
private String cron;
452
private String initialDelay;
453
private TimeUnit timeUnit = TimeUnit.MILLISECONDS;
454
455
public long getFixedDelay();
456
public void setFixedDelay(long fixedDelay);
457
458
public long getMaxMessagesPerPoll();
459
public void setMaxMessagesPerPoll(long maxMessagesPerPoll);
460
461
public String getCron();
462
public void setCron(String cron);
463
464
public String getInitialDelay();
465
public void setInitialDelay(String initialDelay);
466
467
public TimeUnit getTimeUnit();
468
public void setTimeUnit(TimeUnit timeUnit);
469
}
470
}
471
```
472
473
### Auto-Configuration Classes
474
475
Spring Boot auto-configuration classes for automatic setup.
476
477
```java { .api }
478
/**
479
* Auto-configuration for binder factory and related beans.
480
*/
481
@Configuration
482
@EnableConfigurationProperties({BindingServiceProperties.class, SpringIntegrationProperties.class})
483
@Import({ContentTypeConfiguration.class, SpelExpressionConverterConfiguration.class})
484
public class BinderFactoryAutoConfiguration {
485
486
/**
487
* Creates the default binder factory.
488
* @return configured BinderFactory
489
*/
490
@Bean
491
@ConditionalOnMissingBean(BinderFactory.class)
492
public BinderFactory binderFactory();
493
494
/**
495
* Creates the default binder type registry.
496
* @return configured BinderTypeRegistry
497
*/
498
@Bean
499
@ConditionalOnMissingBean(BinderTypeRegistry.class)
500
public BinderTypeRegistry binderTypeRegistry();
501
502
/**
503
* Creates stream function properties.
504
* @return configured StreamFunctionProperties
505
*/
506
@Bean
507
@ConditionalOnMissingBean
508
public StreamFunctionProperties streamFunctionProperties();
509
510
/**
511
* Creates message handler method factory.
512
* @return configured MessageHandlerMethodFactory
513
*/
514
@Bean
515
@ConditionalOnMissingBean(MessageHandlerMethodFactory.class)
516
public MessageHandlerMethodFactory messageHandlerMethodFactory();
517
}
518
519
/**
520
* Auto-configuration for binding service and related components.
521
*/
522
@Configuration
523
@EnableConfigurationProperties(BindingServiceProperties.class)
524
@Import(BinderFactoryAutoConfiguration.class)
525
public class BindingServiceConfiguration {
526
527
/**
528
* Creates the central binding service.
529
* @return configured BindingService
530
*/
531
@Bean
532
@ConditionalOnMissingBean
533
public BindingService bindingService();
534
535
/**
536
* Creates binding lifecycle controller.
537
* @return configured BindingsLifecycleController
538
*/
539
@Bean
540
@ConditionalOnMissingBean
541
public BindingsLifecycleController bindingsLifecycleController();
542
543
/**
544
* Creates composite message channel configurer.
545
* @return configured CompositeMessageChannelConfigurer
546
*/
547
@Bean
548
@ConditionalOnMissingBean(MessageChannelConfigurer.class)
549
public CompositeMessageChannelConfigurer compositeMessageChannelConfigurer();
550
}
551
552
/**
553
* Auto-configuration for binder health indicators.
554
*/
555
@Configuration
556
@ConditionalOnClass(HealthIndicator.class)
557
@AutoConfigureAfter(BindingServiceConfiguration.class)
558
public class BindersHealthIndicatorAutoConfiguration {
559
560
/**
561
* Creates health indicator for binders.
562
* @return configured BindersHealthIndicator
563
*/
564
@Bean
565
@ConditionalOnMissingBean(name = "bindersHealthIndicator")
566
public BindersHealthIndicator bindersHealthIndicator();
567
}
568
569
/**
570
* Auto-configuration for bindings actuator endpoint.
571
*/
572
@Configuration
573
@ConditionalOnClass(Endpoint.class)
574
@AutoConfigureAfter(BindingServiceConfiguration.class)
575
public class BindingsEndpointAutoConfiguration {
576
577
/**
578
* Creates bindings actuator endpoint.
579
* @return configured BindingsEndpoint
580
*/
581
@Bean
582
@ConditionalOnMissingBean
583
public BindingsEndpoint bindingsEndpoint();
584
}
585
586
/**
587
* Auto-configuration for channels actuator endpoint.
588
*/
589
@Configuration
590
@ConditionalOnClass(Endpoint.class)
591
public class ChannelsEndpointAutoConfiguration {
592
593
/**
594
* Creates channels actuator endpoint.
595
* @return configured ChannelsEndpoint
596
*/
597
@Bean
598
@ConditionalOnMissingBean
599
public ChannelsEndpoint channelsEndpoint();
600
}
601
```
602
603
### Content Type Configuration
604
605
Configuration for content type handling and message conversion.
606
607
```java { .api }
608
/**
609
* Configuration for content type handling.
610
*/
611
@Configuration
612
public class ContentTypeConfiguration {
613
614
/**
615
* Creates composite message converter factory.
616
* @return configured CompositeMessageConverterFactory
617
*/
618
@Bean
619
@ConditionalOnMissingBean
620
public CompositeMessageConverterFactory compositeMessageConverterFactory();
621
622
/**
623
* Creates message converter utils.
624
* @return configured MessageConverterUtils
625
*/
626
@Bean
627
@ConditionalOnMissingBean
628
public MessageConverterUtils messageConverterUtils();
629
}
630
631
/**
632
* Configuration for SpEL expression converters.
633
*/
634
@Configuration
635
public class SpelExpressionConverterConfiguration {
636
637
/**
638
* Creates SpEL expression converter.
639
* @return configured Converter
640
*/
641
@Bean
642
@ConditionalOnMissingBean
643
public Converter<String, Expression> spelExpressionConverter();
644
}
645
```
646
647
### Customizer Interfaces
648
649
Interfaces for customizing various components during configuration.
650
651
```java { .api }
652
/**
653
* Customizer for message sources.
654
* @param <T> the message source type
655
*/
656
public interface MessageSourceCustomizer<T> {
657
658
/**
659
* Customize a message source.
660
* @param source the message source to customize
661
* @param destinationName the destination name
662
* @param group the consumer group
663
*/
664
void customize(T source, String destinationName, String group);
665
}
666
667
/**
668
* Customizer for producer message handlers.
669
* @param <H> the message handler type
670
*/
671
public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {
672
673
/**
674
* Customize a producer message handler.
675
* @param handler the message handler to customize
676
* @param destinationName the destination name
677
*/
678
void customize(H handler, String destinationName);
679
}
680
681
/**
682
* Customizer for consumer endpoints.
683
* @param <E> the endpoint type
684
*/
685
public interface ConsumerEndpointCustomizer<E extends MessageProducer> {
686
687
/**
688
* Customize a consumer endpoint.
689
* @param endpoint the endpoint to customize
690
* @param destinationName the destination name
691
* @param group the consumer group
692
*/
693
void customize(E endpoint, String destinationName, String group);
694
}
695
696
/**
697
* Customizer for listener containers.
698
* @param <T> the container type
699
*/
700
public interface ListenerContainerCustomizer<T> {
701
702
/**
703
* Customize a listener container.
704
* @param container the container to customize
705
* @param destinationName the destination name
706
* @param group the consumer group
707
*/
708
void customize(T container, String destinationName, String group);
709
}
710
```
711
712
### Environment Post Processors
713
714
Post processors for modifying the Spring environment during application startup.
715
716
```java { .api }
717
/**
718
* Environment post processor for poller configuration.
719
*/
720
public class PollerConfigEnvironmentPostProcessor implements EnvironmentPostProcessor {
721
722
/**
723
* Post process the environment to add poller configuration.
724
* @param environment the environment to modify
725
* @param application the Spring application
726
*/
727
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application);
728
}
729
```
730
731
### Utility Classes
732
733
Utility classes for configuration handling.
734
735
```java { .api }
736
/**
737
* Represents required consumer groups for a producer.
738
*/
739
public class RequiredGroups {
740
741
private String[] groups = new String[0];
742
743
public String[] getGroups();
744
public void setGroups(String[] groups);
745
}
746
747
/**
748
* Advice for binding handlers.
749
*/
750
public class BindingHandlerAdvise {
751
752
/**
753
* Apply advice to binding handler methods.
754
* @param method the method being advised
755
* @param args the method arguments
756
* @return the advised result
757
*/
758
public Object invoke(Method method, Object[] args);
759
}
760
```
761
762
**Usage Examples:**
763
764
```java
765
// Configuration using properties
766
# application.yml
767
spring:
768
cloud:
769
stream:
770
default-binder: kafka
771
instance-count: 3
772
instance-index: 0
773
dynamic-destinations: true
774
775
binders:
776
kafka:
777
type: kafka
778
environment:
779
spring:
780
cloud:
781
stream:
782
kafka:
783
binder:
784
brokers: localhost:9092,localhost:9093
785
auto-create-topics: true
786
rabbit:
787
type: rabbit
788
environment:
789
spring:
790
rabbitmq:
791
host: localhost
792
port: 5672
793
794
bindings:
795
input:
796
destination: my-input-topic
797
group: my-group
798
binder: kafka
799
content-type: application/json
800
consumer:
801
max-attempts: 5
802
back-off-initial-interval: 2000
803
back-off-multiplier: 2.0
804
concurrency: 2
805
partitioned: true
806
instance-index-list: [0, 1]
807
808
output:
809
destination: my-output-topic
810
binder: rabbit
811
content-type: application/json
812
producer:
813
partition-count: 3
814
partition-key-expression: payload.userId
815
partitioned: true
816
required-groups: [audit-group, analytics-group]
817
sync: true
818
819
// Programmatic configuration
820
@Configuration
821
public class StreamConfiguration {
822
823
@Bean
824
@ConfigurationProperties("custom.stream")
825
public BindingServiceProperties customBindingProperties() {
826
return new BindingServiceProperties();
827
}
828
829
@Bean
830
public MessageSourceCustomizer<KafkaMessageSource> kafkaMessageSourceCustomizer() {
831
return (source, destination, group) -> {
832
// Custom configuration for Kafka message sources
833
source.setGroupId(group + "-custom");
834
};
835
}
836
837
@Bean
838
public ProducerMessageHandlerCustomizer<MessageHandler> producerCustomizer() {
839
return (handler, destination) -> {
840
// Custom configuration for producers
841
if (handler instanceof KafkaProducerMessageHandler) {
842
KafkaProducerMessageHandler kafkaHandler = (KafkaProducerMessageHandler) handler;
843
kafkaHandler.setSync(true);
844
}
845
};
846
}
847
848
@Bean
849
public ConsumerEndpointCustomizer<MessageProducer> consumerCustomizer() {
850
return (endpoint, destination, group) -> {
851
// Custom configuration for consumers
852
if (endpoint instanceof KafkaMessageDrivenChannelAdapter) {
853
KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) endpoint;
854
adapter.setRecoveryInterval(10000);
855
}
856
};
857
}
858
}
859
860
// Dynamic configuration
861
@Component
862
public class DynamicConfigurationService {
863
864
private final BindingServiceProperties bindingProperties;
865
866
public DynamicConfigurationService(BindingServiceProperties bindingProperties) {
867
this.bindingProperties = bindingProperties;
868
}
869
870
public void addDynamicBinding(String bindingName, String destination, String group) {
871
BindingProperties binding = new BindingProperties();
872
binding.setDestination(destination);
873
binding.setGroup(group);
874
binding.setContentType("application/json");
875
876
ConsumerProperties consumer = new ConsumerProperties();
877
consumer.setMaxAttempts(3);
878
consumer.setConcurrency(2);
879
binding.setConsumer(consumer);
880
881
bindingProperties.getBindings().put(bindingName, binding);
882
}
883
884
public void configureBinder(String binderName, String type, Map<String, Object> environment) {
885
BinderProperties binder = new BinderProperties();
886
binder.setType(type);
887
binder.setEnvironment(environment);
888
binder.setInheritEnvironment(true);
889
binder.setDefaultCandidate(true);
890
891
bindingProperties.getBinders().put(binderName, binder);
892
}
893
894
public BindingProperties getBindingConfig(String bindingName) {
895
return bindingProperties.getBindingProperties(bindingName);
896
}
897
}
898
```