0
# Error Handling
1
2
Comprehensive error handling and recovery mechanisms in RxJava. Error handling is crucial for building robust reactive applications that can gracefully handle failures and recover from error conditions.
3
4
## Capabilities
5
6
### Basic Error Handling Operators
7
8
Operators available on all reactive types for handling errors.
9
10
```java { .api }
11
/**
12
* Returns a reactive stream that emits a fallback value when the source emits an error
13
*/
14
public final T onErrorReturn(Function<? super Throwable, ? extends R> valueSupplier);
15
public final T onErrorReturn(R value);
16
17
/**
18
* Returns a reactive stream that switches to another stream when the source emits an error
19
*/
20
public final T onErrorResumeNext(Function<? super Throwable, ? extends T> resumeFunction);
21
public final T onErrorResumeNext(T resumeStream);
22
23
/**
24
* Re-subscribes to the source stream when an error occurs
25
*/
26
public final T retry();
27
public final T retry(long times);
28
public final T retry(BiPredicate<? super Integer, ? super Throwable> predicate);
29
30
/**
31
* Re-subscribes based on a function that receives error notifications
32
*/
33
public final T retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler);
34
35
/**
36
* Performs an action when an error occurs without affecting the stream
37
*/
38
public final T doOnError(Consumer<? super Throwable> onError);
39
40
/**
41
* Materializes error notifications as regular onNext emissions
42
*/
43
public final Observable<Notification<T>> materialize();
44
```
45
46
### Completable-Specific Error Handling
47
48
Additional error handling for Completable operations.
49
50
```java { .api }
51
/**
52
* Converts errors to successful completion
53
*/
54
public final Completable onErrorComplete();
55
public final Completable onErrorComplete(Predicate<? super Throwable> predicate);
56
```
57
58
### Maybe-Specific Error Handling
59
60
Additional error handling for Maybe operations.
61
62
```java { .api }
63
/**
64
* Converts errors to empty completion
65
*/
66
public final Maybe<T> onErrorComplete();
67
public final Maybe<T> onErrorComplete(Predicate<? super Throwable> predicate);
68
```
69
70
### Global Error Handling
71
72
Global hooks for undeliverable errors and general error handling.
73
74
```java { .api }
75
/**
76
* Global plugin system for error handling
77
*/
78
public final class RxJavaPlugins {
79
/**
80
* Sets a global error handler for undeliverable exceptions
81
*/
82
public static void setErrorHandler(Consumer<? super Throwable> handler);
83
84
/**
85
* Gets the current global error handler
86
*/
87
public static Consumer<? super Throwable> getErrorHandler();
88
89
/**
90
* Called when an error cannot be delivered to observers
91
*/
92
public static void onError(Throwable error);
93
}
94
```
95
96
### Exception Types
97
98
Common exception types used in RxJava.
99
100
```java { .api }
101
/**
102
* Thrown when multiple exceptions occur
103
*/
104
public final class CompositeException extends RuntimeException {
105
/**
106
* Returns the list of suppressed exceptions
107
*/
108
public List<Throwable> getExceptions();
109
110
/**
111
* Returns the number of suppressed exceptions
112
*/
113
public int size();
114
}
115
116
/**
117
* Thrown when onError is called but no error handler is provided
118
*/
119
public final class OnErrorNotImplementedException extends RuntimeException {
120
// Standard exception wrapper
121
}
122
123
/**
124
* Thrown when backpressure buffer overflows
125
*/
126
public final class MissingBackpressureException extends RuntimeException {
127
// Backpressure-related exception
128
}
129
130
/**
131
* Thrown when the Reactive Streams protocol is violated
132
*/
133
public final class ProtocolViolationException extends IllegalStateException {
134
// Protocol violation exception
135
}
136
137
/**
138
* Wrapper for exceptions that couldn't be delivered to downstream
139
*/
140
public final class UndeliverableException extends RuntimeException {
141
// Undeliverable exception wrapper
142
}
143
```
144
145
## Usage Examples
146
147
**Basic Error Recovery with onErrorReturn:**
148
149
```java
150
import io.reactivex.Observable;
151
152
Observable<String> riskyOperation = Observable.fromCallable(() -> {
153
if (Math.random() > 0.5) {
154
throw new RuntimeException("Random failure");
155
}
156
return "Success";
157
});
158
159
// Provide fallback value on error
160
riskyOperation
161
.onErrorReturn(throwable -> {
162
System.out.println("Error occurred: " + throwable.getMessage());
163
return "Fallback Value";
164
})
165
.subscribe(result -> System.out.println("Result: " + result));
166
167
// Simple fallback value
168
riskyOperation
169
.onErrorReturn("Default Value")
170
.subscribe(result -> System.out.println("Result: " + result));
171
```
172
173
**Error Recovery with Alternative Stream:**
174
175
```java
176
Observable<String> primarySource = Observable.fromCallable(() -> {
177
throw new RuntimeException("Primary source failed");
178
});
179
180
Observable<String> fallbackSource = Observable.just("Fallback", "Data");
181
182
// Switch to fallback stream on error
183
primarySource
184
.onErrorResumeNext(throwable -> {
185
System.out.println("Primary failed, using fallback: " + throwable.getMessage());
186
return fallbackSource;
187
})
188
.subscribe(
189
item -> System.out.println("Item: " + item),
190
error -> System.err.println("Final error: " + error)
191
);
192
```
193
194
**Retry Strategies:**
195
196
```java
197
Observable<String> unreliableService = Observable.fromCallable(() -> {
198
if (Math.random() > 0.7) {
199
return "Success";
200
}
201
throw new RuntimeException("Service temporarily unavailable");
202
});
203
204
// Simple retry (infinite)
205
unreliableService
206
.retry()
207
.take(1) // Take first success
208
.subscribe(
209
result -> System.out.println("Got result: " + result),
210
error -> System.err.println("Never succeeded: " + error)
211
);
212
213
// Retry limited times
214
unreliableService
215
.retry(3)
216
.subscribe(
217
result -> System.out.println("Success after retries: " + result),
218
error -> System.err.println("Failed after 3 retries: " + error)
219
);
220
221
// Conditional retry
222
unreliableService
223
.retry((retryCount, throwable) -> {
224
System.out.println("Retry attempt " + retryCount + " for: " + throwable.getMessage());
225
return retryCount < 5 && throwable instanceof RuntimeException;
226
})
227
.subscribe(
228
result -> System.out.println("Conditional retry success: " + result),
229
error -> System.err.println("Conditional retry failed: " + error)
230
);
231
```
232
233
**Advanced Retry with Exponential Backoff:**
234
235
```java
236
import java.util.concurrent.TimeUnit;
237
238
Observable<String> apiCall = Observable.fromCallable(() -> {
239
// Simulate API that fails 80% of the time
240
if (Math.random() > 0.2) {
241
throw new RuntimeException("API Error");
242
}
243
return "API Response";
244
});
245
246
// Retry with exponential backoff
247
apiCall
248
.retryWhen(errors ->
249
errors
250
.zipWith(Observable.range(1, 4), (throwable, attempt) -> {
251
System.out.println("Attempt " + attempt + " failed: " + throwable.getMessage());
252
return attempt;
253
})
254
.flatMap(attempt -> {
255
long delay = (long) Math.pow(2, attempt); // Exponential backoff
256
System.out.println("Retrying in " + delay + " seconds...");
257
return Observable.timer(delay, TimeUnit.SECONDS);
258
})
259
)
260
.subscribe(
261
result -> System.out.println("API Success: " + result),
262
error -> System.err.println("API Failed after all retries: " + error)
263
);
264
```
265
266
**Error Handling in Chains:**
267
268
```java
269
Observable.fromCallable(() -> "input")
270
.map(input -> {
271
if (input.equals("input")) {
272
throw new IllegalArgumentException("Invalid input");
273
}
274
return input.toUpperCase();
275
})
276
.flatMap(processed -> Observable.fromCallable(() -> {
277
if (processed.equals("ERROR")) {
278
throw new RuntimeException("Processing failed");
279
}
280
return "Processed: " + processed;
281
}))
282
.onErrorResumeNext(throwable -> {
283
if (throwable instanceof IllegalArgumentException) {
284
return Observable.just("Input validation failed");
285
} else if (throwable instanceof RuntimeException) {
286
return Observable.just("Processing failed, using default");
287
}
288
return Observable.error(throwable); // Re-throw unknown errors
289
})
290
.subscribe(
291
result -> System.out.println("Final result: " + result),
292
error -> System.err.println("Unhandled error: " + error)
293
);
294
```
295
296
**Side-Effect Error Logging:**
297
298
```java
299
Observable<Integer> source = Observable.range(1, 10)
300
.map(i -> {
301
if (i == 5) {
302
throw new RuntimeException("Error at item " + i);
303
}
304
return i * i;
305
});
306
307
source
308
.doOnError(throwable -> {
309
// Log error without affecting stream
310
System.err.println("Logging error: " + throwable.getMessage());
311
// Could also log to file, send to monitoring system, etc.
312
})
313
.onErrorReturn(-1) // Recover after logging
314
.subscribe(
315
value -> System.out.println("Value: " + value),
316
error -> System.err.println("Final error: " + error) // Won't be called due to onErrorReturn
317
);
318
```
319
320
**Error Materialization:**
321
322
```java
323
Observable<String> source = Observable.just("A", "B")
324
.concatWith(Observable.error(new RuntimeException("Error")))
325
.concatWith(Observable.just("C")); // This won't be reached
326
327
// Materialize errors as regular notifications
328
source
329
.materialize()
330
.subscribe(notification -> {
331
if (notification.isOnNext()) {
332
System.out.println("Value: " + notification.getValue());
333
} else if (notification.isOnError()) {
334
System.out.println("Error: " + notification.getError().getMessage());
335
} else if (notification.isOnComplete()) {
336
System.out.println("Completed");
337
}
338
});
339
340
// Dematerialize back to regular stream (optional)
341
source
342
.materialize()
343
.filter(notification -> !notification.isOnError()) // Skip errors
344
.dematerialize(notification -> notification)
345
.subscribe(
346
value -> System.out.println("Filtered value: " + value),
347
error -> System.err.println("This won't be called"),
348
() -> System.out.println("Completed without errors")
349
);
350
```
351
352
**Completable Error Handling:**
353
354
```java
355
Completable riskyOperation = Completable.fromAction(() -> {
356
if (Math.random() > 0.5) {
357
throw new RuntimeException("Operation failed");
358
}
359
System.out.println("Operation succeeded");
360
});
361
362
// Convert error to completion
363
riskyOperation
364
.onErrorComplete()
365
.subscribe(
366
() -> System.out.println("Completed (success or error converted)"),
367
error -> System.err.println("This won't be called")
368
);
369
370
// Conditional error to completion
371
riskyOperation
372
.onErrorComplete(throwable -> throwable instanceof RuntimeException)
373
.subscribe(
374
() -> System.out.println("Completed (RuntimeException converted)"),
375
error -> System.err.println("Non-RuntimeException: " + error)
376
);
377
378
// Resume with another Completable
379
Completable fallbackOperation = Completable.fromAction(() ->
380
System.out.println("Fallback operation executed"));
381
382
riskyOperation
383
.onErrorResumeNext(throwable -> {
384
System.out.println("Primary failed, running fallback: " + throwable.getMessage());
385
return fallbackOperation;
386
})
387
.subscribe(
388
() -> System.out.println("Some operation completed"),
389
error -> System.err.println("Both operations failed: " + error)
390
);
391
```
392
393
**Global Error Handling:**
394
395
```java
396
import io.reactivex.plugins.RxJavaPlugins;
397
398
// Set global error handler for undeliverable exceptions
399
RxJavaPlugins.setErrorHandler(throwable -> {
400
System.err.println("Undeliverable exception: " + throwable.getMessage());
401
throwable.printStackTrace();
402
403
// Could also:
404
// - Log to crash reporting service
405
// - Send to monitoring system
406
// - Write to log file
407
// - Trigger app restart in severe cases
408
});
409
410
// Example of undeliverable error
411
PublishSubject<String> subject = PublishSubject.create();
412
subject.subscribe(
413
value -> System.out.println("Value: " + value),
414
error -> System.err.println("Error: " + error)
415
);
416
417
// Complete the subject
418
subject.onComplete();
419
420
// This error cannot be delivered (subject already terminated)
421
// It will be caught by the global error handler
422
subject.onError(new RuntimeException("Undeliverable error"));
423
```
424
425
**Complex Error Handling Scenario:**
426
427
```java
428
// Multi-step process with different error handling strategies
429
Observable<String> complexProcess = Observable.fromCallable(() -> "input")
430
// Step 1: Validation with retry
431
.flatMap(input -> Observable.fromCallable(() -> validateInput(input))
432
.retry(2)
433
.onErrorResumeNext(throwable -> Observable.just("default-input")))
434
435
// Step 2: Processing with timeout and fallback
436
.flatMap(validInput -> processData(validInput)
437
.timeout(5, TimeUnit.SECONDS)
438
.onErrorReturn(throwable -> {
439
if (throwable instanceof TimeoutException) {
440
return "timeout-result";
441
}
442
return "error-result";
443
}))
444
445
// Step 3: Final transformation with error logging
446
.map(result -> result.toUpperCase())
447
.doOnError(throwable -> logError("Final step failed", throwable))
448
.onErrorReturn("FINAL-FALLBACK");
449
450
complexProcess.subscribe(
451
result -> System.out.println("Final result: " + result),
452
error -> System.err.println("Unexpected error: " + error) // Should never be called
453
);
454
455
private static String validateInput(String input) {
456
if (input == null || input.isEmpty()) {
457
throw new IllegalArgumentException("Invalid input");
458
}
459
return input;
460
}
461
462
private static Observable<String> processData(String input) {
463
return Observable.fromCallable(() -> {
464
// Simulate processing that might fail or timeout
465
Thread.sleep(3000);
466
if (Math.random() > 0.7) {
467
throw new RuntimeException("Processing failed");
468
}
469
return "processed-" + input;
470
}).subscribeOn(Schedulers.io());
471
}
472
473
private static void logError(String context, Throwable throwable) {
474
System.err.println(context + ": " + throwable.getMessage());
475
}
476
```
477
478
## Error Handling Best Practices
479
480
**Guidelines:**
481
482
1. **Always handle errors**: Never ignore errors in reactive streams
483
2. **Use appropriate operators**: Choose the right error handling operator for your use case
484
3. **Fail fast vs. resilience**: Balance between failing fast and being resilient
485
4. **Log errors**: Always log errors for debugging and monitoring
486
5. **Global handler**: Set up a global error handler for undeliverable exceptions
487
6. **Test error scenarios**: Write tests for error conditions
488
7. **Resource cleanup**: Ensure resources are cleaned up even when errors occur
489
8. **User experience**: Provide meaningful error messages to users
490
491
**Common Patterns:**
492
493
- **Retry with backoff**: For transient network errors
494
- **Fallback values**: For non-critical operations
495
- **Alternative streams**: For redundant data sources
496
- **Error conversion**: Convert errors to empty/default for optional operations
497
- **Circuit breaker**: Stop trying after consecutive failures
498
- **Timeout handling**: Set reasonable timeouts for operations
499
500
## Types
501
502
```java { .api }
503
/**
504
* Predicate for conditional operations
505
*/
506
public interface Predicate<T> {
507
boolean test(T t) throws Exception;
508
}
509
510
/**
511
* BiPredicate for retry conditions
512
*/
513
public interface BiPredicate<T1, T2> {
514
boolean test(T1 t1, T2 t2) throws Exception;
515
}
516
517
/**
518
* Function for error mapping
519
*/
520
public interface Function<T, R> {
521
R apply(T t) throws Exception;
522
}
523
524
/**
525
* Consumer for side effects
526
*/
527
public interface Consumer<T> {
528
void accept(T t) throws Exception;
529
}
530
```