0
# Observable API
1
2
Observable is RxJava's implementation of the non-backpressured reactive stream pattern, designed for handling sequences of 0 to N items. It provides comprehensive operator support for transformation, filtering, combination, and error handling without built-in flow control.
3
4
## Capabilities
5
6
### Observable Creation
7
8
Factory methods for creating Observable instances from various sources.
9
10
```java { .api }
11
// Required imports:
12
// import java.util.concurrent.Callable;
13
// import java.util.concurrent.Future;
14
// import java.util.concurrent.TimeUnit;
15
// import java.util.function.Consumer;
16
// import java.util.function.BiConsumer;
17
// import java.util.function.Supplier;
18
// import org.reactivestreams.Publisher;
19
/**
20
* Creates an Observable that emits a single item
21
* @param item the item to emit
22
* @return Observable that emits the single item
23
*/
24
public static <T> Observable<T> just(T item);
25
26
/**
27
* Creates an Observable that emits two items
28
* @param item1 first item to emit
29
* @param item2 second item to emit
30
* @return Observable that emits the two items
31
*/
32
public static <T> Observable<T> just(T item1, T item2);
33
34
/**
35
* Creates an Observable that emits three items
36
* @param item1 first item to emit
37
* @param item2 second item to emit
38
* @param item3 third item to emit
39
* @return Observable that emits the three items
40
*/
41
public static <T> Observable<T> just(T item1, T item2, T item3);
42
43
/**
44
* Creates an Observable that emits up to ten items
45
* @param items the items to emit (up to 10 items supported)
46
* @return Observable that emits all provided items
47
*/
48
public static <T> Observable<T> just(T item1, T item2, T item3, T item4, T item5, T item6, T item7, T item8, T item9, T item10);
49
50
/**
51
* Creates an Observable from an Iterable source
52
* @param source the Iterable to convert
53
* @return Observable that emits items from the Iterable
54
*/
55
public static <T> Observable<T> fromIterable(Iterable<? extends T> source);
56
57
/**
58
* Creates an Observable from an array
59
* @param array the array to convert
60
* @return Observable that emits items from the array
61
*/
62
public static <T> Observable<T> fromArray(T... array);
63
64
/**
65
* Creates an Observable using a custom emitter function
66
* @param source the ObservableOnSubscribe function
67
* @return Observable created from the custom emitter
68
*/
69
public static <T> Observable<T> create(ObservableOnSubscribe<T> source);
70
71
/**
72
* Creates an Observable that emits sequential integers
73
* @param start the starting value
74
* @param count the number of items to emit
75
* @return Observable emitting integers from start to start+count-1
76
*/
77
public static Observable<Integer> range(int start, int count);
78
79
/**
80
* Creates an Observable that emits at specified intervals
81
* @param period the emission interval
82
* @param unit the time unit
83
* @return Observable emitting sequential longs at intervals
84
*/
85
public static Observable<Long> interval(long period, TimeUnit unit);
86
87
/**
88
* Creates an Observable that emits after a delay
89
* @param delay the delay duration
90
* @param unit the time unit
91
* @return Observable that emits 0L after the delay
92
*/
93
public static Observable<Long> timer(long delay, TimeUnit unit);
94
95
/**
96
* Creates an empty Observable that only calls onComplete
97
* @return Observable that completes immediately
98
*/
99
public static <T> Observable<T> empty();
100
101
/**
102
* Creates an Observable that never emits anything
103
* @return Observable that never calls any observer methods
104
*/
105
public static <T> Observable<T> never();
106
107
/**
108
* Creates an Observable that only calls onError
109
* @param error the error to emit
110
* @return Observable that emits the error
111
*/
112
public static <T> Observable<T> error(Throwable error);
113
114
/**
115
* Creates an Observable from a Callable
116
* @param callable the Callable to invoke for each subscription
117
* @return Observable that emits the result of the Callable
118
*/
119
public static <T> Observable<T> fromCallable(Callable<? extends T> callable);
120
121
/**
122
* Creates an Observable from a CompletableSource
123
* @param completableSource the CompletableSource to convert
124
* @return Observable that completes when the CompletableSource completes
125
*/
126
public static <T> Observable<T> fromCompletable(CompletableSource completableSource);
127
128
/**
129
* Creates an Observable from a Future
130
* @param future the Future to convert
131
* @return Observable that emits the Future result
132
*/
133
public static <T> Observable<T> fromFuture(Future<? extends T> future);
134
135
/**
136
* Creates an Observable from a MaybeSource
137
* @param maybe the MaybeSource to convert
138
* @return Observable that emits the Maybe result or completes
139
*/
140
public static <T> Observable<T> fromMaybe(MaybeSource<T> maybe);
141
142
/**
143
* Creates an Observable from a Publisher (Reactive Streams)
144
* @param publisher the Publisher to convert
145
* @return Observable that emits items from the Publisher
146
*/
147
public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher);
148
149
/**
150
* Creates an Observable from a Runnable
151
* @param run the Runnable to execute
152
* @return Observable that completes after running the Runnable
153
*/
154
public static <T> Observable<T> fromRunnable(Runnable run);
155
156
/**
157
* Creates an Observable from a SingleSource
158
* @param source the SingleSource to convert
159
* @return Observable that emits the Single result
160
*/
161
public static <T> Observable<T> fromSingle(SingleSource<T> source);
162
163
/**
164
* Creates an Observable from a Supplier
165
* @param supplier the Supplier to invoke for each subscription
166
* @return Observable that emits the result of the Supplier
167
*/
168
public static <T> Observable<T> fromSupplier(Supplier<? extends T> supplier);
169
170
/**
171
* Creates an Observable that defers creation until subscription
172
* @param supplier function that returns an ObservableSource
173
* @return Observable that creates the actual source on subscription
174
*/
175
public static <T> Observable<T> defer(Supplier<? extends ObservableSource<? extends T>> supplier);
176
177
/**
178
* Creates an Observable that emits sequential long values in a range
179
* @param start the starting value
180
* @param count the number of items to emit
181
* @return Observable emitting longs from start to start+count-1
182
*/
183
public static Observable<Long> rangeLong(long start, long count);
184
185
/**
186
* Creates an Observable that emits at intervals starting after an initial delay
187
* @param initialDelay the initial delay before the first emission
188
* @param period the emission interval
189
* @param unit the time unit
190
* @return Observable emitting sequential longs at intervals
191
*/
192
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit);
193
194
/**
195
* Creates an Observable that emits sequential longs over time in a range
196
* @param start the starting value
197
* @param count the number of items to emit
198
* @param initialDelay the initial delay before the first emission
199
* @param period the emission interval
200
* @param unit the time unit
201
* @return Observable emitting sequential longs in range over time
202
*/
203
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit);
204
205
/**
206
* Mirrors the first Observable to emit or complete from multiple sources
207
* @param sources the Iterable of ObservableSource instances
208
* @return Observable that mirrors the first source to emit
209
*/
210
public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources);
211
212
/**
213
* Mirrors the first Observable to emit or complete from multiple sources
214
* @param sources the array of ObservableSource instances
215
* @return Observable that mirrors the first source to emit
216
*/
217
public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources);
218
219
/**
220
* Generates values on demand using a generator function
221
* @param generator the generator function that emits values
222
* @return Observable that generates values on demand
223
*/
224
public static <T> Observable<T> generate(Consumer<Emitter<T>> generator);
225
226
/**
227
* Generates values on demand with state
228
* @param initialState the initial state supplier
229
* @param generator the generator function with state
230
* @return Observable that generates values with state
231
*/
232
public static <T, S> Observable<T> generate(Supplier<S> initialState, BiConsumer<S, Emitter<T>> generator);
233
234
/**
235
* Compares two Observable sequences for equality
236
* @param source1 the first Observable sequence
237
* @param source2 the second Observable sequence
238
* @return Single that emits true if sequences are equal
239
*/
240
public static <T> Single<Boolean> sequenceEqual(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2);
241
242
/**
243
* Switches to new inner Observables as they arrive
244
* @param sources Observable of Observable sources
245
* @return Observable that switches to the latest inner Observable
246
*/
247
public static <T> Observable<T> switchOnNext(ObservableSource<? extends ObservableSource<? extends T>> sources);
248
249
/**
250
* Creates an Observable with resource management
251
* @param resourceSupplier supplier for the resource
252
* @param sourceSupplier function that creates the Observable from the resource
253
* @param resourceCleanup function to clean up the resource
254
* @return Observable with automatic resource management
255
*/
256
public static <T, D> Observable<T> using(
257
Supplier<? extends D> resourceSupplier,
258
Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier,
259
Consumer<? super D> resourceCleanup
260
);
261
262
/**
263
* Wraps an ObservableSource to make it an Observable
264
* @param source the ObservableSource to wrap
265
* @return Observable wrapping the source
266
*/
267
public static <T> Observable<T> wrap(ObservableSource<T> source);
268
```
269
270
### Java 8+ Integration
271
272
Factory methods for integrating with Java 8+ features.
273
274
```java { .api }
275
// Required imports:
276
// import java.util.Optional;
277
// import java.util.concurrent.CompletionStage;
278
// import java.util.stream.Stream;
279
/**
280
* Creates an Observable from an Optional (Java 8+)
281
* @param optional the Optional to convert
282
* @return Observable that emits the Optional value or completes if empty
283
*/
284
public static <T> Observable<T> fromOptional(Optional<T> optional);
285
286
/**
287
* Creates an Observable from a CompletionStage (Java 8+)
288
* @param stage the CompletionStage to convert
289
* @return Observable that emits the CompletionStage result
290
*/
291
public static <T> Observable<T> fromCompletionStage(CompletionStage<T> stage);
292
293
/**
294
* Creates an Observable from a Stream (Java 8+)
295
* @param stream the Stream to convert
296
* @return Observable that emits the Stream elements
297
*/
298
public static <T> Observable<T> fromStream(Stream<T> stream);
299
```
300
301
### Transformation Operators
302
303
Transform emitted items using various mapping functions.
304
305
```java { .api }
306
/**
307
* Transform items using a mapping function
308
* @param mapper function to transform each item
309
* @return Observable with transformed items
310
*/
311
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper);
312
313
/**
314
* Transform items to Observables and flatten the results
315
* @param mapper function returning Observable for each item
316
* @return Observable with flattened results
317
*/
318
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
319
320
/**
321
* Transform items to Observables and concatenate them in order
322
* @param mapper function returning Observable for each item
323
* @return Observable with concatenated results in order
324
*/
325
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);
326
327
/**
328
* Transform items to Singles and merge the results
329
* @param mapper function returning Single for each item
330
* @return Observable with Single results
331
*/
332
public final <R> Observable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper);
333
334
/**
335
* Emit only items that pass a predicate test
336
* @param predicate function to test each item
337
* @return Observable with filtered items
338
*/
339
public final Observable<T> filter(Predicate<? super T> predicate);
340
341
/**
342
* Emit only distinct items compared to previous emissions
343
* @return Observable with distinct consecutive items
344
*/
345
public final Observable<T> distinctUntilChanged();
346
347
/**
348
* Skip the first n items
349
* @param count number of items to skip
350
* @return Observable skipping the first count items
351
*/
352
public final Observable<T> skip(long count);
353
354
/**
355
* Take only the first n items
356
* @param count number of items to take
357
* @return Observable emitting only the first count items
358
*/
359
public final Observable<T> take(long count);
360
```
361
362
### Combination Operators
363
364
Combine multiple Observable sources.
365
366
```java { .api }
367
/**
368
* Merge multiple Observables into one
369
* @param sources array of Observable sources
370
* @return Observable merging all source emissions
371
*/
372
public static <T> Observable<T> merge(ObservableSource<? extends T>... sources);
373
374
/**
375
* Concatenate multiple Observables in sequence
376
* @param sources array of Observable sources
377
* @return Observable concatenating all sources in order
378
*/
379
public static <T> Observable<T> concat(ObservableSource<? extends T>... sources);
380
381
/**
382
* Combine the latest values from multiple sources
383
* @param source1 first Observable source
384
* @param source2 second Observable source
385
* @param combiner function to combine the latest values
386
* @return Observable emitting combined values
387
*/
388
public static <T1, T2, R> Observable<R> combineLatest(
389
ObservableSource<T1> source1,
390
ObservableSource<T2> source2,
391
BiFunction<? super T1, ? super T2, ? extends R> combiner
392
);
393
394
/**
395
* Zip multiple sources together
396
* @param source1 first Observable source
397
* @param source2 second Observable source
398
* @param zipper function to zip values together
399
* @return Observable emitting zipped values
400
*/
401
public static <T1, T2, R> Observable<R> zip(
402
ObservableSource<T1> source1,
403
ObservableSource<T2> source2,
404
BiFunction<? super T1, ? super T2, ? extends R> zipper
405
);
406
407
/**
408
* Start with additional items before the source emissions
409
* @param items items to emit first
410
* @return Observable starting with the specified items
411
*/
412
public final Observable<T> startWith(T... items);
413
414
/**
415
* Concatenate with another Observable
416
* @param other Observable to concatenate after this one
417
* @return Observable concatenating this and other
418
*/
419
public final Observable<T> concatWith(ObservableSource<? extends T> other);
420
```
421
422
### Subscription and Scheduling
423
424
Control when and how Observable emissions are observed.
425
426
```java { .api }
427
/**
428
* Subscribe with a simple onNext callback
429
* @param onNext function called for each emitted item
430
* @return Disposable for managing the subscription
431
*/
432
public final Disposable subscribe(Consumer<? super T> onNext);
433
434
/**
435
* Subscribe with onNext and onError callbacks
436
* @param onNext function called for each emitted item
437
* @param onError function called on error
438
* @return Disposable for managing the subscription
439
*/
440
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
441
442
/**
443
* Subscribe with full Observer interface
444
* @param observer the Observer to receive emissions
445
*/
446
public final void subscribe(Observer<? super T> observer);
447
448
/**
449
* Specify the Scheduler for subscription (source) operations
450
* @param scheduler the Scheduler to use for subscriptions
451
* @return Observable operating on the specified scheduler
452
*/
453
public final Observable<T> subscribeOn(Scheduler scheduler);
454
455
/**
456
* Specify the Scheduler for observation (downstream) operations
457
* @param scheduler the Scheduler to use for observations
458
* @return Observable observing on the specified scheduler
459
*/
460
public final Observable<T> observeOn(Scheduler scheduler);
461
```
462
463
### Error Handling
464
465
Manage errors in the Observable stream.
466
467
```java { .api }
468
/**
469
* Return a default item when an error occurs
470
* @param defaultItem the item to emit on error
471
* @return Observable that emits defaultItem on error
472
*/
473
public final Observable<T> onErrorReturn(T defaultItem);
474
475
/**
476
* Resume with another Observable when an error occurs
477
* @param resumeSource Observable to switch to on error
478
* @return Observable that switches to resumeSource on error
479
*/
480
public final Observable<T> onErrorResumeNext(ObservableSource<? extends T> resumeSource);
481
482
/**
483
* Retry the subscription when an error occurs
484
* @param times maximum number of retry attempts
485
* @return Observable that retries up to the specified times
486
*/
487
public final Observable<T> retry(long times);
488
489
/**
490
* Perform side-effect action when an error occurs
491
* @param onError action to perform on error
492
* @return Observable that performs the action on error
493
*/
494
public final Observable<T> doOnError(Consumer<? super Throwable> onError);
495
```
496
497
### Side Effects
498
499
Perform side-effect actions without modifying the stream.
500
501
```java { .api }
502
/**
503
* Perform an action for each emitted item
504
* @param onNext action to perform for each item
505
* @return Observable that performs the action for each item
506
*/
507
public final Observable<T> doOnNext(Consumer<? super T> onNext);
508
509
/**
510
* Perform an action when the Observable completes
511
* @param onComplete action to perform on completion
512
* @return Observable that performs the action on completion
513
*/
514
public final Observable<T> doOnComplete(Action onComplete);
515
516
/**
517
* Perform an action when subscription occurs
518
* @param onSubscribe action to perform on subscription
519
* @return Observable that performs the action on subscription
520
*/
521
public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe);
522
523
/**
524
* Perform an action when disposal occurs
525
* @param onDispose action to perform on disposal
526
* @return Observable that performs the action on disposal
527
*/
528
public final Observable<T> doOnDispose(Action onDispose);
529
```
530
531
## Types
532
533
```java { .api }
534
/**
535
* Interface for creating custom Observable sources
536
*/
537
public interface ObservableOnSubscribe<T> {
538
void subscribe(ObservableEmitter<T> emitter) throws Throwable;
539
}
540
541
/**
542
* Emitter interface for custom Observable creation
543
*/
544
public interface ObservableEmitter<T> extends Emitter<T> {
545
void onNext(T value);
546
void onError(Throwable error);
547
void onComplete();
548
boolean isDisposed();
549
}
550
551
/**
552
* Base interface for Observable sources
553
*/
554
public interface ObservableSource<T> {
555
void subscribe(Observer<? super T> observer);
556
}
557
```
558
559
**Usage Examples:**
560
561
```java
562
import io.reactivex.rxjava3.core.Observable;
563
import io.reactivex.rxjava3.schedulers.Schedulers;
564
import java.util.concurrent.TimeUnit;
565
566
// Basic Observable creation and subscription
567
Observable.just("Hello", "World")
568
.subscribe(System.out::println);
569
570
// Complex transformation chain
571
Observable.range(1, 10)
572
.filter(x -> x % 2 == 0)
573
.map(x -> x * x)
574
.take(3)
575
.subscribe(System.out::println);
576
577
// Async operations with scheduling
578
Observable.fromCallable(() -> {
579
Thread.sleep(1000);
580
return "Computed result";
581
})
582
.subscribeOn(Schedulers.io())
583
.observeOn(Schedulers.single())
584
.subscribe(
585
result -> System.out.println("Result: " + result),
586
error -> System.err.println("Error: " + error)
587
);
588
589
// Error handling
590
Observable.just(1, 2, 0, 4)
591
.map(x -> 10 / x)
592
.onErrorReturn(-1)
593
.subscribe(System.out::println);
594
```