Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
—
Subjects and Processors are hot observables that act as both observer and observable, enabling multicasting of events to multiple subscribers. They bridge the gap between imperative and reactive programming by allowing manual event emission.
Hot observables for multicasting without backpressure support.
/**
* Subject that emits to all current subscribers
*/
public final class PublishSubject<T> extends Subject<T> {
/**
* Create a new PublishSubject
* @return new PublishSubject instance
*/
public static <T> PublishSubject<T> create();
/**
* Emit an item to all subscribers
* @param value the item to emit
*/
public void onNext(T value);
/**
* Emit an error to all subscribers and terminate
* @param error the error to emit
*/
public void onError(Throwable error);
/**
* Complete all subscribers
*/
public void onComplete();
/**
* Check if this subject has subscribers
* @return true if has subscribers, false otherwise
*/
public boolean hasObservers();
/**
* Check if this subject has completed
* @return true if completed, false otherwise
*/
public boolean hasComplete();
/**
* Check if this subject has an error
* @return true if terminated with error, false otherwise
*/
public boolean hasThrowable();
/**
* Get the terminating error if any
* @return the error that terminated this subject, null if none
*/
public Throwable getThrowable();
}
/**
* Subject that replays items to new subscribers
*/
public final class ReplaySubject<T> extends Subject<T> {
/**
* Create an unbounded ReplaySubject
* @return new ReplaySubject that replays all items
*/
public static <T> ReplaySubject<T> create();
/**
* Create a size-bounded ReplaySubject
* @param maxSize maximum number of items to replay
* @return new ReplaySubject with size limit
*/
public static <T> ReplaySubject<T> createWithSize(int maxSize);
/**
* Create a time-bounded ReplaySubject
* @param maxAge maximum age of items to replay
* @param unit time unit for maxAge
* @return new ReplaySubject with time limit
*/
public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit);
/**
* Create a time and size bounded ReplaySubject
* @param maxSize maximum number of items to replay
* @param maxAge maximum age of items to replay
* @param unit time unit for maxAge
* @return new ReplaySubject with both limits
*/
public static <T> ReplaySubject<T> createWithTimeAndSize(int maxSize, long maxAge, TimeUnit unit);
/**
* Get all cached values
* @return array of all cached values
*/
public Object[] getValues();
/**
* Get all cached values as specific type
* @param array array to fill with values
* @return array filled with cached values
*/
public T[] getValues(T[] array);
/**
* Get count of cached values
* @return number of cached values
*/
public int size();
}
/**
* Subject that only emits the last item to new subscribers
*/
public final class BehaviorSubject<T> extends Subject<T> {
/**
* Create a BehaviorSubject without initial value
* @return new BehaviorSubject
*/
public static <T> BehaviorSubject<T> create();
/**
* Create a BehaviorSubject with initial value
* @param defaultValue the initial value
* @return new BehaviorSubject with initial value
*/
public static <T> BehaviorSubject<T> createDefault(T defaultValue);
/**
* Get the current value if any
* @return the current value or null if none
*/
public T getValue();
/**
* Check if this subject has a current value
* @return true if has current value, false otherwise
*/
public boolean hasValue();
}
/**
* Subject that only emits items after onComplete is called
*/
public final class AsyncSubject<T> extends Subject<T> {
/**
* Create a new AsyncSubject
* @return new AsyncSubject instance
*/
public static <T> AsyncSubject<T> create();
/**
* Get the final value if completed successfully
* @return the final value or null if none or not completed
*/
public T getValue();
/**
* Check if this subject has a final value
* @return true if has final value, false otherwise
*/
public boolean hasValue();
}Hot flowables with backpressure support for Reactive Streams.
/**
* Processor that emits to all current subscribers with backpressure
*/
public final class PublishProcessor<T> extends FlowableProcessor<T> {
/**
* Create a new PublishProcessor
* @return new PublishProcessor instance
*/
public static <T> PublishProcessor<T> create();
/**
* Emit an item to all subscribers
* @param value the item to emit
*/
public void onNext(T value);
/**
* Emit an error to all subscribers and terminate
* @param error the error to emit
*/
public void onError(Throwable error);
/**
* Complete all subscribers
*/
public void onComplete();
/**
* Offer an item with backpressure handling
* @param value the item to offer
* @return true if accepted, false if would violate backpressure
*/
public boolean offer(T value);
/**
* Check if this processor has subscribers
* @return true if has subscribers, false otherwise
*/
public boolean hasSubscribers();
}
/**
* Processor that replays items to new subscribers with backpressure
*/
public final class ReplayProcessor<T> extends FlowableProcessor<T> {
/**
* Create an unbounded ReplayProcessor
* @return new ReplayProcessor that replays all items
*/
public static <T> ReplayProcessor<T> create();
/**
* Create a size-bounded ReplayProcessor
* @param maxSize maximum number of items to replay
* @return new ReplayProcessor with size limit
*/
public static <T> ReplayProcessor<T> createWithSize(int maxSize);
/**
* Create a time-bounded ReplayProcessor
* @param maxAge maximum age of items to replay
* @param unit time unit for maxAge
* @return new ReplayProcessor with time limit
*/
public static <T> ReplayProcessor<T> createWithTime(long maxAge, TimeUnit unit);
/**
* Get all cached values
* @return array of all cached values
*/
public Object[] getValues();
/**
* Get count of cached values
* @return number of cached values
*/
public int size();
}
/**
* Processor that only emits the last item to new subscribers
*/
public final class BehaviorProcessor<T> extends FlowableProcessor<T> {
/**
* Create a BehaviorProcessor without initial value
* @return new BehaviorProcessor
*/
public static <T> BehaviorProcessor<T> create();
/**
* Create a BehaviorProcessor with initial value
* @param defaultValue the initial value
* @return new BehaviorProcessor with initial value
*/
public static <T> BehaviorProcessor<T> createDefault(T defaultValue);
/**
* Get the current value if any
* @return the current value or null if none
*/
public T getValue();
/**
* Check if this processor has a current value
* @return true if has current value, false otherwise
*/
public boolean hasValue();
}
/**
* Processor that only emits items after onComplete is called
*/
public final class AsyncProcessor<T> extends FlowableProcessor<T> {
/**
* Create a new AsyncProcessor
* @return new AsyncProcessor instance
*/
public static <T> AsyncProcessor<T> create();
/**
* Get the final value if completed successfully
* @return the final value or null if none or not completed
*/
public T getValue();
/**
* Check if this processor has a final value
* @return true if has final value, false otherwise
*/
public boolean hasValue();
}Subjects for single-value reactive types.
/**
* Subject for Single operations
*/
public final class SingleSubject<T> extends Single<T> implements SingleObserver<T> {
/**
* Create a new SingleSubject
* @return new SingleSubject instance
*/
public static <T> SingleSubject<T> create();
/**
* Emit a success value to all observers
* @param value the value to emit
*/
public void onSuccess(T value);
/**
* Emit an error to all observers
* @param error the error to emit
*/
public void onError(Throwable error);
/**
* Check if this subject has observers
* @return true if has observers, false otherwise
*/
public boolean hasObservers();
/**
* Get the success value if any
* @return the success value or null if none
*/
public T getValue();
/**
* Get the error if any
* @return the error or null if none
*/
public Throwable getThrowable();
}
/**
* Subject for Maybe operations
*/
public final class MaybeSubject<T> extends Maybe<T> implements MaybeObserver<T> {
/**
* Create a new MaybeSubject
* @return new MaybeSubject instance
*/
public static <T> MaybeSubject<T> create();
/**
* Emit a success value to all observers
* @param value the value to emit
*/
public void onSuccess(T value);
/**
* Emit an error to all observers
* @param error the error to emit
*/
public void onError(Throwable error);
/**
* Complete all observers without emitting a value
*/
public void onComplete();
/**
* Check if this subject has observers
* @return true if has observers, false otherwise
*/
public boolean hasObservers();
/**
* Get the success value if any
* @return the success value or null if none
*/
public T getValue();
}
/**
* Subject for Completable operations
*/
public final class CompletableSubject extends Completable implements CompletableObserver {
/**
* Create a new CompletableSubject
* @return new CompletableSubject instance
*/
public static CompletableSubject create();
/**
* Complete all observers
*/
public void onComplete();
/**
* Emit an error to all observers
* @param error the error to emit
*/
public void onError(Throwable error);
/**
* Check if this subject has observers
* @return true if has observers, false otherwise
*/
public boolean hasObservers();
/**
* Get the error if any
* @return the error or null if none
*/
public Throwable getThrowable();
}/**
* Base class for all Observable subjects
*/
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
/**
* Convert this subject to a serialized version for thread safety
* @return serialized version of this subject
*/
public final Subject<T> toSerialized();
/**
* Check if this subject has observers
* @return true if has observers, false otherwise
*/
public abstract boolean hasObservers();
/**
* Check if this subject has completed normally
* @return true if completed, false otherwise
*/
public abstract boolean hasComplete();
/**
* Check if this subject terminated with an error
* @return true if terminated with error, false otherwise
*/
public abstract boolean hasThrowable();
/**
* Get the terminating error if any
* @return the error or null if none
*/
public abstract Throwable getThrowable();
}
/**
* Base class for all Flowable processors
*/
public abstract class FlowableProcessor<T> extends Flowable<T> implements FlowableSubscriber<T>, Processor<T, T> {
/**
* Convert this processor to a serialized version for thread safety
* @return serialized version of this processor
*/
public final FlowableProcessor<T> toSerialized();
/**
* Check if this processor has subscribers
* @return true if has subscribers, false otherwise
*/
public abstract boolean hasSubscribers();
/**
* Check if this processor has completed normally
* @return true if completed, false otherwise
*/
public abstract boolean hasComplete();
/**
* Check if this processor terminated with an error
* @return true if terminated with error, false otherwise
*/
public abstract boolean hasThrowable();
/**
* Get the terminating error if any
* @return the error or null if none
*/
public abstract Throwable getThrowable();
}Usage Examples:
import io.reactivex.rxjava3.subjects.*;
import io.reactivex.rxjava3.processors.*;
import io.reactivex.rxjava3.core.*;
import java.util.concurrent.TimeUnit;
// PublishSubject - hot multicast
PublishSubject<String> publishSubject = PublishSubject.create();
// Subscribe multiple observers
publishSubject.subscribe(value -> System.out.println("Observer 1: " + value));
publishSubject.subscribe(value -> System.out.println("Observer 2: " + value));
// Emit items
publishSubject.onNext("Hello");
publishSubject.onNext("World");
// Late subscriber won't receive previous items
publishSubject.subscribe(value -> System.out.println("Late Observer: " + value));
publishSubject.onNext("Late Item");
publishSubject.onComplete();
// BehaviorSubject - remembers last value
BehaviorSubject<String> behaviorSubject = BehaviorSubject.createDefault("Initial");
behaviorSubject.subscribe(value -> System.out.println("Behavior 1: " + value));
behaviorSubject.onNext("Update 1");
// Late subscriber gets the last value
behaviorSubject.subscribe(value -> System.out.println("Behavior 2: " + value));
behaviorSubject.onNext("Update 2");
// ReplaySubject - replays all previous items
ReplaySubject<String> replaySubject = ReplaySubject.create();
replaySubject.onNext("Item 1");
replaySubject.onNext("Item 2");
// Late subscriber gets all previous items
replaySubject.subscribe(value -> System.out.println("Replay: " + value));
replaySubject.onNext("Item 3");
// AsyncSubject - only emits the last item when completed
AsyncSubject<String> asyncSubject = AsyncSubject.create();
asyncSubject.subscribe(value -> System.out.println("Async: " + value));
asyncSubject.onNext("First");
asyncSubject.onNext("Second");
asyncSubject.onNext("Last");
asyncSubject.onComplete(); // Only "Last" is emitted
// SingleSubject for single-value operations
SingleSubject<String> singleSubject = SingleSubject.create();
singleSubject.subscribe(System.out::println);
singleSubject.onSuccess("Single Value");
// MaybeSubject for optional values
MaybeSubject<String> maybeSubject = MaybeSubject.create();
maybeSubject.subscribe(
value -> System.out.println("Maybe success: " + value),
error -> System.err.println("Maybe error: " + error),
() -> System.out.println("Maybe complete (empty)")
);
// Can emit value or complete empty
if (Math.random() > 0.5) {
maybeSubject.onSuccess("Maybe Value");
} else {
maybeSubject.onComplete();
}
// CompletableSubject for completion-only operations
CompletableSubject completableSubject = CompletableSubject.create();
completableSubject.subscribe(
() -> System.out.println("Completable completed"),
error -> System.err.println("Completable error: " + error)
);
completableSubject.onComplete();
// Processors for backpressured streams
PublishProcessor<Integer> publishProcessor = PublishProcessor.create();
publishProcessor.subscribe(value -> System.out.println("Processor: " + value));
// Emit with backpressure handling
for (int i = 0; i < 5; i++) {
if (publishProcessor.offer(i)) {
System.out.println("Offered: " + i);
} else {
System.out.println("Backpressure: could not offer " + i);
}
}
publishProcessor.onComplete();
// Thread-safe serialized subjects
Subject<String> serializedSubject = PublishSubject.<String>create().toSerialized();
// Safe to call from multiple threads
new Thread(() -> serializedSubject.onNext("Thread 1")).start();
new Thread(() -> serializedSubject.onNext("Thread 2")).start();
// Bridge between imperative and reactive code
class EventBus {
private final PublishSubject<String> eventSubject = PublishSubject.create();
public Observable<String> getEvents() {
return eventSubject;
}
public void publishEvent(String event) {
eventSubject.onNext(event);
}
}
EventBus eventBus = new EventBus();
eventBus.getEvents().subscribe(event -> System.out.println("Event: " + event));
eventBus.publishEvent("User logged in");
eventBus.publishEvent("Data updated");
// State management with BehaviorSubject
class StateManager<T> {
private final BehaviorSubject<T> stateSubject;
public StateManager(T initialState) {
this.stateSubject = BehaviorSubject.createDefault(initialState);
}
public Observable<T> getState() {
return stateSubject.distinctUntilChanged();
}
public T getCurrentState() {
return stateSubject.getValue();
}
public void setState(T newState) {
stateSubject.onNext(newState);
}
}
StateManager<String> stateManager = new StateManager<>("Initial State");
stateManager.getState().subscribe(state -> System.out.println("State: " + state));
stateManager.setState("Updated State");
stateManager.setState("Final State");PublishSubject: Event bus, real-time notifications, hot observablesBehaviorSubject: State management, current value access, configuration settingsReplaySubject: Caching, message replay, audit trailsAsyncSubject: Final result computation, completion notificationsSingleSubject: Single async operations, request/response patternsMaybeSubject: Optional async operations, cache lookupsCompletableSubject: Fire-and-forget operations, cleanup taskstoSerialized() when accessing subjects from multiple threadsInstall with Tessl CLI
npx tessl i tessl/maven-io-reactivex-rxjava3--rxjava