or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

completable.mddisposables.mderror-handling.mdflowable.mdindex.mdmaybe.mdobservable.mdschedulers.mdsingle.mdsubjects.md

flowable.mddocs/

0

# Flowable Streams

1

2

Reactive streams with backpressure support for 0-N items. Flowable implements the Reactive Streams specification and is designed to handle scenarios where the producer may emit items faster than the consumer can process them.

3

4

## Capabilities

5

6

### Flowable Creation

7

8

Factory methods for creating Flowable instances with backpressure handling.

9

10

```java { .api }

11

/**

12

* Creates a Flowable that emits the provided items then completes

13

*/

14

public static <T> Flowable<T> just(T item);

15

public static <T> Flowable<T> just(T item1, T item2);

16

// ... up to 10 items

17

18

/**

19

* Creates a Flowable from an array

20

*/

21

public static <T> Flowable<T> fromArray(T... array);

22

23

/**

24

* Creates a Flowable from an Iterable

25

*/

26

public static <T> Flowable<T> fromIterable(Iterable<? extends T> source);

27

28

/**

29

* Creates a Flowable using the provided FlowableOnSubscribe with backpressure strategy

30

*/

31

public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode);

32

33

/**

34

* Creates a Flowable from a Publisher (Reactive Streams compatibility)

35

*/

36

public static <T> Flowable<T> fromPublisher(Publisher<? extends T> publisher);

37

38

/**

39

* Creates a Flowable that emits sequential numbers at intervals

40

*/

41

public static Flowable<Long> interval(long period, TimeUnit unit);

42

public static Flowable<Long> interval(long initialDelay, long period, TimeUnit unit);

43

44

/**

45

* Creates a Flowable that emits a range of integers

46

*/

47

public static Flowable<Integer> range(int start, int count);

48

49

/**

50

* Creates a Flowable that generates items using a generator function

51

*/

52

public static <T> Flowable<T> generate(Consumer<Emitter<T>> generator);

53

public static <S, T> Flowable<T> generate(Callable<S> initialState, BiConsumer<S, Emitter<T>> generator);

54

```

55

56

### Backpressure Handling

57

58

Operators specifically designed to handle backpressure scenarios.

59

60

```java { .api }

61

/**

62

* Buffers all items until the downstream is ready to receive them

63

*/

64

public final Flowable<T> onBackpressureBuffer();

65

public final Flowable<T> onBackpressureBuffer(int capacity);

66

public final Flowable<T> onBackpressureBuffer(int capacity, Action onOverflow);

67

68

/**

69

* Drops items when downstream can't keep up

70

*/

71

public final Flowable<T> onBackpressureDrop();

72

public final Flowable<T> onBackpressureDrop(Consumer<? super T> onDrop);

73

74

/**

75

* Keeps only the latest item when downstream can't keep up

76

*/

77

public final Flowable<T> onBackpressureLatest();

78

79

/**

80

* Reduces backpressure by sampling items at regular intervals

81

*/

82

public final Flowable<T> sample(long period, TimeUnit unit);

83

public final Flowable<T> sample(long period, TimeUnit unit, Scheduler scheduler);

84

85

/**

86

* Throttles items by only emitting the first item in each time window

87

*/

88

public final Flowable<T> throttleFirst(long windowDuration, TimeUnit unit);

89

90

/**

91

* Throttles items by only emitting the last item in each time window

92

*/

93

public final Flowable<T> throttleLast(long intervalDuration, TimeUnit unit);

94

95

/**

96

* Debounces items by only emitting when no new items arrive for a specified duration

97

*/

98

public final Flowable<T> debounce(long timeout, TimeUnit unit);

99

public final Flowable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler);

100

```

101

102

### Transformation Operators

103

104

Transform items with backpressure-aware operators.

105

106

```java { .api }

107

/**

108

* Transforms items using a function

109

*/

110

public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper);

111

112

/**

113

* FlatMap variants with backpressure handling

114

*/

115

public final <R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

116

public final <R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, int maxConcurrency);

117

public final <R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayErrors);

118

119

/**

120

* ConcatMap maintains order and handles backpressure

121

*/

122

public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

123

public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, int prefetch);

124

125

/**

126

* SwitchMap for latest values only

127

*/

128

public final <R> Flowable<R> switchMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

129

130

/**

131

* Accumulation with scan

132

*/

133

public final <R> Flowable<R> scan(BiFunction<R, ? super T, R> accumulator);

134

public final <R> Flowable<R> scan(R initialValue, BiFunction<R, ? super T, R> accumulator);

135

136

/**

137

* Grouping with backpressure handling

138

*/

139

public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? extends K> keySelector);

140

```

141

142

### Subscription and Flow Control

143

144

Subscribe to Flowable with proper flow control.

145

146

```java { .api }

147

/**

148

* Subscribe with FlowableSubscriber for full Reactive Streams compliance

149

*/

150

public final void subscribe(FlowableSubscriber<? super T> subscriber);

151

152

/**

153

* Subscribe with simple callbacks (automatically requests unbounded)

154

*/

155

public final Disposable subscribe(Consumer<? super T> onNext);

156

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);

157

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete);

158

159

/**

160

* Convert to Observable (loses backpressure handling)

161

*/

162

public final Observable<T> toObservable();

163

164

/**

165

* Blocking operations

166

*/

167

public final T blockingFirst();

168

public final T blockingLast();

169

public final T blockingSingle();

170

public final Iterable<T> blockingIterable();

171

```

