or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

actuator-integration.mdbinder-framework.mdbinding-management.mdconfiguration.mdfunction-support.mdindex.mdmessage-conversion.md
tile.json

tessl/maven-org-springframework-cloud--spring-cloud-stream

A framework for building message-driven microservice applications on Spring Boot with Spring Integration.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.springframework.cloud/spring-cloud-stream@4.3.x

To install, run

npx @tessl/cli install tessl/maven-org-springframework-cloud--spring-cloud-stream@4.3.0

index.mddocs/

Spring Cloud Stream

Spring Cloud Stream is a framework for building message-driven microservice applications on Spring Boot. It provides opinionated configuration for message brokers, introducing concepts like persistent publish-subscribe semantics, consumer groups, and partitions. The framework builds upon Spring Integration to provide connectivity to message brokers such as Apache Kafka and RabbitMQ.

Package Information

  • Package Name: spring-cloud-stream
  • Package Type: maven
  • Group ID: org.springframework.cloud
  • Artifact ID: spring-cloud-stream
  • Language: Java
  • Installation: Add to Maven pom.xml:
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>4.3.0</version>
</dependency>

For Gradle:

implementation 'org.springframework.cloud:spring-cloud-stream:4.3.0'

Core Imports

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.cloud.stream.annotation.StreamRetryTemplate;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binding.BindingService;
import org.springframework.cloud.stream.config.BindingServiceProperties;

Basic Usage

Functional Programming Approach (Recommended)

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

@SpringBootApplication
public class StreamApplication {
    
    // StreamBridge for dynamic message sending
    private final StreamBridge streamBridge;
    
    public StreamApplication(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }
    
    // Consumer function - receives messages
    @Bean
    public Consumer<String> handleInput() {
        return message -> {
            System.out.println("Received: " + message);
            // Process message
        };
    }
    
    // Function - transforms messages
    @Bean
    public Function<String, String> processData() {
        return input -> input.toUpperCase();
    }
    
    // Supplier - produces messages
    @Bean
    public Supplier<String> sendMessage() {
        return () -> "Hello from Spring Cloud Stream";
    }
    
    // Dynamic message sending
    public void sendDynamicMessage(String destination, Object message) {
        streamBridge.send(destination, message);
    }
    
    public static void main(String[] args) {
        SpringApplication.run(StreamApplication.class, args);
    }
}

Configuration Example

spring:
  cloud:
    stream:
      bindings:
        handleInput-in-0:
          destination: input-topic
          group: my-group
        processData-in-0:
          destination: process-input
        processData-out-0:
          destination: process-output
        sendMessage-out-0:
          destination: output-topic
      binders:
        kafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092

Architecture

Spring Cloud Stream is built around several key architectural components:

  • Binder Abstraction: Pluggable middleware abstraction that connects applications to message brokers
  • Binding Framework: Creates and manages connections between applications and message channels
  • Function-Based Programming: Integration with Spring Cloud Function for reactive and imperative programming models
  • Message Conversion: Automatic conversion between different message formats and content types
  • Configuration System: Comprehensive property-based configuration for bindings and binders
  • Actuator Integration: Health indicators and management endpoints for monitoring and control

Capabilities

Core Binder Framework

Foundational abstractions for connecting applications to message brokers, including binder interfaces, property configurations, and implementation support for different messaging middleware.

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
    Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
    Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
    default String getBinderIdentity() { return null; }
}

public interface Binding<T> extends Pausable {
    String getName();
    void unbind();
    State getState();
    String getBindingName();
    boolean isInput();
    String getBinderName();
}

Core Binder Framework

Binding Management

Centralized binding lifecycle management, proxy creation, and channel configuration for connecting application components to messaging infrastructure.

public class BindingService {
    public Collection<Binding<Object>> bindConsumer(Object inputTarget, String inputName);
    public Binding<MessageChannel> bindProducer(Object outputTarget, String outputName);
    public void unbindConsumers(String inputName);
    public void unbindProducers(String outputName);
}

public interface Bindable {
    Set<String> getInputs();
    Set<String> getOutputs();
    void bindInputs(BindingService bindingService);
    void bindOutputs(BindingService bindingService);
}

