or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actuator-integration.mdbinder-framework.mdbinding-management.mdconfiguration.mdfunction-support.mdindex.mdmessage-conversion.md

actuator-integration.mddocs/

0

# Actuator Integration

1

2

Spring Cloud Stream's actuator integration provides Spring Boot Actuator endpoints for monitoring and managing Stream applications. This includes health indicators for binder connectivity, management endpoints for controlling bindings, and monitoring capabilities for channels and message flow.

3

4

## Capabilities

5

6

### Bindings Endpoint

7

8

Actuator endpoint for managing binding lifecycle and querying binding states.

9

10

```java { .api }

11

/**

12

* Actuator endpoint for managing message bindings.

13

* Provides operations to start, stop, pause, and resume bindings.

14

*/

15

@Endpoint(id = "bindings")

16

public class BindingsEndpoint implements ApplicationContextAware {

17

18

private ApplicationContext applicationContext;

19

private BindingsLifecycleController bindingsController;

20

21

/**

22

* Query the state of all bindings.

23

* @return map of binding states keyed by binding name

24

*/

25

@ReadOperation

26

public Map<String, List<BindingInformation>> queryStates();

27

28

/**

29

* Query the state of a specific binding.

30

* @param name the binding name

31

* @return list of binding information for the named binding

32

*/

33

@ReadOperation

34

public List<BindingInformation> queryState(@Selector String name);

35

36

/**

37

* Change the state of a specific binding.

38

* @param name the binding name

39

* @param state the desired state (STARTED, STOPPED, PAUSED, RESUMED)

40

*/

41

@WriteOperation

42

public void changeState(@Selector String name, State state);

43

44

/**

45

* Get detailed information about a binding.

46

* @param name the binding name

47

* @return detailed binding information

48

*/

49

@ReadOperation

50

public BindingDetails getBindingDetails(@Selector String name);

51

52

public void setApplicationContext(ApplicationContext applicationContext);

53

54

/**

55

* Information about a binding's current state.

56

*/

57

public static class BindingInformation {

58

private final String bindingName;

59

private final State state;

60

private final String group;

61

private final boolean pausable;

62

private final boolean input;

63

private final String binder;

64

65

public BindingInformation(String bindingName, State state, String group, boolean pausable, boolean input, String binder);

66

67

public String getBindingName();

68

public State getState();

69

public String getGroup();

70

public boolean isPausable();

71

public boolean isInput();

72

public String getBinder();

73

}

74

75

/**

76

* Detailed information about a binding.

77

*/

78

public static class BindingDetails {

79

private final String name;

80

private final String destination;

81

private final String group;

82

private final String binder;

83

private final String contentType;

84

private final Map<String, Object> properties;

85

86

public BindingDetails(String name, String destination, String group, String binder, String contentType, Map<String, Object> properties);

87

88

public String getName();

89

public String getDestination();

90

public String getGroup();

91

public String getBinder();

92

public String getContentType();

93

public Map<String, Object> getProperties();

94

}

95

}

96

```

97

98

### Channels Endpoint

99

100

Actuator endpoint for inspecting message channels and their configuration.

101

102

