0
# Subjects and Processors
1
2
Subjects and Processors are hot observables that act as both observer and observable, enabling multicasting of events to multiple subscribers. They bridge the gap between imperative and reactive programming by allowing manual event emission.
3
4
## Capabilities
5
6
### Observable Subjects
7
8
Hot observables for multicasting without backpressure support.
9
10
```java { .api }
11
/**
12
* Subject that emits to all current subscribers
13
*/
14
public final class PublishSubject<T> extends Subject<T> {
15
/**
16
* Create a new PublishSubject
17
* @return new PublishSubject instance
18
*/
19
public static <T> PublishSubject<T> create();
20
21
/**
22
* Emit an item to all subscribers
23
* @param value the item to emit
24
*/
25
public void onNext(T value);
26
27
/**
28
* Emit an error to all subscribers and terminate
29
* @param error the error to emit
30
*/
31
public void onError(Throwable error);
32
33
/**
34
* Complete all subscribers
35
*/
36
public void onComplete();
37
38
/**
39
* Check if this subject has subscribers
40
* @return true if has subscribers, false otherwise
41
*/
42
public boolean hasObservers();
43
44
/**
45
* Check if this subject has completed
46
* @return true if completed, false otherwise
47
*/
48
public boolean hasComplete();
49
50
/**
51
* Check if this subject has an error
52
* @return true if terminated with error, false otherwise
53
*/
54
public boolean hasThrowable();
55
56
/**
57
* Get the terminating error if any
58
* @return the error that terminated this subject, null if none
59
*/
60
public Throwable getThrowable();
61
}
62
63
/**
64
* Subject that replays items to new subscribers
65
*/
66
public final class ReplaySubject<T> extends Subject<T> {
67
/**
68
* Create an unbounded ReplaySubject
69
* @return new ReplaySubject that replays all items
70
*/
71
public static <T> ReplaySubject<T> create();
72
73
/**
74
* Create a size-bounded ReplaySubject
75
* @param maxSize maximum number of items to replay
76
* @return new ReplaySubject with size limit
77
*/
78
public static <T> ReplaySubject<T> createWithSize(int maxSize);
79
80
/**
81
* Create a time-bounded ReplaySubject
82
* @param maxAge maximum age of items to replay
83
* @param unit time unit for maxAge
84
* @return new ReplaySubject with time limit
85
*/
86
public static <T> ReplaySubject<T> createWithTime(long maxAge, TimeUnit unit);
87
88
/**
89
* Create a time and size bounded ReplaySubject
90
* @param maxSize maximum number of items to replay
91
* @param maxAge maximum age of items to replay
92
* @param unit time unit for maxAge
93
* @return new ReplaySubject with both limits
94
*/
95
public static <T> ReplaySubject<T> createWithTimeAndSize(int maxSize, long maxAge, TimeUnit unit);
96
97
/**
98
* Get all cached values
99
* @return array of all cached values
100
*/
101
public Object[] getValues();
102
103
/**
104
* Get all cached values as specific type
105
* @param array array to fill with values
106
* @return array filled with cached values
107
*/
108
public T[] getValues(T[] array);
109
110
/**
111
* Get count of cached values
112
* @return number of cached values
113
*/
114
public int size();
115
}
116
117
/**
118
* Subject that only emits the last item to new subscribers
119
*/
120
public final class BehaviorSubject<T> extends Subject<T> {
121
/**
122
* Create a BehaviorSubject without initial value
123
* @return new BehaviorSubject
124
*/
125
public static <T> BehaviorSubject<T> create();
126
127
/**
128
* Create a BehaviorSubject with initial value
129
* @param defaultValue the initial value
130
* @return new BehaviorSubject with initial value
131
*/
132
public static <T> BehaviorSubject<T> createDefault(T defaultValue);
133
134
/**
135
* Get the current value if any
136
* @return the current value or null if none
137
*/
138
public T getValue();
139
140
/**
141
* Check if this subject has a current value
142
* @return true if has current value, false otherwise
143
*/
144
public boolean hasValue();
145
}
146
147
/**
148
* Subject that only emits items after onComplete is called
149
*/
150
public final class AsyncSubject<T> extends Subject<T> {
151
/**
152
* Create a new AsyncSubject
153
* @return new AsyncSubject instance
154
*/
155
public static <T> AsyncSubject<T> create();
156
157
/**
158
* Get the final value if completed successfully
159
* @return the final value or null if none or not completed
160
*/
161
public T getValue();
162
163
/**
164
* Check if this subject has a final value
165
* @return true if has final value, false otherwise
166
*/
167
public boolean hasValue();
168
}
169
```
170
171
### Flowable Processors
172
173
Hot flowables with backpressure support for Reactive Streams.
174
175
```java { .api }
176
/**
177
* Processor that emits to all current subscribers with backpressure
178
*/
179
public final class PublishProcessor<T> extends FlowableProcessor<T> {
180
/**
181
* Create a new PublishProcessor
182
* @return new PublishProcessor instance
183
*/
184
public static <T> PublishProcessor<T> create();
185
186
/**
187
* Emit an item to all subscribers
188
* @param value the item to emit
189
*/
190
public void onNext(T value);
191
192
/**
193
* Emit an error to all subscribers and terminate
194
* @param error the error to emit
195
*/
196
public void onError(Throwable error);
197
198
/**
199
* Complete all subscribers
200
*/
201
public void onComplete();
202
203
/**
204
* Offer an item with backpressure handling
205
* @param value the item to offer
206
* @return true if accepted, false if would violate backpressure
207
*/
208
public boolean offer(T value);
209
210
/**
211
* Check if this processor has subscribers
212
* @return true if has subscribers, false otherwise
213
*/
214
public boolean hasSubscribers();
215
}
216
217
/**
218
* Processor that replays items to new subscribers with backpressure
219
*/
220
public final class ReplayProcessor<T> extends FlowableProcessor<T> {
221
/**
222
* Create an unbounded ReplayProcessor
223
* @return new ReplayProcessor that replays all items
224
*/
225
public static <T> ReplayProcessor<T> create();
226
227
/**
228
* Create a size-bounded ReplayProcessor
229
* @param maxSize maximum number of items to replay
230
* @return new ReplayProcessor with size limit
231
*/
232
public static <T> ReplayProcessor<T> createWithSize(int maxSize);
233
234
/**
235
* Create a time-bounded ReplayProcessor
236
* @param maxAge maximum age of items to replay
237
* @param unit time unit for maxAge
238
* @return new ReplayProcessor with time limit
239
*/
240
public static <T> ReplayProcessor<T> createWithTime(long maxAge, TimeUnit unit);
241
242
/**
243
* Get all cached values
244
* @return array of all cached values
245
*/
246
public Object[] getValues();
247
248
/**
249
* Get count of cached values
250
* @return number of cached values
251
*/
252
public int size();
253
}
254
255
/**
256
* Processor that only emits the last item to new subscribers
257
*/
258
public final class BehaviorProcessor<T> extends FlowableProcessor<T> {
259
/**
260
* Create a BehaviorProcessor without initial value
261
* @return new BehaviorProcessor
262
*/
263
public static <T> BehaviorProcessor<T> create();
264
265
/**
266
* Create a BehaviorProcessor with initial value
267
* @param defaultValue the initial value
268
* @return new BehaviorProcessor with initial value
269
*/
270
public static <T> BehaviorProcessor<T> createDefault(T defaultValue);
271
272
/**
273
* Get the current value if any
274
* @return the current value or null if none
275
*/
276
public T getValue();
277
278
/**
279
* Check if this processor has a current value
280
* @return true if has current value, false otherwise
281
*/
282
public boolean hasValue();
283
}
284
285
/**
286
* Processor that only emits items after onComplete is called
287
*/
288
public final class AsyncProcessor<T> extends FlowableProcessor<T> {
289
/**
290
* Create a new AsyncProcessor
291
* @return new AsyncProcessor instance
292
*/
293
public static <T> AsyncProcessor<T> create();
294
295
/**
296
* Get the final value if completed successfully
297
* @return the final value or null if none or not completed
298
*/
299
public T getValue();
300
301
/**
302
* Check if this processor has a final value
303
* @return true if has final value, false otherwise
304
*/
305
public boolean hasValue();
306
}
307
```
308
309
### Single and Maybe Subjects
310
311
Subjects for single-value reactive types.
312
313
```java { .api }
314
/**
315
* Subject for Single operations
316
*/
317
public final class SingleSubject<T> extends Single<T> implements SingleObserver<T> {
318
/**
319
* Create a new SingleSubject
320
* @return new SingleSubject instance
321
*/
322
public static <T> SingleSubject<T> create();
323
324
/**
325
* Emit a success value to all observers
326
* @param value the value to emit
327
*/
328
public void onSuccess(T value);
329
330
/**
331
* Emit an error to all observers
332
* @param error the error to emit
333
*/
334
public void onError(Throwable error);
335
336
/**
337
* Check if this subject has observers
338
* @return true if has observers, false otherwise
339
*/
340
public boolean hasObservers();
341
342
/**
343
* Get the success value if any
344
* @return the success value or null if none
345
*/
346
public T getValue();
347
348
/**
349
* Get the error if any
350
* @return the error or null if none
351
*/
352
public Throwable getThrowable();
353
}
354
355
/**
356
* Subject for Maybe operations
357
*/
358
public final class MaybeSubject<T> extends Maybe<T> implements MaybeObserver<T> {
359
/**
360
* Create a new MaybeSubject
361
* @return new MaybeSubject instance
362
*/
363
public static <T> MaybeSubject<T> create();
364
365
/**
366
* Emit a success value to all observers
367
* @param value the value to emit
368
*/
369
public void onSuccess(T value);
370
371
/**
372
* Emit an error to all observers
373
* @param error the error to emit
374
*/
375
public void onError(Throwable error);
376
377
/**
378
* Complete all observers without emitting a value
379
*/
380
public void onComplete();
381
382
/**
383
* Check if this subject has observers
384
* @return true if has observers, false otherwise
385
*/
386
public boolean hasObservers();
387
388
/**
389
* Get the success value if any
390
* @return the success value or null if none
391
*/
392
public T getValue();
393
}
394
395
/**
396
* Subject for Completable operations
397
*/
398
public final class CompletableSubject extends Completable implements CompletableObserver {
399
/**
400
* Create a new CompletableSubject
401
* @return new CompletableSubject instance
402
*/
403
public static CompletableSubject create();
404
405
/**
406
* Complete all observers
407
*/
408
public void onComplete();
409
410
/**
411
* Emit an error to all observers
412
* @param error the error to emit
413
*/
414
public void onError(Throwable error);
415
416
/**
417
* Check if this subject has observers
418
* @return true if has observers, false otherwise
419
*/
420
public boolean hasObservers();
421
422
/**
423
* Get the error if any
424
* @return the error or null if none
425
*/
426
public Throwable getThrowable();
427
}
428
```
429
430
## Types
431
432
```java { .api }
433
/**
434
* Base class for all Observable subjects
435
*/
436
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
437
/**
438
* Convert this subject to a serialized version for thread safety
439
* @return serialized version of this subject
440
*/
441
public final Subject<T> toSerialized();
442
443
/**
444
* Check if this subject has observers
445
* @return true if has observers, false otherwise
446
*/
447
public abstract boolean hasObservers();
448
449
/**
450
* Check if this subject has completed normally
451
* @return true if completed, false otherwise
452
*/
453
public abstract boolean hasComplete();
454
455
/**
456
* Check if this subject terminated with an error
457
* @return true if terminated with error, false otherwise
458
*/
459
public abstract boolean hasThrowable();
460
461
/**
462
* Get the terminating error if any
463
* @return the error or null if none
464
*/
465
public abstract Throwable getThrowable();
466
}
467
468
/**
469
* Base class for all Flowable processors
470
*/
471
public abstract class FlowableProcessor<T> extends Flowable<T> implements FlowableSubscriber<T>, Processor<T, T> {
472
/**
473
* Convert this processor to a serialized version for thread safety
474
* @return serialized version of this processor
475
*/
476
public final FlowableProcessor<T> toSerialized();
477
478
/**
479
* Check if this processor has subscribers
480
* @return true if has subscribers, false otherwise
481
*/
482
public abstract boolean hasSubscribers();
483
484
/**
485
* Check if this processor has completed normally
486
* @return true if completed, false otherwise
487
*/
488
public abstract boolean hasComplete();
489
490
/**
491
* Check if this processor terminated with an error
492
* @return true if terminated with error, false otherwise
493
*/
494
public abstract boolean hasThrowable();
495
496
/**
497
* Get the terminating error if any
498
* @return the error or null if none
499
*/
500
public abstract Throwable getThrowable();
501
}
502
```
503
504
**Usage Examples:**
505
506
```java
507
import io.reactivex.rxjava3.subjects.*;
508
import io.reactivex.rxjava3.processors.*;
509
import io.reactivex.rxjava3.core.*;
510
import java.util.concurrent.TimeUnit;
511
512
// PublishSubject - hot multicast
513
PublishSubject<String> publishSubject = PublishSubject.create();
514
515
// Subscribe multiple observers
516
publishSubject.subscribe(value -> System.out.println("Observer 1: " + value));
517
publishSubject.subscribe(value -> System.out.println("Observer 2: " + value));
518
519
// Emit items
520
publishSubject.onNext("Hello");
521
publishSubject.onNext("World");
522
523
// Late subscriber won't receive previous items
524
publishSubject.subscribe(value -> System.out.println("Late Observer: " + value));
525
publishSubject.onNext("Late Item");
526
527
publishSubject.onComplete();
528
529
// BehaviorSubject - remembers last value
530
BehaviorSubject<String> behaviorSubject = BehaviorSubject.createDefault("Initial");
531
532
behaviorSubject.subscribe(value -> System.out.println("Behavior 1: " + value));
533
behaviorSubject.onNext("Update 1");
534
535
// Late subscriber gets the last value
536
behaviorSubject.subscribe(value -> System.out.println("Behavior 2: " + value));
537
behaviorSubject.onNext("Update 2");
538
539
// ReplaySubject - replays all previous items
540
ReplaySubject<String> replaySubject = ReplaySubject.create();
541
542
replaySubject.onNext("Item 1");
543
replaySubject.onNext("Item 2");
544
545
// Late subscriber gets all previous items
546
replaySubject.subscribe(value -> System.out.println("Replay: " + value));
547
replaySubject.onNext("Item 3");
548
549
// AsyncSubject - only emits the last item when completed
550
AsyncSubject<String> asyncSubject = AsyncSubject.create();
551
552
asyncSubject.subscribe(value -> System.out.println("Async: " + value));
553
asyncSubject.onNext("First");
554
asyncSubject.onNext("Second");
555
asyncSubject.onNext("Last");
556
asyncSubject.onComplete(); // Only "Last" is emitted
557
558
// SingleSubject for single-value operations
559
SingleSubject<String> singleSubject = SingleSubject.create();
560
561
singleSubject.subscribe(System.out::println);
562
singleSubject.onSuccess("Single Value");
563
564
// MaybeSubject for optional values
565
MaybeSubject<String> maybeSubject = MaybeSubject.create();
566
567
maybeSubject.subscribe(
568
value -> System.out.println("Maybe success: " + value),
569
error -> System.err.println("Maybe error: " + error),
570
() -> System.out.println("Maybe complete (empty)")
571
);
572
573
// Can emit value or complete empty
574
if (Math.random() > 0.5) {
575
maybeSubject.onSuccess("Maybe Value");
576
} else {
577
maybeSubject.onComplete();
578
}
579
580
// CompletableSubject for completion-only operations
581
CompletableSubject completableSubject = CompletableSubject.create();
582
583
completableSubject.subscribe(
584
() -> System.out.println("Completable completed"),
585
error -> System.err.println("Completable error: " + error)
586
);
587
588
completableSubject.onComplete();
589
590
// Processors for backpressured streams
591
PublishProcessor<Integer> publishProcessor = PublishProcessor.create();
592
593
publishProcessor.subscribe(value -> System.out.println("Processor: " + value));
594
595
// Emit with backpressure handling
596
for (int i = 0; i < 5; i++) {
597
if (publishProcessor.offer(i)) {
598
System.out.println("Offered: " + i);
599
} else {
600
System.out.println("Backpressure: could not offer " + i);
601
}
602
}
603
604
publishProcessor.onComplete();
605
606
// Thread-safe serialized subjects
607
Subject<String> serializedSubject = PublishSubject.<String>create().toSerialized();
608
609
// Safe to call from multiple threads
610
new Thread(() -> serializedSubject.onNext("Thread 1")).start();
611
new Thread(() -> serializedSubject.onNext("Thread 2")).start();
612
613
// Bridge between imperative and reactive code
614
class EventBus {
615
private final PublishSubject<String> eventSubject = PublishSubject.create();
616
617
public Observable<String> getEvents() {
618
return eventSubject;
619
}
620
621
public void publishEvent(String event) {
622
eventSubject.onNext(event);
623
}
624
}
625
626
EventBus eventBus = new EventBus();
627
eventBus.getEvents().subscribe(event -> System.out.println("Event: " + event));
628
eventBus.publishEvent("User logged in");
629
eventBus.publishEvent("Data updated");
630
631
// State management with BehaviorSubject
632
class StateManager<T> {
633
private final BehaviorSubject<T> stateSubject;
634
635
public StateManager(T initialState) {
636
this.stateSubject = BehaviorSubject.createDefault(initialState);
637
}
638
639
public Observable<T> getState() {
640
return stateSubject.distinctUntilChanged();
641
}
642
643
public T getCurrentState() {
644
return stateSubject.getValue();
645
}
646
647
public void setState(T newState) {
648
stateSubject.onNext(newState);
649
}
650
}
651
652
StateManager<String> stateManager = new StateManager<>("Initial State");
653
stateManager.getState().subscribe(state -> System.out.println("State: " + state));
654
stateManager.setState("Updated State");
655
stateManager.setState("Final State");
656
```
657
658
## Subject Selection Guide
659
660
### When to Use Each Subject Type
661
662
- **`PublishSubject`**: Event bus, real-time notifications, hot observables
663
- **`BehaviorSubject`**: State management, current value access, configuration settings
664
- **`ReplaySubject`**: Caching, message replay, audit trails
665
- **`AsyncSubject`**: Final result computation, completion notifications
666
- **`SingleSubject`**: Single async operations, request/response patterns
667
- **`MaybeSubject`**: Optional async operations, cache lookups
668
- **`CompletableSubject`**: Fire-and-forget operations, cleanup tasks
669
670
### Best Practices
671
672
1. **Use `toSerialized()`** when accessing subjects from multiple threads
673
2. **Complete subjects** to free resources and notify subscribers
674
3. **Handle errors appropriately** as they terminate the subject
675
4. **Prefer BehaviorSubject** for state that needs current value access
676
5. **Use ReplaySubject with bounds** to prevent memory leaks
677
6. **Consider processors** for backpressured scenarios
678
7. **Dispose of subscriptions** to prevent memory leaks in long-lived subjects