A framework for building message-driven microservice applications on Spring Boot with Spring Integration.
—
The core binder framework provides foundational abstractions for connecting applications to message brokers. It defines the pluggable middleware abstraction that allows Spring Cloud Stream to work with different messaging systems like Apache Kafka, RabbitMQ, and others.
The primary abstraction for connecting applications to messaging middleware.
/**
* Strategy interface for binding app interfaces to logical names.
* @param <T> the binding target type
* @param <C> the consumer properties type
* @param <P> the producer properties type
*/
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
/**
* Bind a consumer to the given name and group.
* @param name the logical name of the target
* @param group the consumer group
* @param inboundBindTarget the consumer binding target
* @param consumerProperties consumer configuration properties
* @return the binding handle
*/
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
/**
* Bind a producer to the given name.
* @param name the logical name of the target
* @param outboundBindTarget the producer binding target
* @param producerProperties producer configuration properties
* @return the binding handle
*/
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
/**
* Get the binder identity for management purposes.
* @return the binder identity, or null if not supported
*/
default String getBinderIdentity() {
return null;
}
}Represents the connection between application components and messaging infrastructure.
/**
* Represents the binding between an input/output and an adapter endpoint.
* @param <T> the binding target type
*/
public interface Binding<T> extends Pausable {
/**
* Get the logical name of this binding.
* @return the binding name
*/
String getName();
/**
* Unbind this binding, cleaning up resources.
*/
void unbind();
/**
* Get the current state of this binding.
* @return the binding state
*/
State getState();
/**
* Get the binding name for management purposes.
* @return the binding name
*/
String getBindingName();
/**
* Check if this is an input binding.
* @return true if input binding, false if output
*/
boolean isInput();
/**
* Get the name of the binder that created this binding.
* @return the binder name
*/
String getBinderName();
// Lifecycle methods from Pausable
void pause();
void resume();
boolean isPaused();
}Factory for creating and managing binder instances.
/**
* Factory for creating binder instances.
*/
public interface BinderFactory {
/**
* Get a binder instance for the specified name and type.
* @param name the binder name, or null for default
* @param bindingTargetType the binding target type
* @return the binder instance
*/
<T> Binder<T, ?, ?> getBinder(String name, Class<T> bindingTargetType);
}
/**
* Default implementation of BinderFactory.
*/
public class DefaultBinderFactory implements BinderFactory, DisposableBean, ApplicationContextAware {
public <T> Binder<T, ?, ?> getBinder(String name, Class<T> bindingTargetType);
public void destroy();
public void setApplicationContext(ApplicationContext applicationContext);
}Registry for managing available binder types.
/**
* Registry of available binder types.
*/
public interface BinderTypeRegistry {
/**
* Get a binder type by name.
* @param name the binder type name
* @return the binder type, or null if not found
*/
BinderType get(String name);
/**
* Get all registered binder types.
* @return map of binder types keyed by name
*/
Map<String, BinderType> getAll();
}
/**
* Default implementation of BinderTypeRegistry.
*/
public class DefaultBinderTypeRegistry implements BinderTypeRegistry {
public BinderType get(String name);
public Map<String, BinderType> getAll();
}Extended interfaces for binders with additional capabilities.
/**
* Binder with extended properties support.
* @param <T> the binding target type
* @param <C> the consumer properties type
* @param <P> the producer properties type
*/
public interface ExtendedPropertiesBinder<T, C extends ConsumerProperties, P extends ProducerProperties> extends Binder<T, C, P> {
/**
* Get extended binding properties.
* @param channelName the channel name
* @return the extended binding properties, or null if not supported
*/
ExtendedBindingProperties<C, P> getExtendedPropertiesEntryIfAny(String channelName);
}
/**
* Binder for pollable consumers.
* @param <H> the handler type
* @param <C> the consumer properties type
*/
public interface PollableConsumerBinder<H, C extends ConsumerProperties> extends Binder<PollableSource<H>, C, ProducerProperties> {
// Inherits binding methods from Binder interface
}Interface for pollable message sources.
/**
* Interface for pollable sources.
* @param <H> the handler type
*/
public interface PollableSource<H> {
/**
* Poll for a message.
* @param handler the message handler
* @return true if a message was received and handled
*/
boolean poll(H handler);
}
/**
* Message-specific pollable source.
*/
public interface PollableMessageSource extends PollableSource<MessageHandler> {
/**
* Poll for a message with timeout.
* @param handler the message handler
* @param timeout the timeout duration
* @return true if a message was received within timeout
*/
default boolean poll(MessageHandler handler, Duration timeout) {
return poll(handler);
}
}
/**
* Default implementation of PollableMessageSource.
*/
public class DefaultPollableMessageSource implements PollableMessageSource, BeanFactoryAware {
public boolean poll(MessageHandler handler);
public boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type);
public void setBeanFactory(BeanFactory beanFactory);
}Base classes for implementing custom binders.
/**
* Base class for binder implementations.
* @param <T> the binding target type
* @param <C> the consumer properties type
* @param <P> the producer properties type
*/
public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends ProducerProperties> implements Binder<T, C, P>, BeanFactoryAware, InitializingBean {
protected abstract Binding<T> doBindConsumer(String name, String group, T inputTarget, C properties);
protected abstract Binding<T> doBindProducer(String name, T outputTarget, P properties);
public final Binding<T> bindConsumer(String name, String group, T inputTarget, C properties);
public final Binding<T> bindProducer(String name, T outputTarget, P properties);
public void setBeanFactory(BeanFactory beanFactory);
public void afterPropertiesSet();
}
/**
* Base class for message channel binders.
* @param <C> the consumer properties type
* @param <P> the producer properties type
* @param <PP> the provisioning provider type
*/
public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties, P extends ProducerProperties, PP extends ProvisioningProvider<C, P>> extends AbstractBinder<MessageChannel, C, P> implements ExtendedPropertiesBinder<MessageChannel, C, P> {
protected abstract Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel, C properties);
protected abstract Binding<MessageChannel> doBindProducer(String name, MessageChannel outputChannel, P properties);
public ExtendedBindingProperties<C, P> getExtendedPropertiesEntryIfAny(String channelName);
}Configuration classes and data types for binding management.
/**
* Configuration holder for a binder.
*/
public class BinderConfiguration {
private final Map<String, Object> properties;
private final boolean inheritEnvironment;
private final boolean defaultCandidate;
public BinderConfiguration(String type, Map<String, Object> properties, boolean inheritEnvironment, boolean defaultCandidate);
public String getConfigurationName();
public Map<String, Object> getProperties();
public boolean isInheritEnvironment();
public boolean isDefaultCandidate();
}
/**
* Represents a binder type with configuration classes.
*/
public class BinderType {
private final String typeName;
private final Class<?>[] configurationClasses;
public BinderType(String typeName, Class<?>[] configurationClasses);
public String getTypeName();
public Class<?>[] getConfigurationClasses();
}
/**
* Default binding implementation.
* @param <T> the binding target type
*/
public class DefaultBinding<T> implements Binding<T> {
public DefaultBinding(String name, String group, T target, Lifecycle lifecycle);
public String getName();
public void unbind();
public State getState();
public String getBindingName();
public boolean isInput();
public String getBinderName();
public void pause();
public void resume();
public boolean isPaused();
}Support for message partitioning across multiple consumers.
/**
* Handles partitioning logic for messages.
*/
public class PartitionHandler {
/**
* Create a partition handler with the given selector strategy.
* @param partitionSelectorStrategy the partition selector
* @param partitionCount the number of partitions
*/
public PartitionHandler(PartitionSelectorStrategy partitionSelectorStrategy, int partitionCount);
/**
* Determine the partition for a message.
* @param message the message to partition
* @return the partition number
*/
public int determinePartition(Message<?> message);
}
/**
* Strategy for extracting partition keys from messages.
*/
public interface PartitionKeyExtractorStrategy {
/**
* Extract the partition key from a message.
* @param message the message
* @return the partition key
*/
Object extractKey(Message<?> message);
}
/**
* Strategy for selecting partitions based on keys.
*/
public interface PartitionSelectorStrategy {
/**
* Select a partition for the given key.
* @param key the partition key
* @param partitionCount the total number of partitions
* @return the selected partition number
*/
int selectPartition(Object key, int partitionCount);
}Utility classes for message handling and MIME type processing.
/**
* Container for message values and headers.
*/
public class MessageValues implements Map<String, Object>, Serializable {
public MessageValues(Message<?> original);
public MessageValues(Object payload, Map<String, Object> headers);
public Object getPayload();
public void setPayload(Object payload);
public Map<String, Object> getHeaders();
public Message<Object> toMessage();
// Map interface methods
public Object get(Object key);
public Object put(String key, Object value);
public Set<String> keySet();
public Collection<Object> values();
public Set<Entry<String, Object>> entrySet();
// ... other Map methods
}
/**
* Utilities for Java class MIME type handling.
*/
public class JavaClassMimeTypeUtils {
public static final String JAVA_OBJECT_TYPE = "application/x-java-object";
public static final String JAVA_SERIALIZED_OBJECT_TYPE = "application/x-java-serialized-object";
/**
* Convert class name to MIME type.
* @param className the Java class name
* @return the corresponding MIME type
*/
public static MimeType classNameToMimeType(String className);
/**
* Convert MIME type to class name.
* @param mimeType the MIME type
* @return the corresponding Java class name, or null if not applicable
*/
public static String mimeTypeToClassName(MimeType mimeType);
}
/**
* Utilities for embedded header handling.
*/
public class EmbeddedHeaderUtils {
/**
* Extract headers embedded in message payload.
* @param message the message with embedded headers
* @param headerNames the names of headers to extract
* @return the message with headers extracted to message headers
*/
public static Message<byte[]> extractHeaders(Message<byte[]> message, String... headerNames);
/**
* Embed headers into message payload.
* @param message the message
* @param headerNames the names of headers to embed
* @return the message with headers embedded in payload
*/
public static Message<byte[]> embedHeaders(MessageValues message, String... headerNames);
}Constants and enumerations used throughout the binder framework.
/**
* Header constants for binder operations.
*/
public class BinderHeaders {
public static final String STANDARD_HEADERS = "standardHeaders";
public static final String TARGET_DESTINATION = "scst_targetDestination";
public static final String PARTITION_HEADER = "scst_partition";
public static final String PARTITION_OVERRIDE = "scst_partitionOverride";
public static final String NATIVE_HEADERS_PRESENT = "nativeHeadersPresent";
public static final String SCST_VERSION = "scst_version";
public static final String NESTED_EXCEPTIONS_HEADER = "scst_nestedExceptions";
}
/**
* Header modes for message processing.
*/
public enum HeaderMode {
/** No headers are processed */
none,
/** Headers are processed normally */
headers,
/** Headers are embedded in message payload */
embeddedHeaders
}Application events related to binding operations.
/**
* Event fired when a binding is created.
*/
public class BindingCreatedEvent extends ApplicationEvent {
/**
* Create a new binding created event.
* @param binding the binding that was created
*/
public BindingCreatedEvent(Binding<?> binding);
/**
* Get the binding that was created.
* @return the binding
*/
public Binding<?> getBinding();
}Exception classes specific to binder operations.
/**
* General binder-related exception.
*/
public class BinderException extends RuntimeException {
public BinderException(String message);
public BinderException(String message, Throwable cause);
}
/**
* Exception to signal message requeuing.
*/
public class RequeueCurrentMessageException extends RuntimeException {
public RequeueCurrentMessageException(String message);
public RequeueCurrentMessageException(String message, Throwable cause);
}Usage Examples:
import org.springframework.cloud.stream.binder.*;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.messaging.MessageChannel;
// Custom binder implementation
public class MyCustomBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, MyProvisioningProvider> {
@Override
protected Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel, ConsumerProperties properties) {
// Implementation for binding consumer
// Connect inputChannel to external message source
return new DefaultBinding<>(name, group, inputChannel, lifecycle);
}
@Override
protected Binding<MessageChannel> doBindProducer(String name, MessageChannel outputChannel, ProducerProperties properties) {
// Implementation for binding producer
// Connect outputChannel to external message destination
return new DefaultBinding<>(name, null, outputChannel, lifecycle);
}
}
// Using BinderFactory
@Component
public class MessageService {
private final BinderFactory binderFactory;
public MessageService(BinderFactory binderFactory) {
this.binderFactory = binderFactory;
}
public void createDynamicBinding() {
Binder<MessageChannel, ?, ?> binder = binderFactory.getBinder("kafka", MessageChannel.class);
MessageChannel channel = new DirectChannel();
Binding<MessageChannel> binding = binder.bindProducer("my-topic", channel, new ProducerProperties());
// Use the binding...
// Clean up
binding.unbind();
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-springframework-cloud--spring-cloud-stream