Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
—
RxJava's Disposable pattern provides resource management for reactive streams, enabling proper cleanup of subscriptions, threads, and other resources to prevent memory leaks and resource exhaustion.
Core interface for managing reactive stream resources.
/**
* Represents a disposable resource
*/
public interface Disposable {
/**
* Dispose of the resource and cancel any ongoing operations
*/
void dispose();
/**
* Check if this resource has been disposed
* @return true if disposed, false otherwise
*/
boolean isDisposed();
}
/**
* Empty disposable that does nothing when disposed
* @return Disposable that performs no action
*/
public static Disposable empty();
/**
* Create a disposable from a Runnable action
* @param action the action to run when disposed
* @return Disposable that runs the action when disposed
*/
public static Disposable fromRunnable(Runnable action);
/**
* Create a disposable from an Action
* @param action the action to run when disposed
* @return Disposable that runs the action when disposed
*/
public static Disposable fromAction(Action action);
/**
* Create a disposable from an AutoCloseable resource
* @param autoCloseable the resource to close when disposed
* @return Disposable that closes the resource when disposed
*/
public static Disposable fromAutoCloseable(AutoCloseable autoCloseable);
/**
* Create a disposable from a Future
* @param future the Future to cancel when disposed
* @param mayInterruptIfRunning whether to interrupt if running
* @return Disposable that cancels the Future when disposed
*/
public static Disposable fromFuture(Future<?> future, boolean mayInterruptIfRunning);Manage multiple disposables as a single unit.
/**
* Container for multiple disposables that can be disposed together
*/
public final class CompositeDisposable implements Disposable, DisposableContainer {
/**
* Create an empty CompositeDisposable
*/
public CompositeDisposable();
/**
* Create a CompositeDisposable with initial disposables
* @param disposables the initial disposables to add
*/
public CompositeDisposable(Disposable... disposables);
/**
* Add a disposable to this composite
* @param disposable the disposable to add
* @return true if added successfully, false if already disposed
*/
public boolean add(Disposable disposable);
/**
* Add multiple disposables to this composite
* @param disposables the disposables to add
* @return true if all were added successfully
*/
public boolean addAll(Disposable... disposables);
/**
* Remove and dispose a specific disposable
* @param disposable the disposable to remove
* @return true if found and removed, false otherwise
*/
public boolean remove(Disposable disposable);
/**
* Remove a specific disposable without disposing it
* @param disposable the disposable to remove
* @return true if found and removed, false otherwise
*/
public boolean delete(Disposable disposable);
/**
* Clear all disposables and dispose them
*/
public void clear();
/**
* Get the number of disposables in this composite
* @return the count of disposables
*/
public int size();
/**
* Dispose all contained disposables and prevent new additions
*/
public void dispose();
/**
* Check if this composite has been disposed
* @return true if disposed, false otherwise
*/
public boolean isDisposed();
}Replace disposables while maintaining single active subscription.
/**
* Container that holds at most one disposable at a time
*/
public final class SerialDisposable implements Disposable {
/**
* Create an empty SerialDisposable
*/
public SerialDisposable();
/**
* Create a SerialDisposable with an initial disposable
* @param initialDisposable the initial disposable
*/
public SerialDisposable(Disposable initialDisposable);
/**
* Replace the current disposable with a new one
* @param next the new disposable (disposes the current one)
* @return true if set successfully, false if already disposed
*/
public boolean replace(Disposable next);
/**
* Update the current disposable
* @param next the new disposable (does not dispose the current one)
* @return true if set successfully, false if already disposed
*/
public boolean set(Disposable next);
/**
* Get the current disposable
* @return the current disposable or null if none set
*/
public Disposable get();
/**
* Dispose the current disposable and prevent new ones
*/
public void dispose();
/**
* Check if this serial disposable has been disposed
* @return true if disposed, false otherwise
*/
public boolean isDisposed();
}Interface for managing collections of disposables.
/**
* Interface for containers that can hold disposables
*/
public interface DisposableContainer {
/**
* Add a disposable to this container
* @param disposable the disposable to add
* @return true if added successfully, false if container is disposed
*/
boolean add(Disposable disposable);
/**
* Remove a disposable from this container
* @param disposable the disposable to remove
* @return true if found and removed, false otherwise
*/
boolean remove(Disposable disposable);
/**
* Remove a disposable without disposing it
* @param disposable the disposable to remove
* @return true if found and removed, false otherwise
*/
boolean delete(Disposable disposable);
}Specialized disposable implementations for common scenarios.
/**
* Disposable that wraps a Runnable action
*/
public final class RunnableDisposable extends ReferenceDisposable<Runnable> {
/**
* Create a RunnableDisposable
* @param run the Runnable to execute on disposal
*/
public RunnableDisposable(Runnable run);
}
/**
* Disposable that wraps an Action
*/
public final class ActionDisposable extends ReferenceDisposable<Action> {
/**
* Create an ActionDisposable
* @param action the Action to execute on disposal
*/
public ActionDisposable(Action action);
}
/**
* Disposable that cancels a Future
*/
public final class FutureDisposable extends AtomicReference<Future<?>> implements Disposable {
/**
* Create a FutureDisposable
* @param future the Future to cancel on disposal
* @param allowInterrupt whether to allow interruption
*/
public FutureDisposable(Future<?> future, boolean allowInterrupt);
}
/**
* Disposable that closes an AutoCloseable resource
*/
public final class AutoCloseableDisposable extends ReferenceDisposable<AutoCloseable> {
/**
* Create an AutoCloseableDisposable
* @param autoCloseable the resource to close on disposal
*/
public AutoCloseableDisposable(AutoCloseable autoCloseable);
}/**
* Base class for reference-based disposables
*/
public abstract class ReferenceDisposable<T> extends AtomicReference<T> implements Disposable {
/**
* Create a ReferenceDisposable with a value
* @param value the value to hold
*/
protected ReferenceDisposable(T value);
/**
* Called when the disposable is disposed
* @param value the value being disposed
*/
protected abstract void onDisposed(T value);
/**
* Dispose of the held reference
*/
public final void dispose();
/**
* Check if disposed
* @return true if disposed, false otherwise
*/
public final boolean isDisposed();
}Usage Examples:
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
// Basic subscription disposal
Disposable subscription = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(System.out::println);
// Dispose after 5 seconds
Thread.sleep(5000);
subscription.dispose();
// CompositeDisposable for multiple subscriptions
CompositeDisposable compositeDisposable = new CompositeDisposable();
Disposable sub1 = Observable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Sub1: " + tick));
Disposable sub2 = Observable.interval(2, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Sub2: " + tick));
// Add both subscriptions to composite
compositeDisposable.addAll(sub1, sub2);
// Dispose all at once
Thread.sleep(10000);
compositeDisposable.dispose();
// SerialDisposable for replacing subscriptions
SerialDisposable serialDisposable = new SerialDisposable();
// Start with one subscription
serialDisposable.set(Observable.interval(1, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("First: " + tick)));
// Replace with different subscription after 5 seconds
Thread.sleep(5000);
serialDisposable.replace(Observable.interval(500, TimeUnit.MILLISECONDS)
.subscribe(tick -> System.out.println("Second: " + tick)));
// Clean up
Thread.sleep(5000);
serialDisposable.dispose();
// Custom disposable from action
Disposable customDisposable = Disposable.fromAction(() -> {
System.out.println("Custom cleanup executed");
});
customDisposable.dispose();
// Resource management pattern
CompositeDisposable resources = new CompositeDisposable();
try {
// Add various resources
resources.add(Observable.interval(1, TimeUnit.SECONDS)
.subscribe(System.out::println));
resources.add(Single.timer(5, TimeUnit.SECONDS)
.subscribe(tick -> System.out.println("Timer completed")));
resources.add(Disposable.fromRunnable(() ->
System.out.println("Cleanup task executed")));
// Simulate work
Thread.sleep(3000);
} finally {
// Ensure all resources are cleaned up
resources.dispose();
}
// Conditional disposal
CompositeDisposable conditionalResources = new CompositeDisposable();
Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS);
Disposable subscription1 = source.subscribe(tick -> {
System.out.println("Tick: " + tick);
if (tick >= 5) {
conditionalResources.dispose(); // Stop all when condition met
}
});
conditionalResources.add(subscription1);
// Preventing memory leaks in Android-style lifecycle
class ReactiveComponent {
private final CompositeDisposable disposables = new CompositeDisposable();
public void onCreate() {
// Add subscriptions
disposables.add(
Observable.interval(1, TimeUnit.SECONDS)
.observeOn(Schedulers.single())
.subscribe(this::updateUI)
);
}
public void onDestroy() {
// Clean up all subscriptions
disposables.clear();
}
private void updateUI(Long tick) {
System.out.println("UI update: " + tick);
}
}
// Using DisposableObserver for manual resource management
DisposableObserver<Long> observer = new DisposableObserver<Long>() {
@Override
public void onNext(Long value) {
System.out.println("Received: " + value);
if (value >= 3) {
dispose(); // Self-dispose when condition met
}
}
@Override
public void onError(Throwable e) {
System.err.println("Error: " + e);
}
@Override
public void onComplete() {
System.out.println("Completed");
}
};
Observable.range(1, 10)
.subscribe(observer);Install with Tessl CLI
npx tessl i tessl/maven-io-reactivex-rxjava3--rxjava