RxJava: Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
—
Hot observables that can multicast to multiple observers and emit items regardless of subscriptions. Subjects act as both Observable and Observer, making them perfect for bridging reactive and non-reactive code.
All subjects extend the base Subject class.
/**
* Base class for all subjects that act as both Observable and Observer
*/
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
/**
* Returns true if this subject has any observers
*/
public abstract boolean hasObservers();
/**
* Returns true if this subject has terminated with an error
*/
public abstract boolean hasThrowable();
/**
* Returns the terminal error if hasThrowable() returns true
*/
public abstract Throwable getThrowable();
/**
* Returns true if this subject has completed successfully
*/
public abstract boolean hasComplete();
/**
* Converts this subject to a serialized version (thread-safe)
*/
public final Subject<T> toSerialized();
}Multicasts items to current observers only.
/**
* Subject that multicasts items to currently subscribed observers
* Does not replay any items to new subscribers
*/
public final class PublishSubject<T> extends Subject<T> {
/**
* Creates a new PublishSubject
*/
public static <T> PublishSubject<T> create();
/**
* Returns the number of current observers
*/
public int observerCount();
// Inherits Observer methods
public void onSubscribe(Disposable d);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}Replays the latest value to new subscribers.
/**
* Subject that replays the latest emitted item to new subscribers
* Always has a current value (either initial or most recent)
*/
public final class BehaviorSubject<T> extends Subject<T> {
/**
* Creates a BehaviorSubject with an initial value
*/
public static <T> BehaviorSubject<T> createDefault(T defaultValue);
/**
* Creates a BehaviorSubject without an initial value
*/
public static <T> BehaviorSubject<T> create();
/**
* Returns the current value if available
*/
public T getValue();
/**
* Returns true if this subject has a current value
*/
public boolean hasValue();
/**
* Returns the number of current observers
*/
public int observerCount();
}Replays all or a subset of emitted items to new subscribers.
/**
* Subject that replays emitted items to new subscribers
* Can buffer all items or limit by count/time
*/
public final class ReplaySubject<T> extends Subject<T> {
/**
* Creates a ReplaySubject that buffers all items
*/
public static <T> ReplaySubject<T> create();
/**
* Creates a ReplaySubject with a maximum buffer size
*/
public static <T> ReplaySubject<T> create(int bufferSize);
/**
* Creates a ReplaySubject that buffers items for a time window
*/
public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler);
/**
* Creates a ReplaySubject with both size and time limits
*/
public static <T> ReplaySubject<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize);
/**
* Returns the current number of buffered items
*/
public int size();
/**
* Returns the current buffered values as an array
*/
public Object[] getValues();
/**
* Returns the current buffered values as a typed array
*/
public T[] getValues(T[] array);
/**
* Returns the number of current observers
*/
public int observerCount();
}Replays only the final value to subscribers.
/**
* Subject that only emits the last value when it completes
* Emits nothing if it terminates with an error
*/
public final class AsyncSubject<T> extends Subject<T> {
/**
* Creates a new AsyncSubject
*/
public static <T> AsyncSubject<T> create();
/**
* Returns the final value if the subject has completed successfully
*/
public T getValue();
/**
* Returns true if this subject has a final value
*/
public boolean hasValue();
/**
* Returns the number of current observers
*/
public int observerCount();
}Single-observer subject with optional buffering.
/**
* Subject that allows only one observer and buffers items until subscription
*/
public final class UnicastSubject<T> extends Subject<T> {
/**
* Creates a UnicastSubject with unlimited buffering
*/
public static <T> UnicastSubject<T> create();
/**
* Creates a UnicastSubject with a capacity hint and cleanup callback
*/
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate);
/**
* Creates a UnicastSubject with cleanup callback and delay error flag
*/
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate, boolean delayError);
}Subjects for other reactive types.
/**
* Subject for Single reactive type
*/
public final class SingleSubject<T> extends Single<T> implements SingleObserver<T> {
public static <T> SingleSubject<T> create();
public boolean hasObservers();
public boolean hasValue();
public boolean hasThrowable();
public T getValue();
public Throwable getThrowable();
public void onSubscribe(Disposable d);
public void onSuccess(T value);
public void onError(Throwable e);
}
/**
* Subject for Maybe reactive type
*/
public final class MaybeSubject<T> extends Maybe<T> implements MaybeObserver<T> {
public static <T> MaybeSubject<T> create();
public boolean hasObservers();
public boolean hasValue();
public boolean hasComplete();
public boolean hasThrowable();
public T getValue();
public Throwable getThrowable();
public void onSubscribe(Disposable d);
public void onSuccess(T value);
public void onError(Throwable e);
public void onComplete();
}
/**
* Subject for Completable reactive type
*/
public final class CompletableSubject extends Completable implements CompletableObserver {
public static CompletableSubject create();
public boolean hasObservers();
public boolean hasComplete();
public boolean hasThrowable();
public Throwable getThrowable();
public void onSubscribe(Disposable d);
public void onComplete();
public void onError(Throwable e);
}PublishSubject - Live Event Broadcasting:
import io.reactivex.subjects.PublishSubject;
PublishSubject<String> eventBus = PublishSubject.create();
// First subscriber
eventBus.subscribe(event -> System.out.println("Subscriber 1: " + event));
// Emit events
eventBus.onNext("Event 1");
eventBus.onNext("Event 2");
// Second subscriber (won't receive previous events)
eventBus.subscribe(event -> System.out.println("Subscriber 2: " + event));
eventBus.onNext("Event 3"); // Both subscribers receive this
// Clean termination
eventBus.onComplete();BehaviorSubject - Current State Management:
import io.reactivex.subjects.BehaviorSubject;
// User state management
BehaviorSubject<String> userState = BehaviorSubject.createDefault("logged_out");
// UI component subscribes
userState.subscribe(state -> System.out.println("UI: User is " + state));
// State changes
userState.onNext("logging_in");
userState.onNext("logged_in");
// New component subscribes and immediately gets current state
userState.subscribe(state -> System.out.println("New Component: User is " + state));
// Check current state
if (userState.hasValue()) {
System.out.println("Current state: " + userState.getValue());
}ReplaySubject - Event History:
import io.reactivex.subjects.ReplaySubject;
// Replay last 3 events
ReplaySubject<String> history = ReplaySubject.create(3);
// Emit some events
history.onNext("Event 1");
history.onNext("Event 2");
history.onNext("Event 3");
history.onNext("Event 4");
history.onNext("Event 5");
// New subscriber gets last 3 events
history.subscribe(event -> System.out.println("Late subscriber: " + event));
// Output: Event 3, Event 4, Event 5
// Time-based replay
ReplaySubject<String> timeHistory = ReplaySubject.createWithTime(
2, TimeUnit.SECONDS, Schedulers.computation());
timeHistory.onNext("Old event");
Thread.sleep(3000);
timeHistory.onNext("Recent event");
// New subscriber only gets recent event
timeHistory.subscribe(event -> System.out.println("Time subscriber: " + event));AsyncSubject - Final Result:
import io.reactivex.subjects.AsyncSubject;
AsyncSubject<String> calculation = AsyncSubject.create();
// Subscribers only get the final result
calculation.subscribe(result -> System.out.println("Result 1: " + result));
// Emit intermediate values (not received by observers)
calculation.onNext("Step 1");
calculation.onNext("Step 2");
calculation.onNext("Final Result");
// Must complete for observers to receive the final value
calculation.onComplete();
// Late subscriber still gets the final result
calculation.subscribe(result -> System.out.println("Result 2: " + result));UnicastSubject - Single Observer with Buffering:
import io.reactivex.subjects.UnicastSubject;
UnicastSubject<Integer> unicast = UnicastSubject.create();
// Emit items before subscription (they get buffered)
unicast.onNext(1);
unicast.onNext(2);
unicast.onNext(3);
// First subscriber gets all buffered items
unicast.subscribe(value -> System.out.println("Unicast: " + value));
// Subsequent items delivered immediately
unicast.onNext(4);
unicast.onNext(5);
// Only one observer allowed - second subscription will error
// unicast.subscribe(value -> System.out.println("Second: " + value)); // IllegalStateExceptionSingleSubject - Async Result:
import io.reactivex.subjects.SingleSubject;
SingleSubject<String> asyncResult = SingleSubject.create();
// Multiple subscribers can wait for the same result
asyncResult.subscribe(result -> System.out.println("Observer 1: " + result));
asyncResult.subscribe(result -> System.out.println("Observer 2: " + result));
// Simulate async operation
new Thread(() -> {
try {
Thread.sleep(2000);
asyncResult.onSuccess("Async operation completed");
} catch (Exception e) {
asyncResult.onError(e);
}
}).start();Thread Safety with Serialized Subjects:
PublishSubject<String> unsafeSubject = PublishSubject.create();
Subject<String> safeSubject = unsafeSubject.toSerialized();
// Multiple threads can safely emit to serialized subject
for (int i = 0; i < 10; i++) {
final int threadId = i;
new Thread(() -> {
for (int j = 0; j < 100; j++) {
safeSubject.onNext("Thread " + threadId + ", Item " + j);
}
}).start();
}
safeSubject.subscribe(item -> System.out.println("Received: " + item));Error Handling with Subjects:
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(
item -> System.out.println("Item: " + item),
error -> System.err.println("Error: " + error.getMessage()),
() -> System.out.println("Completed")
);
subject.onNext("Item 1");
subject.onNext("Item 2");
// Error terminates the subject
subject.onError(new RuntimeException("Something went wrong"));
// No more items can be emitted after error
// subject.onNext("Item 3"); // This would be ignoredBridging Callback APIs with Subjects:
// Bridge traditional callback API to reactive streams
public class WeatherService {
private final PublishSubject<Weather> weatherUpdates = PublishSubject.create();
public Observable<Weather> getWeatherUpdates() {
return weatherUpdates.asObservable(); // Hide subject implementation
}
// Called by external weather API
public void onWeatherUpdate(Weather weather) {
weatherUpdates.onNext(weather);
}
public void onWeatherError(Exception error) {
weatherUpdates.onError(error);
}
}
// Usage
WeatherService service = new WeatherService();
service.getWeatherUpdates()
.subscribe(weather -> System.out.println("Weather: " + weather));When to use each subject:
Best Practices:
toSerialized() when multiple threads emit to a subjectasObservable() to hide the subject from consumers/**
* Exception thrown when trying to subscribe multiple observers to UnicastSubject
*/
public final class IllegalStateException extends RuntimeException {
// Standard exception
}
/**
* Base interfaces for all subjects
*/
public interface ObservableSource<T> {
void subscribe(Observer<? super T> observer);
}
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable e);
void onComplete();
}Install with Tessl CLI
npx tessl i tessl/maven-io-reactivex-rxjava2--rxjava