Configuration DSL for Simple Messaging Protocol including message broker registration, STOMP endpoints, destination prefixes, broker relay setup, and channel configuration for WebSocket-based messaging applications.
Central configuration point for message broker setup.
/**
* Registry for configuring message broker options.
*/
public class MessageBrokerRegistry {
/**
* Enable a simple in-memory message broker for pub-sub messaging.
* @param destinationPrefixes destination prefixes supported by the broker
* @return SimpleBrokerRegistration for further configuration
*/
public SimpleBrokerRegistration enableSimpleBroker(String... destinationPrefixes);
/**
* Enable a STOMP broker relay that forwards messages to a full-featured
* message broker like RabbitMQ, ActiveMQ, etc.
* @param destinationPrefixes destination prefixes to delegate to the broker
* @return StompBrokerRelayRegistration for further configuration
*/
public StompBrokerRelayRegistration enableStompBrokerRelay(String... destinationPrefixes);
/**
* Configure one or more prefixes to filter destinations targeting
* application annotated methods.
* @param prefixes destination prefixes (e.g., "/app")
* @return this registry
*/
public MessageBrokerRegistry setApplicationDestinationPrefixes(String... prefixes);
/**
* Configure the prefix to use for user destinations.
* Default is "/user/".
* @param prefix the prefix
* @return this registry
*/
public MessageBrokerRegistry setUserDestinationPrefix(String prefix);
/**
* Customize the channel used for incoming messages from clients.
* @return ChannelRegistration for configuration
*/
public ChannelRegistration configureBrokerChannel();
/**
* Whether to preserve the publish order of messages sent by the same client.
* Default is false.
* @return this registry
*/
public MessageBrokerRegistry setPreservePublishOrder(boolean preservePublishOrder);
/**
* Configure a PathMatcher for matching destinations.
* @param pathMatcher the path matcher to use
* @return this registry
*/
public MessageBrokerRegistry setPathMatcher(PathMatcher pathMatcher);
/**
* Configure the cache limit for destination matching.
* @param cacheLimit the cache limit (default is 1024)
* @return this registry
*/
public MessageBrokerRegistry setCacheLimit(int cacheLimit);
}Usage Examples:
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.*;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// Enable simple in-memory broker
registry.enableSimpleBroker("/topic", "/queue")
.setHeartbeatValue(new long[]{10000, 10000});
// Set application destination prefix
registry.setApplicationDestinationPrefixes("/app");
// Set user destination prefix
registry.setUserDestinationPrefix("/user");
// Preserve message order
registry.setPreservePublishOrder(true);
// Configure cache limit
registry.setCacheLimit(2048);
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOrigins("*")
.withSockJS();
}
}Configuration for the simple in-memory message broker.
/**
* Registration class for configuring a simple message broker.
*/
public class SimpleBrokerRegistration {
/**
* Configure the heartbeat settings.
* The first number represents how often the server will send heartbeats.
* The second number represents how often the client should send heartbeats.
* A value of 0 means no heartbeats.
* @param heartbeat the heartbeat values in milliseconds [send, receive]
* @return this registration
*/
public SimpleBrokerRegistration setHeartbeatValue(long[] heartbeat);
/**
* Configure the TaskScheduler to use for sending heartbeat messages.
* @param taskScheduler the task scheduler
* @return this registration
*/
public SimpleBrokerRegistration setTaskScheduler(TaskScheduler taskScheduler);
/**
* Configure the name of a header on a message that may contain a selector
* to filter messages based on their headers.
* @param selectorHeaderName the header name for selectors
* @return this registration
*/
public SimpleBrokerRegistration setSelectorHeaderName(String selectorHeaderName);
}Usage Examples:
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// Configure simple broker with heartbeat
registry.enableSimpleBroker("/topic", "/queue")
.setHeartbeatValue(new long[]{20000, 20000}) // 20 seconds
.setTaskScheduler(taskScheduler())
.setSelectorHeaderName("selector");
}
@Bean
public TaskScheduler taskScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(1);
scheduler.setThreadNamePrefix("wss-heartbeat-");
scheduler.initialize();
return scheduler;
}Configuration for relaying messages to an external STOMP broker.
/**
* Registration class for configuring a STOMP broker relay.
*/
public class StompBrokerRelayRegistration {
/**
* Set the STOMP broker host.
* Default is "127.0.0.1".
* @param relayHost the host
* @return this registration
*/
public StompBrokerRelayRegistration setRelayHost(String relayHost);
/**
* Set the STOMP broker port.
* Default is 61613.
* @param relayPort the port
* @return this registration
*/
public StompBrokerRelayRegistration setRelayPort(int relayPort);
/**
* Set the login for client connections to the broker.
* Default is "guest".
* @param login the login
* @return this registration
*/
public StompBrokerRelayRegistration setClientLogin(String login);
/**
* Set the passcode for client connections to the broker.
* Default is "guest".
* @param passcode the passcode
* @return this registration
*/
public StompBrokerRelayRegistration setClientPasscode(String passcode);
/**
* Set the login for the shared "system" connection to the broker.
* Default is "guest".
* @param login the system login
* @return this registration
*/
public StompBrokerRelayRegistration setSystemLogin(String login);
/**
* Set the passcode for the shared "system" connection.
* Default is "guest".
* @param passcode the system passcode
* @return this registration
*/
public StompBrokerRelayRegistration setSystemPasscode(String passcode);
/**
* Set the interval for sending heartbeats from the system connection.
* Default is 10000 (10 seconds).
* @param interval the interval in milliseconds
* @return this registration
*/
public StompBrokerRelayRegistration setSystemHeartbeatSendInterval(long interval);
/**
* Set the interval for receiving heartbeats on the system connection.
* Default is 10000 (10 seconds).
* @param interval the interval in milliseconds
* @return this registration
*/
public StompBrokerRelayRegistration setSystemHeartbeatReceiveInterval(long interval);
/**
* Set the virtual host to use when connecting to the broker.
* @param virtualHost the virtual host
* @return this registration
*/
public StompBrokerRelayRegistration setVirtualHost(String virtualHost);
/**
* Configure a TCP client for managing TCP connections to the broker.
* @param tcpClient the TCP client
* @return this registration
*/
public StompBrokerRelayRegistration setTcpClient(TcpOperations<byte[]> tcpClient);
/**
* Whether the relay should auto-start when the application context is refreshed.
* Default is true.
* @param autoStartup whether to auto-start
* @return this registration
*/
public StompBrokerRelayRegistration setAutoStartup(boolean autoStartup);
/**
* Set the destination for broadcasting user registry information.
* @param destination the broadcast destination
* @return this registration
*/
public StompBrokerRelayRegistration setUserDestinationBroadcast(String destination);
/**
* Set the destination for broadcasting user registry updates.
* @param destination the broadcast destination
* @return this registration
*/
public StompBrokerRelayRegistration setUserRegistryBroadcast(String destination);
}Usage Examples:
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// Configure STOMP broker relay (e.g., RabbitMQ)
registry.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("rabbitmq.example.com")
.setRelayPort(61613)
.setClientLogin("client-user")
.setClientPasscode("client-pass")
.setSystemLogin("system-user")
.setSystemPasscode("system-pass")
.setVirtualHost("/")
.setSystemHeartbeatSendInterval(20000)
.setSystemHeartbeatReceiveInterval(20000)
.setUserDestinationBroadcast("/topic/unresolved-user-destination")
.setUserRegistryBroadcast("/topic/simp-user-registry");
registry.setApplicationDestinationPrefixes("/app");
}Configuration for message channels (input/output/broker).
/**
* Registration class for customizing message channel configuration.
*/
public class ChannelRegistration {
/**
* Configure interceptors for the message channel.
* @param interceptors the interceptors to add
* @return this registration
*/
public ChannelRegistration setInterceptors(ChannelInterceptor... interceptors);
/**
* Configure the task executor for the channel.
* @param executor the executor
* @return this registration
*/
public ChannelRegistration taskExecutor(Executor executor);
/**
* Configure the task executor using TaskExecutorRegistration.
* @param registration the executor registration
* @return this registration
*/
public ChannelRegistration taskExecutor(TaskExecutorRegistration registration);
}Usage Examples:
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.Message;
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
// Add logging interceptor
registration.setInterceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
System.out.println("Inbound: " + message);
return message;
}
});
// Configure executor
registration.taskExecutor()
.corePoolSize(4)
.maxPoolSize(8)
.keepAliveSeconds(60);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(4)
.maxPoolSize(8);
}
}Configures thread pool properties for channel task executors.
/**
* Registration class for configuring a task executor.
*/
public class TaskExecutorRegistration {
/**
* Set the core pool size of the ThreadPoolExecutor.
* Default is twice the number of available processors.
* @param corePoolSize the core pool size
* @return this registration
*/
public TaskExecutorRegistration corePoolSize(int corePoolSize);
/**
* Set the max pool size of the ThreadPoolExecutor.
* Default is Integer.MAX_VALUE.
* @param maxPoolSize the max pool size
* @return this registration
*/
public TaskExecutorRegistration maxPoolSize(int maxPoolSize);
/**
* Set the keep-alive seconds for the ThreadPoolExecutor.
* Default is 60 seconds.
* @param keepAliveSeconds the keep-alive time in seconds
* @return this registration
*/
public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds);
/**
* Set the queue capacity for the ThreadPoolExecutor.
* Default is Integer.MAX_VALUE.
* @param queueCapacity the queue capacity
* @return this registration
*/
public TaskExecutorRegistration queueCapacity(int queueCapacity);
}Base configuration class providing beans for message broker infrastructure.
/**
* Abstract base configuration class for WebSocket message broker support.
*/
public abstract class AbstractMessageBrokerConfiguration implements ApplicationContextAware {
/**
* The default client inbound channel bean.
*/
@Bean
public AbstractSubscribableChannel clientInboundChannel();
/**
* The default client outbound channel bean.
*/
@Bean
public AbstractSubscribableChannel clientOutboundChannel();
/**
* The default broker channel bean.
*/
@Bean
public AbstractSubscribableChannel brokerChannel();
/**
* The handler for @MessageMapping methods.
*/
@Bean
public SimpAnnotationMethodMessageHandler simpAnnotationMethodMessageHandler();
/**
* The message broker implementation (simple or relay).
*/
@Bean
public AbstractBrokerMessageHandler simpleBrokerMessageHandler();
/**
* The SimpMessagingTemplate for sending messages.
*/
@Bean
public SimpMessagingTemplate brokerMessagingTemplate();
/**
* Resolves user destinations.
*/
@Bean
public UserDestinationResolver userDestinationResolver();
/**
* Registry of active user sessions.
*/
@Bean
public SimpUserRegistry userRegistry();
/**
* Override to configure message broker options.
*/
protected void configureMessageBroker(MessageBrokerRegistry registry) {
}
/**
* Override to configure client inbound channel.
*/
protected void configureClientInboundChannel(ChannelRegistration registration) {
}
/**
* Override to configure client outbound channel.
*/
protected void configureClientOutboundChannel(ChannelRegistration registration) {
}
}import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.*;
@Configuration
@EnableWebSocketMessageBroker
public class ComprehensiveWebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// Add STOMP endpoints
registry.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
registry.addEndpoint("/ws-native")
.setAllowedOriginPatterns("*");
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// Option 1: Simple broker (development/small apps)
registry.enableSimpleBroker("/topic", "/queue")
.setHeartbeatValue(new long[]{10000, 10000})
.setTaskScheduler(heartbeatScheduler());
// Option 2: External broker relay (production)
// registry.enableStompBrokerRelay("/topic", "/queue")
// .setRelayHost("rabbitmq.example.com")
// .setRelayPort(61613)
// .setClientLogin("client")
// .setClientPasscode("client-pass")
// .setSystemLogin("system")
// .setSystemPasscode("system-pass");
// Set application destination prefix
registry.setApplicationDestinationPrefixes("/app");
// Set user destination prefix
registry.setUserDestinationPrefix("/user");
// Preserve publish order
registry.setPreservePublishOrder(true);
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
// Add security/logging interceptor
registration.setInterceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor =
StompHeaderAccessor.wrap(message);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// Authentication/authorization logic
String sessionId = accessor.getSessionId();
System.out.println("CONNECT from session: " + sessionId);
}
return message;
}
});
// Configure thread pool
registration.taskExecutor()
.corePoolSize(4)
.maxPoolSize(8)
.keepAliveSeconds(60)
.queueCapacity(1000);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
registration.taskExecutor()
.corePoolSize(4)
.maxPoolSize(8);
}
@Bean
public ThreadPoolTaskScheduler heartbeatScheduler() {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(1);
scheduler.setThreadNamePrefix("ws-heartbeat-");
scheduler.initialize();
return scheduler;
}
}
// Message controller
@Controller
public class WebSocketMessageController {
@Autowired
private SimpMessagingTemplate messagingTemplate;
@MessageMapping("/chat")
@SendTo("/topic/messages")
public ChatMessage sendMessage(ChatMessage message) {
return message;
}
@MessageMapping("/private")
@SendToUser("/queue/reply")
public String sendPrivate(String message, Principal principal) {
return "Reply to " + principal.getName() + ": " + message;
}
@Scheduled(fixedRate = 5000)
public void sendPeriodicMessages() {
messagingTemplate.convertAndSend("/topic/periodic",
new StatusMessage("Heartbeat", System.currentTimeMillis()));
}
}
class StatusMessage {
private String status;
private long timestamp;
public StatusMessage(String status, long timestamp) {
this.status = status;
this.timestamp = timestamp;
}
public String getStatus() { return status; }
public long getTimestamp() { return timestamp; }
}