or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actuator-integration.mdbinder-framework.mdbinding-management.mdconfiguration.mdfunction-support.mdindex.mdmessage-conversion.md

configuration.mddocs/

0

# Configuration System

1

2

Spring Cloud Stream's configuration system provides comprehensive property-based configuration for bindings, binders, and Spring Boot auto-configuration integration. It allows fine-grained control over message processing behavior, middleware connections, and application lifecycle management.

3

4

## Capabilities

5

6

### Binding Service Properties

7

8

Main configuration properties class for the entire Spring Cloud Stream application.

9

10

```java { .api }

11

/**

12

* Main configuration properties for Spring Cloud Stream binding service.

13

* Contains binder configurations, binding properties, and global settings.

14

*/

15

@ConfigurationProperties("spring.cloud.stream")

16

public class BindingServiceProperties implements ApplicationContextAware {

17

18

/**

19

* Default binder to use when no specific binder is configured.

20

*/

21

private String defaultBinder;

22

23

/**

24

* Configuration for available binders keyed by binder name.

25

*/

26

private Map<String, BinderProperties> binders = new HashMap<>();

27

28

/**

29

* Configuration for individual bindings keyed by binding name.

30

*/

31

private Map<String, BindingProperties> bindings = new HashMap<>();

32

33

/**

34

* Number of deployed instances of the application.

35

*/

36

private int instanceCount = 1;

37

38

/**

39

* Instance index of this application (0-based).

40

*/

41

private int instanceIndex = 0;

42

43

/**

44

* Whether to allow dynamic destination creation.

45

*/

46

private boolean dynamicDestinations = true;

47

48

/**

49

* Cache size for destination information.

50

*/

51

private int bindingRetryInterval = 30;

52

53

public String getDefaultBinder();

54

public void setDefaultBinder(String defaultBinder);

55

56

public Map<String, BinderProperties> getBinders();

57

public void setBinders(Map<String, BinderProperties> binders);

58

59

public Map<String, BindingProperties> getBindings();

60

public void setBindings(Map<String, BindingProperties> bindings);

61

62

public int getInstanceCount();

63

public void setInstanceCount(int instanceCount);

64

65

public int getInstanceIndex();

66

public void setInstanceIndex(int instanceIndex);

67

68

public boolean isDynamicDestinations();

69

public void setDynamicDestinations(boolean dynamicDestinations);

70

71

public int getBindingRetryInterval();

72

public void setBindingRetryInterval(int bindingRetryInterval);

73

74

/**

75

* Get binding properties for a specific binding name.

76

* @param bindingName the binding name

77

* @return binding properties or default if not found

78

*/

79

public BindingProperties getBindingProperties(String bindingName);

80

81

/**

82

* Get binder properties for a specific binder name.

83

* @param binderName the binder name

84

* @return binder properties or null if not found

85

*/

86

public BinderProperties getBinderProperties(String binderName);

87

88

public void setApplicationContext(ApplicationContext applicationContext);

89

}

90

```

91

92

### Binding Properties

93

94

Configuration properties for individual message bindings.

95

96

```java { .api }

97

/**

98

* Properties for individual message bindings.

99

* Controls destination, content type, and consumer/producer behavior.

100

*/

101

public class BindingProperties implements Cloneable {

102

103

/**

104

* The logical destination name (topic, queue, etc.).

105

*/

106

private String destination;

107

108

/**

109

* Consumer group for this binding.

110

*/

111

private String group;

112

113

/**

114

* Content type for message serialization/deserialization.

115

*/

116

private String contentType;

117

118

/**

119

* Specific binder to use for this binding.

120

*/

121

private String binder;

122

123

/**

124

* Consumer-specific properties.

125

*/

126

private ConsumerProperties consumer = new ConsumerProperties();

127

128

/**

129

* Producer-specific properties.

130

*/

131

private ProducerProperties producer = new ProducerProperties();

132

133

public String getDestination();

134

public void setDestination(String destination);

135

136

public String getGroup();

137

public void setGroup(String group);

138

139

public String getContentType();

140

public void setContentType(String contentType);

141

142

public String getBinder();

143

public void setBinder(String binder);

144

145

public ConsumerProperties getConsumer();

146

public void setConsumer(ConsumerProperties consumer);

147

148

public ProducerProperties getProducer();

149

public void setProducer(ProducerProperties producer);

150

151

public BindingProperties clone();

152

}

153

```

