A framework for building message-driven microservice applications on Spring Boot with Spring Integration.
—
Spring Cloud Stream's function programming support provides modern functional programming model integration with Spring Cloud Function. This includes StreamBridge for dynamic messaging, function-based message processing, and seamless integration between imperative and reactive programming models.
Central component for sending messages to output bindings from external sources, enabling dynamic destination routing and type conversion.
/**
* Bridge for sending data to output bindings from external sources.
* Supports dynamic destinations, type conversion, and partitioning.
*/
public class StreamBridge implements StreamOperations, ApplicationContextAware, BeanNameAware {
/**
* Send data to a binding.
* @param bindingName the name of the binding
* @param data the data to send
* @return true if the message was sent successfully
*/
public boolean send(String bindingName, Object data);
/**
* Send data to a binding with specified content type.
* @param bindingName the name of the binding
* @param data the data to send
* @param outputContentType the content type for the message
* @return true if the message was sent successfully
*/
public boolean send(String bindingName, Object data, MimeType outputContentType);
/**
* Send data to a binding with full control over content type and binder selection.
* @param bindingName the name of the binding
* @param binderName the specific binder to use (can be null for default)
* @param data the data to send
* @param outputContentType the content type for the message
* @return true if the message was sent successfully
*/
public boolean send(String bindingName, @Nullable String binderName, Object data, MimeType outputContentType);
public void setApplicationContext(ApplicationContext applicationContext);
public void setBeanName(String name);
}
/**
* Basic contract for StreamBridge operations.
*/
public interface StreamOperations {
/**
* Send data to a binding.
* @param bindingName the name of the binding
* @param data the data to send
* @return true if the message was sent successfully
*/
boolean send(String bindingName, Object data);
/**
* Send data to a binding with specified content type.
* @param bindingName the name of the binding
* @param data the data to send
* @param outputContentType the content type for the message
* @return true if the message was sent successfully
*/
boolean send(String bindingName, Object data, MimeType outputContentType);
}Main configuration class for function-based message processing.
/**
* Main configuration for function-based message processing.
* Integrates with Spring Cloud Function catalog.
*/
@Configuration
@EnableConfigurationProperties({StreamFunctionProperties.class})
public class FunctionConfiguration implements ApplicationContextAware, EnvironmentAware {
/**
* Creates the primary StreamBridge bean.
* @return configured StreamBridge instance
*/
@Bean
public StreamBridge streamBridge();
/**
* Creates function catalog for discovering and managing functions.
* @return function catalog instance
*/
@Bean
public FunctionCatalog functionCatalog();
/**
* Creates function inspector for analyzing function signatures.
* @return function inspector instance
*/
@Bean
public FunctionInspector functionInspector();
public void setApplicationContext(ApplicationContext applicationContext);
public void setEnvironment(Environment environment);
}Configuration properties for function-based bindings.
/**
* Properties for stream function configuration.
*/
@ConfigurationProperties(prefix = "spring.cloud.stream.function")
public class StreamFunctionProperties {
/**
* Definition of functions to bind.
*/
private String definition;
/**
* Whether to use the functional model.
*/
private boolean autoStartup = true;
/**
* Routing expression for dynamic function routing.
*/
private String routingExpression;
public String getDefinition();
public void setDefinition(String definition);
public boolean isAutoStartup();
public void setAutoStartup(boolean autoStartup);
public String getRoutingExpression();
public void setRoutingExpression(String routingExpression);
}
/**
* Configuration properties for function binding details.
*/
public class StreamFunctionConfigurationProperties {
private final Map<String, String> bindings = new HashMap<>();
private final Map<String, String> definition = new HashMap<>();
/**
* Get function to binding name mappings.
* @return map of function names to binding names
*/
public Map<String, String> getBindings();
/**
* Get function definitions.
* @return map of function definitions
*/
public Map<String, String> getDefinition();
}Factory for creating function-aware bindable proxies.
/**
* Factory for creating bindable function proxies.
* Extends BindableProxyFactory with function-specific capabilities.
*/
public class BindableFunctionProxyFactory extends BindableProxyFactory implements BeanFactoryAware, InitializingBean {
private final FunctionCatalog functionCatalog;
private final StreamFunctionProperties functionProperties;
public BindableFunctionProxyFactory(Class<?> type, FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties);
protected Object createBindableProxy();
/**
* Resolve function binding names from function definitions.
* @return map of resolved binding names
*/
protected Map<String, String> resolveFunctionBindings();
public void setBeanFactory(BeanFactory beanFactory);
public void afterPropertiesSet();
}Support classes for function processing and partitioning.
/**
* Wrapper that adds partition awareness to functions.
*/
public class PartitionAwareFunctionWrapper implements Function<Object, Object>, ApplicationContextAware {
private final Function<Object, Object> function;
private final ProducerProperties producerProperties;
private ApplicationContext applicationContext;
public PartitionAwareFunctionWrapper(Function<Object, Object> function, ProducerProperties producerProperties);
/**
* Apply the function with partition awareness.
* @param input the input object
* @return the function result with partition information
*/
public Object apply(Object input);
public void setApplicationContext(ApplicationContext applicationContext);
}
/**
* Initializer for pollable sources in function context.
*/
public class PollableSourceInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
/**
* Initialize pollable sources for function-based applications.
* @param applicationContext the application context to initialize
*/
public void initialize(ConfigurableApplicationContext applicationContext);
}
/**
* Environment post processor for routing function configuration.
*/
public class RoutingFunctionEnvironmentPostProcessor implements EnvironmentPostProcessor {
/**
* Post process the environment to add routing function configuration.
* @param environment the environment to post process
* @param application the Spring application
*/
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application);
}Utilities for handling batch processing in stream functions.
/**
* Utilities for batch processing in stream functions.
*/
public class StandardBatchUtils {
/**
* Check if batch processing is enabled for the current context.
* @return true if batch processing is enabled
*/
public static boolean isBatchEnabled();
/**
* Extract individual items from a batch message.
* @param batchMessage the batch message
* @return list of individual items
*/
public static List<Object> extractBatchItems(Object batchMessage);
/**
* Create a batch message from individual items.
* @param items the individual items
* @return the batch message
*/
public static Object createBatchMessage(List<Object> items);
/**
* Check if an object represents a batch.
* @param object the object to check
* @return true if the object is a batch
*/
public static boolean isBatch(Object object);
}Constants used in function processing.
/**
* Constants for function processing.
*/
public class FunctionConstants {
/**
* Delimiter used in function composition.
*/
public static final String DELIMITER = "|";
/**
* Default suffix for output bindings.
*/
public static final String DEFAULT_OUTPUT_SUFFIX = "-out-";
/**
* Default suffix for input bindings.
*/
public static final String DEFAULT_INPUT_SUFFIX = "-in-";
/**
* Header name for function name.
*/
public static final String FUNCTION_NAME_HEADER = "spring.cloud.function.definition";
/**
* Default function name for routing.
*/
public static final String DEFAULT_FUNCTION_NAME = "functionRouter";
}Support classes for partitioning in function contexts.
/**
* Support for partitioning in function contexts.
*/
public class PartitionSupport {
private final String partitionKeyExpression;
private final String partitionSelectorExpression;
private final int partitionCount;
/**
* Create partition support with key expression.
* @param partitionKeyExpression SpEL expression for extracting partition key
* @param partitionCount number of partitions
*/
public PartitionSupport(String partitionKeyExpression, int partitionCount);
/**
* Create partition support with selector expression.
* @param partitionKeyExpression SpEL expression for extracting partition key
* @param partitionSelectorExpression SpEL expression for selecting partition
* @param partitionCount number of partitions
*/
public PartitionSupport(String partitionKeyExpression, String partitionSelectorExpression, int partitionCount);
public String getPartitionKeyExpression();
public String getPartitionSelectorExpression();
public int getPartitionCount();
}Usage Examples:
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeType;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
@SpringBootApplication
public class FunctionStreamApplication {
private final StreamBridge streamBridge;
public FunctionStreamApplication(StreamBridge streamBridge) {
this.streamBridge = streamBridge;
}
// Simple consumer function
@Bean
public Consumer<String> handleMessages() {
return message -> {
System.out.println("Processing: " + message);
// Business logic here
};
}
// Function that transforms messages
@Bean
public Function<String, String> processData() {
return input -> {
// Transform the input
return input.toUpperCase();
};
}
// Supplier that produces messages periodically
@Bean
public Supplier<String> generateMessages() {
return () -> {
return "Generated message at " + System.currentTimeMillis();
};
}
// Consumer that processes Message objects with headers
@Bean
public Consumer<Message<String>> handleMessageWithHeaders() {
return message -> {
String payload = message.getPayload();
String correlationId = (String) message.getHeaders().get("correlationId");
System.out.println("Processing: " + payload + " with ID: " + correlationId);
};
}
// Function that processes reactive streams
@Bean
public Function<Flux<String>, Flux<String>> processStream() {
return flux -> flux
.map(String::toUpperCase)
.filter(s -> s.length() > 5);
}
// REST endpoint that uses StreamBridge for dynamic messaging
@GetMapping("/send/{destination}")
public ResponseEntity<String> sendMessage(@PathVariable String destination, @RequestBody String message) {
boolean sent = streamBridge.send(destination, message);
return sent ? ResponseEntity.ok("Message sent") : ResponseEntity.status(500).body("Failed to send");
}
// Send message with custom content type
@GetMapping("/send-json/{destination}")
public ResponseEntity<String> sendJsonMessage(@PathVariable String destination, @RequestBody Object data) {
boolean sent = streamBridge.send(destination, data, MimeType.valueOf("application/json"));
return sent ? ResponseEntity.ok("JSON message sent") : ResponseEntity.status(500).body("Failed to send");
}
// Send message with partitioning
@GetMapping("/send-partitioned/{destination}")
public ResponseEntity<String> sendPartitionedMessage(@PathVariable String destination, @RequestBody String message, @RequestParam String partitionKey) {
PartitionSupport partitionSupport = new PartitionSupport("payload.length()", 3);
boolean sent = streamBridge.send(destination, message, null, partitionSupport);
return sent ? ResponseEntity.ok("Partitioned message sent") : ResponseEntity.status(500).body("Failed to send");
}
public static void main(String[] args) {
SpringApplication.run(FunctionStreamApplication.class, args);
}
}
// Advanced function with routing
@Component
public class RoutingFunctionService {
@Bean
public Function<Message<String>, String> routingFunction() {
return message -> {
String functionName = (String) message.getHeaders().get("spring.cloud.function.definition");
String payload = message.getPayload();
switch (functionName) {
case "uppercase":
return payload.toUpperCase();
case "lowercase":
return payload.toLowerCase();
case "reverse":
return new StringBuilder(payload).reverse().toString();
default:
return payload;
}
};
}
}
// Batch processing example
@Component
public class BatchProcessingService {
@Bean
public Function<List<String>, List<String>> processBatch() {
return batch -> {
return batch.stream()
.map(String::toUpperCase)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
};
}
@Bean
public Consumer<List<OrderEvent>> processOrderBatch() {
return orderBatch -> {
for (OrderEvent order : orderBatch) {
// Process each order in the batch
processOrder(order);
}
};
}
private void processOrder(OrderEvent order) {
// Business logic for processing individual orders
System.out.println("Processing order: " + order.getOrderId());
}
}
// Reactive function processing
@Component
public class ReactiveProcessingService {
@Bean
public Function<Flux<SensorData>, Flux<AlertEvent>> processSensorData() {
return sensorDataFlux -> sensorDataFlux
.window(Duration.ofSeconds(10)) // Window data every 10 seconds
.flatMap(window -> window
.filter(data -> data.getValue() > 100) // Filter high values
.map(data -> new AlertEvent(data.getSensorId(), data.getValue()))
);
}
@Bean
public Consumer<Flux<String>> logMessages() {
return messageFlux -> messageFlux
.doOnNext(message -> System.out.println("Logging: " + message))
.subscribe();
}
}
// Configuration example
# application.yml
spring:
cloud:
stream:
function:
definition: handleMessages;processData;generateMessages
bindings:
handleMessages-in-0: input-topic
processData-in-0: process-input
processData-out-0: process-output
generateMessages-out-0: generated-messages
bindings:
input-topic:
destination: my-input-topic
group: my-group
process-input:
destination: data-to-process
process-output:
destination: processed-data
generated-messages:
destination: generated-topicInstall with Tessl CLI
npx tessl i tessl/maven-org-springframework-cloud--spring-cloud-stream