CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-reactivex-rxjava2--rxjava

RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Pending
Overview
Eval results
Files

disposables.mddocs/

Resource Management

Disposable pattern for managing subscriptions and preventing memory leaks. Proper resource management is crucial in reactive programming to avoid memory leaks and ensure clean shutdown.

Capabilities

Core Disposable Interface

Base interface for all disposable resources.

/**
 * Represents a disposable resource that can be cancelled/disposed
 */
public interface Disposable {
    /**
     * Disposes the resource and cancels any ongoing work
     */
    void dispose();
    
    /**
     * Returns true if this resource has been disposed
     */
    boolean isDisposed();
}

Disposables Utility Class

Factory methods and utilities for working with disposables.

/**
 * Utility class for creating and managing disposables
 */
public final class Disposables {
    /**
     * Returns a disposed disposable instance
     */
    public static Disposable disposed();
    
    /**
     * Returns an empty disposable that does nothing when disposed
     */
    public static Disposable empty();
    
    /**
     * Creates a disposable from an Action
     */
    public static Disposable fromAction(Action action);
    
    /**
     * Creates a disposable from a Runnable
     */
    public static Disposable fromRunnable(Runnable runnable);
    
    /**
     * Creates a disposable from a Future
     */
    public static Disposable fromFuture(Future<?> future);
    
    /**
     * Creates a disposable from a Subscription (Reactive Streams)
     */
    public static Disposable fromSubscription(Subscription subscription);
    
    /**
     * Creates a disposable from an AutoCloseable resource
     */
    public static Disposable fromAutoCloseable(AutoCloseable autoCloseable);
}

CompositeDisposable

Container for multiple disposables that can be disposed together.

/**
 * Container that can hold multiple disposables and dispose them together
 */
public final class CompositeDisposable implements Disposable {
    /**
     * Creates an empty CompositeDisposable
     */
    public CompositeDisposable();
    
    /**
     * Creates a CompositeDisposable with initial disposables
     */
    public CompositeDisposable(Disposable... disposables);
    
    /**
     * Adds a disposable to this container
     * Returns true if added, false if this container is already disposed
     */
    public boolean add(Disposable disposable);
    
    /**
     * Adds multiple disposables to this container
     * Returns true if all were added successfully
     */
    public boolean addAll(Disposable... disposables);
    
    /**
     * Removes a disposable from this container
     * Returns true if removed, false if not found
     */
    public boolean remove(Disposable disposable);
    
    /**
     * Removes and disposes a disposable from this container
     * Returns true if found and disposed
     */
    public boolean delete(Disposable disposable);
    
    /**
     * Disposes all contained disposables and clears the container
     */
    public void clear();
    
    /**
     * Returns the number of currently held disposables
     */
    public int size();
    
    /**
     * Disposes all contained disposables
     * Future additions will be immediately disposed
     */
    public void dispose();
    
    /**
     * Returns true if this container has been disposed
     */
    public boolean isDisposed();
}

SerialDisposable

Holds a single disposable that can be swapped atomically.

/**
 * Container that holds a single disposable and allows atomic replacement
 */
public final class SerialDisposable implements Disposable {
    /**
     * Creates a new SerialDisposable
     */
    public SerialDisposable();
    
    /**
     * Creates a SerialDisposable with an initial disposable
     */
    public SerialDisposable(Disposable initialDisposable);
    
    /**
     * Atomically sets the disposable, disposing the previous one if present
     * Returns true if set successfully, false if this container is disposed
     */
    public boolean set(Disposable disposable);
    
    /**
     * Atomically replaces the disposable without disposing the previous one
     * Returns the previous disposable
     */
    public Disposable replace(Disposable disposable);
    
    /**
     * Returns the current disposable (may be null)
     */
    public Disposable get();
    
    /**
     * Disposes the current disposable
     */
    public void dispose();
    
    /**
     * Returns true if this container has been disposed
     */
    public boolean isDisposed();
}

Resource Observers

Observer implementations with built-in resource management.

/**
 * Observer with built-in resource management for Observable
 */
public abstract class ResourceObserver<T> implements Observer<T>, Disposable {
    /**
     * Adds a resource to be disposed when this observer is disposed
     */
    public final void add(Disposable resource);
    
    /**
     * Disposes all managed resources
     */
    public final void dispose();
    
    /**
     * Returns true if disposed
     */
    public final boolean isDisposed();
    
    // Abstract methods to implement
    public abstract void onNext(T t);
    public abstract void onError(Throwable e);
    public abstract void onComplete();
}

/**
 * Subscriber with built-in resource management for Flowable
 */
public abstract class ResourceSubscriber<T> implements FlowableSubscriber<T>, Disposable {
    /**
     * Adds a resource to be disposed when this subscriber is disposed
     */
    public final void add(Disposable resource);
    
