CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-springframework-cloud--spring-cloud-stream

A framework for building message-driven microservice applications on Spring Boot with Spring Integration.

Pending
Overview
Eval results
Files

function-support.mddocs/

Function Programming Support

Spring Cloud Stream's function programming support provides modern functional programming model integration with Spring Cloud Function. This includes StreamBridge for dynamic messaging, function-based message processing, and seamless integration between imperative and reactive programming models.

Capabilities

StreamBridge

Central component for sending messages to output bindings from external sources, enabling dynamic destination routing and type conversion.

/**
 * Bridge for sending data to output bindings from external sources.
 * Supports dynamic destinations, type conversion, and partitioning.
 */
public class StreamBridge implements StreamOperations, ApplicationContextAware, BeanNameAware {
    
    /**
     * Send data to a binding.
     * @param bindingName the name of the binding
     * @param data the data to send
     * @return true if the message was sent successfully
     */
    public boolean send(String bindingName, Object data);
    
    /**
     * Send data to a binding with specified content type.
     * @param bindingName the name of the binding
     * @param data the data to send
     * @param outputContentType the content type for the message
     * @return true if the message was sent successfully
     */
    public boolean send(String bindingName, Object data, MimeType outputContentType);
    
    /**
     * Send data to a binding with full control over content type and binder selection.
     * @param bindingName the name of the binding
     * @param binderName the specific binder to use (can be null for default)
     * @param data the data to send
     * @param outputContentType the content type for the message
     * @return true if the message was sent successfully
     */
    public boolean send(String bindingName, @Nullable String binderName, Object data, MimeType outputContentType);
    
    public void setApplicationContext(ApplicationContext applicationContext);
    public void setBeanName(String name);
}

/**
 * Basic contract for StreamBridge operations.
 */
public interface StreamOperations {
    
    /**
     * Send data to a binding.
     * @param bindingName the name of the binding
     * @param data the data to send
     * @return true if the message was sent successfully
     */
    boolean send(String bindingName, Object data);
    
    /**
     * Send data to a binding with specified content type.
     * @param bindingName the name of the binding
     * @param data the data to send
     * @param outputContentType the content type for the message
     * @return true if the message was sent successfully
     */
    boolean send(String bindingName, Object data, MimeType outputContentType);
}

Function Configuration

Main configuration class for function-based message processing.

/**
 * Main configuration for function-based message processing.
 * Integrates with Spring Cloud Function catalog.
 */
@Configuration
@EnableConfigurationProperties({StreamFunctionProperties.class})
public class FunctionConfiguration implements ApplicationContextAware, EnvironmentAware {
    
    /**
     * Creates the primary StreamBridge bean.
     * @return configured StreamBridge instance
     */
    @Bean
    public StreamBridge streamBridge();
    
    /**
     * Creates function catalog for discovering and managing functions.
     * @return function catalog instance
     */
    @Bean
    public FunctionCatalog functionCatalog();
    
    /**
     * Creates function inspector for analyzing function signatures.
     * @return function inspector instance
     */
    @Bean
    public FunctionInspector functionInspector();
    
    public void setApplicationContext(ApplicationContext applicationContext);
    public void setEnvironment(Environment environment);
}

Function Properties

Configuration properties for function-based bindings.

/**
 * Properties for stream function configuration.
 */
@ConfigurationProperties(prefix = "spring.cloud.stream.function")
public class StreamFunctionProperties {
    
    /**
     * Definition of functions to bind.
     */
    private String definition;
    
    /**
     * Whether to use the functional model.
     */
    private boolean autoStartup = true;
    
    /**
     * Routing expression for dynamic function routing.
     */
    private String routingExpression;
    
    public String getDefinition();
    public void setDefinition(String definition);
    
    public boolean isAutoStartup();
    public void setAutoStartup(boolean autoStartup);
    
    public String getRoutingExpression();
    public void setRoutingExpression(String routingExpression);
}

/**
 * Configuration properties for function binding details.
 */
public class StreamFunctionConfigurationProperties {
    
    private final Map<String, String> bindings = new HashMap<>();
    private final Map<String, String> definition = new HashMap<>();
    
    /**
     * Get function to binding name mappings.
     * @return map of function names to binding names
     */
    public Map<String, String> getBindings();
    
    /**
     * Get function definitions.
     * @return map of function definitions
     */
    public Map<String, String> getDefinition();
}

Function Proxy Factory

Factory for creating function-aware bindable proxies.

/**
 * Factory for creating bindable function proxies.
 * Extends BindableProxyFactory with function-specific capabilities.
 */
public class BindableFunctionProxyFactory extends BindableProxyFactory implements BeanFactoryAware, InitializingBean {
    
    private final FunctionCatalog functionCatalog;
    private final StreamFunctionProperties functionProperties;
    