```java { .api }

103

/**

104

* Actuator endpoint for inspecting message channels.

105

* Provides information about channel configuration and statistics.

106

*/

107

@Endpoint(id = "channels")

108

public class ChannelsEndpoint implements ApplicationContextAware {

109

110

private ApplicationContext applicationContext;

111

112

/**

113

* Get information about all message channels.

114

* @return map of channel information keyed by channel name

115

*/

116

@ReadOperation

117

public Map<String, Object> channels();

118

119

/**

120

* Get information about a specific channel.

121

* @param name the channel name

122

* @return channel information

123

*/

124

@ReadOperation

125

public ChannelInformation getChannel(@Selector String name);

126

127

/**

128

* Get statistics for all channels.

129

* @return channel statistics

130

*/

131

@ReadOperation

132

public Map<String, ChannelStatistics> getChannelStatistics();

133

134

/**

135

* Get statistics for a specific channel.

136

* @param name the channel name

137

* @return channel statistics

138

*/

139

@ReadOperation

140

public ChannelStatistics getChannelStatistics(@Selector String name);

141

142

public void setApplicationContext(ApplicationContext applicationContext);

143

144

/**

145

* Information about a message channel.

146

*/

147

public static class ChannelInformation {

148

private final String name;

149

private final String type;

150

private final boolean subscribable;

151

private final boolean pollable;

152

private final int subscriberCount;

153

private final Map<String, Object> properties;

154

155

public ChannelInformation(String name, String type, boolean subscribable, boolean pollable, int subscriberCount, Map<String, Object> properties);

156

157

public String getName();

158

public String getType();

159

public boolean isSubscribable();

160

public boolean isPollable();

161

public int getSubscriberCount();

162

public Map<String, Object> getProperties();

163

}

164

165

/**

166

* Statistics for a message channel.

167

*/

168

public static class ChannelStatistics {

169

private final String name;

170

private final long messagesSent;

171

private final long messagesReceived;

172

private final long sendFailures;

173

private final long receiveFailures;

174

private final double averageProcessingTime;

175

private final long lastMessageTime;

176

177

public ChannelStatistics(String name, long messagesSent, long messagesReceived, long sendFailures, long receiveFailures, double averageProcessingTime, long lastMessageTime);

178

179

public String getName();

180

public long getMessagesSent();

181

public long getMessagesReceived();

182

public long getSendFailures();

183

public long getReceiveFailures();

184

public double getAverageProcessingTime();

185

public long getLastMessageTime();

186

}

187

}

188

```

189

190

### Binders Health Indicator

191

192

Health indicator for monitoring binder connectivity and status.

193

194

```java { .api }

195

/**

196

* Health indicator for Spring Cloud Stream binders.

197

* Monitors connectivity and status of all configured binders.

198

*/

199

public class BindersHealthIndicator implements HealthIndicator, ApplicationContextAware {

200

201

private ApplicationContext applicationContext;

202

private BinderFactory binderFactory;

203

204

/**

205

* Check the health of all binders.

206

* @return health information for all binders

207

*/

208

public Health health();

209

210

/**

211

* Check the health of a specific binder.

212

* @param binderName the binder name

213

* @return health information for the specified binder

214

*/

215

public Health getBinderHealth(String binderName);

216

217

/**

218

* Get detailed health information for all binders.

219

* @return detailed health information

220

*/

221

public Map<String, Health> getDetailedHealth();

222

223

public void setApplicationContext(ApplicationContext applicationContext);

224

225

/**

226

* Health details for a specific binder.

227

*/

228

public static class BinderHealthDetails {

229

private final String binderName;

230

private final String binderType;

231

private final Status status;

232

private final String statusDescription;

233

private final Map<String, Object> details;

234

235

public BinderHealthDetails(String binderName, String binderType, Status status, String statusDescription, Map<String, Object> details);

236

237

public String getBinderName();

238

public String getBinderType();

239

public Status getStatus();

240

public String getStatusDescription();

241

public Map<String, Object> getDetails();

242

}

243

}

244

```

245

246

### Stream Metrics

247

248

Metrics collection and reporting for Stream applications.

249

250

