RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
—
Disposable pattern for managing subscriptions and preventing memory leaks. Proper resource management is crucial in reactive programming to avoid memory leaks and ensure clean shutdown.
Base interface for all disposable resources.
/**
* Represents a disposable resource that can be cancelled/disposed
*/
public interface Disposable {
/**
* Disposes the resource and cancels any ongoing work
*/
void dispose();
/**
* Returns true if this resource has been disposed
*/
boolean isDisposed();
}Factory methods and utilities for working with disposables.
/**
* Utility class for creating and managing disposables
*/
public final class Disposables {
/**
* Returns a disposed disposable instance
*/
public static Disposable disposed();
/**
* Returns an empty disposable that does nothing when disposed
*/
public static Disposable empty();
/**
* Creates a disposable from an Action
*/
public static Disposable fromAction(Action action);
/**
* Creates a disposable from a Runnable
*/
public static Disposable fromRunnable(Runnable runnable);
/**
* Creates a disposable from a Future
*/
public static Disposable fromFuture(Future<?> future);
/**
* Creates a disposable from a Subscription (Reactive Streams)
*/
public static Disposable fromSubscription(Subscription subscription);
/**
* Creates a disposable from an AutoCloseable resource
*/
public static Disposable fromAutoCloseable(AutoCloseable autoCloseable);
}Container for multiple disposables that can be disposed together.
/**
* Container that can hold multiple disposables and dispose them together
*/
public final class CompositeDisposable implements Disposable {
/**
* Creates an empty CompositeDisposable
*/
public CompositeDisposable();
/**
* Creates a CompositeDisposable with initial disposables
*/
public CompositeDisposable(Disposable... disposables);
/**
* Adds a disposable to this container
* Returns true if added, false if this container is already disposed
*/
public boolean add(Disposable disposable);
/**
* Adds multiple disposables to this container
* Returns true if all were added successfully
*/
public boolean addAll(Disposable... disposables);
/**
* Removes a disposable from this container
* Returns true if removed, false if not found
*/
public boolean remove(Disposable disposable);
/**
* Removes and disposes a disposable from this container
* Returns true if found and disposed
*/
public boolean delete(Disposable disposable);
/**
* Disposes all contained disposables and clears the container
*/
public void clear();
/**
* Returns the number of currently held disposables
*/
public int size();
/**
* Disposes all contained disposables
* Future additions will be immediately disposed
*/
public void dispose();
/**
* Returns true if this container has been disposed
*/
public boolean isDisposed();
}Holds a single disposable that can be swapped atomically.
/**
* Container that holds a single disposable and allows atomic replacement
*/
public final class SerialDisposable implements Disposable {
/**
* Creates a new SerialDisposable
*/
public SerialDisposable();
/**
* Creates a SerialDisposable with an initial disposable
*/
public SerialDisposable(Disposable initialDisposable);
/**
* Atomically sets the disposable, disposing the previous one if present
* Returns true if set successfully, false if this container is disposed
*/
public boolean set(Disposable disposable);
/**
* Atomically replaces the disposable without disposing the previous one
* Returns the previous disposable
*/
public Disposable replace(Disposable disposable);
/**
* Returns the current disposable (may be null)
*/
public Disposable get();
/**
* Disposes the current disposable
*/
public void dispose();
/**
* Returns true if this container has been disposed
*/
public boolean isDisposed();
}Observer implementations with built-in resource management.
/**
* Observer with built-in resource management for Observable
*/
public abstract class ResourceObserver<T> implements Observer<T>, Disposable {
/**
* Adds a resource to be disposed when this observer is disposed
*/
public final void add(Disposable resource);
/**
* Disposes all managed resources
*/
public final void dispose();
/**
* Returns true if disposed
*/
public final boolean isDisposed();
// Abstract methods to implement
public abstract void onNext(T t);
public abstract void onError(Throwable e);
public abstract void onComplete();
}
/**
* Subscriber with built-in resource management for Flowable
*/
public abstract class ResourceSubscriber<T> implements FlowableSubscriber<T>, Disposable {
/**
* Adds a resource to be disposed when this subscriber is disposed
*/
public final void add(Disposable resource);
/**
* Requests the specified number of items from upstream
*/
protected final void request(long n);
/**
* Disposes all managed resources and cancels upstream
*/
public final void dispose();
/**
* Returns true if disposed
*/
public final boolean isDisposed();
// Abstract methods to implement
public abstract void onNext(T t);
public abstract void onError(Throwable e);
public abstract void onComplete();
}
/**
* Observer with built-in resource management for Single
*/
public abstract class ResourceSingleObserver<T> implements SingleObserver<T>, Disposable {
public final void add(Disposable resource);
public final void dispose();
public final boolean isDisposed();
public abstract void onSuccess(T t);
public abstract void onError(Throwable e);
}
/**
* Observer with built-in resource management for Maybe
*/
public abstract class ResourceMaybeObserver<T> implements MaybeObserver<T>, Disposable {
public final void add(Disposable resource);
public final void dispose();
public final boolean isDisposed();
public abstract void onSuccess(T t);
public abstract void onError(Throwable e);
public abstract void onComplete();
}
/**
* Observer with built-in resource management for Completable
*/
public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable {
public final void add(Disposable resource);
public final void dispose();
public final boolean isDisposed();
public abstract void onComplete();
public abstract void onError(Throwable e);
}Basic Disposable Management:
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io());
// Subscribe and keep the disposable
Disposable disposable = source.subscribe(
value -> System.out.println("Value: " + value),
error -> error.printStackTrace()
);
// Later, dispose to stop the stream and free resources
Thread.sleep(5000);
disposable.dispose();
// Check if disposed
System.out.println("Is disposed: " + disposable.isDisposed());CompositeDisposable for Multiple Subscriptions:
import io.reactivex.disposables.CompositeDisposable;
CompositeDisposable compositeDisposable = new CompositeDisposable();
// Add multiple subscriptions
Observable<Long> timer1 = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> timer2 = Observable.interval(2, TimeUnit.SECONDS);
Observable<String> http = Observable.fromCallable(() -> fetchFromNetwork())
.subscribeOn(Schedulers.io());
compositeDisposable.add(timer1.subscribe(v -> System.out.println("Timer 1: " + v)));
compositeDisposable.add(timer2.subscribe(v -> System.out.println("Timer 2: " + v)));
compositeDisposable.add(http.subscribe(result -> System.out.println("HTTP: " + result)));
System.out.println("Active subscriptions: " + compositeDisposable.size());
// Dispose all at once
Thread.sleep(10000);
compositeDisposable.dispose();
// All subscriptions are now disposed
System.out.println("All disposed: " + compositeDisposable.isDisposed());SerialDisposable for Sequential Operations:
import io.reactivex.disposables.SerialDisposable;
SerialDisposable serialDisposable = new SerialDisposable();
// Start first operation
Observable<String> operation1 = Observable.just("Operation 1")
.delay(2, TimeUnit.SECONDS);
serialDisposable.set(operation1.subscribe(System.out::println));
// Replace with second operation (first one gets disposed)
Thread.sleep(1000);
Observable<String> operation2 = Observable.just("Operation 2")
.delay(1, TimeUnit.SECONDS);
serialDisposable.set(operation2.subscribe(System.out::println));
// Only operation 2 will complete
Thread.sleep(3000);
serialDisposable.dispose();ResourceObserver for Complex Resource Management:
import io.reactivex.observers.ResourceObserver;
class CustomResourceObserver extends ResourceObserver<String> {
private FileWriter fileWriter;
@Override
protected void onStart() {
try {
fileWriter = new FileWriter("output.txt");
// Add file writer as a resource to be closed on disposal
add(Disposables.fromAutoCloseable(fileWriter));
} catch (IOException e) {
dispose(); // Dispose if setup fails
}
}
@Override
public void onNext(String value) {
try {
fileWriter.write(value + "\n");
fileWriter.flush();
} catch (IOException e) {
dispose();
}
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e.getMessage());
// Resources will be automatically disposed
}
@Override
public void onComplete() {
System.out.println("Writing completed");
// Resources will be automatically disposed
}
}
// Usage
Observable<String> data = Observable.just("Line 1", "Line 2", "Line 3");
CustomResourceObserver observer = new CustomResourceObserver();
data.subscribe(observer);
// Can dispose manually if needed
// observer.dispose();Custom Disposable Creation:
// Create disposable from Action
Disposable actionDisposable = Disposables.fromAction(() -> {
System.out.println("Cleaning up resources");
// Cleanup code here
});
// Create disposable from Runnable
Disposable runnableDisposable = Disposables.fromRunnable(() -> {
System.out.println("Shutdown procedure");
});
// Create disposable from Future
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(() -> {
// Long running task
Thread.sleep(10000);
return "Done";
});
Disposable futureDisposable = Disposables.fromFuture(future);
// Dispose all
actionDisposable.dispose();
runnableDisposable.dispose();
futureDisposable.dispose(); // This will cancel the future
executor.shutdown();Lifecycle Management in Android/UI Applications:
public class MainActivity {
private final CompositeDisposable compositeDisposable = new CompositeDisposable();
public void onCreate() {
// Start various subscriptions
compositeDisposable.add(
userService.getCurrentUser()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::updateUI)
);
compositeDisposable.add(
locationService.getLocationUpdates()
.subscribe(this::updateLocation)
);
compositeDisposable.add(
messageService.getMessages()
.subscribe(this::showMessage)
);
}
public void onDestroy() {
// Clean up all subscriptions to prevent memory leaks
compositeDisposable.dispose();
}
private void updateUI(User user) { /* Update UI */ }
private void updateLocation(Location location) { /* Update location */ }
private void showMessage(String message) { /* Show message */ }
}Error Handling with Resource Management:
CompositeDisposable resources = new CompositeDisposable();
try {
// Add resources that might fail
resources.add(Observable.interval(1, TimeUnit.SECONDS)
.subscribe(
value -> {
if (value > 5) {
throw new RuntimeException("Simulated error");
}
System.out.println("Value: " + value);
},
error -> {
System.err.println("Stream error: " + error.getMessage());
// Don't forget to clean up other resources on error
resources.dispose();
}
));
resources.add(Observable.timer(10, TimeUnit.SECONDS)
.subscribe(ignored -> System.out.println("Timer completed")));
} catch (Exception e) {
// Ensure cleanup on any exception
resources.dispose();
throw e;
}Memory Leak Prevention:
public class DataProcessor {
private final CompositeDisposable subscriptions = new CompositeDisposable();
private final PublishSubject<String> subject = PublishSubject.create();
public void startProcessing() {
// Process data with proper cleanup
subscriptions.add(
subject
.buffer(5, TimeUnit.SECONDS)
.filter(buffer -> !buffer.isEmpty())
.flatMap(this::processBuffer)
.subscribe(
result -> System.out.println("Processed: " + result),
error -> System.err.println("Processing error: " + error)
)
);
}
public void addData(String data) {
if (!subscriptions.isDisposed()) {
subject.onNext(data);
}
}
public void shutdown() {
// Proper cleanup prevents memory leaks
subscriptions.dispose();
subject.onComplete();
}
private Observable<String> processBuffer(List<String> buffer) {
return Observable.fromCallable(() -> {
// Process buffer
return "Processed " + buffer.size() + " items";
}).subscribeOn(Schedulers.computation());
}
}Resource Management Guidelines:
Common Patterns:
/**
* Functional interface for cleanup actions
*/
public interface Action {
void run() throws Exception;
}
/**
* Interface for objects that can be cancelled
*/
public interface Cancellable {
void cancel() throws Exception;
}
/**
* Exception thrown by dispose operations
*/
public class CompositeException extends RuntimeException {
public List<Throwable> getExceptions();
}Install with Tessl CLI
npx tessl i tessl/maven-io-reactivex-rxjava2--rxjava