    public BindableFunctionProxyFactory(Class<?> type, FunctionCatalog functionCatalog, StreamFunctionProperties functionProperties);
    
    protected Object createBindableProxy();
    
    /**
     * Resolve function binding names from function definitions.
     * @return map of resolved binding names
     */
    protected Map<String, String> resolveFunctionBindings();
    
    public void setBeanFactory(BeanFactory beanFactory);
    public void afterPropertiesSet();
}

Function Wrappers and Support

Support classes for function processing and partitioning.

/**
 * Wrapper that adds partition awareness to functions.
 */
public class PartitionAwareFunctionWrapper implements Function<Object, Object>, ApplicationContextAware {
    
    private final Function<Object, Object> function;
    private final ProducerProperties producerProperties;
    private ApplicationContext applicationContext;
    
    public PartitionAwareFunctionWrapper(Function<Object, Object> function, ProducerProperties producerProperties);
    
    /**
     * Apply the function with partition awareness.
     * @param input the input object
     * @return the function result with partition information
     */
    public Object apply(Object input);
    
    public void setApplicationContext(ApplicationContext applicationContext);
}

/**
 * Initializer for pollable sources in function context.
 */
public class PollableSourceInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
    
    /**
     * Initialize pollable sources for function-based applications.
     * @param applicationContext the application context to initialize
     */
    public void initialize(ConfigurableApplicationContext applicationContext);
}

/**
 * Environment post processor for routing function configuration.
 */
public class RoutingFunctionEnvironmentPostProcessor implements EnvironmentPostProcessor {
    
    /**
     * Post process the environment to add routing function configuration.
     * @param environment the environment to post process
     * @param application the Spring application
     */
    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application);
}

Batch Processing Utilities

Utilities for handling batch processing in stream functions.

/**
 * Utilities for batch processing in stream functions.
 */
public class StandardBatchUtils {
    
    /**
     * Check if batch processing is enabled for the current context.
     * @return true if batch processing is enabled
     */
    public static boolean isBatchEnabled();
    
    /**
     * Extract individual items from a batch message.
     * @param batchMessage the batch message
     * @return list of individual items
     */
    public static List<Object> extractBatchItems(Object batchMessage);
    
    /**
     * Create a batch message from individual items.
     * @param items the individual items
     * @return the batch message
     */
    public static Object createBatchMessage(List<Object> items);
    
    /**
     * Check if an object represents a batch.
     * @param object the object to check
     * @return true if the object is a batch
     */
    public static boolean isBatch(Object object);
}

Function Constants

Constants used in function processing.

/**
 * Constants for function processing.
 */
public class FunctionConstants {
    
    /**
     * Delimiter used in function composition.
     */
    public static final String DELIMITER = "|";
    
    /**
     * Default suffix for output bindings.
     */
    public static final String DEFAULT_OUTPUT_SUFFIX = "-out-";
    
    /**
     * Default suffix for input bindings.
     */
    public static final String DEFAULT_INPUT_SUFFIX = "-in-";
    
    /**
     * Header name for function name.
     */
    public static final String FUNCTION_NAME_HEADER = "spring.cloud.function.definition";
    
    /**
     * Default function name for routing.
     */
    public static final String DEFAULT_FUNCTION_NAME = "functionRouter";
}

Partition Support

Support classes for partitioning in function contexts.

/**
 * Support for partitioning in function contexts.
 */
public class PartitionSupport {
    
    private final String partitionKeyExpression;
    private final String partitionSelectorExpression;
    private final int partitionCount;
    
    /**
     * Create partition support with key expression.
     * @param partitionKeyExpression SpEL expression for extracting partition key
     * @param partitionCount number of partitions
     */
    public PartitionSupport(String partitionKeyExpression, int partitionCount);
    
    /**
     * Create partition support with selector expression.
     * @param partitionKeyExpression SpEL expression for extracting partition key
     * @param partitionSelectorExpression SpEL expression for selecting partition
     * @param partitionCount number of partitions
     */
    public PartitionSupport(String partitionKeyExpression, String partitionSelectorExpression, int partitionCount);
    
    public String getPartitionKeyExpression();
    public String getPartitionSelectorExpression();
    public int getPartitionCount();
}

Usage Examples:

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.MimeType;

import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

@SpringBootApplication
public class FunctionStreamApplication {
    
    private final StreamBridge streamBridge;
    
    public FunctionStreamApplication(StreamBridge streamBridge) {
        this.streamBridge = streamBridge;
    }
    
    // Simple consumer function
    @Bean
    public Consumer<String> handleMessages() {
        return message -> {
            System.out.println("Processing: " + message);
            // Business logic here
        };
    }
    
    // Function that transforms messages
    @Bean
    public Function<String, String> processData() {
        return input -> {
            // Transform the input
            return input.toUpperCase();
        };
    }
    
