or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-springframework-cloud--spring-cloud-stream

A framework for building message-driven microservice applications on Spring Boot with Spring Integration.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.springframework.cloud/spring-cloud-stream@4.3.x

To install, run

npx @tessl/cli install tessl/maven-org-springframework-cloud--spring-cloud-stream@4.3.0

0

# Spring Cloud Stream

1

2

Spring Cloud Stream is a framework for building message-driven microservice applications on Spring Boot. It provides opinionated configuration for message brokers, introducing concepts like persistent publish-subscribe semantics, consumer groups, and partitions. The framework builds upon Spring Integration to provide connectivity to message brokers such as Apache Kafka and RabbitMQ.

3

4

## Package Information

5

6

- **Package Name**: spring-cloud-stream

7

- **Package Type**: maven

8

- **Group ID**: org.springframework.cloud

9

- **Artifact ID**: spring-cloud-stream

10

- **Language**: Java

11

- **Installation**: Add to Maven `pom.xml`:

12

13

```xml

14

<dependency>

15

<groupId>org.springframework.cloud</groupId>

16

<artifactId>spring-cloud-stream</artifactId>

17

<version>4.3.0</version>

18

</dependency>

19

```

20

21

For Gradle:

22

23

```gradle

24

implementation 'org.springframework.cloud:spring-cloud-stream:4.3.0'

25

```

26

27

## Core Imports

28

29

```java

30

import org.springframework.cloud.stream.function.StreamBridge;

31

import org.springframework.cloud.stream.annotation.StreamRetryTemplate;

32

import org.springframework.cloud.stream.binder.Binder;

33

import org.springframework.cloud.stream.binding.BindingService;

34

import org.springframework.cloud.stream.config.BindingServiceProperties;

35

```

36

37

## Basic Usage

38

39

### Functional Programming Approach (Recommended)

40

41

```java

42

import org.springframework.cloud.stream.function.StreamBridge;

43

import org.springframework.boot.SpringApplication;

44

import org.springframework.boot.autoconfigure.SpringBootApplication;

45

import org.springframework.context.annotation.Bean;

46

47

import java.util.function.Consumer;

48

import java.util.function.Function;

49

import java.util.function.Supplier;

50

51

@SpringBootApplication

52

public class StreamApplication {

53

54

// StreamBridge for dynamic message sending

55

private final StreamBridge streamBridge;

56

57

public StreamApplication(StreamBridge streamBridge) {

58

this.streamBridge = streamBridge;

59

}

60

61

// Consumer function - receives messages

62

@Bean

63

public Consumer<String> handleInput() {

64

return message -> {

65

System.out.println("Received: " + message);

66

// Process message

67

};

68

}

69

70

// Function - transforms messages

71

@Bean

72

public Function<String, String> processData() {

73

return input -> input.toUpperCase();

74

}

75

76

// Supplier - produces messages

77

@Bean

78

public Supplier<String> sendMessage() {

79

return () -> "Hello from Spring Cloud Stream";

80

}

81

82

// Dynamic message sending

83

public void sendDynamicMessage(String destination, Object message) {

84

streamBridge.send(destination, message);

85

}

86

87

public static void main(String[] args) {

88

SpringApplication.run(StreamApplication.class, args);

89

}

90

}

91

```

92

93

### Configuration Example

94

95

```yaml

96

spring:

97

cloud:

98

stream:

99

bindings:

100

handleInput-in-0:

101

destination: input-topic

102

group: my-group

103

processData-in-0:

104

destination: process-input

105

processData-out-0:

106

destination: process-output

107

sendMessage-out-0:

108

destination: output-topic

109

binders:

110

kafka:

111

type: kafka

112

environment:

113

spring:

114

cloud:

115

stream:

116

kafka:

117

binder:

118

brokers: localhost:9092

119

```

120

121

## Architecture

122

123

Spring Cloud Stream is built around several key architectural components:

124

125

- **Binder Abstraction**: Pluggable middleware abstraction that connects applications to message brokers

126

- **Binding Framework**: Creates and manages connections between applications and message channels

127

- **Function-Based Programming**: Integration with Spring Cloud Function for reactive and imperative programming models

128

- **Message Conversion**: Automatic conversion between different message formats and content types

129

- **Configuration System**: Comprehensive property-based configuration for bindings and binders

130

- **Actuator Integration**: Health indicators and management endpoints for monitoring and control

131

132

## Capabilities

133

134

### Core Binder Framework

135

136

Foundational abstractions for connecting applications to message brokers, including binder interfaces, property configurations, and implementation support for different messaging middleware.

137

138

```java { .api }

139

public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {

140

Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

141

Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);

142

default String getBinderIdentity() { return null; }

143

}

144

145

public interface Binding<T> extends Pausable {

146

String getName();

147

void unbind();

148

State getState();

149

String getBindingName();

150

boolean isInput();

151

String getBinderName();

152

}

153

```

154

155

[Core Binder Framework](./binder-framework.md)

156

157

### Binding Management

158

159

Centralized binding lifecycle management, proxy creation, and channel configuration for connecting application components to messaging infrastructure.

160

161

```java { .api }

162

public class BindingService {

163

public Collection<Binding<Object>> bindConsumer(Object inputTarget, String inputName);

164

public Binding<MessageChannel> bindProducer(Object outputTarget, String outputName);

165

public void unbindConsumers(String inputName);

166

public void unbindProducers(String outputName);

167

}

168

169

public interface Bindable {

170

Set<String> getInputs();

171

Set<String> getOutputs();

172

void bindInputs(BindingService bindingService);

173

void bindOutputs(BindingService bindingService);

174

}

175

```