154

155

### Binder Properties

156

157

Configuration properties for message binders (middleware connections).

158

159

```java { .api }

160

/**

161

* Configuration properties for individual binders.

162

* Defines how to connect to specific middleware systems.

163

*/

164

public class BinderProperties {

165

166

/**

167

* The binder type (e.g., "kafka", "rabbit").

168

*/

169

private String type;

170

171

/**

172

* Environment properties specific to this binder.

173

*/

174

private Map<String, Object> environment = new HashMap<>();

175

176

/**

177

* Whether this binder should inherit the parent environment.

178

*/

179

private boolean inheritEnvironment = true;

180

181

/**

182

* Whether this binder is a default candidate for auto-selection.

183

*/

184

private boolean defaultCandidate = true;

185

186

public String getType();

187

public void setType(String type);

188

189

public Map<String, Object> getEnvironment();

190

public void setEnvironment(Map<String, Object> environment);

191

192

public boolean isInheritEnvironment();

193

public void setInheritEnvironment(boolean inheritEnvironment);

194

195

public boolean isDefaultCandidate();

196

public void setDefaultCandidate(boolean defaultCandidate);

197

}

198

```

199

200

### Consumer Properties

201

202

Configuration properties for message consumers.

203

204

```java { .api }

205

/**

206

* Configuration properties for message consumers.

207

* Controls retry behavior, concurrency, and partitioning.

208

*/

209

public class ConsumerProperties implements Cloneable {

210

211

/**

212

* Maximum number of retry attempts for failed messages.

213

*/

214

private int maxAttempts = 3;

215

216

/**

217

* Initial backoff interval for retries (milliseconds).

218

*/

219

private int backOffInitialInterval = 1000;

220

221

/**

222

* Maximum backoff interval for retries (milliseconds).

223

*/

224

private int backOffMaxInterval = 10000;

225

226

/**

227

* Backoff multiplier for exponential backoff.

228

*/

229

private double backOffMultiplier = 2.0;

230

231

/**

232

* Whether failed messages should be retried by default.

233

*/

234

private boolean defaultRetryable = true;

235

236

/**

237

* Number of concurrent consumer threads.

238

*/

239

private int concurrency = 1;

240

241

/**

242

* Whether this consumer supports partitioned data.

243

*/

244

private boolean partitioned = false;

245

246

/**

247

* List of partition indexes this instance should consume from.

248

*/

249

private Integer[] instanceIndexList;

250

251

/**

252

* How message headers should be handled.

253

*/

254

private HeaderMode headerMode = HeaderMode.embeddedHeaders;

255

256

/**

257

* Whether to use native decoding instead of message converters.

258

*/

259

private boolean useNativeDecoding = false;

260

261

/**

262

* Whether to multiplex multiple consumers on a single connection.

263

*/

264

private boolean multiplex = false;

265

266

public int getMaxAttempts();

267

public void setMaxAttempts(int maxAttempts);

268

269

public int getBackOffInitialInterval();

270

public void setBackOffInitialInterval(int backOffInitialInterval);

271

272

public int getBackOffMaxInterval();

273

public void setBackOffMaxInterval(int backOffMaxInterval);

274

275

public double getBackOffMultiplier();

276

public void setBackOffMultiplier(double backOffMultiplier);

277

278

public boolean isDefaultRetryable();

279

public void setDefaultRetryable(boolean defaultRetryable);

280

281

public int getConcurrency();

282

public void setConcurrency(int concurrency);

283

284

public boolean isPartitioned();

285

public void setPartitioned(boolean partitioned);

286

287

public Integer[] getInstanceIndexList();

288

public void setInstanceIndexList(Integer[] instanceIndexList);

289

290

public HeaderMode getHeaderMode();

291

public void setHeaderMode(HeaderMode headerMode);

292

293

public boolean isUseNativeDecoding();

294

public void setUseNativeDecoding(boolean useNativeDecoding);

295

296

public boolean isMultiplex();

297

public void setMultiplex(boolean multiplex);

298

299

public ConsumerProperties clone();

300

}

301

```