    // Supplier that produces messages periodically
    @Bean
    public Supplier<String> generateMessages() {
        return () -> {
            return "Generated message at " + System.currentTimeMillis();
        };
    }
    
    // Consumer that processes Message objects with headers
    @Bean
    public Consumer<Message<String>> handleMessageWithHeaders() {
        return message -> {
            String payload = message.getPayload();
            String correlationId = (String) message.getHeaders().get("correlationId");
            System.out.println("Processing: " + payload + " with ID: " + correlationId);
        };
    }
    
    // Function that processes reactive streams
    @Bean
    public Function<Flux<String>, Flux<String>> processStream() {
        return flux -> flux
            .map(String::toUpperCase)
            .filter(s -> s.length() > 5);
    }
    
    // REST endpoint that uses StreamBridge for dynamic messaging
    @GetMapping("/send/{destination}")
    public ResponseEntity<String> sendMessage(@PathVariable String destination, @RequestBody String message) {
        boolean sent = streamBridge.send(destination, message);
        return sent ? ResponseEntity.ok("Message sent") : ResponseEntity.status(500).body("Failed to send");
    }
    
    // Send message with custom content type
    @GetMapping("/send-json/{destination}")
    public ResponseEntity<String> sendJsonMessage(@PathVariable String destination, @RequestBody Object data) {
        boolean sent = streamBridge.send(destination, data, MimeType.valueOf("application/json"));
        return sent ? ResponseEntity.ok("JSON message sent") : ResponseEntity.status(500).body("Failed to send");
    }
    
    // Send message with partitioning
    @GetMapping("/send-partitioned/{destination}")
    public ResponseEntity<String> sendPartitionedMessage(@PathVariable String destination, @RequestBody String message, @RequestParam String partitionKey) {
        PartitionSupport partitionSupport = new PartitionSupport("payload.length()", 3);
        boolean sent = streamBridge.send(destination, message, null, partitionSupport);
        return sent ? ResponseEntity.ok("Partitioned message sent") : ResponseEntity.status(500).body("Failed to send");
    }
    
    public static void main(String[] args) {
        SpringApplication.run(FunctionStreamApplication.class, args);
    }
}

// Advanced function with routing
@Component
public class RoutingFunctionService {
    
    @Bean
    public Function<Message<String>, String> routingFunction() {
        return message -> {
            String functionName = (String) message.getHeaders().get("spring.cloud.function.definition");
            String payload = message.getPayload();
            
            switch (functionName) {
                case "uppercase":
                    return payload.toUpperCase();
                case "lowercase":
                    return payload.toLowerCase();
                case "reverse":
                    return new StringBuilder(payload).reverse().toString();
                default:
                    return payload;
            }
        };
    }
}

// Batch processing example
@Component
public class BatchProcessingService {
    
    @Bean
    public Function<List<String>, List<String>> processBatch() {
        return batch -> {
            return batch.stream()
                .map(String::toUpperCase)
                .filter(s -> !s.isEmpty())
                .collect(Collectors.toList());
        };
    }
    
    @Bean
    public Consumer<List<OrderEvent>> processOrderBatch() {
        return orderBatch -> {
            for (OrderEvent order : orderBatch) {
                // Process each order in the batch
                processOrder(order);
            }
        };
    }
    
    private void processOrder(OrderEvent order) {
        // Business logic for processing individual orders
        System.out.println("Processing order: " + order.getOrderId());
    }
}

// Reactive function processing
@Component
public class ReactiveProcessingService {
    
    @Bean
    public Function<Flux<SensorData>, Flux<AlertEvent>> processSensorData() {
        return sensorDataFlux -> sensorDataFlux
            .window(Duration.ofSeconds(10))  // Window data every 10 seconds
            .flatMap(window -> window
                .filter(data -> data.getValue() > 100)  // Filter high values
                .map(data -> new AlertEvent(data.getSensorId(), data.getValue()))
            );
    }
    
    @Bean
    public Consumer<Flux<String>> logMessages() {
        return messageFlux -> messageFlux
            .doOnNext(message -> System.out.println("Logging: " + message))
            .subscribe();
    }
}

// Configuration example
# application.yml
spring:
  cloud:
    stream:
      function:
        definition: handleMessages;processData;generateMessages
        bindings:
          handleMessages-in-0: input-topic
          processData-in-0: process-input
          processData-out-0: process-output  
          generateMessages-out-0: generated-messages
      bindings:
        input-topic:
          destination: my-input-topic
          group: my-group
        process-input:
          destination: data-to-process
        process-output:
          destination: processed-data
        generated-messages:
          destination: generated-topic

Install with Tessl CLI

npx tessl i tessl/maven-org-springframework-cloud--spring-cloud-stream

docs

actuator-integration.md

binder-framework.md

binding-management.md

configuration.md

function-support.md

index.md

message-conversion.md

tile.json