or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actuator-integration.mdbinder-framework.mdbinding-management.mdconfiguration.mdfunction-support.mdindex.mdmessage-conversion.md

function-support.mddocs/

0

# Function Programming Support

1

2

Spring Cloud Stream's function programming support provides modern functional programming model integration with Spring Cloud Function. This includes StreamBridge for dynamic messaging, function-based message processing, and seamless integration between imperative and reactive programming models.

3

4

## Capabilities

5

6

### StreamBridge

7

8

Central component for sending messages to output bindings from external sources, enabling dynamic destination routing and type conversion.

9

10

```java { .api }

11

/**

12

* Bridge for sending data to output bindings from external sources.

13

* Supports dynamic destinations, type conversion, and partitioning.

14

*/

15

public class StreamBridge implements StreamOperations, ApplicationContextAware, BeanNameAware {

16

17

/**

18

* Send data to a binding.

19

* @param bindingName the name of the binding

20

* @param data the data to send

21

* @return true if the message was sent successfully

22

*/

23

public boolean send(String bindingName, Object data);

24

25

/**

26

* Send data to a binding with specified content type.

27

* @param bindingName the name of the binding

28

* @param data the data to send

29

* @param outputContentType the content type for the message

30

* @return true if the message was sent successfully

31

*/

32

public boolean send(String bindingName, Object data, MimeType outputContentType);

33

34

/**

35

* Send data to a binding with full control over content type and binder selection.

36

* @param bindingName the name of the binding

37

* @param binderName the specific binder to use (can be null for default)

38

* @param data the data to send

39

* @param outputContentType the content type for the message

40

* @return true if the message was sent successfully

41

*/

42

public boolean send(String bindingName, @Nullable String binderName, Object data, MimeType outputContentType);

43

44

public void setApplicationContext(ApplicationContext applicationContext);

45

public void setBeanName(String name);

46

}

47

48

/**

49

* Basic contract for StreamBridge operations.

50

*/

51

public interface StreamOperations {

52

53

/**

54

* Send data to a binding.

55

* @param bindingName the name of the binding

56

* @param data the data to send

57

* @return true if the message was sent successfully

58

*/

59

boolean send(String bindingName, Object data);

60

61

/**

62

* Send data to a binding with specified content type.

63

* @param bindingName the name of the binding

64

* @param data the data to send

65

* @param outputContentType the content type for the message

66

* @return true if the message was sent successfully

67

*/

68

boolean send(String bindingName, Object data, MimeType outputContentType);

69

}

70

```

71

72

### Function Configuration

73

74

Main configuration class for function-based message processing.

75

76

```java { .api }

77

/**

78

* Main configuration for function-based message processing.

79

* Integrates with Spring Cloud Function catalog.

80

*/

81

@Configuration

82

@EnableConfigurationProperties({StreamFunctionProperties.class})

83

public class FunctionConfiguration implements ApplicationContextAware, EnvironmentAware {

84

85

/**

86

* Creates the primary StreamBridge bean.

87

* @return configured StreamBridge instance

88

*/

89

@Bean

90

public StreamBridge streamBridge();

91

92

/**

93

* Creates function catalog for discovering and managing functions.

94

* @return function catalog instance

95

*/

96

@Bean

97

public FunctionCatalog functionCatalog();

98

99

/**

100

* Creates function inspector for analyzing function signatures.

101

* @return function inspector instance

102

*/

103

@Bean

104

public FunctionInspector functionInspector();

105

106

public void setApplicationContext(ApplicationContext applicationContext);

107

public void setEnvironment(Environment environment);

108

}

109

```

110

111

### Function Properties

112

113

Configuration properties for function-based bindings.

114

115