302

303

### Producer Properties

304

305

Configuration properties for message producers.

306

307

```java { .api }

308

/**

309

* Configuration properties for message producers.

310

* Controls partitioning, error handling, and synchronization behavior.

311

*/

312

public class ProducerProperties implements Cloneable {

313

314

/**

315

* Number of partitions for the target destination.

316

*/

317

private int partitionCount = 1;

318

319

/**

320

* SpEL expression for extracting partition key from messages.

321

*/

322

private String partitionKeyExpression;

323

324

/**

325

* Bean name of partition key extractor strategy.

326

*/

327

private String partitionKeyExtractorName;

328

329

/**

330

* Bean name of partition selector strategy.

331

*/

332

private String partitionSelectorName;

333

334

/**

335

* SpEL expression for selecting partition.

336

*/

337

private String partitionSelectorExpression;

338

339

/**

340

* Whether this producer should partition data.

341

*/

342

private boolean partitioned = false;

343

344

/**

345

* Consumer groups that must exist before messages are sent.

346

*/

347

private RequiredGroups requiredGroups = new RequiredGroups();

348

349

/**

350

* How message headers should be handled.

351

*/

352

private HeaderMode headerMode = HeaderMode.embeddedHeaders;

353

354

/**

355

* Whether to use native encoding instead of message converters.

356

*/

357

private boolean useNativeEncoding = false;

358

359

/**

360

* Whether to create an error channel for send failures.

361

*/

362

private boolean errorChannelEnabled = false;

363

364

/**

365

* Whether message sending should be synchronous.

366

*/

367

private boolean sync = false;

368

369

public int getPartitionCount();

370

public void setPartitionCount(int partitionCount);

371

372

public String getPartitionKeyExpression();

373

public void setPartitionKeyExpression(String partitionKeyExpression);

374

375

public String getPartitionKeyExtractorName();

376

public void setPartitionKeyExtractorName(String partitionKeyExtractorName);

377

378

public String getPartitionSelectorName();

379

public void setPartitionSelectorName(String partitionSelectorName);

380

381

public String getPartitionSelectorExpression();

382

public void setPartitionSelectorExpression(String partitionSelectorExpression);

383

384

public boolean isPartitioned();

385

public void setPartitioned(boolean partitioned);

386

387

public RequiredGroups getRequiredGroups();

388

public void setRequiredGroups(RequiredGroups requiredGroups);

389

390

public HeaderMode getHeaderMode();

391

public void setHeaderMode(HeaderMode headerMode);

392

393

public boolean isUseNativeEncoding();

394

public void setUseNativeEncoding(boolean useNativeEncoding);

395

396

public boolean isErrorChannelEnabled();

397

public void setErrorChannelEnabled(boolean errorChannelEnabled);

398

399

public boolean isSync();

400

public void setSync(boolean sync);

401

402

public ProducerProperties clone();

403

}

404

```

405

406

### Spring Integration Properties

407

408

Configuration properties specific to Spring Integration components.

409

410

```java { .api }

411

/**

412

* Configuration properties for Spring Integration components.

413

*/

414

@ConfigurationProperties("spring.cloud.stream.integration")

415

public class SpringIntegrationProperties {

416

417

/**

418

* Properties for message handler configuration.

419

*/

420

private MessageHandlerProperties messageHandlerNotPropagatedHeaders = new MessageHandlerProperties();

421

422

/**

423

* Default poller configuration.

424

*/

425

private PollerProperties poller = new PollerProperties();

426

427

public MessageHandlerProperties getMessageHandlerNotPropagatedHeaders();

428

public void setMessageHandlerNotPropagatedHeaders(MessageHandlerProperties messageHandlerNotPropagatedHeaders);

429

430

public PollerProperties getPoller();

431

public void setPoller(PollerProperties poller);

432

433

/**

434

* Properties for message handler configuration.

435

*/

436

public static class MessageHandlerProperties {

437

438

private String[] notPropagatedHeaders = new String[0];

439

440

public String[] getNotPropagatedHeaders();

441

public void setNotPropagatedHeaders(String[] notPropagatedHeaders);

442

}

443

444

/**

445

* Properties for poller configuration.

446

*/

447

public static class PollerProperties {

448

449

private long fixedDelay = 1000;

450

private long maxMessagesPerPoll = 1;

451

private String cron;

452

private String initialDelay;

453

private TimeUnit timeUnit = TimeUnit.MILLISECONDS;

454

455

public long getFixedDelay();

456

public void setFixedDelay(long fixedDelay);

457

458

public long getMaxMessagesPerPoll();

459

public void setMaxMessagesPerPoll(long maxMessagesPerPoll);

460

461

public String getCron();

462

public void setCron(String cron);

463

464

public String getInitialDelay();

465

public void setInitialDelay(String initialDelay);

466

467

public TimeUnit getTimeUnit();

468

public void setTimeUnit(TimeUnit timeUnit);

469

}

470

}

471

```

