0
# Observable Streams
1
2
Cold observable sequences for 0-N items without backpressure support. Observable is the most commonly used reactive type in RxJava, ideal for UI events, HTTP requests, and general reactive programming patterns.
3
4
## Capabilities
5
6
### Observable Creation
7
8
Factory methods for creating Observable instances from various sources.
9
10
```java { .api }
11
/**
12
* Creates an Observable that emits the provided items then completes
13
*/
14
public static <T> Observable<T> just(T item);
15
public static <T> Observable<T> just(T item1, T item2);
16
public static <T> Observable<T> just(T item1, T item2, T item3);
17
// ... up to 10 items
18
19
/**
20
* Creates an Observable that emits all items from an array
21
*/
22
public static <T> Observable<T> fromArray(T... array);
23
24
/**
25
* Creates an Observable that emits all items from an Iterable
26
*/
27
public static <T> Observable<T> fromIterable(Iterable<? extends T> source);
28
29
/**
30
* Creates an Observable from a Callable that will be called for each observer
31
*/
32
public static <T> Observable<T> fromCallable(Callable<? extends T> callable);
33
34
/**
35
* Creates an Observable from a Future
36
*/
37
public static <T> Observable<T> fromFuture(Future<? extends T> future);
38
39
/**
40
* Creates an Observable using the provided ObservableOnSubscribe function
41
*/
42
public static <T> Observable<T> create(ObservableOnSubscribe<T> source);
43
44
/**
45
* Creates an Observable that emits sequential numbers every specified interval
46
*/
47
public static Observable<Long> interval(long period, TimeUnit unit);
48
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit);
49
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler);
50
51
/**
52
* Creates an Observable that emits a range of sequential integers
53
*/
54
public static Observable<Integer> range(int start, int count);
55
56
/**
57
* Creates an Observable that emits a single 0L after a delay
58
*/
59
public static Observable<Long> timer(long delay, TimeUnit unit);
60
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler);
61
62
/**
63
* Creates an empty Observable that only calls onComplete
64
*/
65
public static <T> Observable<T> empty();
66
67
/**
68
* Creates an Observable that never emits any items and never terminates
69
*/
70
public static <T> Observable<T> never();
71
72
/**
73
* Creates an Observable that only calls onError
74
*/
75
public static <T> Observable<T> error(Throwable exception);
76
public static <T> Observable<T> error(Callable<? extends Throwable> errorSupplier);
77
78
/**
79
* Defers the creation of the Observable until subscription
80
*/
81
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> observableSupplier);
82
```
83
84
### Transformation Operators
85
86
Transform items emitted by an Observable.
87
88
```java { .api }
89
/**
90
* Transforms items by applying a function to each item
91
*/
92
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper);
93
94
/**
95
* Transforms items into Observables and flattens them into a single Observable
96
*/
97
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
98
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, int maxConcurrency);
99
100
/**
101
* Similar to flatMap but maintains the order of the original items
102
*/
103
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
104
105
/**
106
* Similar to flatMap but only subscribes to the most recent inner Observable
107
*/
108
public final <R> Observable<R> switchMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
109
110
/**
111
* Casts each item to the specified type
112
*/
113
public final <U> Observable<U> cast(Class<U> clazz);
114
115
/**
116
* Applies a function to each item and emits the result
117
*/
118
public final <R> Observable<R> scan(BiFunction<R, ? super T, R> accumulator);
119
public final <R> Observable<R> scan(R initialValue, BiFunction<R, ? super T, R> accumulator);
120
121
/**
122
* Groups items by a key selector function
123
*/
124
public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector);
125
126
/**
127
* Collects items into buffers
128
*/
129
public final Observable<List<T>> buffer(int count);
130
public final Observable<List<T>> buffer(long timespan, TimeUnit unit);
131
public final Observable<List<T>> buffer(long timespan, TimeUnit unit, Scheduler scheduler);
132
133
/**
134
* Creates non-overlapping windows of items
135
*/
136
public final Observable<Observable<T>> window(int count);
137
public final Observable<Observable<T>> window(long timespan, TimeUnit unit);
138
```
139
140
### Filtering Operators
141
142
Filter items emitted by an Observable.
143
144
```java { .api }
145
/**
146
* Filters items based on a predicate
147
*/
148
public final Observable<T> filter(Predicate<? super T> predicate);
149
150
/**
151
* Emits only the first n items
152
*/
153
public final Observable<T> take(long count);
154
155
/**
156
* Emits items for a specified duration
157
*/
158
public final Observable<T> take(long time, TimeUnit unit);
159
160
/**
161
* Skips the first n items
162
*/
163
public final Observable<T> skip(long count);
164
165
/**
166
* Skips items for a specified duration
167
*/
168
public final Observable<T> skip(long time, TimeUnit unit);
169
170
/**
171
* Emits only distinct items
172
*/
173
public final Observable<T> distinct();
174
public final <K> Observable<T> distinct(Function<? super T, K> keySelector);
175
176
/**
177
* Emits only items that are different from the previous item
178
*/
179
public final Observable<T> distinctUntilChanged();
180
public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector);
181
182
/**
183
* Emits only the first item that matches a predicate
184
*/
185
public final Observable<T> takeWhile(Predicate<? super T> predicate);
186
187
/**
188
* Skips items while a predicate is true
189
*/
190
public final Observable<T> skipWhile(Predicate<? super T> predicate);
191
192
/**
193
* Emits only the first item, or throws NoSuchElementException if empty
194
*/
195
public final Observable<T> first(T defaultItem);
196
197
/**
198
* Emits only the last item
199
*/
200
public final Observable<T> last(T defaultItem);
201
202
/**
203
* Emits only the single item, or throws exception if more than one
204
*/
205
public final Observable<T> single(T defaultItem);
206
207
/**
208
* Ignores all items and only emits completion
209
*/
210
public final Completable ignoreElements();
211
```
212
213
### Combining Operators
214
215
Combine multiple Observables.
216
217
```java { .api }
218
/**
219
* Combines two Observables by emitting an item when either emits
220
*/
221
public static <T> Observable<T> merge(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2);
222
public static <T> Observable<T> merge(ObservableSource<? extends ObservableSource<? extends T>> sources);
223
224
/**
225
* Concatenates Observables sequentially
226
*/
227
public static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2);
228
public static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources);
229
230
/**
231
* Combines latest values from multiple Observables
232
*/
233
public static <T1, T2, R> Observable<R> combineLatest(
234
ObservableSource<T1> source1,
235
ObservableSource<T2> source2,
236
BiFunction<? super T1, ? super T2, ? extends R> combiner
237
);
238
239
/**
240
* Zips items from multiple Observables together
241
*/
242
public static <T1, T2, R> Observable<R> zip(
243
ObservableSource<T1> source1,
244
ObservableSource<T2> source2,
245
BiFunction<? super T1, ? super T2, ? extends R> zipper
246
);
247
248
/**
249
* Returns the first Observable to emit or terminate
250
*/
251
public static <T> Observable<T> amb(ObservableSource<? extends T>... sources);
252
253
/**
254
* Prepends items to the beginning of an Observable
255
*/
256
public final Observable<T> startWith(T item);
257
public final Observable<T> startWith(T... items);
258
public final Observable<T> startWithArray(T... items);
259
260
/**
261
* Appends items to the end of an Observable
262
*/
263
public final Observable<T> concatWith(ObservableSource<? extends T> other);
264
```
265
266
### Threading Operators
267
268
Control threading and execution context.
269
270
```java { .api }
271
/**
272
* Specifies the Scheduler on which the source Observable will operate
273
*/
274
public final Observable<T> subscribeOn(Scheduler scheduler);
275
276
/**
277
* Specifies the Scheduler on which observers will be notified
278
*/
279
public final Observable<T> observeOn(Scheduler scheduler);
280
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError);
281
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize);
282
```
283
284
### Side-Effect Operators
285
286
Perform side effects without modifying the stream.
287
288
```java { .api }
289
/**
290
* Invokes an action for each item emitted
291
*/
292
public final Observable<T> doOnNext(Consumer<? super T> onNext);
293
294
/**
295
* Invokes an action when an error is emitted
296
*/
297
public final Observable<T> doOnError(Consumer<? super Throwable> onError);
298
299
/**
300
* Invokes an action when the Observable completes normally
301
*/
302
public final Observable<T> doOnComplete(Action onComplete);
303
304
/**
305
* Invokes an action when a subscription occurs
306
*/
307
public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe);
308
309
/**
310
* Invokes an action when the Observable terminates (completes or errors)
311
*/
312
public final Observable<T> doOnTerminate(Action onTerminate);
313
314
/**
315
* Invokes an action after the Observable terminates
316
*/
317
public final Observable<T> doAfterTerminate(Action onFinally);
318
319
/**
320
* Invokes an action when the subscription is disposed
321
*/
322
public final Observable<T> doOnDispose(Action onDispose);
323
```
324
325
### Temporal Operators
326
327
Control timing and temporal behavior of streams.
328
329
```java { .api }
330
/**
331
* Only emit items if no other item was emitted within a time window
332
*/
333
public final Observable<T> debounce(long timeout, TimeUnit unit);
334
public final Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler);
335
336
/**
337
* Emit only the first item in each time window
338
*/
339
public final Observable<T> throttleFirst(long windowDuration, TimeUnit unit);
340
public final Observable<T> throttleFirst(long windowDuration, TimeUnit unit, Scheduler scheduler);
341
342
/**
343
* Emit only the last item in each time window
344
*/
345
public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit);
346
public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler);
347
348
/**
349
* Shift emissions forward in time by a specified delay
350
*/
351
public final Observable<T> delay(long delay, TimeUnit unit);
352
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler);
353
354
/**
355
* Sample items periodically
356
*/
357
public final Observable<T> sample(long period, TimeUnit unit);
358
public final Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler);
359
```
360
361
### Utility Operators
362
363
Utility operations for various purposes.
364
365
```java { .api }
366
/**
367
* Mirror source Observable, but terminate with TimeoutException if no item is emitted within timeout
368
*/
369
public final Observable<T> timeout(long timeout, TimeUnit timeUnit);
370
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler);
371
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> fallback);
372
373
/**
374
* Cache emissions for replay to all future subscribers
375
*/
376
public final Observable<T> cache();
377
public final Observable<T> cache(int capacityHint);
378
379
/**
380
* Convert notifications into Notification objects
381
*/
382
public final Observable<Notification<T>> materialize();
383
384
/**
385
* Emit the specified value if source is empty
386
*/
387
public final Observable<T> defaultIfEmpty(T defaultItem);
388
389
/**
390
* Count the number of items emitted
391
*/
392
public final Single<Long> count();
393
394
/**
395
* Emit source items and a notification when source completes
396
*/
397
public final Observable<T> doFinally(Action onFinally);
398
```
399
400
### Subscription and Consumption
401
402
Subscribe to an Observable and consume emitted items.
403
404
```java { .api }
405
/**
406
* Subscribes with separate callbacks for each event type
407
*/
408
public final Disposable subscribe();
409
public final Disposable subscribe(Consumer<? super T> onNext);
410
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
411
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete);
412
413
/**
414
* Subscribes with an Observer
415
*/
416
public final void subscribe(Observer<? super T> observer);
417
418
/**
419
* Blocking operations - use with caution
420
*/
421
public final T blockingFirst();
422
public final T blockingFirst(T defaultItem);
423
public final T blockingLast();
424
public final T blockingLast(T defaultItem);
425
public final T blockingSingle();
426
public final T blockingSingle(T defaultItem);
427
public final Iterable<T> blockingIterable();
428
public final void blockingSubscribe(Consumer<? super T> onNext);
429
```
430
431
## Usage Examples
432
433
**Basic Observable Creation and Subscription:**
434
435
```java
436
import io.reactivex.Observable;
437
import io.reactivex.disposables.Disposable;
438
439
// Simple Observable
440
Observable<String> source = Observable.just("Hello", "World");
441
442
Disposable disposable = source.subscribe(
443
item -> System.out.println("Item: " + item),
444
error -> System.err.println("Error: " + error),
445
() -> System.out.println("Complete")
446
);
447
448
// Remember to dispose when done
449
disposable.dispose();
450
```
451
452
**Transformation Chain:**
453
454
```java
455
Observable.fromArray(1, 2, 3, 4, 5)
456
.filter(x -> x % 2 == 0) // Keep even numbers
457
.map(x -> x * x) // Square them
458
.subscribe(result -> System.out.println("Result: " + result));
459
```
460
461
**Async Operations with Threading:**
462
463
```java
464
import io.reactivex.schedulers.Schedulers;
465
466
Observable<String> asyncSource = Observable.fromCallable(() -> {
467
// Simulate expensive operation
468
Thread.sleep(1000);
469
return "Async Result";
470
}).subscribeOn(Schedulers.io()) // Execute on IO thread
471
.observeOn(Schedulers.computation()); // Observe on computation thread
472
473
asyncSource.subscribe(
474
result -> System.out.println("Got: " + result),
475
error -> error.printStackTrace()
476
);
477
```
478
479
**Creating Custom Observable:**
480
481
```java
482
Observable<Integer> customObservable = Observable.create(emitter -> {
483
try {
484
for (int i = 1; i <= 5; i++) {
485
if (emitter.isDisposed()) {
486
return;
487
}
488
emitter.onNext(i);
489
}
490
emitter.onComplete();
491
} catch (Exception e) {
492
emitter.onError(e);
493
}
494
});
495
```
496
497
**Combining Multiple Observables:**
498
499
```java
500
Observable<String> obs1 = Observable.just("A", "B");
501
Observable<String> obs2 = Observable.just("1", "2");
502
503
Observable.zip(obs1, obs2, (s1, s2) -> s1 + s2)
504
.subscribe(combined -> System.out.println(combined)); // A1, B2
505
```
506
507
**Side Effects and Debugging:**
508
509
```java
510
Observable.fromArray(1, 2, 3, 4, 5)
511
.doOnNext(item -> System.out.println("Processing: " + item))
512
.filter(x -> x % 2 == 0)
513
.doOnNext(item -> System.out.println("After filter: " + item))
514
.map(x -> x * x)
515
.doOnComplete(() -> System.out.println("Stream completed"))
516
.subscribe(result -> System.out.println("Final result: " + result));
517
```
518
519
**Temporal Operations:**
520
521
```java
522
import java.util.concurrent.TimeUnit;
523
524
// Debounce - only emit if 300ms pass without another emission
525
Observable.interval(100, TimeUnit.MILLISECONDS)
526
.take(10)
527
.debounce(300, TimeUnit.MILLISECONDS)
528
.subscribe(item -> System.out.println("Debounced: " + item));
529
530
// Delay emissions by 1 second
531
Observable.just("Delayed", "Message")
532
.delay(1, TimeUnit.SECONDS)
533
.subscribe(item -> System.out.println("Got: " + item));
534
```
535
536
**Utility Operations:**
537
538
```java
539
// Timeout and fallback
540
Observable.timer(2, TimeUnit.SECONDS)
541
.timeout(1, TimeUnit.SECONDS, Observable.just("Fallback"))
542
.subscribe(
543
result -> System.out.println("Result: " + result),
544
error -> System.out.println("Timeout occurred")
545
);
546
547
// Cache for replay
548
Observable<String> cached = Observable.fromCallable(() -> {
549
System.out.println("Expensive operation executed");
550
return "Expensive Result";
551
}).cache();
552
553
// Multiple subscriptions will reuse cached result
554
cached.subscribe(result -> System.out.println("First: " + result));
555
cached.subscribe(result -> System.out.println("Second: " + result));
556
```