CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-reactivex-rxjava2--rxjava

RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Pending
Overview
Eval results
Files

error-handling.mddocs/

Error Handling

Comprehensive error handling and recovery mechanisms in RxJava. Error handling is crucial for building robust reactive applications that can gracefully handle failures and recover from error conditions.

Capabilities

Basic Error Handling Operators

Operators available on all reactive types for handling errors.

/**
 * Returns a reactive stream that emits a fallback value when the source emits an error
 */
public final T onErrorReturn(Function<? super Throwable, ? extends R> valueSupplier);
public final T onErrorReturn(R value);

/**
 * Returns a reactive stream that switches to another stream when the source emits an error
 */
public final T onErrorResumeNext(Function<? super Throwable, ? extends T> resumeFunction);
public final T onErrorResumeNext(T resumeStream);

/**
 * Re-subscribes to the source stream when an error occurs
 */
public final T retry();
public final T retry(long times);
public final T retry(BiPredicate<? super Integer, ? super Throwable> predicate);

/**
 * Re-subscribes based on a function that receives error notifications
 */
public final T retryWhen(Function<? super Flowable<Throwable>, ? extends Publisher<?>> handler);

/**
 * Performs an action when an error occurs without affecting the stream
 */
public final T doOnError(Consumer<? super Throwable> onError);

/**
 * Materializes error notifications as regular onNext emissions
 */
public final Observable<Notification<T>> materialize();

Completable-Specific Error Handling

Additional error handling for Completable operations.

/**
 * Converts errors to successful completion
 */
public final Completable onErrorComplete();
public final Completable onErrorComplete(Predicate<? super Throwable> predicate);

Maybe-Specific Error Handling

Additional error handling for Maybe operations.

/**
 * Converts errors to empty completion
 */
public final Maybe<T> onErrorComplete();
public final Maybe<T> onErrorComplete(Predicate<? super Throwable> predicate);

Global Error Handling

Global hooks for undeliverable errors and general error handling.

/**
 * Global plugin system for error handling
 */
public final class RxJavaPlugins {
    /**
     * Sets a global error handler for undeliverable exceptions
     */
    public static void setErrorHandler(Consumer<? super Throwable> handler);
    
    /**
     * Gets the current global error handler
     */
    public static Consumer<? super Throwable> getErrorHandler();
    
    /**
     * Called when an error cannot be delivered to observers
     */
    public static void onError(Throwable error);
}

Exception Types

Common exception types used in RxJava.

/**
 * Thrown when multiple exceptions occur
 */
public final class CompositeException extends RuntimeException {
    /**
     * Returns the list of suppressed exceptions
     */
    public List<Throwable> getExceptions();
    
    /**
     * Returns the number of suppressed exceptions
     */
    public int size();
}

/**
 * Thrown when onError is called but no error handler is provided
 */
public final class OnErrorNotImplementedException extends RuntimeException {
    // Standard exception wrapper
}

/**
 * Thrown when backpressure buffer overflows
 */
public final class MissingBackpressureException extends RuntimeException {
    // Backpressure-related exception
}

/**
 * Thrown when the Reactive Streams protocol is violated
 */
public final class ProtocolViolationException extends IllegalStateException {
    // Protocol violation exception
}

/**
 * Wrapper for exceptions that couldn't be delivered to downstream
 */
public final class UndeliverableException extends RuntimeException {
    // Undeliverable exception wrapper
}

Usage Examples

Basic Error Recovery with onErrorReturn:

import io.reactivex.Observable;

Observable<String> riskyOperation = Observable.fromCallable(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("Random failure");
    }
    return "Success";
});

// Provide fallback value on error
riskyOperation
    .onErrorReturn(throwable -> {
        System.out.println("Error occurred: " + throwable.getMessage());
        return "Fallback Value";
    })
    .subscribe(result -> System.out.println("Result: " + result));

// Simple fallback value
riskyOperation
    .onErrorReturn("Default Value")
    .subscribe(result -> System.out.println("Result: " + result));

Error Recovery with Alternative Stream:

Observable<String> primarySource = Observable.fromCallable(() -> {
    throw new RuntimeException("Primary source failed");
});

Observable<String> fallbackSource = Observable.just("Fallback", "Data");

// Switch to fallback stream on error
primarySource
    .onErrorResumeNext(throwable -> {
        System.out.println("Primary failed, using fallback: " + throwable.getMessage());
        return fallbackSource;
    })
    .subscribe(
        item -> System.out.println("Item: " + item),
        error -> System.err.println("Final error: " + error)
    );

Retry Strategies:

Observable<String> unreliableService = Observable.fromCallable(() -> {
    if (Math.random() > 0.7) {
        return "Success";
    }
    throw new RuntimeException("Service temporarily unavailable");
});