472

473

### Auto-Configuration Classes

474

475

Spring Boot auto-configuration classes for automatic setup.

476

477

```java { .api }

478

/**

479

* Auto-configuration for binder factory and related beans.

480

*/

481

@Configuration

482

@EnableConfigurationProperties({BindingServiceProperties.class, SpringIntegrationProperties.class})

483

@Import({ContentTypeConfiguration.class, SpelExpressionConverterConfiguration.class})

484

public class BinderFactoryAutoConfiguration {

485

486

/**

487

* Creates the default binder factory.

488

* @return configured BinderFactory

489

*/

490

@Bean

491

@ConditionalOnMissingBean(BinderFactory.class)

492

public BinderFactory binderFactory();

493

494

/**

495

* Creates the default binder type registry.

496

* @return configured BinderTypeRegistry

497

*/

498

@Bean

499

@ConditionalOnMissingBean(BinderTypeRegistry.class)

500

public BinderTypeRegistry binderTypeRegistry();

501

502

/**

503

* Creates stream function properties.

504

* @return configured StreamFunctionProperties

505

*/

506

@Bean

507

@ConditionalOnMissingBean

508

public StreamFunctionProperties streamFunctionProperties();

509

510

/**

511

* Creates message handler method factory.

512

* @return configured MessageHandlerMethodFactory

513

*/

514

@Bean

515

@ConditionalOnMissingBean(MessageHandlerMethodFactory.class)

516

public MessageHandlerMethodFactory messageHandlerMethodFactory();

517

}

518

519

/**

520

* Auto-configuration for binding service and related components.

521

*/

522

@Configuration

523

@EnableConfigurationProperties(BindingServiceProperties.class)

524

@Import(BinderFactoryAutoConfiguration.class)

525

public class BindingServiceConfiguration {

526

527

/**

528

* Creates the central binding service.

529

* @return configured BindingService

530

*/

531

@Bean

532

@ConditionalOnMissingBean

533

public BindingService bindingService();

534

535

/**

536

* Creates binding lifecycle controller.

537

* @return configured BindingsLifecycleController

538

*/

539

@Bean

540

@ConditionalOnMissingBean

541

public BindingsLifecycleController bindingsLifecycleController();

542

543

/**

544

* Creates composite message channel configurer.

545

* @return configured CompositeMessageChannelConfigurer

546

*/

547

@Bean

548

@ConditionalOnMissingBean(MessageChannelConfigurer.class)

549

public CompositeMessageChannelConfigurer compositeMessageChannelConfigurer();

550

}

551

552

/**

553

* Auto-configuration for binder health indicators.

554

*/

555

@Configuration

556

@ConditionalOnClass(HealthIndicator.class)

557

@AutoConfigureAfter(BindingServiceConfiguration.class)

558

public class BindersHealthIndicatorAutoConfiguration {

559

560

/**

561

* Creates health indicator for binders.

562

* @return configured BindersHealthIndicator

563

*/

564

@Bean

565

@ConditionalOnMissingBean(name = "bindersHealthIndicator")

566

public BindersHealthIndicator bindersHealthIndicator();

567

}

568

569

/**

570

* Auto-configuration for bindings actuator endpoint.

571

*/

572

@Configuration

573

@ConditionalOnClass(Endpoint.class)

574

@AutoConfigureAfter(BindingServiceConfiguration.class)

575

public class BindingsEndpointAutoConfiguration {

576

577

/**

578

* Creates bindings actuator endpoint.

579

* @return configured BindingsEndpoint

580

*/

581

@Bean

582

@ConditionalOnMissingBean

583

public BindingsEndpoint bindingsEndpoint();

584

}

585

586

/**

587

* Auto-configuration for channels actuator endpoint.

588

*/

589

@Configuration

590

@ConditionalOnClass(Endpoint.class)

591

public class ChannelsEndpointAutoConfiguration {

592

593

/**

594

* Creates channels actuator endpoint.

595

* @return configured ChannelsEndpoint

596

*/

597

@Bean

598

@ConditionalOnMissingBean

599

public ChannelsEndpoint channelsEndpoint();

600

}

601

```