```java { .api }

251

/**

252

* Metrics collector for Spring Cloud Stream applications.

253

* Integrates with Micrometer to provide detailed metrics.

254

*/

255

@Component

256

@ConditionalOnClass(MeterRegistry.class)

257

public class StreamMetrics implements ApplicationContextAware, BeanPostProcessor {

258

259

private final MeterRegistry meterRegistry;

260

private ApplicationContext applicationContext;

261

262

public StreamMetrics(MeterRegistry meterRegistry);

263

264

/**

265

* Record message processing metrics.

266

* @param bindingName the binding name

267

* @param messageCount number of messages processed

268

* @param processingTime time taken to process messages

269

* @param success whether processing was successful

270

*/

271

public void recordMessageProcessing(String bindingName, long messageCount, Duration processingTime, boolean success);

272

273

/**

274

* Record binding state change.

275

* @param bindingName the binding name

276

* @param fromState the previous state

277

* @param toState the new state

278

*/

279

public void recordBindingStateChange(String bindingName, State fromState, State toState);

280

281

/**

282

* Record error metrics.

283

* @param bindingName the binding name

284

* @param errorType the type of error

285

* @param exception the exception that occurred

286

*/

287

public void recordError(String bindingName, String errorType, Throwable exception);

288

289

/**

290

* Get current metrics for a binding.

291

* @param bindingName the binding name

292

* @return binding metrics

293

*/

294

public BindingMetrics getBindingMetrics(String bindingName);

295

296

/**

297

* Get metrics for all bindings.

298

* @return map of binding metrics keyed by binding name

299

*/

300

public Map<String, BindingMetrics> getAllBindingMetrics();

301

302

public void setApplicationContext(ApplicationContext applicationContext);

303

public Object postProcessAfterInitialization(Object bean, String beanName);

304

305

/**

306

* Metrics for a specific binding.

307

*/

308

public static class BindingMetrics {

309

private final String bindingName;

310

private final long totalMessages;

311

private final long successfulMessages;

312

private final long failedMessages;

313

private final double averageProcessingTime;

314

private final long lastMessageTime;

315

private final State currentState;

316

317

public BindingMetrics(String bindingName, long totalMessages, long successfulMessages, long failedMessages, double averageProcessingTime, long lastMessageTime, State currentState);

318

319

public String getBindingName();

320

public long getTotalMessages();

321

public long getSuccessfulMessages();

322

public long getFailedMessages();

323

public double getAverageProcessingTime();

324

public long getLastMessageTime();

325

public State getCurrentState();

326

public double getSuccessRate();

327

}

328

}

329

```

330

331

### Stream Info Contributor

332

333

Info contributor for providing Stream application information in actuator info endpoint.

334

335

```java { .api }

336

/**

337

* Info contributor for Spring Cloud Stream applications.

338

* Provides information about bindings, binders, and configuration.

339

*/

340

@Component

341

@ConditionalOnClass(InfoContributor.class)

342

public class StreamInfoContributor implements InfoContributor, ApplicationContextAware {

343

344

private ApplicationContext applicationContext;

345

private BindingServiceProperties bindingProperties;

346

347

/**

348

* Contribute Stream-specific information to actuator info endpoint.

349

* @param builder the info builder

350

*/

351

public void contribute(Info.Builder builder);

352

353

/**

354

* Get information about configured bindings.

355

* @return binding information

356

*/

357

public Map<String, Object> getBindingInfo();

358

359

/**

360

* Get information about configured binders.

361

* @return binder information

362

*/

363

public Map<String, Object> getBinderInfo();

364

365

/**

366

* Get Stream application configuration details.

367

* @return configuration information

368

*/

369

public Map<String, Object> getConfigurationInfo();

370

371

public void setApplicationContext(ApplicationContext applicationContext);

372

}

373

```

374

375

### Management Operations

376

377

Programmatic management operations for Stream applications.

378

379