    /**
     * Requests the specified number of items from upstream
     */
    protected final void request(long n);
    
    /**
     * Disposes all managed resources and cancels upstream
     */
    public final void dispose();
    
    /**
     * Returns true if disposed
     */
    public final boolean isDisposed();
    
    // Abstract methods to implement
    public abstract void onNext(T t);
    public abstract void onError(Throwable e);
    public abstract void onComplete();
}

/**
 * Observer with built-in resource management for Single
 */
public abstract class ResourceSingleObserver<T> implements SingleObserver<T>, Disposable {
    public final void add(Disposable resource);
    public final void dispose();
    public final boolean isDisposed();
    
    public abstract void onSuccess(T t);
    public abstract void onError(Throwable e);
}

/**
 * Observer with built-in resource management for Maybe
 */
public abstract class ResourceMaybeObserver<T> implements MaybeObserver<T>, Disposable {
    public final void add(Disposable resource);
    public final void dispose();
    public final boolean isDisposed();
    
    public abstract void onSuccess(T t);
    public abstract void onError(Throwable e);
    public abstract void onComplete();
}

/**
 * Observer with built-in resource management for Completable
 */
public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable {
    public final void add(Disposable resource);
    public final void dispose();
    public final boolean isDisposed();
    
    public abstract void onComplete();
    public abstract void onError(Throwable e);
}

Usage Examples

Basic Disposable Management:

import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;

Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS)
    .subscribeOn(Schedulers.io());

// Subscribe and keep the disposable
Disposable disposable = source.subscribe(
    value -> System.out.println("Value: " + value),
    error -> error.printStackTrace()
);

// Later, dispose to stop the stream and free resources
Thread.sleep(5000);
disposable.dispose();

// Check if disposed
System.out.println("Is disposed: " + disposable.isDisposed());

CompositeDisposable for Multiple Subscriptions:

import io.reactivex.disposables.CompositeDisposable;

CompositeDisposable compositeDisposable = new CompositeDisposable();

// Add multiple subscriptions
Observable<Long> timer1 = Observable.interval(1, TimeUnit.SECONDS);
Observable<Long> timer2 = Observable.interval(2, TimeUnit.SECONDS);
Observable<String> http = Observable.fromCallable(() -> fetchFromNetwork())
    .subscribeOn(Schedulers.io());

compositeDisposable.add(timer1.subscribe(v -> System.out.println("Timer 1: " + v)));
compositeDisposable.add(timer2.subscribe(v -> System.out.println("Timer 2: " + v)));
compositeDisposable.add(http.subscribe(result -> System.out.println("HTTP: " + result)));

System.out.println("Active subscriptions: " + compositeDisposable.size());

// Dispose all at once
Thread.sleep(10000);
compositeDisposable.dispose();

// All subscriptions are now disposed
System.out.println("All disposed: " + compositeDisposable.isDisposed());

SerialDisposable for Sequential Operations:

import io.reactivex.disposables.SerialDisposable;

SerialDisposable serialDisposable = new SerialDisposable();

// Start first operation
Observable<String> operation1 = Observable.just("Operation 1")
    .delay(2, TimeUnit.SECONDS);
serialDisposable.set(operation1.subscribe(System.out::println));

// Replace with second operation (first one gets disposed)
Thread.sleep(1000);
Observable<String> operation2 = Observable.just("Operation 2")
    .delay(1, TimeUnit.SECONDS);
serialDisposable.set(operation2.subscribe(System.out::println));

// Only operation 2 will complete
Thread.sleep(3000);
serialDisposable.dispose();

ResourceObserver for Complex Resource Management:

import io.reactivex.observers.ResourceObserver;

class CustomResourceObserver extends ResourceObserver<String> {
    private FileWriter fileWriter;
    
    @Override
    protected void onStart() {
        try {
            fileWriter = new FileWriter("output.txt");
            // Add file writer as a resource to be closed on disposal
            add(Disposables.fromAutoCloseable(fileWriter));
        } catch (IOException e) {
            dispose(); // Dispose if setup fails
        }
    }
    
    @Override
    public void onNext(String value) {
        try {
            fileWriter.write(value + "\n");
            fileWriter.flush();
        } catch (IOException e) {
            dispose();
        }
    }
    
    @Override
    public void onError(Throwable e) {
        System.err.println("Error: " + e.getMessage());
        // Resources will be automatically disposed
    }
    
    @Override
    public void onComplete() {
        System.out.println("Writing completed");
        // Resources will be automatically disposed
    }
}

// Usage
Observable<String> data = Observable.just("Line 1", "Line 2", "Line 3");
CustomResourceObserver observer = new CustomResourceObserver();
data.subscribe(observer);

// Can dispose manually if needed
// observer.dispose();

Custom Disposable Creation:

