0
# Core Binder Framework
1
2
The core binder framework provides foundational abstractions for connecting applications to message brokers. It defines the pluggable middleware abstraction that allows Spring Cloud Stream to work with different messaging systems like Apache Kafka, RabbitMQ, and others.
3
4
## Capabilities
5
6
### Binder Interface
7
8
The primary abstraction for connecting applications to messaging middleware.
9
10
```java { .api }
11
/**
12
* Strategy interface for binding app interfaces to logical names.
13
* @param <T> the binding target type
14
* @param <C> the consumer properties type
15
* @param <P> the producer properties type
16
*/
17
public interface Binder<T, C extends ConsumerProperties, P extends ProducerProperties> {
18
/**
19
* Bind a consumer to the given name and group.
20
* @param name the logical name of the target
21
* @param group the consumer group
22
* @param inboundBindTarget the consumer binding target
23
* @param consumerProperties consumer configuration properties
24
* @return the binding handle
25
*/
26
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);
27
28
/**
29
* Bind a producer to the given name.
30
* @param name the logical name of the target
31
* @param outboundBindTarget the producer binding target
32
* @param producerProperties producer configuration properties
33
* @return the binding handle
34
*/
35
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);
36
37
/**
38
* Get the binder identity for management purposes.
39
* @return the binder identity, or null if not supported
40
*/
41
default String getBinderIdentity() {
42
return null;
43
}
44
}
45
```
46
47
### Binding Interface
48
49
Represents the connection between application components and messaging infrastructure.
50
51
```java { .api }
52
/**
53
* Represents the binding between an input/output and an adapter endpoint.
54
* @param <T> the binding target type
55
*/
56
public interface Binding<T> extends Pausable {
57
/**
58
* Get the logical name of this binding.
59
* @return the binding name
60
*/
61
String getName();
62
63
/**
64
* Unbind this binding, cleaning up resources.
65
*/
66
void unbind();
67
68
/**
69
* Get the current state of this binding.
70
* @return the binding state
71
*/
72
State getState();
73
74
/**
75
* Get the binding name for management purposes.
76
* @return the binding name
77
*/
78
String getBindingName();
79
80
/**
81
* Check if this is an input binding.
82
* @return true if input binding, false if output
83
*/
84
boolean isInput();
85
86
/**
87
* Get the name of the binder that created this binding.
88
* @return the binder name
89
*/
90
String getBinderName();
91
92
// Lifecycle methods from Pausable
93
void pause();
94
void resume();
95
boolean isPaused();
96
}
97
```
98
99
### Binder Factory
100
101
Factory for creating and managing binder instances.
102
103
```java { .api }
104
/**
105
* Factory for creating binder instances.
106
*/
107
public interface BinderFactory {
108
/**
109
* Get a binder instance for the specified name and type.
110
* @param name the binder name, or null for default
111
* @param bindingTargetType the binding target type
112
* @return the binder instance
113
*/
114
<T> Binder<T, ?, ?> getBinder(String name, Class<T> bindingTargetType);
115
}
116
117
/**
118
* Default implementation of BinderFactory.
119
*/
120
public class DefaultBinderFactory implements BinderFactory, DisposableBean, ApplicationContextAware {
121
public <T> Binder<T, ?, ?> getBinder(String name, Class<T> bindingTargetType);
122
public void destroy();
123
public void setApplicationContext(ApplicationContext applicationContext);
124
}
125
```
126
127
### Binder Type Registry
128
129
Registry for managing available binder types.
130
131
```java { .api }
132
/**
133
* Registry of available binder types.
134
*/
135
public interface BinderTypeRegistry {
136
/**
137
* Get a binder type by name.
138
* @param name the binder type name
139
* @return the binder type, or null if not found
140
*/
141
BinderType get(String name);
142
143
/**
144
* Get all registered binder types.
145
* @return map of binder types keyed by name
146
*/
147
Map<String, BinderType> getAll();
148
}
149
150
/**
151
* Default implementation of BinderTypeRegistry.
152
*/
153
public class DefaultBinderTypeRegistry implements BinderTypeRegistry {
154
public BinderType get(String name);
155
public Map<String, BinderType> getAll();
156
}
157
```
158
159
### Extended Binder Interfaces
160
161
Extended interfaces for binders with additional capabilities.
162
163
```java { .api }
164
/**
165
* Binder with extended properties support.
166
* @param <T> the binding target type
167
* @param <C> the consumer properties type
168
* @param <P> the producer properties type
169
*/
170
public interface ExtendedPropertiesBinder<T, C extends ConsumerProperties, P extends ProducerProperties> extends Binder<T, C, P> {
171
/**
172
* Get extended binding properties.
173
* @param channelName the channel name
174
* @return the extended binding properties, or null if not supported
175
*/
176
ExtendedBindingProperties<C, P> getExtendedPropertiesEntryIfAny(String channelName);
177
}
178
179
/**
180
* Binder for pollable consumers.
181
* @param <H> the handler type
182
* @param <C> the consumer properties type
183
*/
184
public interface PollableConsumerBinder<H, C extends ConsumerProperties> extends Binder<PollableSource<H>, C, ProducerProperties> {
185
// Inherits binding methods from Binder interface
186
}
187
```
188
189
### Pollable Source
190
191
Interface for pollable message sources.
192
193
```java { .api }
194
/**
195
* Interface for pollable sources.
196
* @param <H> the handler type
197
*/
198
public interface PollableSource<H> {
199
/**
200
* Poll for a message.
201
* @param handler the message handler
202
* @return true if a message was received and handled
203
*/
204
boolean poll(H handler);
205
}
206
207
/**
208
* Message-specific pollable source.
209
*/
210
public interface PollableMessageSource extends PollableSource<MessageHandler> {
211
/**
212
* Poll for a message with timeout.
213
* @param handler the message handler
214
* @param timeout the timeout duration
215
* @return true if a message was received within timeout
216
*/
217
default boolean poll(MessageHandler handler, Duration timeout) {
218
return poll(handler);
219
}
220
}
221
222
/**
223
* Default implementation of PollableMessageSource.
224
*/
225
public class DefaultPollableMessageSource implements PollableMessageSource, BeanFactoryAware {
226
public boolean poll(MessageHandler handler);
227
public boolean poll(MessageHandler handler, ParameterizedTypeReference<?> type);
228
public void setBeanFactory(BeanFactory beanFactory);
229
}
230
```
231
232
### Abstract Binder Implementations
233
234
Base classes for implementing custom binders.
235
236
```java { .api }
237
/**
238
* Base class for binder implementations.
239
* @param <T> the binding target type
240
* @param <C> the consumer properties type
241
* @param <P> the producer properties type
242
*/
243
public abstract class AbstractBinder<T, C extends ConsumerProperties, P extends ProducerProperties> implements Binder<T, C, P>, BeanFactoryAware, InitializingBean {
244
245
protected abstract Binding<T> doBindConsumer(String name, String group, T inputTarget, C properties);
246
protected abstract Binding<T> doBindProducer(String name, T outputTarget, P properties);
247
248
public final Binding<T> bindConsumer(String name, String group, T inputTarget, C properties);
249
public final Binding<T> bindProducer(String name, T outputTarget, P properties);
250
251
public void setBeanFactory(BeanFactory beanFactory);
252
public void afterPropertiesSet();
253
}
254
255
/**
256
* Base class for message channel binders.
257
* @param <C> the consumer properties type
258
* @param <P> the producer properties type
259
* @param <PP> the provisioning provider type
260
*/
261
public abstract class AbstractMessageChannelBinder<C extends ConsumerProperties, P extends ProducerProperties, PP extends ProvisioningProvider<C, P>> extends AbstractBinder<MessageChannel, C, P> implements ExtendedPropertiesBinder<MessageChannel, C, P> {
262
263
protected abstract Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel, C properties);
264
protected abstract Binding<MessageChannel> doBindProducer(String name, MessageChannel outputChannel, P properties);
265
266
public ExtendedBindingProperties<C, P> getExtendedPropertiesEntryIfAny(String channelName);
267
}
268
```
269
270
### Binding Configuration and Types
271
272
Configuration classes and data types for binding management.
273
274
```java { .api }
275
/**
276
* Configuration holder for a binder.
277
*/
278
public class BinderConfiguration {
279
private final Map<String, Object> properties;
280
private final boolean inheritEnvironment;
281
private final boolean defaultCandidate;
282
283
public BinderConfiguration(String type, Map<String, Object> properties, boolean inheritEnvironment, boolean defaultCandidate);
284
285
public String getConfigurationName();
286
public Map<String, Object> getProperties();
287
public boolean isInheritEnvironment();
288
public boolean isDefaultCandidate();
289
}
290
291
/**
292
* Represents a binder type with configuration classes.
293
*/
294
public class BinderType {
295
private final String typeName;
296
private final Class<?>[] configurationClasses;
297
298
public BinderType(String typeName, Class<?>[] configurationClasses);
299
300
public String getTypeName();
301
public Class<?>[] getConfigurationClasses();
302
}
303
304
/**
305
* Default binding implementation.
306
* @param <T> the binding target type
307
*/
308
public class DefaultBinding<T> implements Binding<T> {
309
public DefaultBinding(String name, String group, T target, Lifecycle lifecycle);
310
311
public String getName();
312
public void unbind();
313
public State getState();
314
public String getBindingName();
315
public boolean isInput();
316
public String getBinderName();
317
public void pause();
318
public void resume();
319
public boolean isPaused();
320
}
321
```
322
323
### Partition Handling
324
325
Support for message partitioning across multiple consumers.
326
327
```java { .api }
328
/**
329
* Handles partitioning logic for messages.
330
*/
331
public class PartitionHandler {
332
/**
333
* Create a partition handler with the given selector strategy.
334
* @param partitionSelectorStrategy the partition selector
335
* @param partitionCount the number of partitions
336
*/
337
public PartitionHandler(PartitionSelectorStrategy partitionSelectorStrategy, int partitionCount);
338
339
/**
340
* Determine the partition for a message.
341
* @param message the message to partition
342
* @return the partition number
343
*/
344
public int determinePartition(Message<?> message);
345
}
346
347
/**
348
* Strategy for extracting partition keys from messages.
349
*/
350
public interface PartitionKeyExtractorStrategy {
351
/**
352
* Extract the partition key from a message.
353
* @param message the message
354
* @return the partition key
355
*/
356
Object extractKey(Message<?> message);
357
}
358
359
/**
360
* Strategy for selecting partitions based on keys.
361
*/
362
public interface PartitionSelectorStrategy {
363
/**
364
* Select a partition for the given key.
365
* @param key the partition key
366
* @param partitionCount the total number of partitions
367
* @return the selected partition number
368
*/
369
int selectPartition(Object key, int partitionCount);
370
}
371
```
372
373
### Utility Classes
374
375
Utility classes for message handling and MIME type processing.
376
377
```java { .api }
378
/**
379
* Container for message values and headers.
380
*/
381
public class MessageValues implements Map<String, Object>, Serializable {
382
public MessageValues(Message<?> original);
383
public MessageValues(Object payload, Map<String, Object> headers);
384
385
public Object getPayload();
386
public void setPayload(Object payload);
387
public Map<String, Object> getHeaders();
388
public Message<Object> toMessage();
389
390
// Map interface methods
391
public Object get(Object key);
392
public Object put(String key, Object value);
393
public Set<String> keySet();
394
public Collection<Object> values();
395
public Set<Entry<String, Object>> entrySet();
396
// ... other Map methods
397
}
398
399
/**
400
* Utilities for Java class MIME type handling.
401
*/
402
public class JavaClassMimeTypeUtils {
403
public static final String JAVA_OBJECT_TYPE = "application/x-java-object";
404
public static final String JAVA_SERIALIZED_OBJECT_TYPE = "application/x-java-serialized-object";
405
406
/**
407
* Convert class name to MIME type.
408
* @param className the Java class name
409
* @return the corresponding MIME type
410
*/
411
public static MimeType classNameToMimeType(String className);
412
413
/**
414
* Convert MIME type to class name.
415
* @param mimeType the MIME type
416
* @return the corresponding Java class name, or null if not applicable
417
*/
418
public static String mimeTypeToClassName(MimeType mimeType);
419
}
420
421
/**
422
* Utilities for embedded header handling.
423
*/
424
public class EmbeddedHeaderUtils {
425
/**
426
* Extract headers embedded in message payload.
427
* @param message the message with embedded headers
428
* @param headerNames the names of headers to extract
429
* @return the message with headers extracted to message headers
430
*/
431
public static Message<byte[]> extractHeaders(Message<byte[]> message, String... headerNames);
432
433
/**
434
* Embed headers into message payload.
435
* @param message the message
436
* @param headerNames the names of headers to embed
437
* @return the message with headers embedded in payload
438
*/
439
public static Message<byte[]> embedHeaders(MessageValues message, String... headerNames);
440
}
441
```
442
443
### Constants and Enums
444
445
Constants and enumerations used throughout the binder framework.
446
447
```java { .api }
448
/**
449
* Header constants for binder operations.
450
*/
451
public class BinderHeaders {
452
public static final String STANDARD_HEADERS = "standardHeaders";
453
public static final String TARGET_DESTINATION = "scst_targetDestination";
454
public static final String PARTITION_HEADER = "scst_partition";
455
public static final String PARTITION_OVERRIDE = "scst_partitionOverride";
456
public static final String NATIVE_HEADERS_PRESENT = "nativeHeadersPresent";
457
public static final String SCST_VERSION = "scst_version";
458
public static final String NESTED_EXCEPTIONS_HEADER = "scst_nestedExceptions";
459
}
460
461
/**
462
* Header modes for message processing.
463
*/
464
public enum HeaderMode {
465
/** No headers are processed */
466
none,
467
/** Headers are processed normally */
468
headers,
469
/** Headers are embedded in message payload */
470
embeddedHeaders
471
}
472
```
473
474
### Events
475
476
Application events related to binding operations.
477
478
```java { .api }
479
/**
480
* Event fired when a binding is created.
481
*/
482
public class BindingCreatedEvent extends ApplicationEvent {
483
/**
484
* Create a new binding created event.
485
* @param binding the binding that was created
486
*/
487
public BindingCreatedEvent(Binding<?> binding);
488
489
/**
490
* Get the binding that was created.
491
* @return the binding
492
*/
493
public Binding<?> getBinding();
494
}
495
```
496
497
### Exception Classes
498
499
Exception classes specific to binder operations.
500
501
```java { .api }
502
/**
503
* General binder-related exception.
504
*/
505
public class BinderException extends RuntimeException {
506
public BinderException(String message);
507
public BinderException(String message, Throwable cause);
508
}
509
510
/**
511
* Exception to signal message requeuing.
512
*/
513
public class RequeueCurrentMessageException extends RuntimeException {
514
public RequeueCurrentMessageException(String message);
515
public RequeueCurrentMessageException(String message, Throwable cause);
516
}
517
```
518
519
**Usage Examples:**
520
521
```java
522
import org.springframework.cloud.stream.binder.*;
523
import org.springframework.integration.channel.DirectChannel;
524
import org.springframework.messaging.MessageChannel;
525
526
// Custom binder implementation
527
public class MyCustomBinder extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, MyProvisioningProvider> {
528
529
@Override
530
protected Binding<MessageChannel> doBindConsumer(String name, String group, MessageChannel inputChannel, ConsumerProperties properties) {
531
// Implementation for binding consumer
532
// Connect inputChannel to external message source
533
return new DefaultBinding<>(name, group, inputChannel, lifecycle);
534
}
535
536
@Override
537
protected Binding<MessageChannel> doBindProducer(String name, MessageChannel outputChannel, ProducerProperties properties) {
538
// Implementation for binding producer
539
// Connect outputChannel to external message destination
540
return new DefaultBinding<>(name, null, outputChannel, lifecycle);
541
}
542
}
543
544
// Using BinderFactory
545
@Component
546
public class MessageService {
547
548
private final BinderFactory binderFactory;
549
550
public MessageService(BinderFactory binderFactory) {
551
this.binderFactory = binderFactory;
552
}
553
554
public void createDynamicBinding() {
555
Binder<MessageChannel, ?, ?> binder = binderFactory.getBinder("kafka", MessageChannel.class);
556
MessageChannel channel = new DirectChannel();
557
Binding<MessageChannel> binding = binder.bindProducer("my-topic", channel, new ProducerProperties());
558
559
// Use the binding...
560
561
// Clean up
562
binding.unbind();
563
}
564
}
565
```