Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences
npx @tessl/cli install tessl/npm-rxjs@7.8.00
# RxJS
1
2
RxJS is the most comprehensive reactive programming library for JavaScript and TypeScript, providing powerful tools for composing asynchronous and event-based programs using observable sequences. It offers an extensive collection of operators for transforming, filtering, combining, and managing streams of data, with features including backpressure handling, error propagation, resource management, and comprehensive testing utilities.
3
4
## Package Information
5
6
- **Package Name**: rxjs
7
- **Package Type**: npm
8
- **Language**: TypeScript
9
- **Installation**: `npm install rxjs`
10
11
## Core Imports
12
13
```typescript
14
import { Observable, Subject, BehaviorSubject, map, filter, mergeMap } from "rxjs";
15
```
16
17
For operators specifically:
18
19
```typescript
20
import { map, filter, mergeMap, catchError } from "rxjs/operators";
21
```
22
23
For specialized modules:
24
25
```typescript
26
import { ajax } from "rxjs/ajax";
27
import { webSocket } from "rxjs/webSocket";
28
import { TestScheduler } from "rxjs/testing";
29
import { fromFetch } from "rxjs/fetch";
30
```
31
32
CommonJS:
33
34
```javascript
35
const { Observable, Subject, map, filter } = require("rxjs");
36
const { ajax } = require("rxjs/ajax");
37
```
38
39
## Basic Usage
40
41
```typescript
42
import { Observable, Subject, map, filter, mergeMap, catchError } from "rxjs";
43
44
// Create observables from various sources
45
const numbers$ = new Observable(subscriber => {
46
subscriber.next(1);
47
subscriber.next(2);
48
subscriber.next(3);
49
subscriber.complete();
50
});
51
52
// Use operators to transform data
53
const processedNumbers$ = numbers$.pipe(
54
map(x => x * 2),
55
filter(x => x > 2),
56
mergeMap(x => [x, x + 1])
57
);
58
59
// Subscribe to receive values
60
processedNumbers$.subscribe({
61
next: value => console.log(value),
62
error: err => console.error(err),
63
complete: () => console.log('Complete!')
64
});
65
66
// Work with subjects for multicasting
67
const subject = new Subject<string>();
68
subject.subscribe(value => console.log('Observer 1:', value));
69
subject.subscribe(value => console.log('Observer 2:', value));
70
subject.next('Hello World');
71
```
72
73
## Architecture
74
75
RxJS is built around several foundational concepts:
76
77
- **Observables**: Core reactive streams that emit values over time
78
- **Operators**: Pure functions for transforming observables (map, filter, merge, etc.)
79
- **Subjects**: Special observables that can multicast to multiple observers
80
- **Schedulers**: Control timing and concurrency of observable execution
81
- **Subscriptions**: Represent execution of observables and provide cleanup mechanisms
82
83
The library uses a pull-based approach where operators create new observables rather than mutating existing ones, enabling powerful composition patterns and predictable data flow.
84
85
## Capabilities
86
87
### Core Observable Types
88
89
Foundation classes for reactive programming including Observable, ConnectableObservable, and GroupedObservable for different stream patterns.
90
91
```typescript { .api }
92
class Observable<T> {
93
constructor(subscribe?: (observer: Observer<T>) => TeardownLogic);
94
subscribe(observer?: Partial<Observer<T>>): Subscription;
95
pipe<A>(op1: OperatorFunction<T, A>): Observable<A>;
96
pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Observable<B>;
97
}
98
99
class ConnectableObservable<T> extends Observable<T> {
100
connect(): Subscription;
101
refCount(): Observable<T>;
102
}
103
104
class GroupedObservable<K, T> extends Observable<T> {
105
readonly key: K;
106
}
107
```
108
109
[Core Types](./core-types.md)
110
111
### Subject Types
112
113
Special observables that can act as both observer and observable, enabling multicasting patterns.
114
115
```typescript { .api }
116
class Subject<T> extends Observable<T> {
117
next(value: T): void;
118
error(err: any): void;
119
complete(): void;
120
}
121
122
class BehaviorSubject<T> extends Subject<T> {
123
constructor(initialValue: T);
124
readonly value: T;
125
}
126
127
class ReplaySubject<T> extends Subject<T> {
128
constructor(bufferSize?: number, windowTime?: number, timestampProvider?: TimestampProvider);
129
}
130
131
class AsyncSubject<T> extends Subject<T> {}
132
```
133
134
[Subjects](./subjects.md)
135
136
### Observable Creation
137
138
Static functions for creating observables from various sources including events, promises, iterables, and custom logic.
139
140
```typescript { .api }
141
function from<T>(input: ObservableInput<T>): Observable<T>;
142
function of<T>(...args: T[]): Observable<T>;
143
function fromEvent<T>(target: any, eventName: string): Observable<T>;
144
function interval(period: number): Observable<number>;
145
function timer(dueTime: number | Date, period?: number): Observable<number>;
146
function combineLatest<T>(sources: ObservableInput<T>[]): Observable<T[]>;
147
function merge<T>(...sources: ObservableInput<T>[]): Observable<T>;
148
function forkJoin<T>(sources: ObservableInput<T>[]): Observable<T[]>;
149
```
150
151
[Observable Creation](./observable-creation.md)
152
153
### Filtering Operators
154
155
Operators for selecting specific values from observable streams based on various criteria.
156
157
```typescript { .api }
158
function filter<T>(predicate: (value: T, index: number) => boolean): OperatorFunction<T, T>;
159
function take<T>(count: number): OperatorFunction<T, T>;
160
function skip<T>(count: number): OperatorFunction<T, T>;
161
function first<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, T>;
162
function last<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, T>;
163
function distinct<T>(keySelector?: (value: T) => any): OperatorFunction<T, T>;
164
function debounceTime<T>(dueTime: number): OperatorFunction<T, T>;
165
function throttleTime<T>(duration: number): OperatorFunction<T, T>;
166
```
167
168
[Filtering Operators](./filtering-operators.md)
169
170
### Transformation Operators
171
172
Operators for transforming values emitted by observables into new forms and structures.
173
174
```typescript { .api }
175
function map<T, R>(project: (value: T, index: number) => R): OperatorFunction<T, R>;
176
function mergeMap<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>;
177
function switchMap<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>;
178
function concatMap<T, R>(project: (value: T, index: number) => ObservableInput<R>): OperatorFunction<T, R>;
179
function scan<T, R>(accumulator: (acc: R, value: T, index: number) => R, seed: R): OperatorFunction<T, R>;
180
function buffer<T>(closingNotifier: Observable<any>): OperatorFunction<T, T[]>;
181
function groupBy<T, K>(keySelector: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>;
182
```
183
184
[Transformation Operators](./transformation-operators.md)
185
186
### Combination Operators
187
188
Operators for combining multiple observable streams in various ways.
189
190
```typescript { .api }
191
function combineLatestWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;
192
function mergeWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;
193
function concatWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;
194
function startWith<T>(...values: T[]): OperatorFunction<T, T>;
195
function withLatestFrom<T, R>(...sources: ObservableInput<R>[]): OperatorFunction<T, [T, ...R[]]>;
196
function zipWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;
197
```
198
199
[Combination Operators](./combination-operators.md)
200
201
### Error Handling
202
203
Operators and patterns for handling errors in reactive streams with recovery mechanisms.
204
205
```typescript { .api }
206
function catchError<T, O>(selector: (err: any, caught: Observable<T>) => O): OperatorFunction<T, T | ObservedValueOf<O>>;
207
function retry<T>(count?: number): OperatorFunction<T, T>;
208
function retryWhen<T>(notifier: (errors: Observable<any>) => Observable<any>): OperatorFunction<T, T>;
209
210
class TimeoutError extends Error {
211
readonly name: "TimeoutError";
212
}
213
214
class EmptyError extends Error {
215
readonly name: "EmptyError";
216
}
217
```
218
219
[Error Handling](./error-handling.md)
220
221
### Schedulers
222
223
Control timing and concurrency of observable execution with various scheduling strategies.
224
225
```typescript { .api }
226
interface SchedulerLike {
227
schedule<T>(work: (this: SchedulerAction<T>, state?: T) => void, delay?: number, state?: T): Subscription;
228
}
229
230
const asyncScheduler: SchedulerLike;
231
const asapScheduler: SchedulerLike;
232
const queueScheduler: SchedulerLike;
233
const animationFrameScheduler: SchedulerLike;
234
235
class VirtualTimeScheduler extends AsyncScheduler {
236
flush(): void;
237
}
238
```
239
240
[Schedulers](./schedulers.md)
241
242
### AJAX Operations
243
244
HTTP request capabilities with full observable integration and response streaming.
245
246
```typescript { .api }
247
function ajax(request: string | AjaxConfig): Observable<AjaxResponse<any>>;
248
249
interface AjaxConfig {
250
url?: string;
251
method?: string;
252
headers?: Record<string, any>;
253
body?: any;
254
timeout?: number;
255
responseType?: XMLHttpRequestResponseType;
256
}
257
258
class AjaxResponse<T> {
259
readonly response: T;
260
readonly status: number;
261
readonly responseText: string;
262
readonly request: AjaxConfig;
263
}
264
```
265
266
[AJAX Operations](./ajax-operations.md)
267
268
### WebSocket Operations
269
270
Real-time bidirectional communication with WebSocket integration for reactive streams.
271
272
```typescript { .api }
273
function webSocket<T>(urlConfigOrSource: string | WebSocketSubjectConfig<T>): WebSocketSubject<T>;
274
275
class WebSocketSubject<T> extends Subject<T> {
276
multiplex<R>(
277
subMsg: () => any,
278
unsubMsg: () => any,
279
messageFilter: (value: T) => boolean
280
): Observable<R>;
281
close(): void;
282
}
283
284
interface WebSocketSubjectConfig<T> {
285
url: string;
286
protocol?: string | string[];
287
serializer?: (value: T) => any;
288
deserializer?: (e: MessageEvent) => T;
289
}
290
```
291
292
[WebSocket Operations](./websocket-operations.md)
293
294
### Fetch Operations
295
296
Modern fetch-based HTTP requests with full Observable integration, streaming support, and comprehensive error handling.
297
298
```typescript { .api }
299
function fromFetch<T>(
300
input: string | Request,
301
initWithSelector?: RequestInit & {
302
selector?: (response: Response) => ObservableInput<T>;
303
}
304
): Observable<T extends never ? Response : T>;
305
```
306
307
[Fetch Operations](./fetch-operations.md)
308
309
### Testing Utilities
310
311
Comprehensive testing framework with marble testing and virtual time scheduling.
312
313
```typescript { .api }
314
class TestScheduler extends VirtualTimeScheduler {
315
run<T>(callback: (helpers: RunHelpers) => T): T;
316
createHotObservable<T>(marbles: string, values?: any, error?: any): HotObservable<T>;
317
createColdObservable<T>(marbles: string, values?: any, error?: any): ColdObservable<T>;
318
expectObservable<T>(observable: Observable<T>): Expectation<T>;
319
}
320
321
interface RunHelpers {
322
cold: typeof TestScheduler.prototype.createColdObservable;
323
hot: typeof TestScheduler.prototype.createHotObservable;
324
expectObservable: typeof TestScheduler.prototype.expectObservable;
325
flush: typeof TestScheduler.prototype.flush;
326
}
327
```
328
329
[Testing Utilities](./testing-utilities.md)
330
331
### Promise Conversion
332
333
Utilities for converting observables to promises for integration with async/await patterns.
334
335
```typescript { .api }
336
function firstValueFrom<T>(source: Observable<T>): Promise<T>;
337
function lastValueFrom<T>(source: Observable<T>): Promise<T>;
338
```
339
340
### Configuration System
341
342
Global configuration object for customizing RxJS behavior and error handling.
343
344
```typescript { .api }
345
const config: GlobalConfig;
346
347
interface GlobalConfig {
348
onUnhandledError: ((err: any) => void) | null;
349
onStoppedNotification: ((notification: Notification<any>, subscriber: Subscriber<any>) => void) | null;
350
Promise?: PromiseConstructorLike;
351
useDeprecatedSynchronousErrorHandling: boolean;
352
useDeprecatedNextContext: boolean;
353
}
354
```
355
356
## Types
357
358
```typescript { .api }
359
type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
360
361
interface Observer<T> {
362
next: (value: T) => void;
363
error: (err: any) => void;
364
complete: () => void;
365
}
366
367
interface Subscription {
368
unsubscribe(): void;
369
readonly closed: boolean;
370
}
371
372
interface OperatorFunction<T, R> {
373
(source: Observable<T>): Observable<R>;
374
}
375
376
interface MonoTypeOperatorFunction<T> extends OperatorFunction<T, T> {}
377
378
type TeardownLogic = Subscription | Unsubscribable | (() => void) | void;
379
```