A framework for building message-driven microservice applications on Spring Boot with Spring Integration.
—
Spring Cloud Stream's actuator integration provides Spring Boot Actuator endpoints for monitoring and managing Stream applications. This includes health indicators for binder connectivity, management endpoints for controlling bindings, and monitoring capabilities for channels and message flow.
Actuator endpoint for managing binding lifecycle and querying binding states.
/**
* Actuator endpoint for managing message bindings.
* Provides operations to start, stop, pause, and resume bindings.
*/
@Endpoint(id = "bindings")
public class BindingsEndpoint implements ApplicationContextAware {
private ApplicationContext applicationContext;
private BindingsLifecycleController bindingsController;
/**
* Query the state of all bindings.
* @return map of binding states keyed by binding name
*/
@ReadOperation
public Map<String, List<BindingInformation>> queryStates();
/**
* Query the state of a specific binding.
* @param name the binding name
* @return list of binding information for the named binding
*/
@ReadOperation
public List<BindingInformation> queryState(@Selector String name);
/**
* Change the state of a specific binding.
* @param name the binding name
* @param state the desired state (STARTED, STOPPED, PAUSED, RESUMED)
*/
@WriteOperation
public void changeState(@Selector String name, State state);
/**
* Get detailed information about a binding.
* @param name the binding name
* @return detailed binding information
*/
@ReadOperation
public BindingDetails getBindingDetails(@Selector String name);
public void setApplicationContext(ApplicationContext applicationContext);
/**
* Information about a binding's current state.
*/
public static class BindingInformation {
private final String bindingName;
private final State state;
private final String group;
private final boolean pausable;
private final boolean input;
private final String binder;
public BindingInformation(String bindingName, State state, String group, boolean pausable, boolean input, String binder);
public String getBindingName();
public State getState();
public String getGroup();
public boolean isPausable();
public boolean isInput();
public String getBinder();
}
/**
* Detailed information about a binding.
*/
public static class BindingDetails {
private final String name;
private final String destination;
private final String group;
private final String binder;
private final String contentType;
private final Map<String, Object> properties;
public BindingDetails(String name, String destination, String group, String binder, String contentType, Map<String, Object> properties);
public String getName();
public String getDestination();
public String getGroup();
public String getBinder();
public String getContentType();
public Map<String, Object> getProperties();
}
}Actuator endpoint for inspecting message channels and their configuration.
/**
* Actuator endpoint for inspecting message channels.
* Provides information about channel configuration and statistics.
*/
@Endpoint(id = "channels")
public class ChannelsEndpoint implements ApplicationContextAware {
private ApplicationContext applicationContext;
/**
* Get information about all message channels.
* @return map of channel information keyed by channel name
*/
@ReadOperation
public Map<String, Object> channels();
/**
* Get information about a specific channel.
* @param name the channel name
* @return channel information
*/
@ReadOperation
public ChannelInformation getChannel(@Selector String name);
/**
* Get statistics for all channels.
* @return channel statistics
*/
@ReadOperation
public Map<String, ChannelStatistics> getChannelStatistics();
/**
* Get statistics for a specific channel.
* @param name the channel name
* @return channel statistics
*/
@ReadOperation
public ChannelStatistics getChannelStatistics(@Selector String name);
public void setApplicationContext(ApplicationContext applicationContext);
/**
* Information about a message channel.
*/
public static class ChannelInformation {
private final String name;
private final String type;
private final boolean subscribable;
private final boolean pollable;
private final int subscriberCount;
private final Map<String, Object> properties;
public ChannelInformation(String name, String type, boolean subscribable, boolean pollable, int subscriberCount, Map<String, Object> properties);
public String getName();
public String getType();
public boolean isSubscribable();
public boolean isPollable();
public int getSubscriberCount();
public Map<String, Object> getProperties();
}
/**
* Statistics for a message channel.
*/
public static class ChannelStatistics {
private final String name;
private final long messagesSent;
private final long messagesReceived;
private final long sendFailures;
private final long receiveFailures;
private final double averageProcessingTime;
private final long lastMessageTime;
public ChannelStatistics(String name, long messagesSent, long messagesReceived, long sendFailures, long receiveFailures, double averageProcessingTime, long lastMessageTime);
public String getName();
public long getMessagesSent();
public long getMessagesReceived();
public long getSendFailures();
public long getReceiveFailures();
public double getAverageProcessingTime();
public long getLastMessageTime();
}
}Health indicator for monitoring binder connectivity and status.
/**
* Health indicator for Spring Cloud Stream binders.
* Monitors connectivity and status of all configured binders.
*/
public class BindersHealthIndicator implements HealthIndicator, ApplicationContextAware {
private ApplicationContext applicationContext;
private BinderFactory binderFactory;
/**
* Check the health of all binders.
* @return health information for all binders
*/
public Health health();
/**
* Check the health of a specific binder.
* @param binderName the binder name
* @return health information for the specified binder
*/
public Health getBinderHealth(String binderName);
/**
* Get detailed health information for all binders.
* @return detailed health information
*/
public Map<String, Health> getDetailedHealth();
public void setApplicationContext(ApplicationContext applicationContext);
/**
* Health details for a specific binder.
*/
public static class BinderHealthDetails {
private final String binderName;
private final String binderType;
private final Status status;
private final String statusDescription;
private final Map<String, Object> details;
public BinderHealthDetails(String binderName, String binderType, Status status, String statusDescription, Map<String, Object> details);
public String getBinderName();
public String getBinderType();
public Status getStatus();
public String getStatusDescription();
public Map<String, Object> getDetails();
}
}Metrics collection and reporting for Stream applications.
/**
* Metrics collector for Spring Cloud Stream applications.
* Integrates with Micrometer to provide detailed metrics.
*/
@Component
@ConditionalOnClass(MeterRegistry.class)
public class StreamMetrics implements ApplicationContextAware, BeanPostProcessor {
private final MeterRegistry meterRegistry;
private ApplicationContext applicationContext;
public StreamMetrics(MeterRegistry meterRegistry);
/**
* Record message processing metrics.
* @param bindingName the binding name
* @param messageCount number of messages processed
* @param processingTime time taken to process messages
* @param success whether processing was successful
*/
public void recordMessageProcessing(String bindingName, long messageCount, Duration processingTime, boolean success);
/**
* Record binding state change.
* @param bindingName the binding name
* @param fromState the previous state
* @param toState the new state
*/
public void recordBindingStateChange(String bindingName, State fromState, State toState);
/**
* Record error metrics.
* @param bindingName the binding name
* @param errorType the type of error
* @param exception the exception that occurred
*/
public void recordError(String bindingName, String errorType, Throwable exception);
/**
* Get current metrics for a binding.
* @param bindingName the binding name
* @return binding metrics
*/
public BindingMetrics getBindingMetrics(String bindingName);
/**
* Get metrics for all bindings.
* @return map of binding metrics keyed by binding name
*/
public Map<String, BindingMetrics> getAllBindingMetrics();
public void setApplicationContext(ApplicationContext applicationContext);
public Object postProcessAfterInitialization(Object bean, String beanName);
/**
* Metrics for a specific binding.
*/
public static class BindingMetrics {
private final String bindingName;
private final long totalMessages;
private final long successfulMessages;
private final long failedMessages;
private final double averageProcessingTime;
private final long lastMessageTime;
private final State currentState;
public BindingMetrics(String bindingName, long totalMessages, long successfulMessages, long failedMessages, double averageProcessingTime, long lastMessageTime, State currentState);
public String getBindingName();
public long getTotalMessages();
public long getSuccessfulMessages();
public long getFailedMessages();
public double getAverageProcessingTime();
public long getLastMessageTime();
public State getCurrentState();
public double getSuccessRate();
}
}Info contributor for providing Stream application information in actuator info endpoint.
/**
* Info contributor for Spring Cloud Stream applications.
* Provides information about bindings, binders, and configuration.
*/
@Component
@ConditionalOnClass(InfoContributor.class)
public class StreamInfoContributor implements InfoContributor, ApplicationContextAware {
private ApplicationContext applicationContext;
private BindingServiceProperties bindingProperties;
/**
* Contribute Stream-specific information to actuator info endpoint.
* @param builder the info builder
*/
public void contribute(Info.Builder builder);
/**
* Get information about configured bindings.
* @return binding information
*/
public Map<String, Object> getBindingInfo();
/**
* Get information about configured binders.
* @return binder information
*/
public Map<String, Object> getBinderInfo();
/**
* Get Stream application configuration details.
* @return configuration information
*/
public Map<String, Object> getConfigurationInfo();
public void setApplicationContext(ApplicationContext applicationContext);
}Programmatic management operations for Stream applications.
/**
* Management operations for Spring Cloud Stream applications.
* Provides programmatic access to binding and binder management.
*/
@Component
public class StreamManagementOperations implements ApplicationContextAware {
private ApplicationContext applicationContext;
private BindingsLifecycleController bindingsController;
private BindingService bindingService;
/**
* Start all bindings in the application.
*/
public void startAllBindings();
/**
* Stop all bindings in the application.
*/
public void stopAllBindings();
/**
* Restart all bindings in the application.
*/
public void restartAllBindings();
/**
* Start specific bindings.
* @param bindingNames the names of bindings to start
*/
public void startBindings(String... bindingNames);
/**
* Stop specific bindings.
* @param bindingNames the names of bindings to stop
*/
public void stopBindings(String... bindingNames);
/**
* Pause specific bindings.
* @param bindingNames the names of bindings to pause
*/
public void pauseBindings(String... bindingNames);
/**
* Resume specific bindings.
* @param bindingNames the names of bindings to resume
*/
public void resumeBindings(String... bindingNames);
/**
* Get the current status of all bindings.
* @return binding status information
*/
public Map<String, BindingStatus> getBindingStatuses();
/**
* Force refresh of binding configurations.
*/
public void refreshBindingConfigurations();
/**
* Validate binding configurations.
* @return validation results
*/
public ValidationResult validateConfigurations();
public void setApplicationContext(ApplicationContext applicationContext);
/**
* Status information for a binding.
*/
public static class BindingStatus {
private final String name;
private final State state;
private final boolean healthy;
private final String statusMessage;
private final long lastStateChange;
public BindingStatus(String name, State state, boolean healthy, String statusMessage, long lastStateChange);
public String getName();
public State getState();
public boolean isHealthy();
public String getStatusMessage();
public long getLastStateChange();
}
/**
* Results from configuration validation.
*/
public static class ValidationResult {
private final boolean valid;
private final List<ValidationError> errors;
private final List<ValidationWarning> warnings;
public ValidationResult(boolean valid, List<ValidationError> errors, List<ValidationWarning> warnings);
public boolean isValid();
public List<ValidationError> getErrors();
public List<ValidationWarning> getWarnings();
}
/**
* Configuration validation error.
*/
public static class ValidationError {
private final String bindingName;
private final String property;
private final String message;
public ValidationError(String bindingName, String property, String message);
public String getBindingName();
public String getProperty();
public String getMessage();
}
/**
* Configuration validation warning.
*/
public static class ValidationWarning {
private final String bindingName;
private final String property;
private final String message;
public ValidationWarning(String bindingName, String property, String message);
public String getBindingName();
public String getProperty();
public String getMessage();
}
}Auto-configuration classes for actuator integration.
/**
* Auto-configuration for bindings actuator endpoint.
*/
@Configuration
@ConditionalOnClass({Endpoint.class, BindingsEndpoint.class})
@ConditionalOnWebApplication
@AutoConfigureAfter(BindingServiceConfiguration.class)
public class BindingsEndpointAutoConfiguration {
/**
* Create bindings actuator endpoint.
* @param bindingsController the bindings lifecycle controller
* @return configured BindingsEndpoint
*/
@Bean
@ConditionalOnMissingBean
public BindingsEndpoint bindingsEndpoint(BindingsLifecycleController bindingsController);
}
/**
* Auto-configuration for channels actuator endpoint.
*/
@Configuration
@ConditionalOnClass({Endpoint.class, ChannelsEndpoint.class})
@ConditionalOnWebApplication
public class ChannelsEndpointAutoConfiguration {
/**
* Create channels actuator endpoint.
* @return configured ChannelsEndpoint
*/
@Bean
@ConditionalOnMissingBean
public ChannelsEndpoint channelsEndpoint();
}
/**
* Auto-configuration for binders health indicator.
*/
@Configuration
@ConditionalOnClass({HealthIndicator.class, BindersHealthIndicator.class})
@ConditionalOnProperty(name = "management.health.binders.enabled", havingValue = "true", matchIfMissing = true)
@AutoConfigureAfter(BindingServiceConfiguration.class)
public class BindersHealthIndicatorAutoConfiguration {
/**
* Create binders health indicator.
* @param binderFactory the binder factory
* @return configured BindersHealthIndicator
*/
@Bean
@ConditionalOnMissingBean(name = "bindersHealthIndicator")
public BindersHealthIndicator bindersHealthIndicator(BinderFactory binderFactory);
}
/**
* Auto-configuration for Stream metrics.
*/
@Configuration
@ConditionalOnClass({MeterRegistry.class, StreamMetrics.class})
@ConditionalOnProperty(name = "management.metrics.enable.stream", havingValue = "true", matchIfMissing = true)
public class StreamMetricsAutoConfiguration {
/**
* Create Stream metrics collector.
* @param meterRegistry the meter registry
* @return configured StreamMetrics
*/
@Bean
@ConditionalOnMissingBean
public StreamMetrics streamMetrics(MeterRegistry meterRegistry);
}Usage Examples:
import org.springframework.cloud.stream.endpoint.*;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
// Using bindings endpoint programmatically
@RestController
@RequestMapping("/admin/stream")
public class StreamAdminController {
private final BindingsEndpoint bindingsEndpoint;
private final ChannelsEndpoint channelsEndpoint;
private final StreamManagementOperations managementOps;
public StreamAdminController(BindingsEndpoint bindingsEndpoint,
ChannelsEndpoint channelsEndpoint,
StreamManagementOperations managementOps) {
this.bindingsEndpoint = bindingsEndpoint;
this.channelsEndpoint = channelsEndpoint;
this.managementOps = managementOps;
}
// Get all binding states
@GetMapping("/bindings")
public Map<String, List<BindingsEndpoint.BindingInformation>> getAllBindings() {
return bindingsEndpoint.queryStates();
}
// Control specific binding
@PostMapping("/bindings/{name}/start")
public ResponseEntity<String> startBinding(@PathVariable String name) {
try {
bindingsEndpoint.changeState(name, State.STARTED);
return ResponseEntity.ok("Binding " + name + " started");
} catch (Exception e) {
return ResponseEntity.status(500).body("Failed to start binding: " + e.getMessage());
}
}
@PostMapping("/bindings/{name}/stop")
public ResponseEntity<String> stopBinding(@PathVariable String name) {
try {
bindingsEndpoint.changeState(name, State.STOPPED);
return ResponseEntity.ok("Binding " + name + " stopped");
} catch (Exception e) {
return ResponseEntity.status(500).body("Failed to stop binding: " + e.getMessage());
}
}
@PostMapping("/bindings/{name}/pause")
public ResponseEntity<String> pauseBinding(@PathVariable String name) {
try {
bindingsEndpoint.changeState(name, State.PAUSED);
return ResponseEntity.ok("Binding " + name + " paused");
} catch (Exception e) {
return ResponseEntity.status(500).body("Failed to pause binding: " + e.getMessage());
}
}
@PostMapping("/bindings/{name}/resume")
public ResponseEntity<String> resumeBinding(@PathVariable String name) {
try {
bindingsEndpoint.changeState(name, State.RESUMED);
return ResponseEntity.ok("Binding " + name + " resumed");
} catch (Exception e) {
return ResponseEntity.status(500).body("Failed to resume binding: " + e.getMessage());
}
}
// Get channel information
@GetMapping("/channels")
public Map<String, Object> getAllChannels() {
return channelsEndpoint.channels();
}
@GetMapping("/channels/{name}")
public ChannelsEndpoint.ChannelInformation getChannel(@PathVariable String name) {
return channelsEndpoint.getChannel(name);
}
// Bulk operations
@PostMapping("/bindings/start-all")
public ResponseEntity<String> startAllBindings() {
try {
managementOps.startAllBindings();
return ResponseEntity.ok("All bindings started");
} catch (Exception e) {
return ResponseEntity.status(500).body("Failed to start all bindings: " + e.getMessage());
}
}
@PostMapping("/bindings/stop-all")
public ResponseEntity<String> stopAllBindings() {
try {
managementOps.stopAllBindings();
return ResponseEntity.ok("All bindings stopped");
} catch (Exception e) {
return ResponseEntity.status(500).body("Failed to stop all bindings: " + e.getMessage());
}
}
// Get detailed status
@GetMapping("/status")
public Map<String, StreamManagementOperations.BindingStatus> getBindingStatuses() {
return managementOps.getBindingStatuses();
}
// Validate configuration
@GetMapping("/validate")
public StreamManagementOperations.ValidationResult validateConfiguration() {
return managementOps.validateConfigurations();
}
}
// Custom health indicator
@Component
public class CustomStreamHealthIndicator implements HealthIndicator {
private final BinderFactory binderFactory;
private final BindingService bindingService;
public CustomStreamHealthIndicator(BinderFactory binderFactory, BindingService bindingService) {
this.binderFactory = binderFactory;
this.bindingService = bindingService;
}
@Override
public Health health() {
Health.Builder builder = new Health.Builder();
try {
// Check binder health
boolean allBindersHealthy = checkBinderHealth();
// Check binding health
Map<String, Object> bindingHealth = checkBindingHealth();
if (allBindersHealthy && isAllBindingsHealthy(bindingHealth)) {
builder.status(Status.UP);
} else {
builder.status(Status.DOWN);
}
builder.withDetail("binders", allBindersHealthy ? "UP" : "DOWN");
builder.withDetail("bindings", bindingHealth);
} catch (Exception e) {
builder.status(Status.DOWN)
.withDetail("error", e.getMessage());
}
return builder.build();
}
private boolean checkBinderHealth() {
// Implementation to check binder connectivity
return true;
}
private Map<String, Object> checkBindingHealth() {
// Implementation to check binding health
return new HashMap<>();
}
private boolean isAllBindingsHealthy(Map<String, Object> bindingHealth) {
// Implementation to evaluate binding health
return true;
}
}
// Metrics collection example
@Component
public class StreamMetricsCollector {
private final StreamMetrics streamMetrics;
private final MeterRegistry meterRegistry;
public StreamMetricsCollector(StreamMetrics streamMetrics, MeterRegistry meterRegistry) {
this.streamMetrics = streamMetrics;
this.meterRegistry = meterRegistry;
}
@EventListener
public void handleBindingStateChange(BindingCreatedEvent event) {
// Record binding creation
Gauge.builder("stream.bindings.active")
.description("Number of active bindings")
.register(meterRegistry, this, obj -> getActiveBindingCount());
}
public void recordMessageProcessing(String bindingName, long count, Duration time, boolean success) {
streamMetrics.recordMessageProcessing(bindingName, count, time, success);
// Additional custom metrics
Counter.builder("stream.messages.processed")
.tag("binding", bindingName)
.tag("success", String.valueOf(success))
.register(meterRegistry)
.increment(count);
Timer.builder("stream.message.processing.time")
.tag("binding", bindingName)
.register(meterRegistry)
.record(time);
}
private double getActiveBindingCount() {
return streamMetrics.getAllBindingMetrics().size();
}
}
// Configuration for actuator endpoints
# application.yml
management:
endpoints:
web:
exposure:
include: health,info,bindings,channels,metrics
endpoint:
bindings:
enabled: true
channels:
enabled: true
health:
show-details: always
health:
binders:
enabled: true
metrics:
enable:
stream: true
export:
prometheus:
enabled: true
# Enable specific actuator features
spring:
cloud:
stream:
actuator:
bindings-endpoint:
enabled: true
channels-endpoint:
enabled: true
health-indicator:
enabled: true
info-contributor:
enabled: true
metrics:
enabled: trueInstall with Tessl CLI
npx tessl i tessl/maven-org-springframework-cloud--spring-cloud-stream