// Simple retry (infinite)
unreliableService
    .retry()
    .take(1) // Take first success
    .subscribe(
        result -> System.out.println("Got result: " + result),
        error -> System.err.println("Never succeeded: " + error)
    );

// Retry limited times
unreliableService
    .retry(3)
    .subscribe(
        result -> System.out.println("Success after retries: " + result),
        error -> System.err.println("Failed after 3 retries: " + error)
    );

// Conditional retry
unreliableService
    .retry((retryCount, throwable) -> {
        System.out.println("Retry attempt " + retryCount + " for: " + throwable.getMessage());
        return retryCount < 5 && throwable instanceof RuntimeException;
    })
    .subscribe(
        result -> System.out.println("Conditional retry success: " + result),
        error -> System.err.println("Conditional retry failed: " + error)
    );

Advanced Retry with Exponential Backoff:

import java.util.concurrent.TimeUnit;

Observable<String> apiCall = Observable.fromCallable(() -> {
    // Simulate API that fails 80% of the time
    if (Math.random() > 0.2) {
        throw new RuntimeException("API Error");
    }
    return "API Response";
});

// Retry with exponential backoff
apiCall
    .retryWhen(errors -> 
        errors
            .zipWith(Observable.range(1, 4), (throwable, attempt) -> {
                System.out.println("Attempt " + attempt + " failed: " + throwable.getMessage());
                return attempt;
            })
            .flatMap(attempt -> {
                long delay = (long) Math.pow(2, attempt); // Exponential backoff
                System.out.println("Retrying in " + delay + " seconds...");
                return Observable.timer(delay, TimeUnit.SECONDS);
            })
    )
    .subscribe(
        result -> System.out.println("API Success: " + result),
        error -> System.err.println("API Failed after all retries: " + error)
    );

Error Handling in Chains:

Observable.fromCallable(() -> "input")
    .map(input -> {
        if (input.equals("input")) {
            throw new IllegalArgumentException("Invalid input");
        }
        return input.toUpperCase();
    })
    .flatMap(processed -> Observable.fromCallable(() -> {
        if (processed.equals("ERROR")) {
            throw new RuntimeException("Processing failed");
        }
        return "Processed: " + processed;
    }))
    .onErrorResumeNext(throwable -> {
        if (throwable instanceof IllegalArgumentException) {
            return Observable.just("Input validation failed");
        } else if (throwable instanceof RuntimeException) {
            return Observable.just("Processing failed, using default");
        }
        return Observable.error(throwable); // Re-throw unknown errors
    })
    .subscribe(
        result -> System.out.println("Final result: " + result),
        error -> System.err.println("Unhandled error: " + error)
    );

Side-Effect Error Logging:

Observable<Integer> source = Observable.range(1, 10)
    .map(i -> {
        if (i == 5) {
            throw new RuntimeException("Error at item " + i);
        }
        return i * i;
    });

source
    .doOnError(throwable -> {
        // Log error without affecting stream
        System.err.println("Logging error: " + throwable.getMessage());
        // Could also log to file, send to monitoring system, etc.
    })
    .onErrorReturn(-1) // Recover after logging
    .subscribe(
        value -> System.out.println("Value: " + value),
        error -> System.err.println("Final error: " + error) // Won't be called due to onErrorReturn
    );

Error Materialization:

Observable<String> source = Observable.just("A", "B")
    .concatWith(Observable.error(new RuntimeException("Error")))
    .concatWith(Observable.just("C")); // This won't be reached

// Materialize errors as regular notifications
source
    .materialize()
    .subscribe(notification -> {
        if (notification.isOnNext()) {
            System.out.println("Value: " + notification.getValue());
        } else if (notification.isOnError()) {
            System.out.println("Error: " + notification.getError().getMessage());
        } else if (notification.isOnComplete()) {
            System.out.println("Completed");
        }
    });

// Dematerialize back to regular stream (optional)
source
    .materialize()
    .filter(notification -> !notification.isOnError()) // Skip errors
    .dematerialize(notification -> notification)
    .subscribe(
        value -> System.out.println("Filtered value: " + value),
        error -> System.err.println("This won't be called"),
        () -> System.out.println("Completed without errors")
    );

Completable Error Handling:

Completable riskyOperation = Completable.fromAction(() -> {
    if (Math.random() > 0.5) {
        throw new RuntimeException("Operation failed");
    }
    System.out.println("Operation succeeded");
});

// Convert error to completion
riskyOperation
    .onErrorComplete()
    .subscribe(
        () -> System.out.println("Completed (success or error converted)"),
        error -> System.err.println("This won't be called")
    );

