A framework for building message-driven microservice applications on Spring Boot with Spring Integration.
npx @tessl/cli install tessl/maven-org-springframework-cloud--spring-cloud-stream@4.3.00
# Spring Cloud Stream
1
2
Spring Cloud Stream is a framework for building message-driven microservice applications on Spring Boot. It provides opinionated configuration for message brokers, introducing concepts like persistent publish-subscribe semantics, consumer groups, and partitions. The framework builds upon Spring Integration to provide connectivity to message brokers such as Apache Kafka and RabbitMQ.
3
4
## Package Information
5
6
- **Package Name**: spring-cloud-stream
7
- **Package Type**: maven
8
- **Group ID**: org.springframework.cloud
9
- **Artifact ID**: spring-cloud-stream
10
- **Language**: Java
11
- **Installation**: Add to Maven `pom.xml`:
12
13
```xml
14
<dependency>
15
<groupId>org.springframework.cloud</groupId>
16
<artifactId>spring-cloud-stream</artifactId>
17
<version>4.3.0</version>
18
</dependency>
19
```
20
21
For Gradle:
22
23
```gradle
24
implementation 'org.springframework.cloud:spring-cloud-stream:4.3.0'
25
```
26
27
## Core Imports
28
29
```java
30
import org.springframework.cloud.stream.function.StreamBridge;
31
import org.springframework.cloud.stream.annotation.StreamRetryTemplate;
32
import org.springframework.cloud.stream.binder.Binder;
33
import org.springframework.cloud.stream.binding.BindingService;
34
import org.springframework.cloud.stream.config.BindingServiceProperties;
35
```
36
37
## Basic Usage
38
39
### Functional Programming Approach (Recommended)
40
41
```java
42
import org.springframework.cloud.stream.function.StreamBridge;
43
import org.springframework.boot.SpringApplication;
44
import org.springframework.boot.autoconfigure.SpringBootApplication;
45
import org.springframework.context.annotation.Bean;
46
47
import java.util.function.Consumer;
48
import java.util.function.Function;
49
import java.util.function.Supplier;
50
51
@SpringBootApplication
52
public class StreamApplication {
53
54
// StreamBridge for dynamic message sending
55
private final StreamBridge streamBridge;
56
57
public StreamApplication(StreamBridge streamBridge) {
58
this.streamBridge = streamBridge;
59
}
60
61
// Consumer function - receives messages
62
@Bean
63
public Consumer<String> handleInput() {
64
return message -> {
65
System.out.println("Received: " + message);
66
// Process message
67
};
68
}
69
70
// Function - transforms messages
71
@Bean
72
public Function<String, String> processData() {
73
return input -> input.toUpperCase();
74
}
75
76
// Supplier - produces messages
77
@Bean
78
public Supplier<String> sendMessage() {
79
return () -> "Hello from Spring Cloud Stream";
80
}
81
82
// Dynamic message sending
83
public void sendDynamicMessage(String destination, Object message) {
84
streamBridge.send(destination, message);
85
}
86
87
public static void main(String[] args) {
88
SpringApplication.run(StreamApplication.class, args);
89
}
90
}
91
```
92
93
### Configuration Example
94
95
```yaml
96
spring:
97
cloud:
98
stream:
99
bindings:
100
handleInput-in-0:
101
destination: input-topic
102
group: my-group
103
processData-in-0:
104
destination: process-input
105
processData-out-0:
106
destination: process-output
107
sendMessage-out-0:
108
destination: output-topic
109
binders:
110
kafka:
111
type: kafka
112
environment:
113
spring:
114
cloud:
115
stream:
116
kafka:
117
binder:
118
brokers: localhost:9092
119
```
120
121
## Architecture
122
123
Spring Cloud Stream is built around several key architectural components:
124
125
- **Binder Abstraction**: Pluggable middleware abstraction that connects applications to message brokers
126
- **Binding Framework**: Creates and manages connections between applications and message channels
127
- **Function-Based Programming**: Integration with Spring Cloud Function for reactive and imperative programming models
128
- **Message Conversion**: Automatic conversion between different message formats and content types
129
- **Configuration System**: Comprehensive property-based configuration for bindings and binders
130
- **Actuator Integration**: Health indicators and management endpoints for monitoring and control
131
132
## Capabilities
133
134
### Core Binder Framework
135
136
Foundational abstractions for connecting applications to message brokers, including binder interfaces, property configurations, and implementation support for different messaging middleware.
137
138
```java { .api }
139
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
140
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
141
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
142
default String getBinderIdentity() { return null; }
143
}
144
145
public interface Binding<T> extends Pausable {
146
String getName();
147
void unbind();
148
State getState();
149
String getBindingName();
150
boolean isInput();
151
String getBinderName();
152
}
153
```
154
155
[Core Binder Framework](./binder-framework.md)
156
157
### Binding Management
158
159
Centralized binding lifecycle management, proxy creation, and channel configuration for connecting application components to messaging infrastructure.
160
161
```java { .api }
162
public class BindingService {
163
public Collection<Binding<Object>> bindConsumer(Object inputTarget, String inputName);
164
public Binding<MessageChannel> bindProducer(Object outputTarget, String outputName);
165
public void unbindConsumers(String inputName);
166
public void unbindProducers(String outputName);
167
}
168
169
public interface Bindable {
170
Set<String> getInputs();
171
Set<String> getOutputs();
172
void bindInputs(BindingService bindingService);
173
void bindOutputs(BindingService bindingService);
174
}
175
```
176
177
[Binding Management](./binding-management.md)
178
179
### Function Programming Support
180
181
Modern functional programming model integration with Spring Cloud Function, providing StreamBridge for dynamic messaging and function-based message processing.
182
183
```java { .api }
184
public class StreamBridge implements StreamOperations {
185
public boolean send(String bindingName, Object data);
186
public boolean send(String bindingName, Object data, MimeType outputContentType);
187
public boolean send(String bindingName, @Nullable Object data, @Nullable MimeType outputContentType, @Nullable PartitionSupport partitionSupport);
188
}
189
190
public interface StreamOperations {
191
boolean send(String bindingName, Object data);
192
boolean send(String bindingName, Object data, MimeType outputContentType);
193
}
194
```
195
196
[Function Programming Support](./function-support.md)
197
198
### Configuration System
199
200
Comprehensive configuration framework for binding properties, binder settings, and Spring Boot auto-configuration integration.
201
202
```java { .api }
203
@ConfigurationProperties("spring.cloud.stream")
204
public class BindingServiceProperties {
205
private String defaultBinder;
206
private Map<String, BinderProperties> binders = new HashMap<>();
207
private Map<String, BindingProperties> bindings = new HashMap<>();
208
private int instanceCount = 1;
209
private int instanceIndex = 0;
210
private boolean dynamicDestinations = true;
211
}
212
213
public class BindingProperties {
214
private String destination;
215
private String group;
216
private String contentType;
217
private String binder;
218
private ConsumerProperties consumer = new ConsumerProperties();
219
private ProducerProperties producer = new ProducerProperties();
220
}
221
```
222
223
[Configuration System](./configuration.md)
224
225
### Message Conversion
226
227
Message conversion framework for handling different content types, MIME types, and serialization formats across various messaging systems.
228
229
```java { .api }
230
public class CompositeMessageConverterFactory {
231
public MessageConverter getMessageConverterForAllRegistered();
232
public static MessageConverter getMessageConverterForType(MimeType mimeType);
233
}
234
235
public class ObjectStringMessageConverter extends AbstractMessageConverter {
236
protected boolean supports(Class<?> clazz);
237
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint);
238
protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint);
239
}
240
```
241
242
[Message Conversion](./message-conversion.md)
243
244
### Actuator Integration
245
246
Spring Boot Actuator integration providing health indicators, management endpoints, and monitoring capabilities for Spring Cloud Stream applications.
247
248
```java { .api }
249
@Endpoint(id = "bindings")
250
public class BindingsEndpoint {
251
@WriteOperation
252
public void changeState(@Selector String name, State state);
253
254
@ReadOperation
255
public Map<String, Object> queryStates();
256
}
257
258
@Endpoint(id = "channels")
259
public class ChannelsEndpoint implements ApplicationContextAware {
260
@ReadOperation
261
public Map<String, Object> channels();
262
}
263
```
264
265
[Actuator Integration](./actuator-integration.md)
266
267
## Common Types
268
269
```java { .api }
270
public class ConsumerProperties {
271
private int maxAttempts = 3;
272
private int backOffInitialInterval = 1000;
273
private int backOffMaxInterval = 10000;
274
private double backOffMultiplier = 2.0;
275
private boolean defaultRetryable = true;
276
private int concurrency = 1;
277
private boolean partitioned = false;
278
private HeaderMode headerMode = HeaderMode.embeddedHeaders;
279
private boolean useNativeDecoding = false;
280
private boolean multiplex = false;
281
}
282
283
public class ProducerProperties {
284
private int partitionCount = 1;
285
private String partitionKeyExpression;
286
private String partitionKeyExtractorName;
287
private String partitionSelectorName;
288
private String partitionSelectorExpression;
289
private boolean partitioned = false;
290
private RequiredGroups requiredGroups = new RequiredGroups();
291
private HeaderMode headerMode = HeaderMode.embeddedHeaders;
292
private boolean useNativeEncoding = false;
293
private boolean errorChannelEnabled = false;
294
private boolean sync = false;
295
}
296
297
public enum HeaderMode {
298
none, headers, embeddedHeaders
299
}
300
301
public enum State {
302
STARTED, STOPPED, PAUSED, RESUMED
303
}
304
```
305
306
## Exception Handling
307
308
```java { .api }
309
public class BinderException extends RuntimeException {
310
public BinderException(String message);
311
public BinderException(String message, Throwable cause);
312
}
313
314
public class ConversionException extends RuntimeException {
315
public ConversionException(String message, Throwable cause);
316
}
317
318
public class ProvisioningException extends NestedRuntimeException {
319
public ProvisioningException(String message);
320
public ProvisioningException(String message, Throwable cause);
321
}
322
323
public class RequeueCurrentMessageException extends RuntimeException {
324
public RequeueCurrentMessageException(String message);
325
public RequeueCurrentMessageException(String message, Throwable cause);
326
}
327
```
328
329
## Annotations
330
331
```java { .api }
332
@Target({ElementType.FIELD, ElementType.METHOD})
333
@Retention(RetentionPolicy.RUNTIME)
334
@Bean
335
@Qualifier
336
public @interface StreamRetryTemplate {
337
}
338
```