```java { .api }

380

/**

381

* Management operations for Spring Cloud Stream applications.

382

* Provides programmatic access to binding and binder management.

383

*/

384

@Component

385

public class StreamManagementOperations implements ApplicationContextAware {

386

387

private ApplicationContext applicationContext;

388

private BindingsLifecycleController bindingsController;

389

private BindingService bindingService;

390

391

/**

392

* Start all bindings in the application.

393

*/

394

public void startAllBindings();

395

396

/**

397

* Stop all bindings in the application.

398

*/

399

public void stopAllBindings();

400

401

/**

402

* Restart all bindings in the application.

403

*/

404

public void restartAllBindings();

405

406

/**

407

* Start specific bindings.

408

* @param bindingNames the names of bindings to start

409

*/

410

public void startBindings(String... bindingNames);

411

412

/**

413

* Stop specific bindings.

414

* @param bindingNames the names of bindings to stop

415

*/

416

public void stopBindings(String... bindingNames);

417

418

/**

419

* Pause specific bindings.

420

* @param bindingNames the names of bindings to pause

421

*/

422

public void pauseBindings(String... bindingNames);

423

424

/**

425

* Resume specific bindings.

426

* @param bindingNames the names of bindings to resume

427

*/

428

public void resumeBindings(String... bindingNames);

429

430

/**

431

* Get the current status of all bindings.

432

* @return binding status information

433

*/

434

public Map<String, BindingStatus> getBindingStatuses();

435

436

/**

437

* Force refresh of binding configurations.

438

*/

439

public void refreshBindingConfigurations();

440

441

/**

442

* Validate binding configurations.

443

* @return validation results

444

*/

445

public ValidationResult validateConfigurations();

446

447

public void setApplicationContext(ApplicationContext applicationContext);

448

449

/**

450

* Status information for a binding.

451

*/

452

public static class BindingStatus {

453

private final String name;

454

private final State state;

455

private final boolean healthy;

456

private final String statusMessage;

457

private final long lastStateChange;

458

459

public BindingStatus(String name, State state, boolean healthy, String statusMessage, long lastStateChange);

460

461

public String getName();

462

public State getState();

463

public boolean isHealthy();

464

public String getStatusMessage();

465

public long getLastStateChange();

466

}

467

468

/**

469

* Results from configuration validation.

470

*/

471

public static class ValidationResult {

472

private final boolean valid;

473

private final List<ValidationError> errors;

474

private final List<ValidationWarning> warnings;

475

476

public ValidationResult(boolean valid, List<ValidationError> errors, List<ValidationWarning> warnings);

477

478

public boolean isValid();

479

public List<ValidationError> getErrors();

480

public List<ValidationWarning> getWarnings();

481

}

482

483

/**

484

* Configuration validation error.

485

*/

486

public static class ValidationError {

487

private final String bindingName;

488

private final String property;

489

private final String message;

490

491

public ValidationError(String bindingName, String property, String message);

492

493

public String getBindingName();

494

public String getProperty();

495

public String getMessage();

496

}

497

498

/**

499

* Configuration validation warning.

500

*/

501

public static class ValidationWarning {

502

private final String bindingName;

503

private final String property;

504

private final String message;

505

506

public ValidationWarning(String bindingName, String property, String message);

507

508

public String getBindingName();

509

public String getProperty();

510

public String getMessage();

511

}

512

}

513

```

514

515

### Auto-Configuration

516

517

Auto-configuration classes for actuator integration.

518

519

```java { .api }

520

/**

521

* Auto-configuration for bindings actuator endpoint.

522

*/

523

@Configuration

524

@ConditionalOnClass({Endpoint.class, BindingsEndpoint.class})

525

@ConditionalOnWebApplication

526

@AutoConfigureAfter(BindingServiceConfiguration.class)

527

public class BindingsEndpointAutoConfiguration {

528

529

/**

530

* Create bindings actuator endpoint.

531

* @param bindingsController the bindings lifecycle controller

532

* @return configured BindingsEndpoint

533

*/

534

@Bean

535

@ConditionalOnMissingBean

536

public BindingsEndpoint bindingsEndpoint(BindingsLifecycleController bindingsController);

537

}

538

539

/**

540

* Auto-configuration for channels actuator endpoint.

541

*/

542

@Configuration

543

@ConditionalOnClass({Endpoint.class, ChannelsEndpoint.class})

544

@ConditionalOnWebApplication

545

public class ChannelsEndpointAutoConfiguration {

546

547

/**

548

* Create channels actuator endpoint.

549

* @return configured ChannelsEndpoint

550

*/

551

@Bean

552

@ConditionalOnMissingBean

553

public ChannelsEndpoint channelsEndpoint();

554

}

555

556

/**

557

* Auto-configuration for binders health indicator.

558

*/

559

@Configuration

560

@ConditionalOnClass({HealthIndicator.class, BindersHealthIndicator.class})

561

@ConditionalOnProperty(name = "management.health.binders.enabled", havingValue = "true", matchIfMissing = true)

562

@AutoConfigureAfter(BindingServiceConfiguration.class)

563

public class BindersHealthIndicatorAutoConfiguration {

564

565

/**

566

* Create binders health indicator.

567

* @param binderFactory the binder factory

568

* @return configured BindersHealthIndicator

569

*/

570

@Bean

571

@ConditionalOnMissingBean(name = "bindersHealthIndicator")

572

public BindersHealthIndicator bindersHealthIndicator(BinderFactory binderFactory);

573

}

574

575

/**

576

* Auto-configuration for Stream metrics.

577

*/

578

@Configuration

579

@ConditionalOnClass({MeterRegistry.class, StreamMetrics.class})

580

@ConditionalOnProperty(name = "management.metrics.enable.stream", havingValue = "true", matchIfMissing = true)

581

public class StreamMetricsAutoConfiguration {

582

583

/**

584

* Create Stream metrics collector.

585

* @param meterRegistry the meter registry

586

* @return configured StreamMetrics

587

*/

588

@Bean

589

@ConditionalOnMissingBean

590

public StreamMetrics streamMetrics(MeterRegistry meterRegistry);

591

}

592

```

