0
# Flowable Streams
1
2
Reactive streams with backpressure support for 0-N items. Flowable implements the Reactive Streams specification and is designed to handle scenarios where the producer may emit items faster than the consumer can process them.
3
4
## Capabilities
5
6
### Flowable Creation
7
8
Factory methods for creating Flowable instances with backpressure handling.
9
10
```java { .api }
11
/**
12
* Creates a Flowable that emits the provided items then completes
13
*/
14
public static <T> Flowable<T> just(T item);
15
public static <T> Flowable<T> just(T item1, T item2);
16
// ... up to 10 items
17
18
/**
19
* Creates a Flowable from an array
20
*/
21
public static <T> Flowable<T> fromArray(T... array);
22
23
/**
24
* Creates a Flowable from an Iterable
25
*/
26
public static <T> Flowable<T> fromIterable(Iterable<? extends T> source);
27
28
/**
29
* Creates a Flowable using the provided FlowableOnSubscribe with backpressure strategy
30
*/
31
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode);
32
33
/**
34
* Creates a Flowable from a Publisher (Reactive Streams compatibility)
35
*/
36
public static <T> Flowable<T> fromPublisher(Publisher<? extends T> publisher);
37
38
/**
39
* Creates a Flowable that emits sequential numbers at intervals
40
*/
41
public static Flowable<Long> interval(long period, TimeUnit unit);
42
public static Flowable<Long> interval(long initialDelay, long period, TimeUnit unit);
43
44
/**
45
* Creates a Flowable that emits a range of integers
46
*/
47
public static Flowable<Integer> range(int start, int count);
48
49
/**
50
* Creates a Flowable that generates items using a generator function
51
*/
52
public static <T> Flowable<T> generate(Consumer<Emitter<T>> generator);
53
public static <S, T> Flowable<T> generate(Callable<S> initialState, BiConsumer<S, Emitter<T>> generator);
54
```
55
56
### Backpressure Handling
57
58
Operators specifically designed to handle backpressure scenarios.
59
60
```java { .api }
61
/**
62
* Buffers all items until the downstream is ready to receive them
63
*/
64
public final Flowable<T> onBackpressureBuffer();
65
public final Flowable<T> onBackpressureBuffer(int capacity);
66
public final Flowable<T> onBackpressureBuffer(int capacity, Action onOverflow);
67
68
/**
69
* Drops items when downstream can't keep up
70
*/
71
public final Flowable<T> onBackpressureDrop();
72
public final Flowable<T> onBackpressureDrop(Consumer<? super T> onDrop);
73
74
/**
75
* Keeps only the latest item when downstream can't keep up
76
*/
77
public final Flowable<T> onBackpressureLatest();
78
79
/**
80
* Reduces backpressure by sampling items at regular intervals
81
*/
82
public final Flowable<T> sample(long period, TimeUnit unit);
83
public final Flowable<T> sample(long period, TimeUnit unit, Scheduler scheduler);
84
85
/**
86
* Throttles items by only emitting the first item in each time window
87
*/
88
public final Flowable<T> throttleFirst(long windowDuration, TimeUnit unit);
89
90
/**
91
* Throttles items by only emitting the last item in each time window
92
*/
93
public final Flowable<T> throttleLast(long intervalDuration, TimeUnit unit);
94
95
/**
96
* Debounces items by only emitting when no new items arrive for a specified duration
97
*/
98
public final Flowable<T> debounce(long timeout, TimeUnit unit);
99
public final Flowable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler);
100
```
101
102
### Transformation Operators
103
104
Transform items with backpressure-aware operators.
105
106
```java { .api }
107
/**
108
* Transforms items using a function
109
*/
110
public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper);
111
112
/**
113
* FlatMap variants with backpressure handling
114
*/
115
public final <R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
116
public final <R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, int maxConcurrency);
117
public final <R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayErrors);
118
119
/**
120
* ConcatMap maintains order and handles backpressure
121
*/
122
public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
123
public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, int prefetch);
124
125
/**
126
* SwitchMap for latest values only
127
*/
128
public final <R> Flowable<R> switchMap(Function<? super T, ? extends Publisher<? extends R>> mapper);
129
130
/**
131
* Accumulation with scan
132
*/
133
public final <R> Flowable<R> scan(BiFunction<R, ? super T, R> accumulator);
134
public final <R> Flowable<R> scan(R initialValue, BiFunction<R, ? super T, R> accumulator);
135
136
/**
137
* Grouping with backpressure handling
138
*/
139
public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? extends K> keySelector);
140
```
141
142
### Subscription and Flow Control
143
144
Subscribe to Flowable with proper flow control.
145
146
```java { .api }
147
/**
148
* Subscribe with FlowableSubscriber for full Reactive Streams compliance
149
*/
150
public final void subscribe(FlowableSubscriber<? super T> subscriber);
151
152
/**
153
* Subscribe with simple callbacks (automatically requests unbounded)
154
*/
155
public final Disposable subscribe(Consumer<? super T> onNext);
156
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
157
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete);
158
159
/**
160
* Convert to Observable (loses backpressure handling)
161
*/
162
public final Observable<T> toObservable();
163
164
/**
165
* Blocking operations
166
*/
167
public final T blockingFirst();
168
public final T blockingLast();
169
public final T blockingSingle();
170
public final Iterable<T> blockingIterable();
171
```
172
173
### Parallel Processing
174
175
Process items in parallel with backpressure support.
176
177
```java { .api }
178
/**
179
* Creates a ParallelFlowable for parallel processing
180
*/
181
public final ParallelFlowable<T> parallel();
182
public final ParallelFlowable<T> parallel(int parallelism);
183
public final ParallelFlowable<T> parallel(int parallelism, int prefetch);
184
```
185
186
## Usage Examples
187
188
**Basic Flowable with Backpressure:**
189
190
```java
191
import io.reactivex.Flowable;
192
import io.reactivex.BackpressureStrategy;
193
import io.reactivex.FlowableSubscriber;
194
import org.reactivestreams.Subscription;
195
196
Flowable<Integer> source = Flowable.create(emitter -> {
197
for (int i = 1; i <= 1000; i++) {
198
if (emitter.isCancelled()) {
199
return;
200
}
201
emitter.onNext(i);
202
}
203
emitter.onComplete();
204
}, BackpressureStrategy.BUFFER);
205
206
source.subscribe(new FlowableSubscriber<Integer>() {
207
private Subscription subscription;
208
209
@Override
210
public void onSubscribe(Subscription s) {
211
subscription = s;
212
subscription.request(1); // Request first item
213
}
214
215
@Override
216
public void onNext(Integer integer) {
217
System.out.println("Received: " + integer);
218
subscription.request(1); // Request next item
219
}
220
221
@Override
222
public void onError(Throwable t) {
223
t.printStackTrace();
224
}
225
226
@Override
227
public void onComplete() {
228
System.out.println("Complete");
229
}
230
});
231
```
232
233
**Handling Fast Producer with Backpressure Strategies:**
234
235
```java
236
// Producer that emits items very quickly
237
Flowable<Integer> fastProducer = Flowable.create(emitter -> {
238
for (int i = 0; i < 1000000; i++) {
239
emitter.onNext(i);
240
}
241
emitter.onComplete();
242
}, BackpressureStrategy.MISSING);
243
244
// Strategy 1: Buffer all items
245
fastProducer
246
.onBackpressureBuffer()
247
.observeOn(Schedulers.io())
248
.subscribe(item -> {
249
Thread.sleep(1); // Slow consumer
250
System.out.println("Buffered: " + item);
251
});
252
253
// Strategy 2: Drop items when buffer is full
254
fastProducer
255
.onBackpressureDrop(dropped -> System.out.println("Dropped: " + dropped))
256
.subscribe(item -> System.out.println("Received: " + item));
257
258
// Strategy 3: Keep only latest
259
fastProducer
260
.onBackpressureLatest()
261
.subscribe(item -> System.out.println("Latest: " + item));
262
```
263
264
**Reactive Streams Interoperability:**
265
266
```java
267
import org.reactivestreams.Publisher;
268
import java.util.concurrent.Flow;
269
270
// Converting to/from Reactive Streams Publisher
271
Publisher<String> publisher = Flowable.just("Hello", "World");
272
273
Flowable<String> fromPublisher = Flowable.fromPublisher(publisher);
274
275
// Java 9+ Flow interoperability
276
Flow.Publisher<String> flowPublisher = fromPublisher.toFlowPublisher();
277
```
278
279
**Parallel Processing with Backpressure:**
280
281
```java
282
Flowable.range(1, 1000)
283
.parallel(4) // Split into 4 parallel streams
284
.runOn(Schedulers.computation()) // Each stream runs on computation scheduler
285
.map(i -> i * i) // Square each number in parallel
286
.sequential() // Merge back into single stream
287
.subscribe(result -> System.out.println("Result: " + result));
288
```
289
290
**Combining Flowables:**
291
292
```java
293
Flowable<Integer> source1 = Flowable.range(1, 5);
294
Flowable<Integer> source2 = Flowable.range(6, 5);
295
296
// Merge with backpressure handling
297
Flowable.merge(source1, source2)
298
.subscribe(item -> System.out.println("Merged: " + item));
299
300
// Zip with proper flow control
301
Flowable.zip(source1, source2, (a, b) -> a + b)
302
.subscribe(sum -> System.out.println("Sum: " + sum));
303
```
304
305
## Types
306
307
```java { .api }
308
/**
309
* Functional interface for creating Flowable with backpressure handling
310
*/
311
public interface FlowableOnSubscribe<T> {
312
void subscribe(FlowableEmitter<T> emitter) throws Exception;
313
}
314
315
/**
316
* Emitter for FlowableOnSubscribe
317
*/
318
public interface FlowableEmitter<T> extends Emitter<T> {
319
void setDisposable(Disposable d);
320
void setCancellable(Cancellable c);
321
long requested();
322
boolean isCancelled();
323
}
324
325
/**
326
* Subscriber interface compatible with Reactive Streams
327
*/
328
public interface FlowableSubscriber<T> extends Subscriber<T> {
329
// Inherits from org.reactivestreams.Subscriber
330
}
331
332
/**
333
* Grouped Flowable for groupBy operations
334
*/
335
public abstract class GroupedFlowable<K, T> extends Flowable<T> {
336
public abstract K getKey();
337
}
338
```