Binding Management

Function Programming Support

Modern functional programming model integration with Spring Cloud Function, providing StreamBridge for dynamic messaging and function-based message processing.

public class StreamBridge implements StreamOperations {
    public boolean send(String bindingName, Object data);
    public boolean send(String bindingName, Object data, MimeType outputContentType);
    public boolean send(String bindingName, @Nullable Object data, @Nullable MimeType outputContentType, @Nullable PartitionSupport partitionSupport);
}

public interface StreamOperations {
    boolean send(String bindingName, Object data);
    boolean send(String bindingName, Object data, MimeType outputContentType);
}

Function Programming Support

Configuration System

Comprehensive configuration framework for binding properties, binder settings, and Spring Boot auto-configuration integration.

@ConfigurationProperties("spring.cloud.stream")
public class BindingServiceProperties {
    private String defaultBinder;
    private Map<String, BinderProperties> binders = new HashMap<>();
    private Map<String, BindingProperties> bindings = new HashMap<>();
    private int instanceCount = 1;
    private int instanceIndex = 0;
    private boolean dynamicDestinations = true;
}

public class BindingProperties {
    private String destination;
    private String group;
    private String contentType;
    private String binder;
    private ConsumerProperties consumer = new ConsumerProperties();
    private ProducerProperties producer = new ProducerProperties();
}

Configuration System

Message Conversion

Message conversion framework for handling different content types, MIME types, and serialization formats across various messaging systems.

public class CompositeMessageConverterFactory {
    public MessageConverter getMessageConverterForAllRegistered();
    public static MessageConverter getMessageConverterForType(MimeType mimeType);
}

public class ObjectStringMessageConverter extends AbstractMessageConverter {
    protected boolean supports(Class<?> clazz);
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint);
    protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint);
}

Message Conversion

Actuator Integration

Spring Boot Actuator integration providing health indicators, management endpoints, and monitoring capabilities for Spring Cloud Stream applications.

@Endpoint(id = "bindings")
public class BindingsEndpoint {
    @WriteOperation
    public void changeState(@Selector String name, State state);
    
    @ReadOperation
    public Map<String, Object> queryStates();
}

@Endpoint(id = "channels")
public class ChannelsEndpoint implements ApplicationContextAware {
    @ReadOperation
    public Map<String, Object> channels();
}

Actuator Integration

Common Types

public class ConsumerProperties {
    private int maxAttempts = 3;
    private int backOffInitialInterval = 1000;
    private int backOffMaxInterval = 10000;
    private double backOffMultiplier = 2.0;
    private boolean defaultRetryable = true;
    private int concurrency = 1;
    private boolean partitioned = false;
    private HeaderMode headerMode = HeaderMode.embeddedHeaders;
    private boolean useNativeDecoding = false;
    private boolean multiplex = false;
}

public class ProducerProperties {
    private int partitionCount = 1;
    private String partitionKeyExpression;
    private String partitionKeyExtractorName;
    private String partitionSelectorName;
    private String partitionSelectorExpression;
    private boolean partitioned = false;
    private RequiredGroups requiredGroups = new RequiredGroups();
    private HeaderMode headerMode = HeaderMode.embeddedHeaders;
    private boolean useNativeEncoding = false;
    private boolean errorChannelEnabled = false;
    private boolean sync = false;
}

public enum HeaderMode {
    none, headers, embeddedHeaders
}

public enum State {
    STARTED, STOPPED, PAUSED, RESUMED
}

Exception Handling

public class BinderException extends RuntimeException {
    public BinderException(String message);
    public BinderException(String message, Throwable cause);
}

public class ConversionException extends RuntimeException {
    public ConversionException(String message, Throwable cause);
}

public class ProvisioningException extends NestedRuntimeException {
    public ProvisioningException(String message);
    public ProvisioningException(String message, Throwable cause);
}

public class RequeueCurrentMessageException extends RuntimeException {
    public RequeueCurrentMessageException(String message);
    public RequeueCurrentMessageException(String message, Throwable cause);
}

Annotations

@Target({ElementType.FIELD, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Bean
@Qualifier
public @interface StreamRetryTemplate {
}