CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-reactivex-rxjava2--rxjava

RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

Pending
Overview
Eval results
Files

subjects.mddocs/

Subjects and Hot Observables

Hot observables that can multicast to multiple observers and emit items regardless of subscriptions. Subjects act as both Observable and Observer, making them perfect for bridging reactive and non-reactive code.

Capabilities

Subject Base Class

All subjects extend the base Subject class.

/**
 * Base class for all subjects that act as both Observable and Observer
 */
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    /**
     * Returns true if this subject has any observers
     */
    public abstract boolean hasObservers();
    
    /**
     * Returns true if this subject has terminated with an error
     */
    public abstract boolean hasThrowable();
    
    /**
     * Returns the terminal error if hasThrowable() returns true
     */
    public abstract Throwable getThrowable();
    
    /**
     * Returns true if this subject has completed successfully
     */
    public abstract boolean hasComplete();
    
    /**
     * Converts this subject to a serialized version (thread-safe)
     */
    public final Subject<T> toSerialized();
}

PublishSubject

Multicasts items to current observers only.

/**
 * Subject that multicasts items to currently subscribed observers
 * Does not replay any items to new subscribers
 */
public final class PublishSubject<T> extends Subject<T> {
    /**
     * Creates a new PublishSubject
     */
    public static <T> PublishSubject<T> create();
    
    /**
     * Returns the number of current observers
     */
    public int observerCount();
    
    // Inherits Observer methods
    public void onSubscribe(Disposable d);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}

BehaviorSubject

Replays the latest value to new subscribers.

/**
 * Subject that replays the latest emitted item to new subscribers
 * Always has a current value (either initial or most recent)
 */
public final class BehaviorSubject<T> extends Subject<T> {
    /**
     * Creates a BehaviorSubject with an initial value
     */
    public static <T> BehaviorSubject<T> createDefault(T defaultValue);
    
    /**
     * Creates a BehaviorSubject without an initial value
     */
    public static <T> BehaviorSubject<T> create();
    
    /**
     * Returns the current value if available
     */
    public T getValue();
    
    /**
     * Returns true if this subject has a current value
     */
    public boolean hasValue();
    
    /**
     * Returns the number of current observers
     */
    public int observerCount();
}

ReplaySubject

Replays all or a subset of emitted items to new subscribers.

/**
 * Subject that replays emitted items to new subscribers
 * Can buffer all items or limit by count/time
 */
public final class ReplaySubject<T> extends Subject<T> {
    /**
     * Creates a ReplaySubject that buffers all items
     */
    public static <T> ReplaySubject<T> create();
    
    /**
     * Creates a ReplaySubject with a maximum buffer size
     */
    public static <T> ReplaySubject<T> create(int bufferSize);
    
    /**
     * Creates a ReplaySubject that buffers items for a time window
     */
    public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler);
    
    /**
     * Creates a ReplaySubject with both size and time limits
     */
    public static <T> ReplaySubject<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize);
    
    /**
     * Returns the current number of buffered items
     */
    public int size();
    
    /**
     * Returns the current buffered values as an array
     */
    public Object[] getValues();
    
    /**
     * Returns the current buffered values as a typed array
     */
    public T[] getValues(T[] array);
    
    /**
     * Returns the number of current observers
     */
    public int observerCount();
}

AsyncSubject

Replays only the final value to subscribers.

/**
 * Subject that only emits the last value when it completes
 * Emits nothing if it terminates with an error
 */
public final class AsyncSubject<T> extends Subject<T> {
    /**
     * Creates a new AsyncSubject
     */
    public static <T> AsyncSubject<T> create();
    
    /**
     * Returns the final value if the subject has completed successfully
     */
    public T getValue();
    
    /**
     * Returns true if this subject has a final value
     */
    public boolean hasValue();
    
    /**
     * Returns the number of current observers
     */
    public int observerCount();
}

UnicastSubject

Single-observer subject with optional buffering.

/**
 * Subject that allows only one observer and buffers items until subscription
 */
public final class UnicastSubject<T> extends Subject<T> {
    /**
     * Creates a UnicastSubject with unlimited buffering
     */
    public static <T> UnicastSubject<T> create();
    