```java { .api }

116

/**

117

* Properties for stream function configuration.

118

*/

119

@ConfigurationProperties(prefix = "spring.cloud.stream.function")

120

public class StreamFunctionProperties {

121

122

/**

123

* Definition of functions to bind.

124

*/

125

private String definition;

126

127

/**

128

* Whether to use the functional model.

129

*/

130

private boolean autoStartup = true;

131

132

/**

133

* Routing expression for dynamic function routing.

134

*/

135

private String routingExpression;

136

137

public String getDefinition();

138

public void setDefinition(String definition);

139

140

public boolean isAutoStartup();

141

public void setAutoStartup(boolean autoStartup);

142

143

public String getRoutingExpression();

144

public void setRoutingExpression(String routingExpression);

145

}

146

147

/**

148

* Configuration properties for function binding details.

149

*/

150

public class StreamFunctionConfigurationProperties {

151

152

private final Map<String, String> bindings = new HashMap<>();

153

private final Map<String, String> definition = new HashMap<>();

154

155

/**

156

* Get function to binding name mappings.

157

* @return map of function names to binding names

158

*/

159

public Map<String, String> getBindings();

160

161

/**

162

* Get function definitions.

163

* @return map of function definitions

164

*/

165

public Map<String, String> getDefinition();

166

}

167

```

168

169

### Function Proxy Factory

170

171

Factory for creating function-aware bindable proxies.

172

173

```java { .api }

174

/**

175

* Factory for creating bindable function proxies.

176

* Extends BindableProxyFactory with function-specific capabilities.

177

*/

178

public class BindableFunctionProxyFactory extends BindableProxyFactory implements BeanFactoryAware, InitializingBean {

179

180

private final FunctionCatalog functionCatalog;

181

private final StreamFunctionProperties functionProperties;

182

183

public BindableFunctionProxyFactory(Class<?> type, FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties);

184

185

protected Object createBindableProxy();

186

187

/**

188

* Resolve function binding names from function definitions.

189

* @return map of resolved binding names

190

*/

191

protected Map<String, String> resolveFunctionBindings();

192

193

public void setBeanFactory(BeanFactory beanFactory);

194

public void afterPropertiesSet();

195

}

196

```

197

198

### Function Wrappers and Support

199

200

Support classes for function processing and partitioning.

201

202

```java { .api }

203

/**

204

* Wrapper that adds partition awareness to functions.

205

*/

206

public class PartitionAwareFunctionWrapper implements Function<Object, Object>, ApplicationContextAware {

207

208

private final Function<Object, Object> function;

209

private final ProducerProperties producerProperties;

210

private ApplicationContext applicationContext;

211

212

public PartitionAwareFunctionWrapper(Function<Object, Object> function, ProducerProperties producerProperties);

213

214

/**

215

* Apply the function with partition awareness.

216

* @param input the input object

217

* @return the function result with partition information

218

*/

219

public Object apply(Object input);

220

221

public void setApplicationContext(ApplicationContext applicationContext);

222

}

223

224

/**

225

* Initializer for pollable sources in function context.

226

*/

227

public class PollableSourceInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {

228

229

/**

230

* Initialize pollable sources for function-based applications.

231

* @param applicationContext the application context to initialize

232

*/

233

public void initialize(ConfigurableApplicationContext applicationContext);

234

}

235

236

/**

237

* Environment post processor for routing function configuration.

238

*/

239

public class RoutingFunctionEnvironmentPostProcessor implements EnvironmentPostProcessor {

240

241

/**

242

* Post process the environment to add routing function configuration.

243

* @param environment the environment to post process

244

* @param application the Spring application

245

*/

246

public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application);

247

}

248

```

249

250

### Batch Processing Utilities

251

252

Utilities for handling batch processing in stream functions.

253

254

```java { .api }

255

/**

256

* Utilities for batch processing in stream functions.

257

*/

258

public class StandardBatchUtils {

259

260

/**

261

* Check if batch processing is enabled for the current context.

262

* @return true if batch processing is enabled

263

*/

264

public static boolean isBatchEnabled();

265

266

/**

267

* Extract individual items from a batch message.

268

* @param batchMessage the batch message

269

* @return list of individual items

270

*/

271

public static List<Object> extractBatchItems(Object batchMessage);

272

273

/**

274

* Create a batch message from individual items.

275

* @param items the individual items

276

* @return the batch message

277

*/

278

public static Object createBatchMessage(List<Object> items);

279

280

/**

281

* Check if an object represents a batch.

282

* @param object the object to check

283

* @return true if the object is a batch

284

*/

285

public static boolean isBatch(Object object);

286

}

287

```

