0
# Subjects and Hot Observables
1
2
Hot observables that can multicast to multiple observers and emit items regardless of subscriptions. Subjects act as both Observable and Observer, making them perfect for bridging reactive and non-reactive code.
3
4
## Capabilities
5
6
### Subject Base Class
7
8
All subjects extend the base Subject class.
9
10
```java { .api }
11
/**
12
* Base class for all subjects that act as both Observable and Observer
13
*/
14
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
15
/**
16
* Returns true if this subject has any observers
17
*/
18
public abstract boolean hasObservers();
19
20
/**
21
* Returns true if this subject has terminated with an error
22
*/
23
public abstract boolean hasThrowable();
24
25
/**
26
* Returns the terminal error if hasThrowable() returns true
27
*/
28
public abstract Throwable getThrowable();
29
30
/**
31
* Returns true if this subject has completed successfully
32
*/
33
public abstract boolean hasComplete();
34
35
/**
36
* Converts this subject to a serialized version (thread-safe)
37
*/
38
public final Subject<T> toSerialized();
39
}
40
```
41
42
### PublishSubject
43
44
Multicasts items to current observers only.
45
46
```java { .api }
47
/**
48
* Subject that multicasts items to currently subscribed observers
49
* Does not replay any items to new subscribers
50
*/
51
public final class PublishSubject<T> extends Subject<T> {
52
/**
53
* Creates a new PublishSubject
54
*/
55
public static <T> PublishSubject<T> create();
56
57
/**
58
* Returns the number of current observers
59
*/
60
public int observerCount();
61
62
// Inherits Observer methods
63
public void onSubscribe(Disposable d);
64
public void onNext(T t);
65
public void onError(Throwable t);
66
public void onComplete();
67
}
68
```
69
70
### BehaviorSubject
71
72
Replays the latest value to new subscribers.
73
74
```java { .api }
75
/**
76
* Subject that replays the latest emitted item to new subscribers
77
* Always has a current value (either initial or most recent)
78
*/
79
public final class BehaviorSubject<T> extends Subject<T> {
80
/**
81
* Creates a BehaviorSubject with an initial value
82
*/
83
public static <T> BehaviorSubject<T> createDefault(T defaultValue);
84
85
/**
86
* Creates a BehaviorSubject without an initial value
87
*/
88
public static <T> BehaviorSubject<T> create();
89
90
/**
91
* Returns the current value if available
92
*/
93
public T getValue();
94
95
/**
96
* Returns true if this subject has a current value
97
*/
98
public boolean hasValue();
99
100
/**
101
* Returns the number of current observers
102
*/
103
public int observerCount();
104
}
105
```
106
107
### ReplaySubject
108
109
Replays all or a subset of emitted items to new subscribers.
110
111
```java { .api }
112
/**
113
* Subject that replays emitted items to new subscribers
114
* Can buffer all items or limit by count/time
115
*/
116
public final class ReplaySubject<T> extends Subject<T> {
117
/**
118
* Creates a ReplaySubject that buffers all items
119
*/
120
public static <T> ReplaySubject<T> create();
121
122
/**
123
* Creates a ReplaySubject with a maximum buffer size
124
*/
125
public static <T> ReplaySubject<T> create(int bufferSize);
126
127
/**
128
* Creates a ReplaySubject that buffers items for a time window
129
*/
130
public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit, Scheduler scheduler);
131
132
/**
133
* Creates a ReplaySubject with both size and time limits
134
*/
135
public static <T> ReplaySubject<T> createWithTimeAndSize(long maxAge, TimeUnit unit, Scheduler scheduler, int maxSize);
136
137
/**
138
* Returns the current number of buffered items
139
*/
140
public int size();
141
142
/**
143
* Returns the current buffered values as an array
144
*/
145
public Object[] getValues();
146
147
/**
148
* Returns the current buffered values as a typed array
149
*/
150
public T[] getValues(T[] array);
151
152
/**
153
* Returns the number of current observers
154
*/
155
public int observerCount();
156
}
157
```
158
159
### AsyncSubject
160
161
Replays only the final value to subscribers.
162
163
```java { .api }
164
/**
165
* Subject that only emits the last value when it completes
166
* Emits nothing if it terminates with an error
167
*/
168
public final class AsyncSubject<T> extends Subject<T> {
169
/**
170
* Creates a new AsyncSubject
171
*/
172
public static <T> AsyncSubject<T> create();
173
174
/**
175
* Returns the final value if the subject has completed successfully
176
*/
177
public T getValue();
178
179
/**
180
* Returns true if this subject has a final value
181
*/
182
public boolean hasValue();
183
184
/**
185
* Returns the number of current observers
186
*/
187
public int observerCount();
188
}
189
```
190
191
### UnicastSubject
192
193
Single-observer subject with optional buffering.
194
195
```java { .api }
196
/**
197
* Subject that allows only one observer and buffers items until subscription
198
*/
199
public final class UnicastSubject<T> extends Subject<T> {
200
/**
201
* Creates a UnicastSubject with unlimited buffering
202
*/
203
public static <T> UnicastSubject<T> create();
204
205
/**
206
* Creates a UnicastSubject with a capacity hint and cleanup callback
207
*/
208
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate);
209
210
/**
211
* Creates a UnicastSubject with cleanup callback and delay error flag
212
*/
213
public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate, boolean delayError);
214
}
215
```
216
217
### Single, Maybe, and Completable Subjects
218
219
Subjects for other reactive types.
220
221
```java { .api }
222
/**
223
* Subject for Single reactive type
224
*/
225
public final class SingleSubject<T> extends Single<T> implements SingleObserver<T> {
226
public static <T> SingleSubject<T> create();
227
public boolean hasObservers();
228
public boolean hasValue();
229
public boolean hasThrowable();
230
public T getValue();
231
public Throwable getThrowable();
232
public void onSubscribe(Disposable d);
233
public void onSuccess(T value);
234
public void onError(Throwable e);
235
}
236
237
/**
238
* Subject for Maybe reactive type
239
*/
240
public final class MaybeSubject<T> extends Maybe<T> implements MaybeObserver<T> {
241
public static <T> MaybeSubject<T> create();
242
public boolean hasObservers();
243
public boolean hasValue();
244
public boolean hasComplete();
245
public boolean hasThrowable();
246
public T getValue();
247
public Throwable getThrowable();
248
public void onSubscribe(Disposable d);
249
public void onSuccess(T value);
250
public void onError(Throwable e);
251
public void onComplete();
252
}
253
254
/**
255
* Subject for Completable reactive type
256
*/
257
public final class CompletableSubject extends Completable implements CompletableObserver {
258
public static CompletableSubject create();
259
public boolean hasObservers();
260
public boolean hasComplete();
261
public boolean hasThrowable();
262
public Throwable getThrowable();
263
public void onSubscribe(Disposable d);
264
public void onComplete();
265
public void onError(Throwable e);
266
}
267
```
268
269
## Usage Examples
270
271
**PublishSubject - Live Event Broadcasting:**
272
273
```java
274
import io.reactivex.subjects.PublishSubject;
275
276
PublishSubject<String> eventBus = PublishSubject.create();
277
278
// First subscriber
279
eventBus.subscribe(event -> System.out.println("Subscriber 1: " + event));
280
281
// Emit events
282
eventBus.onNext("Event 1");
283
eventBus.onNext("Event 2");
284
285
// Second subscriber (won't receive previous events)
286
eventBus.subscribe(event -> System.out.println("Subscriber 2: " + event));
287
288
eventBus.onNext("Event 3"); // Both subscribers receive this
289
290
// Clean termination
291
eventBus.onComplete();
292
```
293
294
**BehaviorSubject - Current State Management:**
295
296
```java
297
import io.reactivex.subjects.BehaviorSubject;
298
299
// User state management
300
BehaviorSubject<String> userState = BehaviorSubject.createDefault("logged_out");
301
302
// UI component subscribes
303
userState.subscribe(state -> System.out.println("UI: User is " + state));
304
305
// State changes
306
userState.onNext("logging_in");
307
userState.onNext("logged_in");
308
309
// New component subscribes and immediately gets current state
310
userState.subscribe(state -> System.out.println("New Component: User is " + state));
311
312
// Check current state
313
if (userState.hasValue()) {
314
System.out.println("Current state: " + userState.getValue());
315
}
316
```
317
318
**ReplaySubject - Event History:**
319
320
```java
321
import io.reactivex.subjects.ReplaySubject;
322
323
// Replay last 3 events
324
ReplaySubject<String> history = ReplaySubject.create(3);
325
326
// Emit some events
327
history.onNext("Event 1");
328
history.onNext("Event 2");
329
history.onNext("Event 3");
330
history.onNext("Event 4");
331
history.onNext("Event 5");
332
333
// New subscriber gets last 3 events
334
history.subscribe(event -> System.out.println("Late subscriber: " + event));
335
// Output: Event 3, Event 4, Event 5
336
337
// Time-based replay
338
ReplaySubject<String> timeHistory = ReplaySubject.createWithTime(
339
2, TimeUnit.SECONDS, Schedulers.computation());
340
341
timeHistory.onNext("Old event");
342
Thread.sleep(3000);
343
timeHistory.onNext("Recent event");
344
345
// New subscriber only gets recent event
346
timeHistory.subscribe(event -> System.out.println("Time subscriber: " + event));
347
```
348
349
**AsyncSubject - Final Result:**
350
351
```java
352
import io.reactivex.subjects.AsyncSubject;
353
354
AsyncSubject<String> calculation = AsyncSubject.create();
355
356
// Subscribers only get the final result
357
calculation.subscribe(result -> System.out.println("Result 1: " + result));
358
359
// Emit intermediate values (not received by observers)
360
calculation.onNext("Step 1");
361
calculation.onNext("Step 2");
362
calculation.onNext("Final Result");
363
364
// Must complete for observers to receive the final value
365
calculation.onComplete();
366
367
// Late subscriber still gets the final result
368
calculation.subscribe(result -> System.out.println("Result 2: " + result));
369
```
370
371
**UnicastSubject - Single Observer with Buffering:**
372
373
```java
374
import io.reactivex.subjects.UnicastSubject;
375
376
UnicastSubject<Integer> unicast = UnicastSubject.create();
377
378
// Emit items before subscription (they get buffered)
379
unicast.onNext(1);
380
unicast.onNext(2);
381
unicast.onNext(3);
382
383
// First subscriber gets all buffered items
384
unicast.subscribe(value -> System.out.println("Unicast: " + value));
385
386
// Subsequent items delivered immediately
387
unicast.onNext(4);
388
unicast.onNext(5);
389
390
// Only one observer allowed - second subscription will error
391
// unicast.subscribe(value -> System.out.println("Second: " + value)); // IllegalStateException
392
```
393
394
**SingleSubject - Async Result:**
395
396
```java
397
import io.reactivex.subjects.SingleSubject;
398
399
SingleSubject<String> asyncResult = SingleSubject.create();
400
401
// Multiple subscribers can wait for the same result
402
asyncResult.subscribe(result -> System.out.println("Observer 1: " + result));
403
asyncResult.subscribe(result -> System.out.println("Observer 2: " + result));
404
405
// Simulate async operation
406
new Thread(() -> {
407
try {
408
Thread.sleep(2000);
409
asyncResult.onSuccess("Async operation completed");
410
} catch (Exception e) {
411
asyncResult.onError(e);
412
}
413
}).start();
414
```
415
416
**Thread Safety with Serialized Subjects:**
417
418
```java
419
PublishSubject<String> unsafeSubject = PublishSubject.create();
420
Subject<String> safeSubject = unsafeSubject.toSerialized();
421
422
// Multiple threads can safely emit to serialized subject
423
for (int i = 0; i < 10; i++) {
424
final int threadId = i;
425
new Thread(() -> {
426
for (int j = 0; j < 100; j++) {
427
safeSubject.onNext("Thread " + threadId + ", Item " + j);
428
}
429
}).start();
430
}
431
432
safeSubject.subscribe(item -> System.out.println("Received: " + item));
433
```
434
435
**Error Handling with Subjects:**
436
437
```java
438
PublishSubject<String> subject = PublishSubject.create();
439
440
subject.subscribe(
441
item -> System.out.println("Item: " + item),
442
error -> System.err.println("Error: " + error.getMessage()),
443
() -> System.out.println("Completed")
444
);
445
446
subject.onNext("Item 1");
447
subject.onNext("Item 2");
448
449
// Error terminates the subject
450
subject.onError(new RuntimeException("Something went wrong"));
451
452
// No more items can be emitted after error
453
// subject.onNext("Item 3"); // This would be ignored
454
```
455
456
**Bridging Callback APIs with Subjects:**
457
458
```java
459
// Bridge traditional callback API to reactive streams
460
public class WeatherService {
461
private final PublishSubject<Weather> weatherUpdates = PublishSubject.create();
462
463
public Observable<Weather> getWeatherUpdates() {
464
return weatherUpdates.asObservable(); // Hide subject implementation
465
}
466
467
// Called by external weather API
468
public void onWeatherUpdate(Weather weather) {
469
weatherUpdates.onNext(weather);
470
}
471
472
public void onWeatherError(Exception error) {
473
weatherUpdates.onError(error);
474
}
475
}
476
477
// Usage
478
WeatherService service = new WeatherService();
479
service.getWeatherUpdates()
480
.subscribe(weather -> System.out.println("Weather: " + weather));
481
```
482
483
## Subject Guidelines
484
485
**When to use each subject:**
486
487
- **PublishSubject**: Event buses, live data streams, notifications
488
- **BehaviorSubject**: State management, current values, configuration
489
- **ReplaySubject**: Event history, audit logs, caching recent data
490
- **AsyncSubject**: Final results, completion notifications
491
- **UnicastSubject**: Single consumer scenarios, back-pressure handling
492
493
**Best Practices:**
494
495
1. Always use `toSerialized()` when multiple threads emit to a subject
496
2. Prefer `asObservable()` to hide the subject from consumers
497
3. Handle termination properly (onComplete/onError)
498
4. Be careful with memory leaks in ReplaySubject
499
5. Consider using Processors for backpressure-aware subjects
500
6. Don't emit to subjects after they've terminated
501
502
## Types
503
504
```java { .api }
505
/**
506
* Exception thrown when trying to subscribe multiple observers to UnicastSubject
507
*/
508
public final class IllegalStateException extends RuntimeException {
509
// Standard exception
510
}
511
512
/**
513
* Base interfaces for all subjects
514
*/
515
public interface ObservableSource<T> {
516
void subscribe(Observer<? super T> observer);
517
}
518
519
public interface Observer<T> {
520
void onSubscribe(Disposable d);
521
void onNext(T t);
522
void onError(Throwable e);
523
void onComplete();
524
}
525
```