176

177

[Binding Management](./binding-management.md)

178

179

### Function Programming Support

180

181

Modern functional programming model integration with Spring Cloud Function, providing StreamBridge for dynamic messaging and function-based message processing.

182

183

```java { .api }

184

public class StreamBridge implements StreamOperations {

185

public boolean send(String bindingName, Object data);

186

public boolean send(String bindingName, Object data, MimeType outputContentType);

187

public boolean send(String bindingName, @Nullable Object data, @Nullable MimeType outputContentType, @Nullable PartitionSupport partitionSupport);

188

}

189

190

public interface StreamOperations {

191

boolean send(String bindingName, Object data);

192

boolean send(String bindingName, Object data, MimeType outputContentType);

193

}

194

```

195

196

[Function Programming Support](./function-support.md)

197

198

### Configuration System

199

200

Comprehensive configuration framework for binding properties, binder settings, and Spring Boot auto-configuration integration.

201

202

```java { .api }

203

@ConfigurationProperties("spring.cloud.stream")

204

public class BindingServiceProperties {

205

private String defaultBinder;

206

private Map<String, BinderProperties> binders = new HashMap<>();

207

private Map<String, BindingProperties> bindings = new HashMap<>();

208

private int instanceCount = 1;

209

private int instanceIndex = 0;

210

private boolean dynamicDestinations = true;

211

}

212

213

public class BindingProperties {

214

private String destination;

215

private String group;

216

private String contentType;

217

private String binder;

218

private ConsumerProperties consumer = new ConsumerProperties();

219

private ProducerProperties producer = new ProducerProperties();

220

}

221

```

222

223

[Configuration System](./configuration.md)

224

225

### Message Conversion

226

227

Message conversion framework for handling different content types, MIME types, and serialization formats across various messaging systems.

228

229

```java { .api }

230

public class CompositeMessageConverterFactory {

231

public MessageConverter getMessageConverterForAllRegistered();

232

public static MessageConverter getMessageConverterForType(MimeType mimeType);

233

}

234

235

public class ObjectStringMessageConverter extends AbstractMessageConverter {

236

protected boolean supports(Class<?> clazz);

237

protected Object convertFromInternal(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint);

238

protected Object convertToInternal(Object payload, @Nullable MessageHeaders headers, @Nullable Object conversionHint);

239

}

240

```

241

242

[Message Conversion](./message-conversion.md)

243

244

### Actuator Integration

245

246

Spring Boot Actuator integration providing health indicators, management endpoints, and monitoring capabilities for Spring Cloud Stream applications.

247

248

```java { .api }

249

@Endpoint(id = "bindings")

250

public class BindingsEndpoint {

251

@WriteOperation

252

public void changeState(@Selector String name, State state);

253

254

@ReadOperation

255

public Map<String, Object> queryStates();

256

}

257

258

@Endpoint(id = "channels")

259

public class ChannelsEndpoint implements ApplicationContextAware {

260

@ReadOperation

261

public Map<String, Object> channels();

262

}

263

```

264

265

[Actuator Integration](./actuator-integration.md)

266

267

## Common Types

268

269

```java { .api }

270

public class ConsumerProperties {

271

private int maxAttempts = 3;

272

private int backOffInitialInterval = 1000;

273

private int backOffMaxInterval = 10000;

274

private double backOffMultiplier = 2.0;

275

private boolean defaultRetryable = true;

276

private int concurrency = 1;

277

private boolean partitioned = false;

278

private HeaderMode headerMode = HeaderMode.embeddedHeaders;

279

private boolean useNativeDecoding = false;

280

private boolean multiplex = false;

281

}

282

283

public class ProducerProperties {

284

private int partitionCount = 1;

285

private String partitionKeyExpression;

286

private String partitionKeyExtractorName;

287

private String partitionSelectorName;

288

private String partitionSelectorExpression;

289

private boolean partitioned = false;

290

private RequiredGroups requiredGroups = new RequiredGroups();

291

private HeaderMode headerMode = HeaderMode.embeddedHeaders;

292

private boolean useNativeEncoding = false;

293

private boolean errorChannelEnabled = false;

294

private boolean sync = false;

295

}

296

297

public enum HeaderMode {

298

none, headers, embeddedHeaders

299

}

300

301

public enum State {

302

STARTED, STOPPED, PAUSED, RESUMED

303

}

304

```

305

306

## Exception Handling

307

308

```java { .api }

309

public class BinderException extends RuntimeException {

310

public BinderException(String message);

311

public BinderException(String message, Throwable cause);

312

}

313

314

public class ConversionException extends RuntimeException {

315

public ConversionException(String message, Throwable cause);

316

}

317

318

public class ProvisioningException extends NestedRuntimeException {

319

public ProvisioningException(String message);

320

public ProvisioningException(String message, Throwable cause);

321

}

322

323

public class RequeueCurrentMessageException extends RuntimeException {

324

public RequeueCurrentMessageException(String message);

325

public RequeueCurrentMessageException(String message, Throwable cause);

326

}

327

```

328

329

## Annotations

330

331

```java { .api }

332

@Target({ElementType.FIELD, ElementType.METHOD})

333

@Retention(RetentionPolicy.RUNTIME)

334

@Bean

335

@Qualifier

336

public @interface StreamRetryTemplate {

337

}

338

```