CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-reactivex-rxjava3--rxjava

Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Pending
Overview
Eval results
Files

disposables.mddocs/

Resource Management

RxJava's Disposable pattern provides resource management for reactive streams, enabling proper cleanup of subscriptions, threads, and other resources to prevent memory leaks and resource exhaustion.

Capabilities

Basic Disposable Operations

Core interface for managing reactive stream resources.

/**
 * Represents a disposable resource
 */
public interface Disposable {
    /**
     * Dispose of the resource and cancel any ongoing operations
     */
    void dispose();

    /**
     * Check if this resource has been disposed
     * @return true if disposed, false otherwise
     */
    boolean isDisposed();
}

/**
 * Empty disposable that does nothing when disposed
 * @return Disposable that performs no action
 */
public static Disposable empty();

/**
 * Create a disposable from a Runnable action
 * @param action the action to run when disposed
 * @return Disposable that runs the action when disposed
 */
public static Disposable fromRunnable(Runnable action);

/**
 * Create a disposable from an Action
 * @param action the action to run when disposed
 * @return Disposable that runs the action when disposed
 */
public static Disposable fromAction(Action action);

/**
 * Create a disposable from an AutoCloseable resource
 * @param autoCloseable the resource to close when disposed
 * @return Disposable that closes the resource when disposed
 */
public static Disposable fromAutoCloseable(AutoCloseable autoCloseable);

/**
 * Create a disposable from a Future
 * @param future the Future to cancel when disposed
 * @param mayInterruptIfRunning whether to interrupt if running
 * @return Disposable that cancels the Future when disposed
 */
public static Disposable fromFuture(Future<?> future, boolean mayInterruptIfRunning);

Composite Disposable

Manage multiple disposables as a single unit.

/**
 * Container for multiple disposables that can be disposed together
 */
public final class CompositeDisposable implements Disposable, DisposableContainer {
    
    /**
     * Create an empty CompositeDisposable
     */
    public CompositeDisposable();

    /**
     * Create a CompositeDisposable with initial disposables
     * @param disposables the initial disposables to add
     */
    public CompositeDisposable(Disposable... disposables);

    /**
     * Add a disposable to this composite
     * @param disposable the disposable to add
     * @return true if added successfully, false if already disposed
     */
    public boolean add(Disposable disposable);

    /**
     * Add multiple disposables to this composite
     * @param disposables the disposables to add
     * @return true if all were added successfully
     */
    public boolean addAll(Disposable... disposables);

    /**
     * Remove and dispose a specific disposable
     * @param disposable the disposable to remove
     * @return true if found and removed, false otherwise
     */
    public boolean remove(Disposable disposable);

    /**
     * Remove a specific disposable without disposing it
     * @param disposable the disposable to remove
     * @return true if found and removed, false otherwise
     */
    public boolean delete(Disposable disposable);

    /**
     * Clear all disposables and dispose them
     */
    public void clear();

    /**
     * Get the number of disposables in this composite
     * @return the count of disposables
     */
    public int size();

    /**
     * Dispose all contained disposables and prevent new additions
     */
    public void dispose();

    /**
     * Check if this composite has been disposed
     * @return true if disposed, false otherwise
     */
    public boolean isDisposed();
}

Serial Disposable

Replace disposables while maintaining single active subscription.

/**
 * Container that holds at most one disposable at a time
 */
public final class SerialDisposable implements Disposable {
    
    /**
     * Create an empty SerialDisposable
     */
    public SerialDisposable();

    /**
     * Create a SerialDisposable with an initial disposable
     * @param initialDisposable the initial disposable
     */
    public SerialDisposable(Disposable initialDisposable);

    /**
     * Replace the current disposable with a new one
     * @param next the new disposable (disposes the current one)
     * @return true if set successfully, false if already disposed
     */
    public boolean replace(Disposable next);

    /**
     * Update the current disposable
     * @param next the new disposable (does not dispose the current one)
     * @return true if set successfully, false if already disposed
     */
    public boolean set(Disposable next);

    /**
     * Get the current disposable
     * @return the current disposable or null if none set
     */
    public Disposable get();

    /**
     * Dispose the current disposable and prevent new ones
     */
    public void dispose();

    /**
     * Check if this serial disposable has been disposed
     * @return true if disposed, false otherwise
     */
    public boolean isDisposed();
}

Disposable Container

Interface for managing collections of disposables.

/**
 * Interface for containers that can hold disposables
 */
public interface DisposableContainer {
    /**
     * Add a disposable to this container
     * @param disposable the disposable to add
     * @return true if added successfully, false if container is disposed
     */
    boolean add(Disposable disposable);

    /**
     * Remove a disposable from this container
     * @param disposable the disposable to remove
     * @return true if found and removed, false otherwise
     */
    boolean remove(Disposable disposable);

    /**
     * Remove a disposable without disposing it
     * @param disposable the disposable to remove
     * @return true if found and removed, false otherwise
     */
    boolean delete(Disposable disposable);
}

Utility Disposables

Specialized disposable implementations for common scenarios.

/**
 * Disposable that wraps a Runnable action
 */
public final class RunnableDisposable extends ReferenceDisposable<Runnable> {
    /**
     * Create a RunnableDisposable
     * @param run the Runnable to execute on disposal
     */
    public RunnableDisposable(Runnable run);
}

/**
 * Disposable that wraps an Action
 */
public final class ActionDisposable extends ReferenceDisposable<Action> {
    /**
     * Create an ActionDisposable
     * @param action the Action to execute on disposal
     */
    public ActionDisposable(Action action);
}

/**
 * Disposable that cancels a Future
 */