    /**
     * Creates a UnicastSubject with a capacity hint and cleanup callback
     */
    public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate);
    
    /**
     * Creates a UnicastSubject with cleanup callback and delay error flag
     */
    public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate, boolean delayError);
}

Single, Maybe, and Completable Subjects

Subjects for other reactive types.

/**
 * Subject for Single reactive type
 */
public final class SingleSubject<T> extends Single<T> implements SingleObserver<T> {
    public static <T> SingleSubject<T> create();
    public boolean hasObservers();
    public boolean hasValue();
    public boolean hasThrowable();
    public T getValue();
    public Throwable getThrowable();
    public void onSubscribe(Disposable d);
    public void onSuccess(T value);
    public void onError(Throwable e);
}

/**
 * Subject for Maybe reactive type
 */
public final class MaybeSubject<T> extends Maybe<T> implements MaybeObserver<T> {
    public static <T> MaybeSubject<T> create();
    public boolean hasObservers();
    public boolean hasValue();
    public boolean hasComplete();
    public boolean hasThrowable();
    public T getValue();
    public Throwable getThrowable();
    public void onSubscribe(Disposable d);
    public void onSuccess(T value);
    public void onError(Throwable e);
    public void onComplete();
}

/**
 * Subject for Completable reactive type
 */
public final class CompletableSubject extends Completable implements CompletableObserver {
    public static CompletableSubject create();
    public boolean hasObservers();
    public boolean hasComplete();
    public boolean hasThrowable();
    public Throwable getThrowable();
    public void onSubscribe(Disposable d);
    public void onComplete();
    public void onError(Throwable e);
}

Usage Examples

PublishSubject - Live Event Broadcasting:

import io.reactivex.subjects.PublishSubject;

PublishSubject<String> eventBus = PublishSubject.create();

// First subscriber
eventBus.subscribe(event -> System.out.println("Subscriber 1: " + event));

// Emit events
eventBus.onNext("Event 1");
eventBus.onNext("Event 2");

// Second subscriber (won't receive previous events)
eventBus.subscribe(event -> System.out.println("Subscriber 2: " + event));

eventBus.onNext("Event 3"); // Both subscribers receive this

// Clean termination
eventBus.onComplete();

BehaviorSubject - Current State Management:

import io.reactivex.subjects.BehaviorSubject;

// User state management
BehaviorSubject<String> userState = BehaviorSubject.createDefault("logged_out");

// UI component subscribes
userState.subscribe(state -> System.out.println("UI: User is " + state));

// State changes
userState.onNext("logging_in");
userState.onNext("logged_in");

// New component subscribes and immediately gets current state
userState.subscribe(state -> System.out.println("New Component: User is " + state));

// Check current state
if (userState.hasValue()) {
    System.out.println("Current state: " + userState.getValue());
}

ReplaySubject - Event History:

import io.reactivex.subjects.ReplaySubject;

// Replay last 3 events
ReplaySubject<String> history = ReplaySubject.create(3);

// Emit some events
history.onNext("Event 1");
history.onNext("Event 2");
history.onNext("Event 3");
history.onNext("Event 4");
history.onNext("Event 5");

// New subscriber gets last 3 events
history.subscribe(event -> System.out.println("Late subscriber: " + event));
// Output: Event 3, Event 4, Event 5

// Time-based replay
ReplaySubject<String> timeHistory = ReplaySubject.createWithTime(
    2, TimeUnit.SECONDS, Schedulers.computation());

timeHistory.onNext("Old event");
Thread.sleep(3000);
timeHistory.onNext("Recent event");

// New subscriber only gets recent event
timeHistory.subscribe(event -> System.out.println("Time subscriber: " + event));

AsyncSubject - Final Result:

import io.reactivex.subjects.AsyncSubject;

AsyncSubject<String> calculation = AsyncSubject.create();

// Subscribers only get the final result
calculation.subscribe(result -> System.out.println("Result 1: " + result));

// Emit intermediate values (not received by observers)
calculation.onNext("Step 1");
calculation.onNext("Step 2");
calculation.onNext("Final Result");

// Must complete for observers to receive the final value
calculation.onComplete();

// Late subscriber still gets the final result
calculation.subscribe(result -> System.out.println("Result 2: " + result));

UnicastSubject - Single Observer with Buffering:

import io.reactivex.subjects.UnicastSubject;