593

594

**Usage Examples:**

595

596

```java

597

import org.springframework.cloud.stream.endpoint.*;

598

import org.springframework.boot.actuate.health.Health;

599

import org.springframework.boot.actuate.health.Status;

600

601

// Using bindings endpoint programmatically

602

@RestController

603

@RequestMapping("/admin/stream")

604

public class StreamAdminController {

605

606

private final BindingsEndpoint bindingsEndpoint;

607

private final ChannelsEndpoint channelsEndpoint;

608

private final StreamManagementOperations managementOps;

609

610

public StreamAdminController(BindingsEndpoint bindingsEndpoint,

611

ChannelsEndpoint channelsEndpoint,

612

StreamManagementOperations managementOps) {

613

this.bindingsEndpoint = bindingsEndpoint;

614

this.channelsEndpoint = channelsEndpoint;

615

this.managementOps = managementOps;

616

}

617

618

// Get all binding states

619

@GetMapping("/bindings")

620

public Map<String, List<BindingsEndpoint.BindingInformation>> getAllBindings() {

621

return bindingsEndpoint.queryStates();

622

}

623

624

// Control specific binding

625

@PostMapping("/bindings/{name}/start")

626

public ResponseEntity<String> startBinding(@PathVariable String name) {

627

try {

628

bindingsEndpoint.changeState(name, State.STARTED);

629

return ResponseEntity.ok("Binding " + name + " started");

630

} catch (Exception e) {

631

return ResponseEntity.status(500).body("Failed to start binding: " + e.getMessage());

632

}

633

}

634

635

@PostMapping("/bindings/{name}/stop")

636

public ResponseEntity<String> stopBinding(@PathVariable String name) {

637

try {

638

bindingsEndpoint.changeState(name, State.STOPPED);

639

return ResponseEntity.ok("Binding " + name + " stopped");

640

} catch (Exception e) {

641

return ResponseEntity.status(500).body("Failed to stop binding: " + e.getMessage());

642

}

643

}

644

645

@PostMapping("/bindings/{name}/pause")

646

public ResponseEntity<String> pauseBinding(@PathVariable String name) {

647

try {

648

bindingsEndpoint.changeState(name, State.PAUSED);

649

return ResponseEntity.ok("Binding " + name + " paused");

650

} catch (Exception e) {

651

return ResponseEntity.status(500).body("Failed to pause binding: " + e.getMessage());

652

}

653

}

654

655

@PostMapping("/bindings/{name}/resume")

656

public ResponseEntity<String> resumeBinding(@PathVariable String name) {

657

try {

658

bindingsEndpoint.changeState(name, State.RESUMED);

659

return ResponseEntity.ok("Binding " + name + " resumed");

660

} catch (Exception e) {

661

return ResponseEntity.status(500).body("Failed to resume binding: " + e.getMessage());

662

}

663

}

664

665

// Get channel information

666

@GetMapping("/channels")

667

public Map<String, Object> getAllChannels() {

668

return channelsEndpoint.channels();

669

}

670

671

@GetMapping("/channels/{name}")

672

public ChannelsEndpoint.ChannelInformation getChannel(@PathVariable String name) {

673

return channelsEndpoint.getChannel(name);

674

}

675

676

// Bulk operations

677

@PostMapping("/bindings/start-all")

678

public ResponseEntity<String> startAllBindings() {

679

try {

680

managementOps.startAllBindings();

681

return ResponseEntity.ok("All bindings started");

682

} catch (Exception e) {

683

return ResponseEntity.status(500).body("Failed to start all bindings: " + e.getMessage());

684

}

685

}

686

687

@PostMapping("/bindings/stop-all")

688

public ResponseEntity<String> stopAllBindings() {

689

try {

690

managementOps.stopAllBindings();

691

return ResponseEntity.ok("All bindings stopped");

692

} catch (Exception e) {

693

return ResponseEntity.status(500).body("Failed to stop all bindings: " + e.getMessage());

694

}

695

}

696

697

// Get detailed status

698

@GetMapping("/status")

699

public Map<String, StreamManagementOperations.BindingStatus> getBindingStatuses() {

700

return managementOps.getBindingStatuses();

701

}

702

703

// Validate configuration

704

@GetMapping("/validate")

705

public StreamManagementOperations.ValidationResult validateConfiguration() {

706

return managementOps.validateConfigurations();

707

}

708

}

709

710

// Custom health indicator

711

@Component

712

public class CustomStreamHealthIndicator implements HealthIndicator {

713

714

private final BinderFactory binderFactory;

715

private final BindingService bindingService;

716

717

public CustomStreamHealthIndicator(BinderFactory binderFactory, BindingService bindingService) {

718

this.binderFactory = binderFactory;

719

this.bindingService = bindingService;

720

}

721

722

@Override

723

public Health health() {

724

Health.Builder builder = new Health.Builder();

725

726

try {

727

// Check binder health

728

boolean allBindersHealthy = checkBinderHealth();

729

730

// Check binding health

731

Map<String, Object> bindingHealth = checkBindingHealth();

732

733

if (allBindersHealthy && isAllBindingsHealthy(bindingHealth)) {

734

builder.status(Status.UP);

735

} else {

736

builder.status(Status.DOWN);

737

}

738

739

builder.withDetail("binders", allBindersHealthy ? "UP" : "DOWN");

740

builder.withDetail("bindings", bindingHealth);

741

742

} catch (Exception e) {

743

builder.status(Status.DOWN)

744

.withDetail("error", e.getMessage());

745

}

746

747

return builder.build();

748

}

749

750

private boolean checkBinderHealth() {

751

// Implementation to check binder connectivity

752

return true;

753

}

754

755

private Map<String, Object> checkBindingHealth() {

756

// Implementation to check binding health

757

return new HashMap<>();

758

}

759

760

private boolean isAllBindingsHealthy(Map<String, Object> bindingHealth) {

761

// Implementation to evaluate binding health

762

return true;

763

}

764

}

765

766

// Metrics collection example

767

@Component

768

public class StreamMetricsCollector {

769

770

private final StreamMetrics streamMetrics;

771

private final MeterRegistry meterRegistry;

772

773

public StreamMetricsCollector(StreamMetrics streamMetrics, MeterRegistry meterRegistry) {

774

this.streamMetrics = streamMetrics;

775

this.meterRegistry = meterRegistry;

776

}

777

778

@EventListener

779

public void handleBindingStateChange(BindingCreatedEvent event) {

780

// Record binding creation

781

Gauge.builder("stream.bindings.active")

782

.description("Number of active bindings")

783

.register(meterRegistry, this, obj -> getActiveBindingCount());

784

}

785

786

public void recordMessageProcessing(String bindingName, long count, Duration time, boolean success) {

787

streamMetrics.recordMessageProcessing(bindingName, count, time, success);

788

789

// Additional custom metrics

790

Counter.builder("stream.messages.processed")

791

.tag("binding", bindingName)

792

.tag("success", String.valueOf(success))

793

.register(meterRegistry)

794

.increment(count);

795

796

Timer.builder("stream.message.processing.time")

797

.tag("binding", bindingName)

798

.register(meterRegistry)

799

.record(time);

800

}

801

802

private double getActiveBindingCount() {

803

return streamMetrics.getAllBindingMetrics().size();

804

}

805

}

806

807

// Configuration for actuator endpoints

808

# application.yml

809

management:

810

endpoints:

811

web:

812

exposure:

813

include: health,info,bindings,channels,metrics

814

endpoint:

815

bindings:

816

enabled: true

817

channels:

818

enabled: true

819

health:

820

show-details: always

821

health:

822

binders:

823

enabled: true

824

metrics:

825

enable:

826

stream: true

827

export:

828

prometheus:

829

enabled: true

830

831

# Enable specific actuator features

832

spring:

833

cloud:

834

stream:

835

actuator:

836

bindings-endpoint:

837

enabled: true

838

channels-endpoint:

839

enabled: true

840

health-indicator:

841

enabled: true

842

info-contributor:

843

enabled: true

844

metrics:

845

enabled: true

846

```