0
# Subjects
1
2
Special observables that can act as both observer and observable, enabling multicasting patterns where multiple subscribers receive the same values.
3
4
## Capabilities
5
6
### Subject
7
8
Basic subject that multicasts values to multiple subscribers.
9
10
```typescript { .api }
11
/**
12
* Subject acts as both Observable and Observer, enabling multicasting
13
*/
14
class Subject<T> extends Observable<T> implements Observer<T> {
15
/**
16
* Whether the subject has been closed/completed
17
*/
18
readonly closed: boolean;
19
20
/**
21
* Whether the subject has observers
22
*/
23
readonly hasError: boolean;
24
25
/**
26
* Whether the subject is currently being observed
27
*/
28
readonly isStopped: boolean;
29
30
/**
31
* Current observers count
32
*/
33
readonly observers: Observer<T>[];
34
35
/**
36
* Emit a value to all subscribers
37
* @param value - Value to emit
38
*/
39
next(value: T): void;
40
41
/**
42
* Emit an error to all subscribers and complete the subject
43
* @param err - Error to emit
44
*/
45
error(err: any): void;
46
47
/**
48
* Complete the subject, notifying all subscribers
49
*/
50
complete(): void;
51
52
/**
53
* Unsubscribe all observers and clean up resources
54
*/
55
unsubscribe(): void;
56
57
/**
58
* Create observable that shares this subject's notifications
59
* @returns Observable instance
60
*/
61
asObservable(): Observable<T>;
62
}
63
```
64
65
**Usage Examples:**
66
67
```typescript
68
import { Subject } from "rxjs";
69
70
const subject = new Subject<string>();
71
72
// Multiple subscribers
73
subject.subscribe(value => console.log('Observer A:', value));
74
subject.subscribe(value => console.log('Observer B:', value));
75
76
// Emit values - both observers receive them
77
subject.next('Hello');
78
subject.next('World');
79
subject.complete();
80
81
// New subscriber after completion receives nothing
82
subject.subscribe(value => console.log('Observer C:', value)); // No output
83
```
84
85
### BehaviorSubject
86
87
Subject that stores the current value and emits it to new subscribers immediately.
88
89
```typescript { .api }
90
/**
91
* Subject that holds a current value and emits it immediately to new subscribers
92
*/
93
class BehaviorSubject<T> extends Subject<T> {
94
/**
95
* Create BehaviorSubject with initial value
96
* @param initialValue - Initial value to store and emit
97
*/
98
constructor(initialValue: T);
99
100
/**
101
* Current value held by the subject
102
*/
103
readonly value: T;
104
105
/**
106
* Get current value (synchronous)
107
* @returns Current stored value
108
*/
109
getValue(): T;
110
}
111
```
112
113
**Usage Examples:**
114
115
```typescript
116
import { BehaviorSubject } from "rxjs";
117
118
// Create with initial value
119
const behaviorSubject = new BehaviorSubject<number>(0);
120
121
// New subscriber immediately gets current value (0)
122
behaviorSubject.subscribe(val => console.log('Subscriber A:', val));
123
124
// Emit new values
125
behaviorSubject.next(1);
126
behaviorSubject.next(2);
127
128
// New subscriber gets current value (2) immediately
129
behaviorSubject.subscribe(val => console.log('Subscriber B:', val));
130
131
// Access current value synchronously
132
console.log('Current value:', behaviorSubject.value); // 2
133
console.log('Current value (method):', behaviorSubject.getValue()); // 2
134
```
135
136
### ReplaySubject
137
138
Subject that stores recent values and replays them to new subscribers.
139
140
```typescript { .api }
141
/**
142
* Subject that replays recent values to new subscribers
143
*/
144
class ReplaySubject<T> extends Subject<T> {
145
/**
146
* Create ReplaySubject with buffer configuration
147
* @param bufferSize - Number of values to buffer (default: Infinity)
148
* @param windowTime - Time in ms to keep values (default: Infinity)
149
* @param timestampProvider - Custom timestamp provider
150
*/
151
constructor(
152
bufferSize?: number,
153
windowTime?: number,
154
timestampProvider?: TimestampProvider
155
);
156
}
157
158
interface TimestampProvider {
159
now(): number;
160
}
161
```
162
163
**Usage Examples:**
164
165
```typescript
166
import { ReplaySubject } from "rxjs";
167
168
// Replay last 3 values
169
const replaySubject = new ReplaySubject<number>(3);
170
171
// Emit some values
172
replaySubject.next(1);
173
replaySubject.next(2);
174
replaySubject.next(3);
175
replaySubject.next(4);
176
177
// New subscriber gets last 3 values (2, 3, 4)
178
replaySubject.subscribe(val => console.log('Subscriber A:', val));
179
180
// Time-based replay (last 1 second)
181
const timeReplay = new ReplaySubject<string>(Infinity, 1000);
182
timeReplay.next('old');
183
setTimeout(() => {
184
timeReplay.next('recent');
185
// New subscriber only gets 'recent' if subscribed after 1 second
186
timeReplay.subscribe(val => console.log('Time subscriber:', val));
187
}, 1100);
188
```
189
190
### AsyncSubject
191
192
Subject that only emits the last value when completed.
193
194
```typescript { .api }
195
/**
196
* Subject that only emits the last value when the sequence completes
197
*/
198
class AsyncSubject<T> extends Subject<T> {
199
/**
200
* Create AsyncSubject
201
*/
202
constructor();
203
}
204
```
205
206
**Usage Examples:**
207
208
```typescript
209
import { AsyncSubject } from "rxjs";
210
211
const asyncSubject = new AsyncSubject<number>();
212
213
// Subscribers don't receive values until completion
214
asyncSubject.subscribe(val => console.log('Subscriber A:', val));
215
216
asyncSubject.next(1);
217
asyncSubject.next(2);
218
asyncSubject.next(3); // Only this value will be emitted
219
220
asyncSubject.subscribe(val => console.log('Subscriber B:', val));
221
222
// Complete to emit the last value (3) to all subscribers
223
asyncSubject.complete();
224
225
// New subscriber after completion gets the last value
226
asyncSubject.subscribe(val => console.log('Subscriber C:', val)); // 3
227
```
228
229
## Advanced Patterns
230
231
### Subject as Event Bus
232
233
```typescript
234
import { Subject, filter } from "rxjs";
235
236
interface AppEvent {
237
type: string;
238
payload: any;
239
}
240
241
const eventBus = new Subject<AppEvent>();
242
243
// Subscribe to specific event types
244
eventBus.pipe(
245
filter(event => event.type === 'user-login')
246
).subscribe(event => {
247
console.log('User logged in:', event.payload);
248
});
249
250
eventBus.pipe(
251
filter(event => event.type === 'user-logout')
252
).subscribe(event => {
253
console.log('User logged out');
254
});
255
256
// Emit events
257
eventBus.next({ type: 'user-login', payload: { userId: 123 } });
258
eventBus.next({ type: 'user-logout', payload: null });
259
```
260
261
### State Management with BehaviorSubject
262
263
```typescript
264
import { BehaviorSubject, map } from "rxjs";
265
266
interface AppState {
267
user: { id: number; name: string } | null;
268
loading: boolean;
269
}
270
271
class StateService {
272
private state$ = new BehaviorSubject<AppState>({
273
user: null,
274
loading: false
275
});
276
277
// Expose read-only state
278
readonly state = this.state$.asObservable();
279
280
// Specific selectors
281
readonly user$ = this.state$.pipe(map(state => state.user));
282
readonly loading$ = this.state$.pipe(map(state => state.loading));
283
284
setUser(user: { id: number; name: string }) {
285
this.state$.next({
286
...this.state$.value,
287
user
288
});
289
}
290
291
setLoading(loading: boolean) {
292
this.state$.next({
293
...this.state$.value,
294
loading
295
});
296
}
297
}
298
```
299
300
## Types
301
302
```typescript { .api }
303
interface Observer<T> {
304
next: (value: T) => void;
305
error: (err: any) => void;
306
complete: () => void;
307
}
308
309
interface SubjectLike<T> extends Observer<T> {
310
asObservable(): Observable<T>;
311
}
312
313
interface TimestampProvider {
314
now(): number;
315
}
316
```