288

289

### Function Constants

290

291

Constants used in function processing.

292

293

```java { .api }

294

/**

295

* Constants for function processing.

296

*/

297

public class FunctionConstants {

298

299

/**

300

* Delimiter used in function composition.

301

*/

302

public static final String DELIMITER = "|";

303

304

/**

305

* Default suffix for output bindings.

306

*/

307

public static final String DEFAULT_OUTPUT_SUFFIX = "-out-";

308

309

/**

310

* Default suffix for input bindings.

311

*/

312

public static final String DEFAULT_INPUT_SUFFIX = "-in-";

313

314

/**

315

* Header name for function name.

316

*/

317

public static final String FUNCTION_NAME_HEADER = "spring.cloud.function.definition";

318

319

/**

320

* Default function name for routing.

321

*/

322

public static final String DEFAULT_FUNCTION_NAME = "functionRouter";

323

}

324

```

325

326

### Partition Support

327

328

Support classes for partitioning in function contexts.

329

330

```java { .api }

331

/**

332

* Support for partitioning in function contexts.

333

*/

334

public class PartitionSupport {

335

336

private final String partitionKeyExpression;

337

private final String partitionSelectorExpression;

338

private final int partitionCount;

339

340

/**

341

* Create partition support with key expression.

342

* @param partitionKeyExpression SpEL expression for extracting partition key

343

* @param partitionCount number of partitions

344

*/

345

public PartitionSupport(String partitionKeyExpression, int partitionCount);

346

347

/**

348

* Create partition support with selector expression.

349

* @param partitionKeyExpression SpEL expression for extracting partition key

350

* @param partitionSelectorExpression SpEL expression for selecting partition

351

* @param partitionCount number of partitions

352

*/

353

public PartitionSupport(String partitionKeyExpression, String partitionSelectorExpression, int partitionCount);

354

355

public String getPartitionKeyExpression();

356

public String getPartitionSelectorExpression();

357

public int getPartitionCount();

358

}

359

```

360

361

**Usage Examples:**

362

363

