RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
—
Comprehensive error handling and recovery mechanisms in RxJava. Error handling is crucial for building robust reactive applications that can gracefully handle failures and recover from error conditions.
Operators available on all reactive types for handling errors.
/**
* Returns a reactive stream that emits a fallback value when the source emits an error
*/
public final T onErrorReturn(Function<? super Throwable, ? extends R> valueSupplier);
public final T onErrorReturn(R value);
/**
* Returns a reactive stream that switches to another stream when the source emits an error
*/
public final T onErrorResumeNext(Function<? super Throwable, ? extends T> resumeFunction);
public final T onErrorResumeNext(T resumeStream);
/**
* Re-subscribes to the source stream when an error occurs
*/
public final T retry();
public final T retry(long times);
public final T retry(BiPredicate<? super Integer, ? super Throwable> predicate);
/**
* Re-subscribes based on a function that receives error notifications
*/
public final T retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler);
/**
* Performs an action when an error occurs without affecting the stream
*/
public final T doOnError(Consumer<? super Throwable> onError);
/**
* Materializes error notifications as regular onNext emissions
*/
public final Observable<Notification<T>> materialize();Additional error handling for Completable operations.
/**
* Converts errors to successful completion
*/
public final Completable onErrorComplete();
public final Completable onErrorComplete(Predicate<? super Throwable> predicate);Additional error handling for Maybe operations.
/**
* Converts errors to empty completion
*/
public final Maybe<T> onErrorComplete();
public final Maybe<T> onErrorComplete(Predicate<? super Throwable> predicate);Global hooks for undeliverable errors and general error handling.
/**
* Global plugin system for error handling
*/
public final class RxJavaPlugins {
/**
* Sets a global error handler for undeliverable exceptions
*/
public static void setErrorHandler(Consumer<? super Throwable> handler);
/**
* Gets the current global error handler
*/
public static Consumer<? super Throwable> getErrorHandler();
/**
* Called when an error cannot be delivered to observers
*/
public static void onError(Throwable error);
}Common exception types used in RxJava.
/**
* Thrown when multiple exceptions occur
*/
public final class CompositeException extends RuntimeException {
/**
* Returns the list of suppressed exceptions
*/
public List<Throwable> getExceptions();
/**
* Returns the number of suppressed exceptions
*/
public int size();
}
/**
* Thrown when onError is called but no error handler is provided
*/
public final class OnErrorNotImplementedException extends RuntimeException {
// Standard exception wrapper
}
/**
* Thrown when backpressure buffer overflows
*/
public final class MissingBackpressureException extends RuntimeException {
// Backpressure-related exception
}
/**
* Thrown when the Reactive Streams protocol is violated
*/
public final class ProtocolViolationException extends IllegalStateException {
// Protocol violation exception
}
/**
* Wrapper for exceptions that couldn't be delivered to downstream
*/
public final class UndeliverableException extends RuntimeException {
// Undeliverable exception wrapper
}Basic Error Recovery with onErrorReturn:
import io.reactivex.Observable;
Observable<String> riskyOperation = Observable.fromCallable(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Random failure");
}
return "Success";
});
// Provide fallback value on error
riskyOperation
.onErrorReturn(throwable -> {
System.out.println("Error occurred: " + throwable.getMessage());
return "Fallback Value";
})
.subscribe(result -> System.out.println("Result: " + result));
// Simple fallback value
riskyOperation
.onErrorReturn("Default Value")
.subscribe(result -> System.out.println("Result: " + result));Error Recovery with Alternative Stream:
Observable<String> primarySource = Observable.fromCallable(() -> {
throw new RuntimeException("Primary source failed");
});
Observable<String> fallbackSource = Observable.just("Fallback", "Data");
// Switch to fallback stream on error
primarySource
.onErrorResumeNext(throwable -> {
System.out.println("Primary failed, using fallback: " + throwable.getMessage());
return fallbackSource;
})
.subscribe(
item -> System.out.println("Item: " + item),
error -> System.err.println("Final error: " + error)
);Retry Strategies:
Observable<String> unreliableService = Observable.fromCallable(() -> {
if (Math.random() > 0.7) {
return "Success";
}
throw new RuntimeException("Service temporarily unavailable");
});
// Simple retry (infinite)
unreliableService
.retry()
.take(1) // Take first success
.subscribe(
result -> System.out.println("Got result: " + result),
error -> System.err.println("Never succeeded: " + error)
);
// Retry limited times
unreliableService
.retry(3)
.subscribe(
result -> System.out.println("Success after retries: " + result),
error -> System.err.println("Failed after 3 retries: " + error)
);
// Conditional retry
unreliableService
.retry((retryCount, throwable) -> {
System.out.println("Retry attempt " + retryCount + " for: " + throwable.getMessage());
return retryCount < 5 && throwable instanceof RuntimeException;
})
.subscribe(
result -> System.out.println("Conditional retry success: " + result),
error -> System.err.println("Conditional retry failed: " + error)
);Advanced Retry with Exponential Backoff:
import java.util.concurrent.TimeUnit;
Observable<String> apiCall = Observable.fromCallable(() -> {
// Simulate API that fails 80% of the time
if (Math.random() > 0.2) {
throw new RuntimeException("API Error");
}
return "API Response";
});
// Retry with exponential backoff
apiCall
.retryWhen(errors ->
errors
.zipWith(Observable.range(1, 4), (throwable, attempt) -> {
System.out.println("Attempt " + attempt + " failed: " + throwable.getMessage());
return attempt;
})
.flatMap(attempt -> {
long delay = (long) Math.pow(2, attempt); // Exponential backoff
System.out.println("Retrying in " + delay + " seconds...");
return Observable.timer(delay, TimeUnit.SECONDS);
})
)
.subscribe(
result -> System.out.println("API Success: " + result),
error -> System.err.println("API Failed after all retries: " + error)
);Error Handling in Chains:
Observable.fromCallable(() -> "input")
.map(input -> {
if (input.equals("input")) {
throw new IllegalArgumentException("Invalid input");
}
return input.toUpperCase();
})
.flatMap(processed -> Observable.fromCallable(() -> {
if (processed.equals("ERROR")) {
throw new RuntimeException("Processing failed");
}
return "Processed: " + processed;
}))
.onErrorResumeNext(throwable -> {
if (throwable instanceof IllegalArgumentException) {
return Observable.just("Input validation failed");
} else if (throwable instanceof RuntimeException) {
return Observable.just("Processing failed, using default");
}
return Observable.error(throwable); // Re-throw unknown errors
})
.subscribe(
result -> System.out.println("Final result: " + result),
error -> System.err.println("Unhandled error: " + error)
);Side-Effect Error Logging:
Observable<Integer> source = Observable.range(1, 10)
.map(i -> {
if (i == 5) {
throw new RuntimeException("Error at item " + i);
}
return i * i;
});
source
.doOnError(throwable -> {
// Log error without affecting stream
System.err.println("Logging error: " + throwable.getMessage());
// Could also log to file, send to monitoring system, etc.
})
.onErrorReturn(-1) // Recover after logging
.subscribe(
value -> System.out.println("Value: " + value),
error -> System.err.println("Final error: " + error) // Won't be called due to onErrorReturn
);Error Materialization:
Observable<String> source = Observable.just("A", "B")
.concatWith(Observable.error(new RuntimeException("Error")))
.concatWith(Observable.just("C")); // This won't be reached
// Materialize errors as regular notifications
source
.materialize()
.subscribe(notification -> {
if (notification.isOnNext()) {
System.out.println("Value: " + notification.getValue());
} else if (notification.isOnError()) {
System.out.println("Error: " + notification.getError().getMessage());
} else if (notification.isOnComplete()) {
System.out.println("Completed");
}
});
// Dematerialize back to regular stream (optional)
source
.materialize()
.filter(notification -> !notification.isOnError()) // Skip errors
.dematerialize(notification -> notification)
.subscribe(
value -> System.out.println("Filtered value: " + value),
error -> System.err.println("This won't be called"),
() -> System.out.println("Completed without errors")
);Completable Error Handling:
Completable riskyOperation = Completable.fromAction(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Operation failed");
}
System.out.println("Operation succeeded");
});
// Convert error to completion
riskyOperation
.onErrorComplete()
.subscribe(
() -> System.out.println("Completed (success or error converted)"),
error -> System.err.println("This won't be called")
);
// Conditional error to completion
riskyOperation
.onErrorComplete(throwable -> throwable instanceof RuntimeException)
.subscribe(
() -> System.out.println("Completed (RuntimeException converted)"),
error -> System.err.println("Non-RuntimeException: " + error)
);
// Resume with another Completable
Completable fallbackOperation = Completable.fromAction(() ->
System.out.println("Fallback operation executed"));
riskyOperation
.onErrorResumeNext(throwable -> {
System.out.println("Primary failed, running fallback: " + throwable.getMessage());
return fallbackOperation;
})
.subscribe(
() -> System.out.println("Some operation completed"),
error -> System.err.println("Both operations failed: " + error)
);Global Error Handling:
import io.reactivex.plugins.RxJavaPlugins;
// Set global error handler for undeliverable exceptions
RxJavaPlugins.setErrorHandler(throwable -> {
System.err.println("Undeliverable exception: " + throwable.getMessage());
throwable.printStackTrace();
// Could also:
// - Log to crash reporting service
// - Send to monitoring system
// - Write to log file
// - Trigger app restart in severe cases
});
// Example of undeliverable error
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(
value -> System.out.println("Value: " + value),
error -> System.err.println("Error: " + error)
);
// Complete the subject
subject.onComplete();
// This error cannot be delivered (subject already terminated)
// It will be caught by the global error handler
subject.onError(new RuntimeException("Undeliverable error"));Complex Error Handling Scenario:
// Multi-step process with different error handling strategies
Observable<String> complexProcess = Observable.fromCallable(() -> "input")
// Step 1: Validation with retry
.flatMap(input -> Observable.fromCallable(() -> validateInput(input))
.retry(2)
.onErrorResumeNext(throwable -> Observable.just("default-input")))
// Step 2: Processing with timeout and fallback
.flatMap(validInput -> processData(validInput)
.timeout(5, TimeUnit.SECONDS)
.onErrorReturn(throwable -> {
if (throwable instanceof TimeoutException) {
return "timeout-result";
}
return "error-result";
}))
// Step 3: Final transformation with error logging
.map(result -> result.toUpperCase())
.doOnError(throwable -> logError("Final step failed", throwable))
.onErrorReturn("FINAL-FALLBACK");
complexProcess.subscribe(
result -> System.out.println("Final result: " + result),
error -> System.err.println("Unexpected error: " + error) // Should never be called
);
private static String validateInput(String input) {
if (input == null || input.isEmpty()) {
throw new IllegalArgumentException("Invalid input");
}
return input;
}
private static Observable<String> processData(String input) {
return Observable.fromCallable(() -> {
// Simulate processing that might fail or timeout
Thread.sleep(3000);
if (Math.random() > 0.7) {
throw new RuntimeException("Processing failed");
}
return "processed-" + input;
}).subscribeOn(Schedulers.io());
}
private static void logError(String context, Throwable throwable) {
System.err.println(context + ": " + throwable.getMessage());
}Guidelines:
Common Patterns:
/**
* Predicate for conditional operations
*/
public interface Predicate<T> {
boolean test(T t) throws Exception;
}
/**
* BiPredicate for retry conditions
*/
public interface BiPredicate<T1, T2> {
boolean test(T1 t1, T2 t2) throws Exception;
}
/**
* Function for error mapping
*/
public interface Function<T, R> {
R apply(T t) throws Exception;
}
/**
* Consumer for side effects
*/
public interface Consumer<T> {
void accept(T t) throws Exception;
}Install with Tessl CLI
npx tessl i tessl/maven-io-reactivex-rxjava2--rxjava