0
# Helidon Common Reactive
1
2
Helidon Common Reactive is a comprehensive reactive programming library that implements Multi (0-N items) and Single (0-1 item) reactive streams following the Java Flow API specification. The library offers a rich set of operators for stream transformation, filtering, combining, and error handling, enabling developers to build non-blocking, backpressure-aware applications.
3
4
## Package Information
5
6
- **Package Name**: io.helidon.common:helidon-common-reactive
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**: Add to your Maven `pom.xml`:
10
11
```xml
12
<dependency>
13
<groupId>io.helidon.common</groupId>
14
<artifactId>helidon-common-reactive</artifactId>
15
<version>4.2.2</version>
16
</dependency>
17
```
18
19
For Gradle:
20
21
```gradle
22
implementation 'io.helidon.common:helidon-common-reactive:4.2.2'
23
```
24
25
## Core Imports
26
27
```java
28
import io.helidon.common.reactive.Multi;
29
import io.helidon.common.reactive.Single;
30
import io.helidon.common.reactive.IoMulti;
31
```
32
33
Import supporting types as needed:
34
35
```java
36
import io.helidon.common.reactive.Subscribable;
37
import io.helidon.common.reactive.Awaitable;
38
import io.helidon.common.reactive.CompletionAwaitable;
39
import io.helidon.common.reactive.RetrySchema;
40
```
41
42
## Basic Usage
43
44
```java
45
import io.helidon.common.reactive.Multi;
46
import io.helidon.common.reactive.Single;
47
import java.util.concurrent.CompletableFuture;
48
import java.util.List;
49
50
// Create and transform a Multi
51
Multi<String> names = Multi.just("Alice", "Bob", "Charlie")
52
.filter(name -> name.length() > 3)
53
.map(String::toUpperCase);
54
55
// Collect to List
56
List<String> result = names.collectList().await();
57
// Result: ["ALICE", "CHARLIE"]
58
59
// Create a Single from CompletionStage
60
CompletableFuture<String> future = CompletableFuture.completedFuture("Hello");
61
Single<String> greeting = Single.create(future)
62
.map(s -> s + " World!");
63
64
String message = greeting.await();
65
// Result: "Hello World!"
66
67
// Chain operations
68
Multi<Integer> numbers = Multi.range(1, 10)
69
.filter(n -> n % 2 == 0)
70
.map(n -> n * n)
71
.limit(3);
72
73
numbers.forEach(System.out::println); // Prints: 4, 16, 36
74
```
75
76
## Architecture
77
78
Helidon Common Reactive is built around several key components:
79
80
- **Multi<T>**: Represents 0-N items reactive stream with full backpressure support
81
- **Single<T>**: Represents 0-1 item reactive stream with CompletionStage compatibility
82
- **Flow API**: Based on Java's standard `java.util.concurrent.Flow` specification
83
- **Lazy Evaluation**: All operations are cold and executed per subscriber
84
- **Type Safety**: Full generic type support with type-safe transformations
85
- **I/O Integration**: Specialized classes for reactive I/O operations with ByteChannels, InputStreams, and OutputStreams
86
87
## Capabilities
88
89
### Multi Reactive Streams
90
91
Multi represents reactive streams that emit zero or more items. Provides comprehensive operators for transformation, filtering, error handling, and collection operations.
92
93
```java { .api }
94
public interface Multi<T> extends Subscribable<T> {
95
// Factory methods
96
static <T> Multi<T> empty();
97
static <T> Multi<T> just(T... items);
98
static <T> Multi<T> create(Flow.Publisher<T> publisher);
99
static <T> Multi<T> range(int start, int count);
100
101
// Transformation operators
102
<U> Multi<U> map(Function<? super T, ? extends U> mapper);
103
<U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper);
104
Multi<T> filter(Predicate<? super T> predicate);
105
Multi<T> limit(long maxSize);
106
107
// Terminal operations
108
Single<List<T>> collectList();
109
void forEach(Consumer<? super T> consumer);
110
}
111
```
112
113
[Multi Reactive Streams](./multi.md)
114
115
### Single Reactive Values
116
117
Single represents reactive streams that emit at most one item. Implements CompletionStage and provides blocking await operations for integration with existing code.
118
119
```java { .api }
120
public interface Single<T> extends Subscribable<T>, CompletionStage<T>, Awaitable<T> {
121
// Factory methods
122
static <T> Single<T> empty();
123
static <T> Single<T> just(T item);
124
static <T> Single<T> create(CompletionStage<T> completionStage);
125
static <T> Single<T> error(Throwable error);
126
127
// Transformation operators
128
<U> Single<U> map(Function<? super T, ? extends U> mapper);
129
<U> Multi<U> flatMap(Function<? super T, ? extends Flow.Publisher<? extends U>> mapper);
130
131
// Blocking operations
132
T await();
133
T await(Duration timeout);
134
}
135
```
136
137
[Single Reactive Values](./single.md)
138
139
### I/O Integration
140
141
Reactive I/O utilities for integrating with Java I/O streams, channels, and file operations. Enables reactive processing of I/O data with proper backpressure handling.
142
143
```java { .api }
144
public final class IoMulti {
145
// InputStream integration
146
static Multi<ByteBuffer> multiFromStream(InputStream inputStream);
147
static MultiFromInputStreamBuilder multiFromStreamBuilder(InputStream inputStream);
148
149
// OutputStream integration
150
static OutputStreamMulti outputStreamMulti();
151
static OutputStreamMultiBuilder outputStreamMultiBuilder();
152
153
// ByteChannel integration
154
static Multi<ByteBuffer> multiFromByteChannel(ReadableByteChannel channel);
155
static Function<Multi<ByteBuffer>, CompletionStage<Void>> multiToByteChannel(WritableByteChannel channel);
156
}
157
```
158
159
[I/O Integration](./io-integration.md)
160
161
### Supporting Types
162
163
Core interfaces and utility classes that support reactive operations including subscription management, awaitable operations, and retry strategies.
164
165
```java { .api }
166
public interface Subscribable<T> extends Flow.Publisher<T> {
167
void subscribe(Consumer<? super T> onNext);
168
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
169
void subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Runnable onComplete);
170
}
171
172
public interface Awaitable<T> {
173
CompletableFuture<T> toCompletableFuture();
174
T await();
175
T await(Duration timeout);
176
}
177
178
public interface RetrySchema {
179
long nextDelay(int retryCount, long lastDelay);
180
static RetrySchema constant(long delay);
181
static RetrySchema linear(long firstDelay, long increment, long maxDelay);
182
static RetrySchema geometric(long firstDelay, double ratio, long maxDelay);
183
}
184
```
185
186
[Supporting Types](./supporting-types.md)
187
188
## Error Handling Patterns
189
190
The library provides comprehensive error handling through several patterns:
191
192
1. **Resume Operations**: `onErrorResume()` and `onErrorResumeWith()` allow recovering from errors
193
2. **Retry Logic**: `retry()` methods with count, predicate, or publisher-based control
194
3. **Timeout Handling**: `timeout()` methods with optional fallback publishers
195
4. **Completion Handling**: `onCompleteResume()` methods for appending after completion
196
197
## Integration Patterns
198
199
### Java Standard Library Integration
200
201
- **CompletionStage**: Single implements CompletionStage for seamless async integration
202
- **Stream**: `Multi.create(Stream<T>)` converts Java Streams
203
- **Iterable**: `Multi.create(Iterable<T>)` and `flatMapIterable()` methods
204
- **Optional**: `flatMapOptional()` and `toOptionalSingle()` methods
205
- **Collectors**: `collectStream(java.util.stream.Collector)` uses Stream collectors
206
207
### Reactive Streams Integration
208
209
- **Flow.Publisher**: Both Multi and Single are Flow.Publisher implementations
210
- **Backpressure**: Full reactive streams backpressure support
211
- **Subscription**: Proper subscription lifecycle management
212
213
## Threading and Execution Model
214
215
- **Cold Streams**: All operations are lazy and executed per subscriber
216
- **Thread Safety**: Publishers are designed to be thread-safe for subscription
217
- **Execution Control**: `observeOn()` methods allow switching execution contexts
218
- **Async Operations**: CompletionStage integration allows async composition