UnicastSubject<Integer> unicast = UnicastSubject.create();

// Emit items before subscription (they get buffered)
unicast.onNext(1);
unicast.onNext(2);
unicast.onNext(3);

// First subscriber gets all buffered items
unicast.subscribe(value -> System.out.println("Unicast: " + value));

// Subsequent items delivered immediately
unicast.onNext(4);
unicast.onNext(5);

// Only one observer allowed - second subscription will error
// unicast.subscribe(value -> System.out.println("Second: " + value)); // IllegalStateException

SingleSubject - Async Result:

import io.reactivex.subjects.SingleSubject;

SingleSubject<String> asyncResult = SingleSubject.create();

// Multiple subscribers can wait for the same result
asyncResult.subscribe(result -> System.out.println("Observer 1: " + result));
asyncResult.subscribe(result -> System.out.println("Observer 2: " + result));

// Simulate async operation
new Thread(() -> {
    try {
        Thread.sleep(2000);
        asyncResult.onSuccess("Async operation completed");
    } catch (Exception e) {
        asyncResult.onError(e);
    }
}).start();

Thread Safety with Serialized Subjects:

PublishSubject<String> unsafeSubject = PublishSubject.create();
Subject<String> safeSubject = unsafeSubject.toSerialized();

// Multiple threads can safely emit to serialized subject
for (int i = 0; i < 10; i++) {
    final int threadId = i;
    new Thread(() -> {
        for (int j = 0; j < 100; j++) {
            safeSubject.onNext("Thread " + threadId + ", Item " + j);
        }
    }).start();
}

safeSubject.subscribe(item -> System.out.println("Received: " + item));

Error Handling with Subjects:

PublishSubject<String> subject = PublishSubject.create();

subject.subscribe(
    item -> System.out.println("Item: " + item),
    error -> System.err.println("Error: " + error.getMessage()),
    () -> System.out.println("Completed")
);

subject.onNext("Item 1");
subject.onNext("Item 2");

// Error terminates the subject
subject.onError(new RuntimeException("Something went wrong"));

// No more items can be emitted after error
// subject.onNext("Item 3"); // This would be ignored

Bridging Callback APIs with Subjects:

// Bridge traditional callback API to reactive streams
public class WeatherService {
    private final PublishSubject<Weather> weatherUpdates = PublishSubject.create();
    
    public Observable<Weather> getWeatherUpdates() {
        return weatherUpdates.asObservable(); // Hide subject implementation
    }
    
    // Called by external weather API
    public void onWeatherUpdate(Weather weather) {
        weatherUpdates.onNext(weather);
    }
    
    public void onWeatherError(Exception error) {
        weatherUpdates.onError(error);
    }
}

// Usage
WeatherService service = new WeatherService();
service.getWeatherUpdates()
    .subscribe(weather -> System.out.println("Weather: " + weather));

Subject Guidelines

When to use each subject:

  • PublishSubject: Event buses, live data streams, notifications
  • BehaviorSubject: State management, current values, configuration
  • ReplaySubject: Event history, audit logs, caching recent data
  • AsyncSubject: Final results, completion notifications
  • UnicastSubject: Single consumer scenarios, back-pressure handling

Best Practices:

  1. Always use toSerialized() when multiple threads emit to a subject
  2. Prefer asObservable() to hide the subject from consumers
  3. Handle termination properly (onComplete/onError)
  4. Be careful with memory leaks in ReplaySubject
  5. Consider using Processors for backpressure-aware subjects
  6. Don't emit to subjects after they've terminated

Types

/**
 * Exception thrown when trying to subscribe multiple observers to UnicastSubject
 */
public final class IllegalStateException extends RuntimeException {
    // Standard exception
}

/**
 * Base interfaces for all subjects
 */
public interface ObservableSource<T> {
    void subscribe(Observer<? super T> observer);
}

public interface Observer<T> {
    void onSubscribe(Disposable d);
    void onNext(T t);
    void onError(Throwable e);
    void onComplete();
}

Install with Tessl CLI

npx tessl i tessl/maven-io-reactivex-rxjava2--rxjava

docs

completable.md

disposables.md

error-handling.md

flowable.md

index.md

maybe.md

observable.md

schedulers.md

single.md

subjects.md

tile.json