public final class FutureDisposable extends AtomicReference<Future<?>> implements Disposable {
    /**
     * Create a FutureDisposable
     * @param future the Future to cancel on disposal
     * @param allowInterrupt whether to allow interruption
     */
    public FutureDisposable(Future<?> future, boolean allowInterrupt);
}

/**
 * Disposable that closes an AutoCloseable resource
 */
public final class AutoCloseableDisposable extends ReferenceDisposable<AutoCloseable> {
    /**
     * Create an AutoCloseableDisposable
     * @param autoCloseable the resource to close on disposal
     */
    public AutoCloseableDisposable(AutoCloseable autoCloseable);
}

Types

/**
 * Base class for reference-based disposables
 */
public abstract class ReferenceDisposable<T> extends AtomicReference<T> implements Disposable {
    /**
     * Create a ReferenceDisposable with a value
     * @param value the value to hold
     */
    protected ReferenceDisposable(T value);

    /**
     * Called when the disposable is disposed
     * @param value the value being disposed
     */
    protected abstract void onDisposed(T value);

    /**
     * Dispose of the held reference
     */
    public final void dispose();

    /**
     * Check if disposed
     * @return true if disposed, false otherwise
     */
    public final boolean isDisposed();
}

Usage Examples:

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;

// Basic subscription disposal
Disposable subscription = Observable.interval(1, TimeUnit.SECONDS)
    .subscribe(System.out::println);

// Dispose after 5 seconds
Thread.sleep(5000);
subscription.dispose();

// CompositeDisposable for multiple subscriptions
CompositeDisposable compositeDisposable = new CompositeDisposable();

Disposable sub1 = Observable.interval(1, TimeUnit.SECONDS)
    .subscribe(tick -> System.out.println("Sub1: " + tick));

Disposable sub2 = Observable.interval(2, TimeUnit.SECONDS)
    .subscribe(tick -> System.out.println("Sub2: " + tick));

// Add both subscriptions to composite
compositeDisposable.addAll(sub1, sub2);

// Dispose all at once
Thread.sleep(10000);
compositeDisposable.dispose();

// SerialDisposable for replacing subscriptions
SerialDisposable serialDisposable = new SerialDisposable();

// Start with one subscription
serialDisposable.set(Observable.interval(1, TimeUnit.SECONDS)
    .subscribe(tick -> System.out.println("First: " + tick)));

// Replace with different subscription after 5 seconds
Thread.sleep(5000);
serialDisposable.replace(Observable.interval(500, TimeUnit.MILLISECONDS)
    .subscribe(tick -> System.out.println("Second: " + tick)));

// Clean up
Thread.sleep(5000);
serialDisposable.dispose();

// Custom disposable from action
Disposable customDisposable = Disposable.fromAction(() -> {
    System.out.println("Custom cleanup executed");
});

customDisposable.dispose();

// Resource management pattern
CompositeDisposable resources = new CompositeDisposable();

try {
    // Add various resources
    resources.add(Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(System.out::println));
    
    resources.add(Single.timer(5, TimeUnit.SECONDS)
        .subscribe(tick -> System.out.println("Timer completed")));
    
    resources.add(Disposable.fromRunnable(() -> 
        System.out.println("Cleanup task executed")));
    
    // Simulate work
    Thread.sleep(3000);
    
} finally {
    // Ensure all resources are cleaned up
    resources.dispose();
}

// Conditional disposal
CompositeDisposable conditionalResources = new CompositeDisposable();

Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS);

Disposable subscription1 = source.subscribe(tick -> {
    System.out.println("Tick: " + tick);
    if (tick >= 5) {
        conditionalResources.dispose(); // Stop all when condition met
    }
});

conditionalResources.add(subscription1);

// Preventing memory leaks in Android-style lifecycle
class ReactiveComponent {
    private final CompositeDisposable disposables = new CompositeDisposable();
    
    public void onCreate() {
        // Add subscriptions
        disposables.add(
            Observable.interval(1, TimeUnit.SECONDS)
                .observeOn(Schedulers.single())
                .subscribe(this::updateUI)
        );
    }
    
    public void onDestroy() {
        // Clean up all subscriptions
        disposables.clear();
    }
    
    private void updateUI(Long tick) {
        System.out.println("UI update: " + tick);
    }
}

// Using DisposableObserver for manual resource management
DisposableObserver<Long> observer = new DisposableObserver<Long>() {
    @Override
    public void onNext(Long value) {
        System.out.println("Received: " + value);
        if (value >= 3) {
            dispose(); // Self-dispose when condition met
        }
    }
    
    @Override
    public void onError(Throwable e) {
        System.err.println("Error: " + e);
    }
    
    @Override
    public void onComplete() {
        System.out.println("Completed");
    }
};

Observable.range(1, 10)
    .subscribe(observer);

Best Practices

Resource Management Guidelines

  1. Always dispose of subscriptions to prevent memory leaks
  2. Use CompositeDisposable for managing multiple subscriptions
  3. Clear CompositeDisposable in cleanup methods (onDestroy, etc.)
  4. Use SerialDisposable when you need to replace subscriptions
  5. Dispose in finally blocks to ensure cleanup even on exceptions
  6. Check isDisposed() before performing operations that might fail if disposed
  7. Use DisposableObserver/DisposableSubscriber for fine-grained control
  8. Create custom disposables for domain-specific resource cleanup

Install with Tessl CLI

npx tessl i tessl/maven-io-reactivex-rxjava3--rxjava

docs

completable.md

disposables.md

flowable.md

index.md

maybe.md

observable.md

schedulers.md

single.md

subjects.md

tile.json