A framework for building message-driven microservice applications on Spring Boot with Spring Integration.
—
The binding management framework provides centralized lifecycle management for message bindings, proxy creation, and channel configuration. It handles the creation, configuration, and lifecycle of connections between application components and messaging infrastructure.
Central service for managing all message bindings in a Spring Cloud Stream application.
/**
* Central service for managing bindings between applications and message brokers.
*/
public class BindingService implements BeanFactoryAware, DisposableBean, ApplicationContextAware {
/**
* Bind a consumer to the specified input name.
* @param inputTarget the consumer binding target
* @param inputName the logical input name
* @return collection of created bindings
*/
public Collection<Binding<Object>> bindConsumer(Object inputTarget, String inputName);
/**
* Bind a producer to the specified output name.
* @param outputTarget the producer binding target
* @param outputName the logical output name
* @return the created binding
*/
public Binding<MessageChannel> bindProducer(Object outputTarget, String outputName);
/**
* Unbind all consumers for the specified input name.
* @param inputName the logical input name
*/
public void unbindConsumers(String inputName);
/**
* Unbind all producers for the specified output name.
* @param outputName the logical output name
*/
public void unbindProducers(String outputName);
/**
* Get all consumer bindings.
* @return map of consumer bindings keyed by input name
*/
public Map<String, List<Binding<Object>>> getConsumerBindings();
/**
* Get all producer bindings.
* @return map of producer bindings keyed by output name
*/
public Map<String, Binding<MessageChannel>> getProducerBindings();
public void setBeanFactory(BeanFactory beanFactory);
public void setApplicationContext(ApplicationContext applicationContext);
public void destroy();
}Interface for objects that can have their inputs and outputs bound to messaging infrastructure.
/**
* Marker interface for instances that can bind/unbind groups of inputs and outputs.
*/
public interface Bindable {
/**
* Get the names of all inputs that can be bound.
* @return set of input names
*/
Set<String> getInputs();
/**
* Get the names of all outputs that can be bound.
* @return set of output names
*/
Set<String> getOutputs();
/**
* Create and bind all inputs using the provided binding service.
* @param bindingService the service to use for binding
* @return collection of created bindings
*/
Collection<Binding<Object>> createAndBindInputs(BindingService bindingService);
/**
* Create and bind all outputs using the provided binding service.
* @param bindingService the service to use for binding
* @return collection of created bindings
*/
Collection<Binding<Object>> createAndBindOutputs(BindingService bindingService);
/**
* Unbind all inputs.
*/
void unbindInputs();
/**
* Unbind all outputs.
*/
void unbindOutputs();
/**
* Bind all inputs using the provided binding service.
* @param bindingService the service to use for binding
*/
default void bindInputs(BindingService bindingService) {
createAndBindInputs(bindingService);
}
/**
* Bind all outputs using the provided binding service.
* @param bindingService the service to use for binding
*/
default void bindOutputs(BindingService bindingService) {
createAndBindOutputs(bindingService);
}
}Factories for creating binding targets of different types.
/**
* Factory for creating binding targets.
*/
public interface BindingTargetFactory {
/**
* Check if this factory can create the specified type of binding target.
* @param bindingTargetType the target type
* @return true if this factory can create the target type
*/
boolean canCreate(Class<?> bindingTargetType);
/**
* Create a binding target of the specified type.
* @param name the binding target name
* @param bindingTargetType the target type
* @param properties additional properties for target creation
* @return the created binding target
*/
Object createInput(String name, Class<?> bindingTargetType, Properties properties);
/**
* Create an output binding target of the specified type.
* @param name the binding target name
* @param bindingTargetType the target type
* @param properties additional properties for target creation
* @return the created binding target
*/
Object createOutput(String name, Class<?> bindingTargetType, Properties properties);
}
/**
* Factory for subscribable channel binding targets.
*/
public class SubscribableChannelBindingTargetFactory implements BindingTargetFactory, ApplicationContextAware {
public boolean canCreate(Class<?> bindingTargetType);
public MessageChannel createInput(String name, Class<?> bindingTargetType, Properties properties);
public MessageChannel createOutput(String name, Class<?> bindingTargetType, Properties properties);
public void setApplicationContext(ApplicationContext applicationContext);
}
/**
* Factory for message source binding targets.
*/
public class MessageSourceBindingTargetFactory implements BindingTargetFactory {
public boolean canCreate(Class<?> bindingTargetType);
public MessageSource<?> createInput(String name, Class<?> bindingTargetType, Properties properties);
public MessageSource<?> createOutput(String name, Class<?> bindingTargetType, Properties properties);
}
/**
* Factory for Flux message channel binding targets.
*/
public class FluxMessageChannelBindingTargetFactory implements BindingTargetFactory {
public boolean canCreate(Class<?> bindingTargetType);
public Object createInput(String name, Class<?> bindingTargetType, Properties properties);
public Object createOutput(String name, Class<?> bindingTargetType, Properties properties);
}Factories for creating bindable proxy objects.
/**
* Base factory for creating bindable proxies.
*/
public abstract class AbstractBindableProxyFactory implements Bindable, BeanFactoryAware, ApplicationContextAware {
protected final Class<?> type;
protected BeanFactory beanFactory;
protected ApplicationContext applicationContext;
protected AbstractBindableProxyFactory(Class<?> type);
public Set<String> getInputs();
public Set<String> getOutputs();
public void bindInputs(BindingService bindingService);
public void bindOutputs(BindingService bindingService);
public void unbindInputs();
public void unbindOutputs();
protected abstract Object createBindableProxy();
public void setBeanFactory(BeanFactory beanFactory);
public void setApplicationContext(ApplicationContext applicationContext);
}
/**
* Factory for creating standard bindable proxies.
*/
public class BindableProxyFactory extends AbstractBindableProxyFactory {
public BindableProxyFactory(Class<?> type);
protected Object createBindableProxy();
public Object invoke(Object proxy, Method method, Object[] args);
}Interfaces and implementations for configuring message channels.
/**
* Interface for configuring message channels.
*/
public interface MessageChannelConfigurer {
/**
* Configure the input channel.
* @param channel the channel to configure
* @param channelName the channel name
*/
void configureInputChannel(MessageChannel channel, String channelName);
/**
* Configure the output channel.
* @param channel the channel to configure
* @param channelName the channel name
*/
void configureOutputChannel(MessageChannel channel, String channelName);
}
/**
* Extended configurer for channels and sources.
*/
public interface MessageChannelAndSourceConfigurer extends MessageChannelConfigurer {
/**
* Configure a pollable message source.
* @param binding the binding target
* @param name the source name
* @param group the consumer group
*/
void configurePollableSource(Object binding, String name, String group);
}
/**
* Configurer for message converters.
*/
public class MessageConverterConfigurer implements MessageChannelConfigurer, BeanFactoryAware {
public void configureInputChannel(MessageChannel channel, String channelName);
public void configureOutputChannel(MessageChannel channel, String channelName);
public void setBeanFactory(BeanFactory beanFactory);
}
/**
* Composite configurer that delegates to multiple configurers.
*/
public class CompositeMessageChannelConfigurer implements MessageChannelConfigurer {
private final List<MessageChannelConfigurer> configurers;
public CompositeMessageChannelConfigurer(List<MessageChannelConfigurer> configurers);
public void configureInputChannel(MessageChannel channel, String channelName);
public void configureOutputChannel(MessageChannel channel, String channelName);
}Classes for managing binding lifecycle events and states.
/**
* Base class for binding lifecycle management.
*/
public abstract class AbstractBindingLifecycle implements SmartLifecycle, ApplicationContextAware {
protected ApplicationContext applicationContext;
protected volatile boolean running;
public void start();
public void stop();
public boolean isRunning();
public int getPhase();
public boolean isAutoStartup();
public void stop(Runnable callback);
public void setApplicationContext(ApplicationContext applicationContext);
protected abstract void doStartWithBindingService(BindingService bindingService);
protected abstract void doStopWithBindingService(BindingService bindingService);
}
/**
* Lifecycle management for input bindings.
*/
public class InputBindingLifecycle extends AbstractBindingLifecycle {
private final Bindable bindable;
private final Collection<String> inputNames;
public InputBindingLifecycle(Bindable bindable, Collection<String> inputNames);
protected void doStartWithBindingService(BindingService bindingService);
protected void doStopWithBindingService(BindingService bindingService);
}
/**
* Lifecycle management for output bindings.
*/
public class OutputBindingLifecycle extends AbstractBindingLifecycle {
private final Bindable bindable;
private final Collection<String> outputNames;
public OutputBindingLifecycle(Bindable bindable, Collection<String> outputNames);
protected void doStartWithBindingService(BindingService bindingService);
protected void doStopWithBindingService(BindingService bindingService);
}
/**
* Controller for managing bindings lifecycle.
*/
public class BindingsLifecycleController implements ApplicationContextAware {
private ApplicationContext applicationContext;
/**
* Change the state of a binding.
* @param name the binding name
* @param state the desired state
*/
public void changeState(String name, State state);
/**
* Query the states of all bindings.
* @return map of binding states keyed by name
*/
public Map<String, List<BindingInformation>> queryStates();
/**
* Query the state of a specific binding.
* @param name the binding name
* @return list of binding information for the named binding
*/
public List<BindingInformation> queryState(String name);
public void setApplicationContext(ApplicationContext applicationContext);
/**
* Information about a binding's current state.
*/
public static class BindingInformation {
private final String bindingName;
private final State state;
private final String group;
private final boolean pausable;
public BindingInformation(String bindingName, State state, String group, boolean pausable);
public String getBindingName();
public State getState();
public String getGroup();
public boolean isPausable();
}
}Callbacks for handling new destination binding events.
/**
* Callback interface for new destination binding configuration.
* @param <T> the binding target type
*/
public interface NewDestinationBindingCallback<T> {
/**
* Configure a new destination binding.
* @param channelName the channel name
* @param channel the channel being bound
* @param producerProperties producer properties for the binding
* @param extendedProducerProperties extended producer properties
*/
void configure(String channelName, T channel, ProducerProperties producerProperties, Object extendedProducerProperties);
}Channel interceptor for handling message partitioning.
/**
* Channel interceptor for partitioning messages.
*/
public class DefaultPartitioningInterceptor implements ChannelInterceptor, BeanFactoryAware {
private BeanFactory beanFactory;
/**
* Intercept message sending to add partition information.
* @param message the message being sent
* @param channel the channel
* @return the message with partition information added
*/
public Message<?> preSend(Message<?> message, MessageChannel channel);
public void setBeanFactory(BeanFactory beanFactory);
}Utility classes for binding management operations.
/**
* Defines supported bindable features.
*/
public class SupportedBindableFeatures {
private boolean synchronousConsumer = false;
private boolean pollableConsumer = false;
private boolean routeToDlq = false;
public boolean isSynchronousConsumer();
public void setSynchronousConsumer(boolean synchronousConsumer);
public boolean isPollableConsumer();
public void setPollableConsumer(boolean pollableConsumer);
public boolean isRouteToDlq();
public void setRouteToDlq(boolean routeToDlq);
}
/**
* Router that is aware of binder configuration.
*/
public class BinderAwareRouter extends AbstractMappingMessageRouter implements ApplicationContextAware {
private ApplicationContext applicationContext;
protected String getChannelKey(Message<?> message);
public void setApplicationContext(ApplicationContext applicationContext);
}
/**
* Listener for context start after refresh events.
*/
public class ContextStartAfterRefreshListener implements ApplicationListener<ContextRefreshedEvent> {
public void onApplicationEvent(ContextRefreshedEvent event);
}Usage Examples:
import org.springframework.cloud.stream.binding.*;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
// Creating a bindable service
@Component
public class MyMessageService implements Bindable {
private final Map<String, MessageChannel> inputs = new HashMap<>();
private final Map<String, MessageChannel> outputs = new HashMap<>();
@Override
public Set<String> getInputs() {
return inputs.keySet();
}
@Override
public Set<String> getOutputs() {
return outputs.keySet();
}
@Override
public void bindInputs(BindingService bindingService) {
for (Map.Entry<String, MessageChannel> entry : inputs.entrySet()) {
bindingService.bindConsumer(entry.getValue(), entry.getKey());
}
}
@Override
public void bindOutputs(BindingService bindingService) {
for (Map.Entry<String, MessageChannel> entry : outputs.entrySet()) {
bindingService.bindProducer(entry.getValue(), entry.getKey());
}
}
@Override
public void unbindInputs() {
// Implementation for unbinding inputs
}
@Override
public void unbindOutputs() {
// Implementation for unbinding outputs
}
public void addInput(String name, MessageChannel channel) {
inputs.put(name, channel);
}
public void addOutput(String name, MessageChannel channel) {
outputs.put(name, channel);
}
}
// Using BindingService directly
@Service
public class DynamicBindingService {
private final BindingService bindingService;
public DynamicBindingService(BindingService bindingService) {
this.bindingService = bindingService;
}
public void createDynamicConsumer(String inputName) {
MessageChannel channel = new DirectChannel();
channel.subscribe(message -> {
System.out.println("Received: " + message.getPayload());
});
Collection<Binding<Object>> bindings = bindingService.bindConsumer(channel, inputName);
// Store bindings for later cleanup if needed
}
public void createDynamicProducer(String outputName) {
MessageChannel channel = new DirectChannel();
Binding<MessageChannel> binding = bindingService.bindProducer(channel, outputName);
// Now you can send messages through this channel
channel.send(MessageBuilder.withPayload("Hello World").build());
}
}
// Custom channel configurer
@Component
public class CustomChannelConfigurer implements MessageChannelConfigurer {
@Override
public void configureInputChannel(MessageChannel channel, String channelName) {
if (channel instanceof AbstractMessageChannel) {
AbstractMessageChannel abstractChannel = (AbstractMessageChannel) channel;
// Add custom interceptors
abstractChannel.addInterceptor(new MyCustomInterceptor());
}
}
@Override
public void configureOutputChannel(MessageChannel channel, String channelName) {
if (channel instanceof AbstractMessageChannel) {
AbstractMessageChannel abstractChannel = (AbstractMessageChannel) channel;
// Add custom interceptors for output
abstractChannel.addInterceptor(new MyOutputInterceptor());
}
}
}
// Managing binding lifecycle
@Component
public class BindingController {
private final BindingsLifecycleController lifecycleController;
public BindingController(BindingsLifecycleController lifecycleController) {
this.lifecycleController = lifecycleController;
}
public void pauseBinding(String bindingName) {
lifecycleController.changeState(bindingName, State.PAUSED);
}
public void resumeBinding(String bindingName) {
lifecycleController.changeState(bindingName, State.RESUMED);
}
public void stopBinding(String bindingName) {
lifecycleController.changeState(bindingName, State.STOPPED);
}
public Map<String, List<BindingsLifecycleController.BindingInformation>> getAllBindingStates() {
return lifecycleController.queryStates();
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-cloud--spring-cloud-stream