602

603

### Content Type Configuration

604

605

Configuration for content type handling and message conversion.

606

607

```java { .api }

608

/**

609

* Configuration for content type handling.

610

*/

611

@Configuration

612

public class ContentTypeConfiguration {

613

614

/**

615

* Creates composite message converter factory.

616

* @return configured CompositeMessageConverterFactory

617

*/

618

@Bean

619

@ConditionalOnMissingBean

620

public CompositeMessageConverterFactory compositeMessageConverterFactory();

621

622

/**

623

* Creates message converter utils.

624

* @return configured MessageConverterUtils

625

*/

626

@Bean

627

@ConditionalOnMissingBean

628

public MessageConverterUtils messageConverterUtils();

629

}

630

631

/**

632

* Configuration for SpEL expression converters.

633

*/

634

@Configuration

635

public class SpelExpressionConverterConfiguration {

636

637

/**

638

* Creates SpEL expression converter.

639

* @return configured Converter

640

*/

641

@Bean

642

@ConditionalOnMissingBean

643

public Converter<String, Expression> spelExpressionConverter();

644

}

645

```

646

647

### Customizer Interfaces

648

649

Interfaces for customizing various components during configuration.

650

651

```java { .api }

652

/**

653

* Customizer for message sources.

654

* @param <T> the message source type

655

*/

656

public interface MessageSourceCustomizer<T> {

657

658

/**

659

* Customize a message source.

660

* @param source the message source to customize

661

* @param destinationName the destination name

662

* @param group the consumer group

663

*/

664

void customize(T source, String destinationName, String group);

665

}

666

667

/**

668

* Customizer for producer message handlers.

669

* @param <H> the message handler type

670

*/

671

public interface ProducerMessageHandlerCustomizer<H extends MessageHandler> {

672

673

/**

674

* Customize a producer message handler.

675

* @param handler the message handler to customize

676

* @param destinationName the destination name

677

*/

678

void customize(H handler, String destinationName);

679

}

680

681

/**

682

* Customizer for consumer endpoints.

683

* @param <E> the endpoint type

684

*/

685

public interface ConsumerEndpointCustomizer<E extends MessageProducer> {

686

687

/**

688

* Customize a consumer endpoint.

689

* @param endpoint the endpoint to customize

690

* @param destinationName the destination name

691

* @param group the consumer group

692

*/

693

void customize(E endpoint, String destinationName, String group);

694

}

695

696

/**

697

* Customizer for listener containers.

698

* @param <T> the container type

699

*/

700

public interface ListenerContainerCustomizer<T> {

701

702

/**

703

* Customize a listener container.

704

* @param container the container to customize

705

* @param destinationName the destination name

706

* @param group the consumer group

707

*/

708

void customize(T container, String destinationName, String group);

709

}

710

```

711

712

### Environment Post Processors

713

714

Post processors for modifying the Spring environment during application startup.

715

716

```java { .api }

717

/**

718

* Environment post processor for poller configuration.

719

*/

720

public class PollerConfigEnvironmentPostProcessor implements EnvironmentPostProcessor {

721

722

/**

723

* Post process the environment to add poller configuration.

724

* @param environment the environment to modify

725

* @param application the Spring application

726

*/

727

public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application);

728

}

729

```

730

731

### Utility Classes

732

733

Utility classes for configuration handling.

734

735

```java { .api }

736

/**

737

* Represents required consumer groups for a producer.

738

*/

739

public class RequiredGroups {

740

741

private String[] groups = new String[0];

742

743

public String[] getGroups();

744

public void setGroups(String[] groups);

745

}

746

747

/**

748

* Advice for binding handlers.

749

*/

750

public class BindingHandlerAdvise {

751

752

/**

753

* Apply advice to binding handler methods.

754

* @param method the method being advised

755

* @param args the method arguments

756

* @return the advised result

757

*/

758

public Object invoke(Method method, Object[] args);

759

}

760

```

