A framework for building message-driven microservice applications on Spring Boot with Spring Integration.
npx @tessl/cli install tessl/maven-org-springframework-cloud--spring-cloud-stream@4.3.0Spring Cloud Stream is a framework for building message-driven microservice applications on Spring Boot. It provides opinionated configuration for message brokers, introducing concepts like persistent publish-subscribe semantics, consumer groups, and partitions. The framework builds upon Spring Integration to provide connectivity to message brokers such as Apache Kafka and RabbitMQ.
pom.xml:<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
<version>4.3.0</version>
</dependency>For Gradle:
implementation 'org.springframework.cloud:spring-cloud-stream:4.3.0'import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.cloud.stream.annotation.StreamRetryTemplate;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.config.BindingServiceProperties;import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@SpringBootApplication
public class StreamApplication {
// StreamBridge for dynamic message sending
private final StreamBridge streamBridge;
public StreamApplication(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
// Consumer function - receives messages
@Bean
public Consumer<String> handleInput() {
return message -> {
System.out.println("Received: " + message);
// Process message
};
}
// Function - transforms messages
@Bean
public Function<String, String> processData() {
return input -> input.toUpperCase();
}
// Supplier - produces messages
@Bean
public Supplier<String> sendMessage() {
return () -> "Hello from Spring Cloud Stream";
}
// Dynamic message sending
public void sendDynamicMessage(String destination, Object message) {
streamBridge.send(destination, message);
}
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
}spring:
cloud:
stream:
bindings:
handleInput-in-0:
destination: input-topic
group: my-group
processData-in-0:
destination: process-input
processData-out-0:
destination: process-output
sendMessage-out-0:
destination: output-topic
binders:
kafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092Spring Cloud Stream is built around several key architectural components:
Foundational abstractions for connecting applications to message brokers, including binder interfaces, property configurations, and implementation support for different messaging middleware.
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
default String getBinderIdentity() { return null; }
}
public interface Binding<T> extends Pausable {
String getName();
void unbind();
State getState();
String getBindingName();
boolean isInput();
String getBinderName();
}Centralized binding lifecycle management, proxy creation, and channel configuration for connecting application components to messaging infrastructure.
public class BindingService {
public Collection<Binding<Object>> bindConsumer(Object inputTarget, String inputName);
public Binding<MessageChannel> bindProducer(Object outputTarget, String outputName);
public void unbindConsumers(String inputName);
public void unbindProducers(String outputName);
}
public interface Bindable {
Set<String> getInputs();
Set<String> getOutputs();
void bindInputs(BindingService bindingService);
void bindOutputs(BindingService bindingService);
}Modern functional programming model integration with Spring Cloud Function, providing StreamBridge for dynamic messaging and function-based message processing.
public class StreamBridge implements StreamOperations {
public boolean send(String bindingName, Object data);
public boolean send(String bindingName, Object data, MimeType outputContentType);
public boolean send(String bindingName, @Nullable Object data, @Nullable MimeType outputContentType, @Nullable PartitionSupport partitionSupport);
}
public interface StreamOperations {
boolean send(String bindingName, Object data);
boolean send(String bindingName, Object data, MimeType outputContentType);
}Comprehensive configuration framework for binding properties, binder settings, and Spring Boot auto-configuration integration.
@ConfigurationProperties("spring.cloud.stream")
public class BindingServiceProperties {
private String defaultBinder;
private Map<String, BinderProperties> binders = new HashMap<>();
private Map<String, BindingProperties> bindings = new HashMap<>();
private int instanceCount = 1;
private int instanceIndex = 0;
private boolean dynamicDestinations = true;
}
public class BindingProperties {
private String destination;
private String group;
private String contentType;
private String binder;
private ConsumerProperties consumer = new ConsumerProperties();
private ProducerProperties producer = new ProducerProperties();
}Message conversion framework for handling different content types, MIME types, and serialization formats across various messaging systems.
public class CompositeMessageConverterFactory {
public MessageConverter getMessageConverterForAllRegistered();
public static MessageConverter getMessageConverterForType(MimeType mimeType);
}
public class ObjectStringMessageConverter extends AbstractMessageConverter {
protected boolean supports(Class<?> clazz);
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint);
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint);
}Spring Boot Actuator integration providing health indicators, management endpoints, and monitoring capabilities for Spring Cloud Stream applications.
@Endpoint(id = "bindings")
public class BindingsEndpoint {
@WriteOperation
public void changeState(@Selector String name, State state);
@ReadOperation
public Map<String, Object> queryStates();
}
@Endpoint(id = "channels")
public class ChannelsEndpoint implements ApplicationContextAware {
@ReadOperation
public Map<String, Object> channels();
}public class ConsumerProperties {
private int maxAttempts = 3;
private int backOffInitialInterval = 1000;
private int backOffMaxInterval = 10000;
private double backOffMultiplier = 2.0;
private boolean defaultRetryable = true;
private int concurrency = 1;
private boolean partitioned = false;
private HeaderMode headerMode = HeaderMode.embeddedHeaders;
private boolean useNativeDecoding = false;
private boolean multiplex = false;
}
public class ProducerProperties {
private int partitionCount = 1;
private String partitionKeyExpression;
private String partitionKeyExtractorName;
private String partitionSelectorName;
private String partitionSelectorExpression;
private boolean partitioned = false;
private RequiredGroups requiredGroups = new RequiredGroups();
private HeaderMode headerMode = HeaderMode.embeddedHeaders;
private boolean useNativeEncoding = false;
private boolean errorChannelEnabled = false;
private boolean sync = false;
}
public enum HeaderMode {
none, headers, embeddedHeaders
}
public enum State {
STARTED, STOPPED, PAUSED, RESUMED
}public class BinderException extends RuntimeException {
public BinderException(String message);
public BinderException(String message, Throwable cause);
}
public class ConversionException extends RuntimeException {
public ConversionException(String message, Throwable cause);
}
public class ProvisioningException extends NestedRuntimeException {
public ProvisioningException(String message);
public ProvisioningException(String message, Throwable cause);
}
public class RequeueCurrentMessageException extends RuntimeException {
public RequeueCurrentMessageException(String message);
public RequeueCurrentMessageException(String message, Throwable cause);
}@Target({ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Bean
@Qualifier
public @interface StreamRetryTemplate {
}