0
# RxJava
1
2
RxJava 2.x is a Java VM implementation of Reactive Extensions that provides a comprehensive library for composing asynchronous and event-based programs using observable sequences. It extends the observer pattern to support data/event sequences with operators for declarative composition while abstracting low-level threading, synchronization, and concurrent data structures.
3
4
## Package Information
5
6
- **Package Name**: rxjava
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Installation**:
10
```xml
11
<dependency>
12
<groupId>io.reactivex.rxjava2</groupId>
13
<artifactId>rxjava</artifactId>
14
<version>2.2.21</version>
15
</dependency>
16
```
17
- **Gradle**: `implementation 'io.reactivex.rxjava2:rxjava:2.2.21'`
18
19
## Core Imports
20
21
```java
22
import io.reactivex.Observable;
23
import io.reactivex.Flowable;
24
import io.reactivex.Single;
25
import io.reactivex.Maybe;
26
import io.reactivex.Completable;
27
import io.reactivex.Observer;
28
import io.reactivex.disposables.Disposable;
29
import io.reactivex.schedulers.Schedulers;
30
```
31
32
## Basic Usage
33
34
```java
35
import io.reactivex.Observable;
36
import io.reactivex.Observer;
37
import io.reactivex.disposables.Disposable;
38
import io.reactivex.schedulers.Schedulers;
39
40
// Create and subscribe to an Observable
41
Observable<String> observable = Observable.just("Hello", "World")
42
.map(s -> s.toUpperCase())
43
.subscribeOn(Schedulers.io())
44
.observeOn(Schedulers.computation());
45
46
observable.subscribe(new Observer<String>() {
47
@Override
48
public void onSubscribe(Disposable d) {
49
// Handle subscription
50
}
51
52
@Override
53
public void onNext(String value) {
54
System.out.println(value);
55
}
56
57
@Override
58
public void onError(Throwable e) {
59
e.printStackTrace();
60
}
61
62
@Override
63
public void onComplete() {
64
System.out.println("Completed");
65
}
66
});
67
68
// Lambda-style subscription
69
Observable.fromArray(1, 2, 3, 4, 5)
70
.filter(x -> x % 2 == 0)
71
.map(x -> x * x)
72
.subscribe(
73
value -> System.out.println("Value: " + value),
74
error -> error.printStackTrace(),
75
() -> System.out.println("Complete")
76
);
77
```
78
79
## Architecture
80
81
RxJava is built around several key components:
82
83
- **Reactive Types**: Five core types (Observable, Flowable, Single, Maybe, Completable) for different use cases
84
- **Observer Pattern**: Push-based notifications with Observer interfaces for each reactive type
85
- **Operators**: Rich set of operators for transformation, filtering, combining, and error handling
86
- **Schedulers**: Abstraction for controlling threading and execution context
87
- **Backpressure**: Flow control mechanism in Flowable to handle fast producers
88
- **Disposables**: Resource management for subscription cleanup
89
- **Subjects/Processors**: Hot observables that can multicast to multiple observers
90
91
## Capabilities
92
93
### Observable Streams
94
95
Cold observable sequences for 0-N items without backpressure support. Ideal for UI events, HTTP requests, and general reactive programming patterns.
96
97
```java { .api }
98
public abstract class Observable<T> implements ObservableSource<T> {
99
// Factory methods
100
public static <T> Observable<T> just(T... items);
101
public static <T> Observable<T> fromArray(T... array);
102
public static <T> Observable<T> fromIterable(Iterable<? extends T> source);
103
public static Observable<Long> interval(long period, TimeUnit unit);
104
public static <T> Observable<T> create(ObservableOnSubscribe<T> source);
105
106
// Transformation operators
107
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper);
108
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
109
public final Observable<T> filter(Predicate<? super T> predicate);
110
111
// Threading
112
public final Observable<T> subscribeOn(Scheduler scheduler);
113
public final Observable<T> observeOn(Scheduler scheduler);
114
115
// Subscription
116
public final Disposable subscribe(Consumer<? super T> onNext);
117
public final void subscribe(Observer<? super T> observer);
118
}
119
```
120
121
[Observable Streams](./observable.md)
122
123
### Flowable Streams
124
125
Reactive streams with backpressure support for 0-N items. Compatible with Reactive Streams specification for handling fast data producers.
126
127
```java { .api }
128
public abstract class Flowable<T> implements Publisher<T> {
129
// Factory methods with backpressure handling
130
public static <T> Flowable<T> just(T... items);
131
public static <T> Flowable<T> fromArray(T... array);
132
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode);
133
134
// Backpressure operators
135
public final Flowable<T> onBackpressureBuffer();
136
public final Flowable<T> onBackpressureDrop();
137
public final Flowable<T> onBackpressureLatest();
138
139
// Standard operators
140
public final <R> Flowable<R> map(Function<? super T, ? extends R> mapper);
141
public final Flowable<T> filter(Predicate<? super T> predicate);
142
143
// Subscription
144
public final void subscribe(FlowableSubscriber<? super T> subscriber);
145
}
146
```
147
148
[Flowable Streams](./flowable.md)
149
150
### Single Values
151
152
Reactive type that emits exactly one value or an error. Perfect for async operations that return a single result like HTTP requests.
153
154
```java { .api }
155
public abstract class Single<T> implements SingleSource<T> {
156
// Factory methods
157
public static <T> Single<T> just(T item);
158
public static <T> Single<T> fromCallable(Callable<? extends T> callable);
159
public static <T> Single<T> create(SingleOnSubscribe<T> source);
160
161
// Transformation
162
public final <R> Single<R> map(Function<? super T, ? extends R> mapper);
163
public final <R> Single<R> flatMap(Function<? super T, ? extends SingleSource<? extends R>> mapper);
164
165
// Error handling
166
public final Single<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier);
167
168
// Subscription
169
public final Disposable subscribe(Consumer<? super T> onSuccess);
170
public final void subscribe(SingleObserver<? super T> observer);
171
}
172
```
173
174
[Single Values](./single.md)
175
176
### Maybe Values
177
178
Reactive type that emits 0 or 1 item or an error. Useful for operations that may or may not return a value.
179
180
```java { .api }
181
public abstract class Maybe<T> implements MaybeSource<T> {
182
// Factory methods
183
public static <T> Maybe<T> just(T item);
184
public static <T> Maybe<T> empty();
185
public static <T> Maybe<T> fromCallable(Callable<? extends T> callable);
186
187
// Transformation
188
public final <R> Maybe<R> map(Function<? super T, ? extends R> mapper);
189
public final Maybe<T> filter(Predicate<? super T> predicate);
190
public final Maybe<T> defaultIfEmpty(T defaultItem);
191
192
// Subscription
193
public final Disposable subscribe(Consumer<? super T> onSuccess);
194
public final void subscribe(MaybeObserver<? super T> observer);
195
}
196
```
197
198
[Maybe Values](./maybe.md)
199
200
### Completable Operations
201
202
Reactive type that only signals completion or error without emitting items. Ideal for fire-and-forget operations.
203
204
```java { .api }
205
public abstract class Completable implements CompletableSource {
206
// Factory methods
207
public static Completable complete();
208
public static Completable fromAction(Action action);
209
public static Completable fromRunnable(Runnable runnable);
210
211
// Chaining
212
public final <T> Observable<T> andThen(ObservableSource<T> next);
213
public final Completable andThen(CompletableSource next);
214
215
// Subscription
216
public final Disposable subscribe(Action onComplete);
217
public final void subscribe(CompletableObserver observer);
218
}
219
```
220
221
[Completable Operations](./completable.md)
222
223
### Schedulers and Threading
224
225
Control execution context and threading for reactive streams.
226
227
```java { .api }
228
public abstract class Scheduler {
229
public abstract Worker createWorker();
230
public Disposable scheduleDirect(Runnable run);
231
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit);
232
}
233
234
public final class Schedulers {
235
public static Scheduler io();
236
public static Scheduler computation();
237
public static Scheduler newThread();
238
public static Scheduler single();
239
public static Scheduler trampoline();
240
public static Scheduler from(Executor executor);
241
}
242
```
243
244
[Schedulers and Threading](./schedulers.md)
245
246
### Subjects and Hot Observables
247
248
Hot observables that can multicast to multiple observers and emit items regardless of subscriptions.
249
250
```java { .api }
251
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
252
public abstract boolean hasObservers();
253
public abstract boolean hasThrowable();
254
public abstract boolean hasComplete();
255
}
256
257
// Main subject types
258
public final class PublishSubject<T> extends Subject<T>;
259
public final class BehaviorSubject<T> extends Subject<T>;
260
public final class ReplaySubject<T> extends Subject<T>;
261
public final class AsyncSubject<T> extends Subject<T>;
262
```
263
264
[Subjects and Hot Observables](./subjects.md)
265
266
### Resource Management
267
268
Disposable pattern for managing subscriptions and preventing memory leaks.
269
270
```java { .api }
271
public interface Disposable {
272
void dispose();
273
boolean isDisposed();
274
}
275
276
public final class CompositeDisposable implements Disposable {
277
public boolean add(Disposable disposable);
278
public boolean remove(Disposable disposable);
279
public void clear();
280
}
281
```
282
283
[Resource Management](./disposables.md)
284
285
### Error Handling
286
287
Comprehensive error handling and recovery mechanisms.
288
289
```java { .api }
290
// Error handling operators available on all reactive types
291
public final Observable<T> onErrorReturn(Function<? super Throwable, ? extends T> valueSupplier);
292
public final Observable<T> onErrorResumeNext(Function<? super Throwable, ? extends ObservableSource<? extends T>> resumeFunction);
293
public final Observable<T> retry();
294
public final Observable<T> retry(long times);
295
public final Observable<T> retryWhen(Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler);
296
```
297
298
[Error Handling](./error-handling.md)
299
300
## Core Types
301
302
```java { .api }
303
// Observer interfaces
304
public interface Observer<T> {
305
void onSubscribe(Disposable d);
306
void onNext(T t);
307
void onError(Throwable e);
308
void onComplete();
309
}
310
311
public interface SingleObserver<T> {
312
void onSubscribe(Disposable d);
313
void onSuccess(T t);
314
void onError(Throwable e);
315
}
316
317
public interface MaybeObserver<T> {
318
void onSubscribe(Disposable d);
319
void onSuccess(T t);
320
void onError(Throwable e);
321
void onComplete();
322
}
323
324
public interface CompletableObserver {
325
void onSubscribe(Disposable d);
326
void onComplete();
327
void onError(Throwable e);
328
}
329
330
// Functional interfaces
331
public interface Consumer<T> {
332
void accept(T t) throws Exception;
333
}
334
335
public interface Function<T, R> {
336
R apply(T t) throws Exception;
337
}
338
339
public interface Predicate<T> {
340
boolean test(T t) throws Exception;
341
}
342
343
public interface Action {
344
void run() throws Exception;
345
}
346
347
// Backpressure strategies
348
public enum BackpressureStrategy {
349
MISSING, ERROR, BUFFER, DROP, LATEST
350
}
351
```