761

762

**Usage Examples:**

763

764

```java

765

// Configuration using properties

766

# application.yml

767

spring:

768

cloud:

769

stream:

770

default-binder: kafka

771

instance-count: 3

772

instance-index: 0

773

dynamic-destinations: true

774

775

binders:

776

kafka:

777

type: kafka

778

environment:

779

spring:

780

cloud:

781

stream:

782

kafka:

783

binder:

784

brokers: localhost:9092,localhost:9093

785

auto-create-topics: true

786

rabbit:

787

type: rabbit

788

environment:

789

spring:

790

rabbitmq:

791

host: localhost

792

port: 5672

793

794

bindings:

795

input:

796

destination: my-input-topic

797

group: my-group

798

binder: kafka

799

content-type: application/json

800

consumer:

801

max-attempts: 5

802

back-off-initial-interval: 2000

803

back-off-multiplier: 2.0

804

concurrency: 2

805

partitioned: true

806

instance-index-list: [0, 1]

807

808

output:

809

destination: my-output-topic

810

binder: rabbit

811

content-type: application/json

812

producer:

813

partition-count: 3

814

partition-key-expression: payload.userId

815

partitioned: true

816

required-groups: [audit-group, analytics-group]

817

sync: true

818

819

// Programmatic configuration

820

@Configuration

821

public class StreamConfiguration {

822

823

@Bean

824

@ConfigurationProperties("custom.stream")

825

public BindingServiceProperties customBindingProperties() {

826

return new BindingServiceProperties();

827

}

828

829

@Bean

830

public MessageSourceCustomizer<KafkaMessageSource> kafkaMessageSourceCustomizer() {

831

return (source, destination, group) -> {

832

// Custom configuration for Kafka message sources

833

source.setGroupId(group + "-custom");

834

};

835

}

836

837

@Bean

838

public ProducerMessageHandlerCustomizer<MessageHandler> producerCustomizer() {

839

return (handler, destination) -> {

840

// Custom configuration for producers

841

if (handler instanceof KafkaProducerMessageHandler) {

842

KafkaProducerMessageHandler kafkaHandler = (KafkaProducerMessageHandler) handler;

843

kafkaHandler.setSync(true);

844

}

845

};

846

}

847

848

@Bean

849

public ConsumerEndpointCustomizer<MessageProducer> consumerCustomizer() {

850

return (endpoint, destination, group) -> {

851

// Custom configuration for consumers

852

if (endpoint instanceof KafkaMessageDrivenChannelAdapter) {

853

KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) endpoint;

854

adapter.setRecoveryInterval(10000);

855

}

856

};

857

}

858

}

859

860

// Dynamic configuration

861

@Component

862

public class DynamicConfigurationService {

863

864

private final BindingServiceProperties bindingProperties;

865

866

public DynamicConfigurationService(BindingServiceProperties bindingProperties) {

867

this.bindingProperties = bindingProperties;

868

}

869

870

public void addDynamicBinding(String bindingName, String destination, String group) {

871

BindingProperties binding = new BindingProperties();

872

binding.setDestination(destination);

873

binding.setGroup(group);

874

binding.setContentType("application/json");

875

876

ConsumerProperties consumer = new ConsumerProperties();

877

consumer.setMaxAttempts(3);

878

consumer.setConcurrency(2);

879

binding.setConsumer(consumer);

880

881

bindingProperties.getBindings().put(bindingName, binding);

882

}

883

884

public void configureBinder(String binderName, String type, Map<String, Object> environment) {

885

BinderProperties binder = new BinderProperties();

886

binder.setType(type);

887

binder.setEnvironment(environment);

888

binder.setInheritEnvironment(true);

889

binder.setDefaultCandidate(true);

890

891

bindingProperties.getBinders().put(binderName, binder);

892

}

893

894

public BindingProperties getBindingConfig(String bindingName) {

895

return bindingProperties.getBindingProperties(bindingName);

896

}

897

}

898

```