0
# Function Programming Support
1
2
Spring Cloud Stream's function programming support provides modern functional programming model integration with Spring Cloud Function. This includes StreamBridge for dynamic messaging, function-based message processing, and seamless integration between imperative and reactive programming models.
3
4
## Capabilities
5
6
### StreamBridge
7
8
Central component for sending messages to output bindings from external sources, enabling dynamic destination routing and type conversion.
9
10
```java { .api }
11
/**
12
* Bridge for sending data to output bindings from external sources.
13
* Supports dynamic destinations, type conversion, and partitioning.
14
*/
15
public class StreamBridge implements StreamOperations, ApplicationContextAware, BeanNameAware {
16
17
/**
18
* Send data to a binding.
19
* @param bindingName the name of the binding
20
* @param data the data to send
21
* @return true if the message was sent successfully
22
*/
23
public boolean send(String bindingName, Object data);
24
25
/**
26
* Send data to a binding with specified content type.
27
* @param bindingName the name of the binding
28
* @param data the data to send
29
* @param outputContentType the content type for the message
30
* @return true if the message was sent successfully
31
*/
32
public boolean send(String bindingName, Object data, MimeType outputContentType);
33
34
/**
35
* Send data to a binding with full control over content type and binder selection.
36
* @param bindingName the name of the binding
37
* @param binderName the specific binder to use (can be null for default)
38
* @param data the data to send
39
* @param outputContentType the content type for the message
40
* @return true if the message was sent successfully
41
*/
42
public boolean send(String bindingName, @Nullable String binderName, Object data, MimeType outputContentType);
43
44
public void setApplicationContext(ApplicationContext applicationContext);
45
public void setBeanName(String name);
46
}
47
48
/**
49
* Basic contract for StreamBridge operations.
50
*/
51
public interface StreamOperations {
52
53
/**
54
* Send data to a binding.
55
* @param bindingName the name of the binding
56
* @param data the data to send
57
* @return true if the message was sent successfully
58
*/
59
boolean send(String bindingName, Object data);
60
61
/**
62
* Send data to a binding with specified content type.
63
* @param bindingName the name of the binding
64
* @param data the data to send
65
* @param outputContentType the content type for the message
66
* @return true if the message was sent successfully
67
*/
68
boolean send(String bindingName, Object data, MimeType outputContentType);
69
}
70
```
71
72
### Function Configuration
73
74
Main configuration class for function-based message processing.
75
76
```java { .api }
77
/**
78
* Main configuration for function-based message processing.
79
* Integrates with Spring Cloud Function catalog.
80
*/
81
@Configuration
82
@EnableConfigurationProperties({StreamFunctionProperties.class})
83
public class FunctionConfiguration implements ApplicationContextAware, EnvironmentAware {
84
85
/**
86
* Creates the primary StreamBridge bean.
87
* @return configured StreamBridge instance
88
*/
89
@Bean
90
public StreamBridge streamBridge();
91
92
/**
93
* Creates function catalog for discovering and managing functions.
94
* @return function catalog instance
95
*/
96
@Bean
97
public FunctionCatalog functionCatalog();
98
99
/**
100
* Creates function inspector for analyzing function signatures.
101
* @return function inspector instance
102
*/
103
@Bean
104
public FunctionInspector functionInspector();
105
106
public void setApplicationContext(ApplicationContext applicationContext);
107
public void setEnvironment(Environment environment);
108
}
109
```
110
111
### Function Properties
112
113
Configuration properties for function-based bindings.
114
115
```java { .api }
116
/**
117
* Properties for stream function configuration.
118
*/
119
@ConfigurationProperties(prefix = "spring.cloud.stream.function")
120
public class StreamFunctionProperties {
121
122
/**
123
* Definition of functions to bind.
124
*/
125
private String definition;
126
127
/**
128
* Whether to use the functional model.
129
*/
130
private boolean autoStartup = true;
131
132
/**
133
* Routing expression for dynamic function routing.
134
*/
135
private String routingExpression;
136
137
public String getDefinition();
138
public void setDefinition(String definition);
139
140
public boolean isAutoStartup();
141
public void setAutoStartup(boolean autoStartup);
142
143
public String getRoutingExpression();
144
public void setRoutingExpression(String routingExpression);
145
}
146
147
/**
148
* Configuration properties for function binding details.
149
*/
150
public class StreamFunctionConfigurationProperties {
151
152
private final Map<String, String> bindings = new HashMap<>();
153
private final Map<String, String> definition = new HashMap<>();
154
155
/**
156
* Get function to binding name mappings.
157
* @return map of function names to binding names
158
*/
159
public Map<String, String> getBindings();
160
161
/**
162
* Get function definitions.
163
* @return map of function definitions
164
*/
165
public Map<String, String> getDefinition();
166
}
167
```
168
169
### Function Proxy Factory
170
171
Factory for creating function-aware bindable proxies.
172
173
```java { .api }
174
/**
175
* Factory for creating bindable function proxies.
176
* Extends BindableProxyFactory with function-specific capabilities.
177
*/
178
public class BindableFunctionProxyFactory extends BindableProxyFactory implements BeanFactoryAware, InitializingBean {
179
180
private final FunctionCatalog functionCatalog;
181
private final StreamFunctionProperties functionProperties;
182
183
public BindableFunctionProxyFactory(Class<?> type, FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties);
184
185
protected Object createBindableProxy();
186
187
/**
188
* Resolve function binding names from function definitions.
189
* @return map of resolved binding names
190
*/
191
protected Map<String, String> resolveFunctionBindings();
192
193
public void setBeanFactory(BeanFactory beanFactory);
194
public void afterPropertiesSet();
195
}
196
```
197
198
### Function Wrappers and Support
199
200
Support classes for function processing and partitioning.
201
202
```java { .api }
203
/**
204
* Wrapper that adds partition awareness to functions.
205
*/
206
public class PartitionAwareFunctionWrapper implements Function<Object, Object>, ApplicationContextAware {
207
208
private final Function<Object, Object> function;
209
private final ProducerProperties producerProperties;
210
private ApplicationContext applicationContext;
211
212
public PartitionAwareFunctionWrapper(Function<Object, Object> function, ProducerProperties producerProperties);
213
214
/**
215
* Apply the function with partition awareness.
216
* @param input the input object
217
* @return the function result with partition information
218
*/
219
public Object apply(Object input);
220
221
public void setApplicationContext(ApplicationContext applicationContext);
222
}
223
224
/**
225
* Initializer for pollable sources in function context.
226
*/
227
public class PollableSourceInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
228
229
/**
230
* Initialize pollable sources for function-based applications.
231
* @param applicationContext the application context to initialize
232
*/
233
public void initialize(ConfigurableApplicationContext applicationContext);
234
}
235
236
/**
237
* Environment post processor for routing function configuration.
238
*/
239
public class RoutingFunctionEnvironmentPostProcessor implements EnvironmentPostProcessor {
240
241
/**
242
* Post process the environment to add routing function configuration.
243
* @param environment the environment to post process
244
* @param application the Spring application
245
*/
246
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application);
247
}
248
```
249
250
### Batch Processing Utilities
251
252
Utilities for handling batch processing in stream functions.
253
254
```java { .api }
255
/**
256
* Utilities for batch processing in stream functions.
257
*/
258
public class StandardBatchUtils {
259
260
/**
261
* Check if batch processing is enabled for the current context.
262
* @return true if batch processing is enabled
263
*/
264
public static boolean isBatchEnabled();
265
266
/**
267
* Extract individual items from a batch message.
268
* @param batchMessage the batch message
269
* @return list of individual items
270
*/
271
public static List<Object> extractBatchItems(Object batchMessage);
272
273
/**
274
* Create a batch message from individual items.
275
* @param items the individual items
276
* @return the batch message
277
*/
278
public static Object createBatchMessage(List<Object> items);
279
280
/**
281
* Check if an object represents a batch.
282
* @param object the object to check
283
* @return true if the object is a batch
284
*/
285
public static boolean isBatch(Object object);
286
}
287
```
288
289
### Function Constants
290
291
Constants used in function processing.
292
293
```java { .api }
294
/**
295
* Constants for function processing.
296
*/
297
public class FunctionConstants {
298
299
/**
300
* Delimiter used in function composition.
301
*/
302
public static final String DELIMITER = "|";
303
304
/**
305
* Default suffix for output bindings.
306
*/
307
public static final String DEFAULT_OUTPUT_SUFFIX = "-out-";
308
309
/**
310
* Default suffix for input bindings.
311
*/
312
public static final String DEFAULT_INPUT_SUFFIX = "-in-";
313
314
/**
315
* Header name for function name.
316
*/
317
public static final String FUNCTION_NAME_HEADER = "spring.cloud.function.definition";
318
319
/**
320
* Default function name for routing.
321
*/
322
public static final String DEFAULT_FUNCTION_NAME = "functionRouter";
323
}
324
```
325
326
### Partition Support
327
328
Support classes for partitioning in function contexts.
329
330
```java { .api }
331
/**
332
* Support for partitioning in function contexts.
333
*/
334
public class PartitionSupport {
335
336
private final String partitionKeyExpression;
337
private final String partitionSelectorExpression;
338
private final int partitionCount;
339
340
/**
341
* Create partition support with key expression.
342
* @param partitionKeyExpression SpEL expression for extracting partition key
343
* @param partitionCount number of partitions
344
*/
345
public PartitionSupport(String partitionKeyExpression, int partitionCount);
346
347
/**
348
* Create partition support with selector expression.
349
* @param partitionKeyExpression SpEL expression for extracting partition key
350
* @param partitionSelectorExpression SpEL expression for selecting partition
351
* @param partitionCount number of partitions
352
*/
353
public PartitionSupport(String partitionKeyExpression, String partitionSelectorExpression, int partitionCount);
354
355
public String getPartitionKeyExpression();
356
public String getPartitionSelectorExpression();
357
public int getPartitionCount();
358
}
359
```
360
361
**Usage Examples:**
362
363
```java
364
import org.springframework.cloud.stream.function.StreamBridge;
365
import org.springframework.boot.SpringApplication;
366
import org.springframework.boot.autoconfigure.SpringBootApplication;
367
import org.springframework.context.annotation.Bean;
368
import org.springframework.messaging.Message;
369
import org.springframework.messaging.support.MessageBuilder;
370
import org.springframework.util.MimeType;
371
372
import java.util.function.Consumer;
373
import java.util.function.Function;
374
import java.util.function.Supplier;
375
376
@SpringBootApplication
377
public class FunctionStreamApplication {
378
379
private final StreamBridge streamBridge;
380
381
public FunctionStreamApplication(StreamBridge streamBridge) {
382
this.streamBridge = streamBridge;
383
}
384
385
// Simple consumer function
386
@Bean
387
public Consumer<String> handleMessages() {
388
return message -> {
389
System.out.println("Processing: " + message);
390
// Business logic here
391
};
392
}
393
394
// Function that transforms messages
395
@Bean
396
public Function<String, String> processData() {
397
return input -> {
398
// Transform the input
399
return input.toUpperCase();
400
};
401
}
402
403
// Supplier that produces messages periodically
404
@Bean
405
public Supplier<String> generateMessages() {
406
return () -> {
407
return "Generated message at " + System.currentTimeMillis();
408
};
409
}
410
411
// Consumer that processes Message objects with headers
412
@Bean
413
public Consumer<Message<String>> handleMessageWithHeaders() {
414
return message -> {
415
String payload = message.getPayload();
416
String correlationId = (String) message.getHeaders().get("correlationId");
417
System.out.println("Processing: " + payload + " with ID: " + correlationId);
418
};
419
}
420
421
// Function that processes reactive streams
422
@Bean
423
public Function<Flux<String>, Flux<String>> processStream() {
424
return flux -> flux
425
.map(String::toUpperCase)
426
.filter(s -> s.length() > 5);
427
}
428
429
// REST endpoint that uses StreamBridge for dynamic messaging
430
@GetMapping("/send/{destination}")
431
public ResponseEntity<String> sendMessage(@PathVariable String destination, @RequestBody String message) {
432
boolean sent = streamBridge.send(destination, message);
433
return sent ? ResponseEntity.ok("Message sent") : ResponseEntity.status(500).body("Failed to send");
434
}
435
436
// Send message with custom content type
437
@GetMapping("/send-json/{destination}")
438
public ResponseEntity<String> sendJsonMessage(@PathVariable String destination, @RequestBody Object data) {
439
boolean sent = streamBridge.send(destination, data, MimeType.valueOf("application/json"));
440
return sent ? ResponseEntity.ok("JSON message sent") : ResponseEntity.status(500).body("Failed to send");
441
}
442
443
// Send message with partitioning
444
@GetMapping("/send-partitioned/{destination}")
445
public ResponseEntity<String> sendPartitionedMessage(@PathVariable String destination, @RequestBody String message, @RequestParam String partitionKey) {
446
PartitionSupport partitionSupport = new PartitionSupport("payload.length()", 3);
447
boolean sent = streamBridge.send(destination, message, null, partitionSupport);
448
return sent ? ResponseEntity.ok("Partitioned message sent") : ResponseEntity.status(500).body("Failed to send");
449
}
450
451
public static void main(String[] args) {
452
SpringApplication.run(FunctionStreamApplication.class, args);
453
}
454
}
455
456
// Advanced function with routing
457
@Component
458
public class RoutingFunctionService {
459
460
@Bean
461
public Function<Message<String>, String> routingFunction() {
462
return message -> {
463
String functionName = (String) message.getHeaders().get("spring.cloud.function.definition");
464
String payload = message.getPayload();
465
466
switch (functionName) {
467
case "uppercase":
468
return payload.toUpperCase();
469
case "lowercase":
470
return payload.toLowerCase();
471
case "reverse":
472
return new StringBuilder(payload).reverse().toString();
473
default:
474
return payload;
475
}
476
};
477
}
478
}
479
480
// Batch processing example
481
@Component
482
public class BatchProcessingService {
483
484
@Bean
485
public Function<List<String>, List<String>> processBatch() {
486
return batch -> {
487
return batch.stream()
488
.map(String::toUpperCase)
489
.filter(s -> !s.isEmpty())
490
.collect(Collectors.toList());
491
};
492
}
493
494
@Bean
495
public Consumer<List<OrderEvent>> processOrderBatch() {
496
return orderBatch -> {
497
for (OrderEvent order : orderBatch) {
498
// Process each order in the batch
499
processOrder(order);
500
}
501
};
502
}
503
504
private void processOrder(OrderEvent order) {
505
// Business logic for processing individual orders
506
System.out.println("Processing order: " + order.getOrderId());
507
}
508
}
509
510
// Reactive function processing
511
@Component
512
public class ReactiveProcessingService {
513
514
@Bean
515
public Function<Flux<SensorData>, Flux<AlertEvent>> processSensorData() {
516
return sensorDataFlux -> sensorDataFlux
517
.window(Duration.ofSeconds(10)) // Window data every 10 seconds
518
.flatMap(window -> window
519
.filter(data -> data.getValue() > 100) // Filter high values
520
.map(data -> new AlertEvent(data.getSensorId(), data.getValue()))
521
);
522
}
523
524
@Bean
525
public Consumer<Flux<String>> logMessages() {
526
return messageFlux -> messageFlux
527
.doOnNext(message -> System.out.println("Logging: " + message))
528
.subscribe();
529
}
530
}
531
532
// Configuration example
533
# application.yml
534
spring:
535
cloud:
536
stream:
537
function:
538
definition: handleMessages;processData;generateMessages
539
bindings:
540
handleMessages-in-0: input-topic
541
processData-in-0: process-input
542
processData-out-0: process-output
543
generateMessages-out-0: generated-messages
544
bindings:
545
input-topic:
546
destination: my-input-topic
547
group: my-group
548
process-input:
549
destination: data-to-process
550
process-output:
551
destination: processed-data
552
generated-messages:
553
destination: generated-topic
554
```