// Create disposable from Action
Disposable actionDisposable = Disposables.fromAction(() -> {
    System.out.println("Cleaning up resources");
    // Cleanup code here
});

// Create disposable from Runnable
Disposable runnableDisposable = Disposables.fromRunnable(() -> {
    System.out.println("Shutdown procedure");
});

// Create disposable from Future
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(() -> {
    // Long running task
    Thread.sleep(10000);
    return "Done";
});
Disposable futureDisposable = Disposables.fromFuture(future);

// Dispose all
actionDisposable.dispose();
runnableDisposable.dispose();
futureDisposable.dispose(); // This will cancel the future
executor.shutdown();

Lifecycle Management in Android/UI Applications:

public class MainActivity {
    private final CompositeDisposable compositeDisposable = new CompositeDisposable();
    
    public void onCreate() {
        // Start various subscriptions
        compositeDisposable.add(
            userService.getCurrentUser()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(this::updateUI)
        );
        
        compositeDisposable.add(
            locationService.getLocationUpdates()
                .subscribe(this::updateLocation)
        );
        
        compositeDisposable.add(
            messageService.getMessages()
                .subscribe(this::showMessage)
        );
    }
    
    public void onDestroy() {
        // Clean up all subscriptions to prevent memory leaks
        compositeDisposable.dispose();
    }
    
    private void updateUI(User user) { /* Update UI */ }
    private void updateLocation(Location location) { /* Update location */ }
    private void showMessage(String message) { /* Show message */ }
}

Error Handling with Resource Management:

CompositeDisposable resources = new CompositeDisposable();

try {
    // Add resources that might fail
    resources.add(Observable.interval(1, TimeUnit.SECONDS)
        .subscribe(
            value -> {
                if (value > 5) {
                    throw new RuntimeException("Simulated error");
                }
                System.out.println("Value: " + value);
            },
            error -> {
                System.err.println("Stream error: " + error.getMessage());
                // Don't forget to clean up other resources on error
                resources.dispose();
            }
        ));
        
    resources.add(Observable.timer(10, TimeUnit.SECONDS)
        .subscribe(ignored -> System.out.println("Timer completed")));
        
} catch (Exception e) {
    // Ensure cleanup on any exception
    resources.dispose();
    throw e;
}

Memory Leak Prevention:

public class DataProcessor {
    private final CompositeDisposable subscriptions = new CompositeDisposable();
    private final PublishSubject<String> subject = PublishSubject.create();
    
    public void startProcessing() {
        // Process data with proper cleanup
        subscriptions.add(
            subject
                .buffer(5, TimeUnit.SECONDS)
                .filter(buffer -> !buffer.isEmpty())
                .flatMap(this::processBuffer)
                .subscribe(
                    result -> System.out.println("Processed: " + result),
                    error -> System.err.println("Processing error: " + error)
                )
        );
    }
    
    public void addData(String data) {
        if (!subscriptions.isDisposed()) {
            subject.onNext(data);
        }
    }
    
    public void shutdown() {
        // Proper cleanup prevents memory leaks
        subscriptions.dispose();
        subject.onComplete();
    }
    
    private Observable<String> processBuffer(List<String> buffer) {
        return Observable.fromCallable(() -> {
            // Process buffer
            return "Processed " + buffer.size() + " items";
        }).subscribeOn(Schedulers.computation());
    }
}

Best Practices

Resource Management Guidelines:

  1. Always dispose: Keep references to disposables and dispose them when done
  2. Use CompositeDisposable: For managing multiple subscriptions together
  3. Lifecycle awareness: Dispose in appropriate lifecycle methods (onDestroy, onPause, etc.)
  4. Error handling: Ensure resources are disposed even when errors occur
  5. Resource observers: Use ResourceObserver/ResourceSubscriber for complex resource management
  6. SerialDisposable: Use for sequential operations where you need to cancel previous work
  7. Memory leaks: Always dispose long-running or infinite streams
  8. Thread safety: Disposables are thread-safe and can be disposed from any thread

Common Patterns:

  • Activity/Fragment lifecycle management with CompositeDisposable
  • Network request cancellation with individual disposables
  • File/database resource management with ResourceObserver
  • Background task management with SerialDisposable
  • Event bus subscriptions with proper cleanup

Types

/**
 * Functional interface for cleanup actions
 */
public interface Action {
    void run() throws Exception;
}

/**
 * Interface for objects that can be cancelled
 */
public interface Cancellable {
    void cancel() throws Exception;
}

/**
 * Exception thrown by dispose operations
 */
public class CompositeException extends RuntimeException {
    public List<Throwable> getExceptions();
}

Install with Tessl CLI

npx tessl i tessl/maven-io-reactivex-rxjava2--rxjava

docs

completable.md

disposables.md

error-handling.md

flowable.md

index.md

maybe.md

observable.md

schedulers.md

single.md

subjects.md

tile.json