Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences
—
Special observables that can act as both observer and observable, enabling multicasting patterns where multiple subscribers receive the same values.
Basic subject that multicasts values to multiple subscribers.
/**
* Subject acts as both Observable and Observer, enabling multicasting
*/
class Subject<T> extends Observable<T> implements Observer<T> {
/**
* Whether the subject has been closed/completed
*/
readonly closed: boolean;
/**
* Whether the subject has observers
*/
readonly hasError: boolean;
/**
* Whether the subject is currently being observed
*/
readonly isStopped: boolean;
/**
* Current observers count
*/
readonly observers: Observer<T>[];
/**
* Emit a value to all subscribers
* @param value - Value to emit
*/
next(value: T): void;
/**
* Emit an error to all subscribers and complete the subject
* @param err - Error to emit
*/
error(err: any): void;
/**
* Complete the subject, notifying all subscribers
*/
complete(): void;
/**
* Unsubscribe all observers and clean up resources
*/
unsubscribe(): void;
/**
* Create observable that shares this subject's notifications
* @returns Observable instance
*/
asObservable(): Observable<T>;
}Usage Examples:
import { Subject } from "rxjs";
const subject = new Subject<string>();
// Multiple subscribers
subject.subscribe(value => console.log('Observer A:', value));
subject.subscribe(value => console.log('Observer B:', value));
// Emit values - both observers receive them
subject.next('Hello');
subject.next('World');
subject.complete();
// New subscriber after completion receives nothing
subject.subscribe(value => console.log('Observer C:', value)); // No outputSubject that stores the current value and emits it to new subscribers immediately.
/**
* Subject that holds a current value and emits it immediately to new subscribers
*/
class BehaviorSubject<T> extends Subject<T> {
/**
* Create BehaviorSubject with initial value
* @param initialValue - Initial value to store and emit
*/
constructor(initialValue: T);
/**
* Current value held by the subject
*/
readonly value: T;
/**
* Get current value (synchronous)
* @returns Current stored value
*/
getValue(): T;
}Usage Examples:
import { BehaviorSubject } from "rxjs";
// Create with initial value
const behaviorSubject = new BehaviorSubject<number>(0);
// New subscriber immediately gets current value (0)
behaviorSubject.subscribe(val => console.log('Subscriber A:', val));
// Emit new values
behaviorSubject.next(1);
behaviorSubject.next(2);
// New subscriber gets current value (2) immediately
behaviorSubject.subscribe(val => console.log('Subscriber B:', val));
// Access current value synchronously
console.log('Current value:', behaviorSubject.value); // 2
console.log('Current value (method):', behaviorSubject.getValue()); // 2Subject that stores recent values and replays them to new subscribers.
/**
* Subject that replays recent values to new subscribers
*/
class ReplaySubject<T> extends Subject<T> {
/**
* Create ReplaySubject with buffer configuration
* @param bufferSize - Number of values to buffer (default: Infinity)
* @param windowTime - Time in ms to keep values (default: Infinity)
* @param timestampProvider - Custom timestamp provider
*/
constructor(
bufferSize?: number,
windowTime?: number,
timestampProvider?: TimestampProvider
);
}
interface TimestampProvider {
now(): number;
}Usage Examples:
import { ReplaySubject } from "rxjs";
// Replay last 3 values
const replaySubject = new ReplaySubject<number>(3);
// Emit some values
replaySubject.next(1);
replaySubject.next(2);
replaySubject.next(3);
replaySubject.next(4);
// New subscriber gets last 3 values (2, 3, 4)
replaySubject.subscribe(val => console.log('Subscriber A:', val));
// Time-based replay (last 1 second)
const timeReplay = new ReplaySubject<string>(Infinity, 1000);
timeReplay.next('old');
setTimeout(() => {
timeReplay.next('recent');
// New subscriber only gets 'recent' if subscribed after 1 second
timeReplay.subscribe(val => console.log('Time subscriber:', val));
}, 1100);Subject that only emits the last value when completed.
/**
* Subject that only emits the last value when the sequence completes
*/
class AsyncSubject<T> extends Subject<T> {
/**
* Create AsyncSubject
*/
constructor();
}Usage Examples:
import { AsyncSubject } from "rxjs";
const asyncSubject = new AsyncSubject<number>();
// Subscribers don't receive values until completion
asyncSubject.subscribe(val => console.log('Subscriber A:', val));
asyncSubject.next(1);
asyncSubject.next(2);
asyncSubject.next(3); // Only this value will be emitted
asyncSubject.subscribe(val => console.log('Subscriber B:', val));
// Complete to emit the last value (3) to all subscribers
asyncSubject.complete();
// New subscriber after completion gets the last value
asyncSubject.subscribe(val => console.log('Subscriber C:', val)); // 3import { Subject, filter } from "rxjs";
interface AppEvent {
type: string;
payload: any;
}
const eventBus = new Subject<AppEvent>();
// Subscribe to specific event types
eventBus.pipe(
filter(event => event.type === 'user-login')
).subscribe(event => {
console.log('User logged in:', event.payload);
});
eventBus.pipe(
filter(event => event.type === 'user-logout')
).subscribe(event => {
console.log('User logged out');
});
// Emit events
eventBus.next({ type: 'user-login', payload: { userId: 123 } });
eventBus.next({ type: 'user-logout', payload: null });import { BehaviorSubject, map } from "rxjs";
interface AppState {
user: { id: number; name: string } | null;
loading: boolean;
}
class StateService {
private state$ = new BehaviorSubject<AppState>({
user: null,
loading: false
});
// Expose read-only state
readonly state = this.state$.asObservable();
// Specific selectors
readonly user$ = this.state$.pipe(map(state => state.user));
readonly loading$ = this.state$.pipe(map(state => state.loading));
setUser(user: { id: number; name: string }) {
this.state$.next({
...this.state$.value,
user
});
}
setLoading(loading: boolean) {
this.state$.next({
...this.state$.value,
loading
});
}
}interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
complete: () => void;
}
interface SubjectLike<T> extends Observer<T> {
asObservable(): Observable<T>;
}
interface TimestampProvider {
now(): number;
}