```java

364

import org.springframework.cloud.stream.function.StreamBridge;

365

import org.springframework.boot.SpringApplication;

366

import org.springframework.boot.autoconfigure.SpringBootApplication;

367

import org.springframework.context.annotation.Bean;

368

import org.springframework.messaging.Message;

369

import org.springframework.messaging.support.MessageBuilder;

370

import org.springframework.util.MimeType;

371

372

import java.util.function.Consumer;

373

import java.util.function.Function;

374

import java.util.function.Supplier;

375

376

@SpringBootApplication

377

public class FunctionStreamApplication {

378

379

private final StreamBridge streamBridge;

380

381

public FunctionStreamApplication(StreamBridge streamBridge) {

382

this.streamBridge = streamBridge;

383

}

384

385

// Simple consumer function

386

@Bean

387

public Consumer<String> handleMessages() {

388

return message -> {

389

System.out.println("Processing: " + message);

390

// Business logic here

391

};

392

}

393

394

// Function that transforms messages

395

@Bean

396

public Function<String, String> processData() {

397

return input -> {

398

// Transform the input

399

return input.toUpperCase();

400

};

401

}

402

403

// Supplier that produces messages periodically

404

@Bean

405

public Supplier<String> generateMessages() {

406

return () -> {

407

return "Generated message at " + System.currentTimeMillis();

408

};

409

}

410

411

// Consumer that processes Message objects with headers

412

@Bean

413

public Consumer<Message<String>> handleMessageWithHeaders() {

414

return message -> {

415

String payload = message.getPayload();

416

String correlationId = (String) message.getHeaders().get("correlationId");

417

System.out.println("Processing: " + payload + " with ID: " + correlationId);

418

};

419

}

420

421

// Function that processes reactive streams

422

@Bean

423

public Function<Flux<String>, Flux<String>> processStream() {

424

return flux -> flux

425

.map(String::toUpperCase)

426

.filter(s -> s.length() > 5);

427

}

428

429

// REST endpoint that uses StreamBridge for dynamic messaging

430

@GetMapping("/send/{destination}")

431

public ResponseEntity<String> sendMessage(@PathVariable String destination, @RequestBody String message) {

432

boolean sent = streamBridge.send(destination, message);

433

return sent ? ResponseEntity.ok("Message sent") : ResponseEntity.status(500).body("Failed to send");

434

}

435

436

// Send message with custom content type

437

@GetMapping("/send-json/{destination}")

438

public ResponseEntity<String> sendJsonMessage(@PathVariable String destination, @RequestBody Object data) {

439

boolean sent = streamBridge.send(destination, data, MimeType.valueOf("application/json"));

440

return sent ? ResponseEntity.ok("JSON message sent") : ResponseEntity.status(500).body("Failed to send");

441

}

442

443

// Send message with partitioning

444

@GetMapping("/send-partitioned/{destination}")

445

public ResponseEntity<String> sendPartitionedMessage(@PathVariable String destination, @RequestBody String message, @RequestParam String partitionKey) {

446

PartitionSupport partitionSupport = new PartitionSupport("payload.length()", 3);

447

boolean sent = streamBridge.send(destination, message, null, partitionSupport);

448

return sent ? ResponseEntity.ok("Partitioned message sent") : ResponseEntity.status(500).body("Failed to send");

449

}

450

451

public static void main(String[] args) {

452

SpringApplication.run(FunctionStreamApplication.class, args);

453

}

454

}

455

456

// Advanced function with routing

457

@Component

458

public class RoutingFunctionService {

459

460

@Bean

461

public Function<Message<String>, String> routingFunction() {

462

return message -> {

463

String functionName = (String) message.getHeaders().get("spring.cloud.function.definition");

464

String payload = message.getPayload();

465

466

switch (functionName) {

467

case "uppercase":

468

return payload.toUpperCase();

469

case "lowercase":

470

return payload.toLowerCase();

471

case "reverse":

472

return new StringBuilder(payload).reverse().toString();

473

default:

474

return payload;

475

}

476

};

477

}

478

}

479

480

// Batch processing example

481

@Component

482

public class BatchProcessingService {

483

484

@Bean

485

public Function<List<String>, List<String>> processBatch() {

486

return batch -> {

487

return batch.stream()

488

.map(String::toUpperCase)

489

.filter(s -> !s.isEmpty())

490

.collect(Collectors.toList());

491

};

492

}

493

494

@Bean

495

public Consumer<List<OrderEvent>> processOrderBatch() {

496

return orderBatch -> {

497

for (OrderEvent order : orderBatch) {

498

// Process each order in the batch

499

processOrder(order);

500

}

501

};

502

}

503

504

private void processOrder(OrderEvent order) {

505

// Business logic for processing individual orders

506

System.out.println("Processing order: " + order.getOrderId());

507

}

508

}

509

510

// Reactive function processing

511

@Component

512

public class ReactiveProcessingService {

513

514

@Bean

515

public Function<Flux<SensorData>, Flux<AlertEvent>> processSensorData() {

516

return sensorDataFlux -> sensorDataFlux

517

.window(Duration.ofSeconds(10)) // Window data every 10 seconds

518

.flatMap(window -> window

519

.filter(data -> data.getValue() > 100) // Filter high values

520

.map(data -> new AlertEvent(data.getSensorId(), data.getValue()))

521

);

522

}

523

524

@Bean

525

public Consumer<Flux<String>> logMessages() {

526

return messageFlux -> messageFlux

527

.doOnNext(message -> System.out.println("Logging: " + message))

528

.subscribe();

529

}

530

}

531

532

// Configuration example

533

# application.yml

534

spring:

535

cloud:

536

stream:

537

function:

538

definition: handleMessages;processData;generateMessages

539

bindings:

540

handleMessages-in-0: input-topic

541

processData-in-0: process-input

542

processData-out-0: process-output

543

generateMessages-out-0: generated-messages

544

bindings:

545

input-topic:

546

destination: my-input-topic

547

group: my-group

548

process-input:

549

destination: data-to-process

550

process-output:

551

destination: processed-data

552

generated-messages:

553

destination: generated-topic

554

```