172

173

### Parallel Processing

174

175

Process items in parallel with backpressure support.

176

177

```java { .api }

178

/**

179

* Creates a ParallelFlowable for parallel processing

180

*/

181

public final ParallelFlowable<T> parallel();

182

public final ParallelFlowable<T> parallel(int parallelism);

183

public final ParallelFlowable<T> parallel(int parallelism, int prefetch);

184

```

185

186

## Usage Examples

187

188

**Basic Flowable with Backpressure:**

189

190

```java

191

import io.reactivex.Flowable;

192

import io.reactivex.BackpressureStrategy;

193

import io.reactivex.FlowableSubscriber;

194

import org.reactivestreams.Subscription;

195

196

Flowable<Integer> source = Flowable.create(emitter -> {

197

for (int i = 1; i <= 1000; i++) {

198

if (emitter.isCancelled()) {

199

return;

200

}

201

emitter.onNext(i);

202

}

203

emitter.onComplete();

204

}, BackpressureStrategy.BUFFER);

205

206

source.subscribe(new FlowableSubscriber<Integer>() {

207

private Subscription subscription;

208

209

@Override

210

public void onSubscribe(Subscription s) {

211

subscription = s;

212

subscription.request(1); // Request first item

213

}

214

215

@Override

216

public void onNext(Integer integer) {

217

System.out.println("Received: " + integer);

218

subscription.request(1); // Request next item

219

}

220

221

@Override

222

public void onError(Throwable t) {

223

t.printStackTrace();

224

}

225

226

@Override

227

public void onComplete() {

228

System.out.println("Complete");

229

}

230

});

231

```

232

233

**Handling Fast Producer with Backpressure Strategies:**

234

235

```java

236

// Producer that emits items very quickly

237

Flowable<Integer> fastProducer = Flowable.create(emitter -> {

238

for (int i = 0; i < 1000000; i++) {

239

emitter.onNext(i);

240

}

241

emitter.onComplete();

242

}, BackpressureStrategy.MISSING);

243

244

// Strategy 1: Buffer all items

245

fastProducer

246

.onBackpressureBuffer()

247

.observeOn(Schedulers.io())

248

.subscribe(item -> {

249

Thread.sleep(1); // Slow consumer

250

System.out.println("Buffered: " + item);

251

});

252

253

// Strategy 2: Drop items when buffer is full

254

fastProducer

255

.onBackpressureDrop(dropped -> System.out.println("Dropped: " + dropped))

256

.subscribe(item -> System.out.println("Received: " + item));

257

258

// Strategy 3: Keep only latest

259

fastProducer

260

.onBackpressureLatest()

261

.subscribe(item -> System.out.println("Latest: " + item));

262

```

263

264

**Reactive Streams Interoperability:**

265

266

```java

267

import org.reactivestreams.Publisher;

268

import java.util.concurrent.Flow;

269

270

// Converting to/from Reactive Streams Publisher

271

Publisher<String> publisher = Flowable.just("Hello", "World");

272

273

Flowable<String> fromPublisher = Flowable.fromPublisher(publisher);

274

275

// Java 9+ Flow interoperability

276

Flow.Publisher<String> flowPublisher = fromPublisher.toFlowPublisher();

277

```

278

279

**Parallel Processing with Backpressure:**

280

281

```java

282

Flowable.range(1, 1000)

283

.parallel(4) // Split into 4 parallel streams

284

.runOn(Schedulers.computation()) // Each stream runs on computation scheduler

285

.map(i -> i * i) // Square each number in parallel

286

.sequential() // Merge back into single stream

287

.subscribe(result -> System.out.println("Result: " + result));

288

```

289

290

**Combining Flowables:**

291

292

```java

293

Flowable<Integer> source1 = Flowable.range(1, 5);

294

Flowable<Integer> source2 = Flowable.range(6, 5);

295

296

// Merge with backpressure handling

297

Flowable.merge(source1, source2)

298

.subscribe(item -> System.out.println("Merged: " + item));

299

300

// Zip with proper flow control

301

Flowable.zip(source1, source2, (a, b) -> a + b)

302

.subscribe(sum -> System.out.println("Sum: " + sum));

303

```

304

305

## Types

306

307

```java { .api }

308

/**

309

* Functional interface for creating Flowable with backpressure handling

310

*/

311

public interface FlowableOnSubscribe<T> {

312

void subscribe(FlowableEmitter<T> emitter) throws Exception;

313

}

314

315

/**

316

* Emitter for FlowableOnSubscribe

317

*/

318

public interface FlowableEmitter<T> extends Emitter<T> {

319

void setDisposable(Disposable d);

320

void setCancellable(Cancellable c);

321

long requested();

322

boolean isCancelled();

323

}

324

325

/**

326

* Subscriber interface compatible with Reactive Streams

327

*/

328

public interface FlowableSubscriber<T> extends Subscriber<T> {

329

// Inherits from org.reactivestreams.Subscriber

330

}

331

332

/**

333

* Grouped Flowable for groupBy operations

334

*/

335

public abstract class GroupedFlowable<K, T> extends Flowable<T> {

336

public abstract K getKey();

337

}

338

```