0
# Actuator Integration
1
2
Spring Cloud Stream's actuator integration provides Spring Boot Actuator endpoints for monitoring and managing Stream applications. This includes health indicators for binder connectivity, management endpoints for controlling bindings, and monitoring capabilities for channels and message flow.
3
4
## Capabilities
5
6
### Bindings Endpoint
7
8
Actuator endpoint for managing binding lifecycle and querying binding states.
9
10
```java { .api }
11
/**
12
* Actuator endpoint for managing message bindings.
13
* Provides operations to start, stop, pause, and resume bindings.
14
*/
15
@Endpoint(id = "bindings")
16
public class BindingsEndpoint implements ApplicationContextAware {
17
18
private ApplicationContext applicationContext;
19
private BindingsLifecycleController bindingsController;
20
21
/**
22
* Query the state of all bindings.
23
* @return map of binding states keyed by binding name
24
*/
25
@ReadOperation
26
public Map<String, List<BindingInformation>> queryStates();
27
28
/**
29
* Query the state of a specific binding.
30
* @param name the binding name
31
* @return list of binding information for the named binding
32
*/
33
@ReadOperation
34
public List<BindingInformation> queryState(@Selector String name);
35
36
/**
37
* Change the state of a specific binding.
38
* @param name the binding name
39
* @param state the desired state (STARTED, STOPPED, PAUSED, RESUMED)
40
*/
41
@WriteOperation
42
public void changeState(@Selector String name, State state);
43
44
/**
45
* Get detailed information about a binding.
46
* @param name the binding name
47
* @return detailed binding information
48
*/
49
@ReadOperation
50
public BindingDetails getBindingDetails(@Selector String name);
51
52
public void setApplicationContext(ApplicationContext applicationContext);
53
54
/**
55
* Information about a binding's current state.
56
*/
57
public static class BindingInformation {
58
private final String bindingName;
59
private final State state;
60
private final String group;
61
private final boolean pausable;
62
private final boolean input;
63
private final String binder;
64
65
public BindingInformation(String bindingName, State state, String group, boolean pausable, boolean input, String binder);
66
67
public String getBindingName();
68
public State getState();
69
public String getGroup();
70
public boolean isPausable();
71
public boolean isInput();
72
public String getBinder();
73
}
74
75
/**
76
* Detailed information about a binding.
77
*/
78
public static class BindingDetails {
79
private final String name;
80
private final String destination;
81
private final String group;
82
private final String binder;
83
private final String contentType;
84
private final Map<String, Object> properties;
85
86
public BindingDetails(String name, String destination, String group, String binder, String contentType, Map<String, Object> properties);
87
88
public String getName();
89
public String getDestination();
90
public String getGroup();
91
public String getBinder();
92
public String getContentType();
93
public Map<String, Object> getProperties();
94
}
95
}
96
```
97
98
### Channels Endpoint
99
100
Actuator endpoint for inspecting message channels and their configuration.
101
102
```java { .api }
103
/**
104
* Actuator endpoint for inspecting message channels.
105
* Provides information about channel configuration and statistics.
106
*/
107
@Endpoint(id = "channels")
108
public class ChannelsEndpoint implements ApplicationContextAware {
109
110
private ApplicationContext applicationContext;
111
112
/**
113
* Get information about all message channels.
114
* @return map of channel information keyed by channel name
115
*/
116
@ReadOperation
117
public Map<String, Object> channels();
118
119
/**
120
* Get information about a specific channel.
121
* @param name the channel name
122
* @return channel information
123
*/
124
@ReadOperation
125
public ChannelInformation getChannel(@Selector String name);
126
127
/**
128
* Get statistics for all channels.
129
* @return channel statistics
130
*/
131
@ReadOperation
132
public Map<String, ChannelStatistics> getChannelStatistics();
133
134
/**
135
* Get statistics for a specific channel.
136
* @param name the channel name
137
* @return channel statistics
138
*/
139
@ReadOperation
140
public ChannelStatistics getChannelStatistics(@Selector String name);
141
142
public void setApplicationContext(ApplicationContext applicationContext);
143
144
/**
145
* Information about a message channel.
146
*/
147
public static class ChannelInformation {
148
private final String name;
149
private final String type;
150
private final boolean subscribable;
151
private final boolean pollable;
152
private final int subscriberCount;
153
private final Map<String, Object> properties;
154
155
public ChannelInformation(String name, String type, boolean subscribable, boolean pollable, int subscriberCount, Map<String, Object> properties);
156
157
public String getName();
158
public String getType();
159
public boolean isSubscribable();
160
public boolean isPollable();
161
public int getSubscriberCount();
162
public Map<String, Object> getProperties();
163
}
164
165
/**
166
* Statistics for a message channel.
167
*/
168
public static class ChannelStatistics {
169
private final String name;
170
private final long messagesSent;
171
private final long messagesReceived;
172
private final long sendFailures;
173
private final long receiveFailures;
174
private final double averageProcessingTime;
175
private final long lastMessageTime;
176
177
public ChannelStatistics(String name, long messagesSent, long messagesReceived, long sendFailures, long receiveFailures, double averageProcessingTime, long lastMessageTime);
178
179
public String getName();
180
public long getMessagesSent();
181
public long getMessagesReceived();
182
public long getSendFailures();
183
public long getReceiveFailures();
184
public double getAverageProcessingTime();
185
public long getLastMessageTime();
186
}
187
}
188
```
189
190
### Binders Health Indicator
191
192
Health indicator for monitoring binder connectivity and status.
193
194
```java { .api }
195
/**
196
* Health indicator for Spring Cloud Stream binders.
197
* Monitors connectivity and status of all configured binders.
198
*/
199
public class BindersHealthIndicator implements HealthIndicator, ApplicationContextAware {
200
201
private ApplicationContext applicationContext;
202
private BinderFactory binderFactory;
203
204
/**
205
* Check the health of all binders.
206
* @return health information for all binders
207
*/
208
public Health health();
209
210
/**
211
* Check the health of a specific binder.
212
* @param binderName the binder name
213
* @return health information for the specified binder
214
*/
215
public Health getBinderHealth(String binderName);
216
217
/**
218
* Get detailed health information for all binders.
219
* @return detailed health information
220
*/
221
public Map<String, Health> getDetailedHealth();
222
223
public void setApplicationContext(ApplicationContext applicationContext);
224
225
/**
226
* Health details for a specific binder.
227
*/
228
public static class BinderHealthDetails {
229
private final String binderName;
230
private final String binderType;
231
private final Status status;
232
private final String statusDescription;
233
private final Map<String, Object> details;
234
235
public BinderHealthDetails(String binderName, String binderType, Status status, String statusDescription, Map<String, Object> details);
236
237
public String getBinderName();
238
public String getBinderType();
239
public Status getStatus();
240
public String getStatusDescription();
241
public Map<String, Object> getDetails();
242
}
243
}
244
```
245
246
### Stream Metrics
247
248
Metrics collection and reporting for Stream applications.
249
250
```java { .api }
251
/**
252
* Metrics collector for Spring Cloud Stream applications.
253
* Integrates with Micrometer to provide detailed metrics.
254
*/
255
@Component
256
@ConditionalOnClass(MeterRegistry.class)
257
public class StreamMetrics implements ApplicationContextAware, BeanPostProcessor {
258
259
private final MeterRegistry meterRegistry;
260
private ApplicationContext applicationContext;
261
262
public StreamMetrics(MeterRegistry meterRegistry);
263
264
/**
265
* Record message processing metrics.
266
* @param bindingName the binding name
267
* @param messageCount number of messages processed
268
* @param processingTime time taken to process messages
269
* @param success whether processing was successful
270
*/
271
public void recordMessageProcessing(String bindingName, long messageCount, Duration processingTime, boolean success);
272
273
/**
274
* Record binding state change.
275
* @param bindingName the binding name
276
* @param fromState the previous state
277
* @param toState the new state
278
*/
279
public void recordBindingStateChange(String bindingName, State fromState, State toState);
280
281
/**
282
* Record error metrics.
283
* @param bindingName the binding name
284
* @param errorType the type of error
285
* @param exception the exception that occurred
286
*/
287
public void recordError(String bindingName, String errorType, Throwable exception);
288
289
/**
290
* Get current metrics for a binding.
291
* @param bindingName the binding name
292
* @return binding metrics
293
*/
294
public BindingMetrics getBindingMetrics(String bindingName);
295
296
/**
297
* Get metrics for all bindings.
298
* @return map of binding metrics keyed by binding name
299
*/
300
public Map<String, BindingMetrics> getAllBindingMetrics();
301
302
public void setApplicationContext(ApplicationContext applicationContext);
303
public Object postProcessAfterInitialization(Object bean, String beanName);
304
305
/**
306
* Metrics for a specific binding.
307
*/
308
public static class BindingMetrics {
309
private final String bindingName;
310
private final long totalMessages;
311
private final long successfulMessages;
312
private final long failedMessages;
313
private final double averageProcessingTime;
314
private final long lastMessageTime;
315
private final State currentState;
316
317
public BindingMetrics(String bindingName, long totalMessages, long successfulMessages, long failedMessages, double averageProcessingTime, long lastMessageTime, State currentState);
318
319
public String getBindingName();
320
public long getTotalMessages();
321
public long getSuccessfulMessages();
322
public long getFailedMessages();
323
public double getAverageProcessingTime();
324
public long getLastMessageTime();
325
public State getCurrentState();
326
public double getSuccessRate();
327
}
328
}
329
```
330
331
### Stream Info Contributor
332
333
Info contributor for providing Stream application information in actuator info endpoint.
334
335
```java { .api }
336
/**
337
* Info contributor for Spring Cloud Stream applications.
338
* Provides information about bindings, binders, and configuration.
339
*/
340
@Component
341
@ConditionalOnClass(InfoContributor.class)
342
public class StreamInfoContributor implements InfoContributor, ApplicationContextAware {
343
344
private ApplicationContext applicationContext;
345
private BindingServiceProperties bindingProperties;
346
347
/**
348
* Contribute Stream-specific information to actuator info endpoint.
349
* @param builder the info builder
350
*/
351
public void contribute(Info.Builder builder);
352
353
/**
354
* Get information about configured bindings.
355
* @return binding information
356
*/
357
public Map<String, Object> getBindingInfo();
358
359
/**
360
* Get information about configured binders.
361
* @return binder information
362
*/
363
public Map<String, Object> getBinderInfo();
364
365
/**
366
* Get Stream application configuration details.
367
* @return configuration information
368
*/
369
public Map<String, Object> getConfigurationInfo();
370
371
public void setApplicationContext(ApplicationContext applicationContext);
372
}
373
```
374
375
### Management Operations
376
377
Programmatic management operations for Stream applications.
378
379
```java { .api }
380
/**
381
* Management operations for Spring Cloud Stream applications.
382
* Provides programmatic access to binding and binder management.
383
*/
384
@Component
385
public class StreamManagementOperations implements ApplicationContextAware {
386
387
private ApplicationContext applicationContext;
388
private BindingsLifecycleController bindingsController;
389
private BindingService bindingService;
390
391
/**
392
* Start all bindings in the application.
393
*/
394
public void startAllBindings();
395
396
/**
397
* Stop all bindings in the application.
398
*/
399
public void stopAllBindings();
400
401
/**
402
* Restart all bindings in the application.
403
*/
404
public void restartAllBindings();
405
406
/**
407
* Start specific bindings.
408
* @param bindingNames the names of bindings to start
409
*/
410
public void startBindings(String... bindingNames);
411
412
/**
413
* Stop specific bindings.
414
* @param bindingNames the names of bindings to stop
415
*/
416
public void stopBindings(String... bindingNames);
417
418
/**
419
* Pause specific bindings.
420
* @param bindingNames the names of bindings to pause
421
*/
422
public void pauseBindings(String... bindingNames);
423
424
/**
425
* Resume specific bindings.
426
* @param bindingNames the names of bindings to resume
427
*/
428
public void resumeBindings(String... bindingNames);
429
430
/**
431
* Get the current status of all bindings.
432
* @return binding status information
433
*/
434
public Map<String, BindingStatus> getBindingStatuses();
435
436
/**
437
* Force refresh of binding configurations.
438
*/
439
public void refreshBindingConfigurations();
440
441
/**
442
* Validate binding configurations.
443
* @return validation results
444
*/
445
public ValidationResult validateConfigurations();
446
447
public void setApplicationContext(ApplicationContext applicationContext);
448
449
/**
450
* Status information for a binding.
451
*/
452
public static class BindingStatus {
453
private final String name;
454
private final State state;
455
private final boolean healthy;
456
private final String statusMessage;
457
private final long lastStateChange;
458
459
public BindingStatus(String name, State state, boolean healthy, String statusMessage, long lastStateChange);
460
461
public String getName();
462
public State getState();
463
public boolean isHealthy();
464
public String getStatusMessage();
465
public long getLastStateChange();
466
}
467
468
/**
469
* Results from configuration validation.
470
*/
471
public static class ValidationResult {
472
private final boolean valid;
473
private final List<ValidationError> errors;
474
private final List<ValidationWarning> warnings;
475
476
public ValidationResult(boolean valid, List<ValidationError> errors, List<ValidationWarning> warnings);
477
478
public boolean isValid();
479
public List<ValidationError> getErrors();
480
public List<ValidationWarning> getWarnings();
481
}
482
483
/**
484
* Configuration validation error.
485
*/
486
public static class ValidationError {
487
private final String bindingName;
488
private final String property;
489
private final String message;
490
491
public ValidationError(String bindingName, String property, String message);
492
493
public String getBindingName();
494
public String getProperty();
495
public String getMessage();
496
}
497
498
/**
499
* Configuration validation warning.
500
*/
501
public static class ValidationWarning {
502
private final String bindingName;
503
private final String property;
504
private final String message;
505
506
public ValidationWarning(String bindingName, String property, String message);
507
508
public String getBindingName();
509
public String getProperty();
510
public String getMessage();
511
}
512
}
513
```
514
515
### Auto-Configuration
516
517
Auto-configuration classes for actuator integration.
518
519
```java { .api }
520
/**
521
* Auto-configuration for bindings actuator endpoint.
522
*/
523
@Configuration
524
@ConditionalOnClass({Endpoint.class, BindingsEndpoint.class})
525
@ConditionalOnWebApplication
526
@AutoConfigureAfter(BindingServiceConfiguration.class)
527
public class BindingsEndpointAutoConfiguration {
528
529
/**
530
* Create bindings actuator endpoint.
531
* @param bindingsController the bindings lifecycle controller
532
* @return configured BindingsEndpoint
533
*/
534
@Bean
535
@ConditionalOnMissingBean
536
public BindingsEndpoint bindingsEndpoint(BindingsLifecycleController bindingsController);
537
}
538
539
/**
540
* Auto-configuration for channels actuator endpoint.
541
*/
542
@Configuration
543
@ConditionalOnClass({Endpoint.class, ChannelsEndpoint.class})
544
@ConditionalOnWebApplication
545
public class ChannelsEndpointAutoConfiguration {
546
547
/**
548
* Create channels actuator endpoint.
549
* @return configured ChannelsEndpoint
550
*/
551
@Bean
552
@ConditionalOnMissingBean
553
public ChannelsEndpoint channelsEndpoint();
554
}
555
556
/**
557
* Auto-configuration for binders health indicator.
558
*/
559
@Configuration
560
@ConditionalOnClass({HealthIndicator.class, BindersHealthIndicator.class})
561
@ConditionalOnProperty(name = "management.health.binders.enabled", havingValue = "true", matchIfMissing = true)
562
@AutoConfigureAfter(BindingServiceConfiguration.class)
563
public class BindersHealthIndicatorAutoConfiguration {
564
565
/**
566
* Create binders health indicator.
567
* @param binderFactory the binder factory
568
* @return configured BindersHealthIndicator
569
*/
570
@Bean
571
@ConditionalOnMissingBean(name = "bindersHealthIndicator")
572
public BindersHealthIndicator bindersHealthIndicator(BinderFactory binderFactory);
573
}
574
575
/**
576
* Auto-configuration for Stream metrics.
577
*/
578
@Configuration
579
@ConditionalOnClass({MeterRegistry.class, StreamMetrics.class})
580
@ConditionalOnProperty(name = "management.metrics.enable.stream", havingValue = "true", matchIfMissing = true)
581
public class StreamMetricsAutoConfiguration {
582
583
/**
584
* Create Stream metrics collector.
585
* @param meterRegistry the meter registry
586
* @return configured StreamMetrics
587
*/
588
@Bean
589
@ConditionalOnMissingBean
590
public StreamMetrics streamMetrics(MeterRegistry meterRegistry);
591
}
592
```
593
594
**Usage Examples:**
595
596
```java
597
import org.springframework.cloud.stream.endpoint.*;
598
import org.springframework.boot.actuate.health.Health;
599
import org.springframework.boot.actuate.health.Status;
600
601
// Using bindings endpoint programmatically
602
@RestController
603
@RequestMapping("/admin/stream")
604
public class StreamAdminController {
605
606
private final BindingsEndpoint bindingsEndpoint;
607
private final ChannelsEndpoint channelsEndpoint;
608
private final StreamManagementOperations managementOps;
609
610
public StreamAdminController(BindingsEndpoint bindingsEndpoint,
611
ChannelsEndpoint channelsEndpoint,
612
StreamManagementOperations managementOps) {
613
this.bindingsEndpoint = bindingsEndpoint;
614
this.channelsEndpoint = channelsEndpoint;
615
this.managementOps = managementOps;
616
}
617
618
// Get all binding states
619
@GetMapping("/bindings")
620
public Map<String, List<BindingsEndpoint.BindingInformation>> getAllBindings() {
621
return bindingsEndpoint.queryStates();
622
}
623
624
// Control specific binding
625
@PostMapping("/bindings/{name}/start")
626
public ResponseEntity<String> startBinding(@PathVariable String name) {
627
try {
628
bindingsEndpoint.changeState(name, State.STARTED);
629
return ResponseEntity.ok("Binding " + name + " started");
630
} catch (Exception e) {
631
return ResponseEntity.status(500).body("Failed to start binding: " + e.getMessage());
632
}
633
}
634
635
@PostMapping("/bindings/{name}/stop")
636
public ResponseEntity<String> stopBinding(@PathVariable String name) {
637
try {
638
bindingsEndpoint.changeState(name, State.STOPPED);
639
return ResponseEntity.ok("Binding " + name + " stopped");
640
} catch (Exception e) {
641
return ResponseEntity.status(500).body("Failed to stop binding: " + e.getMessage());
642
}
643
}
644
645
@PostMapping("/bindings/{name}/pause")
646
public ResponseEntity<String> pauseBinding(@PathVariable String name) {
647
try {
648
bindingsEndpoint.changeState(name, State.PAUSED);
649
return ResponseEntity.ok("Binding " + name + " paused");
650
} catch (Exception e) {
651
return ResponseEntity.status(500).body("Failed to pause binding: " + e.getMessage());
652
}
653
}
654
655
@PostMapping("/bindings/{name}/resume")
656
public ResponseEntity<String> resumeBinding(@PathVariable String name) {
657
try {
658
bindingsEndpoint.changeState(name, State.RESUMED);
659
return ResponseEntity.ok("Binding " + name + " resumed");
660
} catch (Exception e) {
661
return ResponseEntity.status(500).body("Failed to resume binding: " + e.getMessage());
662
}
663
}
664
665
// Get channel information
666
@GetMapping("/channels")
667
public Map<String, Object> getAllChannels() {
668
return channelsEndpoint.channels();
669
}
670
671
@GetMapping("/channels/{name}")
672
public ChannelsEndpoint.ChannelInformation getChannel(@PathVariable String name) {
673
return channelsEndpoint.getChannel(name);
674
}
675
676
// Bulk operations
677
@PostMapping("/bindings/start-all")
678
public ResponseEntity<String> startAllBindings() {
679
try {
680
managementOps.startAllBindings();
681
return ResponseEntity.ok("All bindings started");
682
} catch (Exception e) {
683
return ResponseEntity.status(500).body("Failed to start all bindings: " + e.getMessage());
684
}
685
}
686
687
@PostMapping("/bindings/stop-all")
688
public ResponseEntity<String> stopAllBindings() {
689
try {
690
managementOps.stopAllBindings();
691
return ResponseEntity.ok("All bindings stopped");
692
} catch (Exception e) {
693
return ResponseEntity.status(500).body("Failed to stop all bindings: " + e.getMessage());
694
}
695
}
696
697
// Get detailed status
698
@GetMapping("/status")
699
public Map<String, StreamManagementOperations.BindingStatus> getBindingStatuses() {
700
return managementOps.getBindingStatuses();
701
}
702
703
// Validate configuration
704
@GetMapping("/validate")
705
public StreamManagementOperations.ValidationResult validateConfiguration() {
706
return managementOps.validateConfigurations();
707
}
708
}
709
710
// Custom health indicator
711
@Component
712
public class CustomStreamHealthIndicator implements HealthIndicator {
713
714
private final BinderFactory binderFactory;
715
private final BindingService bindingService;
716
717
public CustomStreamHealthIndicator(BinderFactory binderFactory, BindingService bindingService) {
718
this.binderFactory = binderFactory;
719
this.bindingService = bindingService;
720
}
721
722
@Override
723
public Health health() {
724
Health.Builder builder = new Health.Builder();
725
726
try {
727
// Check binder health
728
boolean allBindersHealthy = checkBinderHealth();
729
730
// Check binding health
731
Map<String, Object> bindingHealth = checkBindingHealth();
732
733
if (allBindersHealthy && isAllBindingsHealthy(bindingHealth)) {
734
builder.status(Status.UP);
735
} else {
736
builder.status(Status.DOWN);
737
}
738
739
builder.withDetail("binders", allBindersHealthy ? "UP" : "DOWN");
740
builder.withDetail("bindings", bindingHealth);
741
742
} catch (Exception e) {
743
builder.status(Status.DOWN)
744
.withDetail("error", e.getMessage());
745
}
746
747
return builder.build();
748
}
749
750
private boolean checkBinderHealth() {
751
// Implementation to check binder connectivity
752
return true;
753
}
754
755
private Map<String, Object> checkBindingHealth() {
756
// Implementation to check binding health
757
return new HashMap<>();
758
}
759
760
private boolean isAllBindingsHealthy(Map<String, Object> bindingHealth) {
761
// Implementation to evaluate binding health
762
return true;
763
}
764
}
765
766
// Metrics collection example
767
@Component
768
public class StreamMetricsCollector {
769
770
private final StreamMetrics streamMetrics;
771
private final MeterRegistry meterRegistry;
772
773
public StreamMetricsCollector(StreamMetrics streamMetrics, MeterRegistry meterRegistry) {
774
this.streamMetrics = streamMetrics;
775
this.meterRegistry = meterRegistry;
776
}
777
778
@EventListener
779
public void handleBindingStateChange(BindingCreatedEvent event) {
780
// Record binding creation
781
Gauge.builder("stream.bindings.active")
782
.description("Number of active bindings")
783
.register(meterRegistry, this, obj -> getActiveBindingCount());
784
}
785
786
public void recordMessageProcessing(String bindingName, long count, Duration time, boolean success) {
787
streamMetrics.recordMessageProcessing(bindingName, count, time, success);
788
789
// Additional custom metrics
790
Counter.builder("stream.messages.processed")
791
.tag("binding", bindingName)
792
.tag("success", String.valueOf(success))
793
.register(meterRegistry)
794
.increment(count);
795
796
Timer.builder("stream.message.processing.time")
797
.tag("binding", bindingName)
798
.register(meterRegistry)
799
.record(time);
800
}
801
802
private double getActiveBindingCount() {
803
return streamMetrics.getAllBindingMetrics().size();
804
}
805
}
806
807
// Configuration for actuator endpoints
808
# application.yml
809
management:
810
endpoints:
811
web:
812
exposure:
813
include: health,info,bindings,channels,metrics
814
endpoint:
815
bindings:
816
enabled: true
817
channels:
818
enabled: true
819
health:
820
show-details: always
821
health:
822
binders:
823
enabled: true
824
metrics:
825
enable:
826
stream: true
827
export:
828
prometheus:
829
enabled: true
830
831
# Enable specific actuator features
832
spring:
833
cloud:
834
stream:
835
actuator:
836
bindings-endpoint:
837
enabled: true
838
channels-endpoint:
839
enabled: true
840
health-indicator:
841
enabled: true
842
info-contributor:
843
enabled: true
844
metrics:
845
enabled: true
846
```