// Conditional error to completion
riskyOperation
    .onErrorComplete(throwable -> throwable instanceof RuntimeException)
    .subscribe(
        () -> System.out.println("Completed (RuntimeException converted)"),
        error -> System.err.println("Non-RuntimeException: " + error)
    );

// Resume with another Completable
Completable fallbackOperation = Completable.fromAction(() -> 
    System.out.println("Fallback operation executed"));

riskyOperation
    .onErrorResumeNext(throwable -> {
        System.out.println("Primary failed, running fallback: " + throwable.getMessage());
        return fallbackOperation;
    })
    .subscribe(
        () -> System.out.println("Some operation completed"),
        error -> System.err.println("Both operations failed: " + error)
    );

Global Error Handling:

import io.reactivex.plugins.RxJavaPlugins;

// Set global error handler for undeliverable exceptions
RxJavaPlugins.setErrorHandler(throwable -> {
    System.err.println("Undeliverable exception: " + throwable.getMessage());
    throwable.printStackTrace();
    
    // Could also:
    // - Log to crash reporting service
    // - Send to monitoring system
    // - Write to log file
    // - Trigger app restart in severe cases
});

// Example of undeliverable error
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(
    value -> System.out.println("Value: " + value),
    error -> System.err.println("Error: " + error)
);

// Complete the subject
subject.onComplete();

// This error cannot be delivered (subject already terminated)
// It will be caught by the global error handler
subject.onError(new RuntimeException("Undeliverable error"));

Complex Error Handling Scenario:

// Multi-step process with different error handling strategies
Observable<String> complexProcess = Observable.fromCallable(() -> "input")
    // Step 1: Validation with retry
    .flatMap(input -> Observable.fromCallable(() -> validateInput(input))
        .retry(2)
        .onErrorResumeNext(throwable -> Observable.just("default-input")))
    
    // Step 2: Processing with timeout and fallback
    .flatMap(validInput -> processData(validInput)
        .timeout(5, TimeUnit.SECONDS)
        .onErrorReturn(throwable -> {
            if (throwable instanceof TimeoutException) {
                return "timeout-result";
            }
            return "error-result";
        }))
    
    // Step 3: Final transformation with error logging
    .map(result -> result.toUpperCase())
    .doOnError(throwable -> logError("Final step failed", throwable))
    .onErrorReturn("FINAL-FALLBACK");

complexProcess.subscribe(
    result -> System.out.println("Final result: " + result),
    error -> System.err.println("Unexpected error: " + error) // Should never be called
);

private static String validateInput(String input) {
    if (input == null || input.isEmpty()) {
        throw new IllegalArgumentException("Invalid input");
    }
    return input;
}

private static Observable<String> processData(String input) {
    return Observable.fromCallable(() -> {
        // Simulate processing that might fail or timeout
        Thread.sleep(3000);
        if (Math.random() > 0.7) {
            throw new RuntimeException("Processing failed");
        }
        return "processed-" + input;
    }).subscribeOn(Schedulers.io());
}

private static void logError(String context, Throwable throwable) {
    System.err.println(context + ": " + throwable.getMessage());
}

Error Handling Best Practices

Guidelines:

  1. Always handle errors: Never ignore errors in reactive streams
  2. Use appropriate operators: Choose the right error handling operator for your use case
  3. Fail fast vs. resilience: Balance between failing fast and being resilient
  4. Log errors: Always log errors for debugging and monitoring
  5. Global handler: Set up a global error handler for undeliverable exceptions
  6. Test error scenarios: Write tests for error conditions
  7. Resource cleanup: Ensure resources are cleaned up even when errors occur
  8. User experience: Provide meaningful error messages to users

Common Patterns:

  • Retry with backoff: For transient network errors
  • Fallback values: For non-critical operations
  • Alternative streams: For redundant data sources
  • Error conversion: Convert errors to empty/default for optional operations
  • Circuit breaker: Stop trying after consecutive failures
  • Timeout handling: Set reasonable timeouts for operations

Types

/**
 * Predicate for conditional operations
 */
public interface Predicate<T> {
    boolean test(T t) throws Exception;
}

/**
 * BiPredicate for retry conditions
 */
public interface BiPredicate<T1, T2> {
    boolean test(T1 t1, T2 t2) throws Exception;
}

/**
 * Function for error mapping
 */
public interface Function<T, R> {
    R apply(T t) throws Exception;
}

/**
 * Consumer for side effects
 */
public interface Consumer<T> {
    void accept(T t) throws Exception;
}

Install with Tessl CLI

npx tessl i tessl/maven-io-reactivex-rxjava2--rxjava

docs

completable.md

disposables.md

error-handling.md

flowable.md

index.md

maybe.md

observable.md

schedulers.md

single.md

subjects.md

tile.json