0
# Transformation Operators
1
2
Operators for transforming values emitted by observables into new forms and structures, including mapping, flattening, scanning, and buffering operations.
3
4
## Capabilities
5
6
### map
7
8
Transform each value using a projection function.
9
10
```typescript { .api }
11
/**
12
* Transform each emitted value using a projection function
13
* @param project - Function to transform each value
14
* @returns Operator function applying transformation to each value
15
*/
16
function map<T, R>(project: (value: T, index: number) => R): OperatorFunction<T, R>;
17
```
18
19
**Usage Examples:**
20
21
```typescript
22
import { of, map } from "rxjs";
23
24
// Double each number
25
of(1, 2, 3, 4).pipe(
26
map(x => x * 2)
27
).subscribe(x => console.log(x)); // 2, 4, 6, 8
28
29
// Transform objects
30
of(
31
{ firstName: 'John', lastName: 'Doe' },
32
{ firstName: 'Jane', lastName: 'Smith' }
33
).pipe(
34
map(person => `${person.firstName} ${person.lastName}`)
35
).subscribe(name => console.log(name)); // John Doe, Jane Smith
36
37
// With index
38
of('a', 'b', 'c').pipe(
39
map((letter, index) => `${index}: ${letter}`)
40
).subscribe(result => console.log(result)); // 0: a, 1: b, 2: c
41
```
42
43
### mergeMap (flatMap)
44
45
Map each value to an observable and merge all inner observables.
46
47
```typescript { .api }
48
/**
49
* Map each value to an observable and merge all inner observables
50
* @param project - Function that returns an observable for each value
51
* @param concurrent - Maximum number of concurrent inner subscriptions
52
* @returns Operator function flattening mapped observables
53
*/
54
function mergeMap<T, R>(
55
project: (value: T, index: number) => ObservableInput<R>,
56
concurrent?: number
57
): OperatorFunction<T, R>;
58
function mergeMap<T, R, O>(
59
project: (value: T, index: number) => ObservableInput<R>,
60
resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => O,
61
concurrent?: number
62
): OperatorFunction<T, O>;
63
64
/**
65
* Alias for mergeMap
66
*/
67
function flatMap<T, R>(
68
project: (value: T, index: number) => ObservableInput<R>,
69
concurrent?: number
70
): OperatorFunction<T, R>;
71
```
72
73
**Usage Examples:**
74
75
```typescript
76
import { of, mergeMap, delay } from "rxjs";
77
import { ajax } from "rxjs/ajax";
78
79
// Map to HTTP requests
80
of('user1', 'user2', 'user3').pipe(
81
mergeMap(userId => ajax.getJSON(`/api/users/${userId}`))
82
).subscribe(user => console.log('Loaded user:', user));
83
84
// Control concurrency
85
of(1, 2, 3, 4, 5).pipe(
86
mergeMap(n => of(n).pipe(delay(1000)), 2) // Max 2 concurrent
87
).subscribe(x => console.log('Value:', x));
88
89
// Map to arrays (flattened)
90
of([1, 2], [3, 4], [5, 6]).pipe(
91
mergeMap(arr => arr)
92
).subscribe(x => console.log(x)); // 1, 2, 3, 4, 5, 6
93
```
94
95
### switchMap
96
97
Map each value to an observable, canceling previous inner observables.
98
99
```typescript { .api }
100
/**
101
* Map each value to an observable, canceling previous inner observables
102
* @param project - Function that returns an observable for each value
103
* @returns Operator function switching to new mapped observables
104
*/
105
function switchMap<T, R>(
106
project: (value: T, index: number) => ObservableInput<R>
107
): OperatorFunction<T, R>;
108
function switchMap<T, R, O>(
109
project: (value: T, index: number) => ObservableInput<R>,
110
resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => O
111
): OperatorFunction<T, O>;
112
```
113
114
**Usage Examples:**
115
116
```typescript
117
import { fromEvent, switchMap, debounceTime, map } from "rxjs";
118
import { ajax } from "rxjs/ajax";
119
120
// Search with automatic cancellation
121
const searchInput = document.getElementById('search');
122
fromEvent(searchInput, 'input').pipe(
123
map(event => event.target.value),
124
debounceTime(300),
125
switchMap(term => ajax.getJSON(`/api/search?q=${term}`))
126
).subscribe(results => {
127
console.log('Search results:', results);
128
// Previous search requests are cancelled automatically
129
});
130
131
// Latest value wins
132
of(1, 2, 3).pipe(
133
switchMap(n => interval(1000).pipe(
134
map(i => `${n}-${i}`),
135
take(3)
136
))
137
).subscribe(x => console.log(x)); // Only outputs from last (3): 3-0, 3-1, 3-2
138
```
139
140
### concatMap
141
142
Map each value to an observable and concatenate in order.
143
144
```typescript { .api }
145
/**
146
* Map each value to an observable and concatenate in sequential order
147
* @param project - Function that returns an observable for each value
148
* @returns Operator function concatenating mapped observables in order
149
*/
150
function concatMap<T, R>(
151
project: (value: T, index: number) => ObservableInput<R>
152
): OperatorFunction<T, R>;
153
function concatMap<T, R, O>(
154
project: (value: T, index: number) => ObservableInput<R>,
155
resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => O
156
): OperatorFunction<T, O>;
157
```
158
159
**Usage Examples:**
160
161
```typescript
162
import { of, concatMap, delay } from "rxjs";
163
164
// Sequential processing (waits for each to complete)
165
of(1, 2, 3).pipe(
166
concatMap(n => of(`Processing ${n}`).pipe(delay(1000)))
167
).subscribe(x => console.log(x));
168
// Outputs: Processing 1 (after 1s), Processing 2 (after 2s), Processing 3 (after 3s)
169
170
// Preserve order
171
of('file1.txt', 'file2.txt', 'file3.txt').pipe(
172
concatMap(filename => ajax.getJSON(`/api/files/${filename}`))
173
).subscribe(fileData => {
174
console.log('File loaded in order:', fileData);
175
});
176
```
177
178
### exhaustMap
179
180
Map to an observable, ignoring new values while inner observable is active.
181
182
```typescript { .api }
183
/**
184
* Map each value to an observable, ignoring new values while inner observable is active
185
* @param project - Function that returns an observable for each value
186
* @returns Operator function exhausting mapped observables
187
*/
188
function exhaustMap<T, R>(
189
project: (value: T, index: number) => ObservableInput<R>
190
): OperatorFunction<T, R>;
191
function exhaustMap<T, R, O>(
192
project: (value: T, index: number) => ObservableInput<R>,
193
resultSelector: (outerValue: T, innerValue: R, outerIndex: number, innerIndex: number) => O
194
): OperatorFunction<T, O>;
195
```
196
197
**Usage Examples:**
198
199
```typescript
200
import { fromEvent, exhaustMap } from "rxjs";
201
import { ajax } from "rxjs/ajax";
202
203
// Prevent multiple simultaneous requests
204
const saveButton = document.getElementById('save');
205
fromEvent(saveButton, 'click').pipe(
206
exhaustMap(() => ajax.post('/api/save', { data: 'example' }))
207
).subscribe(
208
response => console.log('Saved:', response),
209
err => console.error('Save failed:', err)
210
);
211
// Clicks during ongoing save are ignored
212
```
213
214
### scan
215
216
Apply accumulator function over time, emitting intermediate results.
217
218
```typescript { .api }
219
/**
220
* Apply accumulator function over time, emitting intermediate results
221
* @param accumulator - Function to compute accumulated value
222
* @param seed - Initial accumulated value
223
* @returns Operator function scanning with accumulator
224
*/
225
function scan<T, R>(
226
accumulator: (acc: R, value: T, index: number) => R,
227
seed: R
228
): OperatorFunction<T, R>;
229
function scan<T>(
230
accumulator: (acc: T, value: T, index: number) => T
231
): OperatorFunction<T, T>;
232
```
233
234
**Usage Examples:**
235
236
```typescript
237
import { of, scan } from "rxjs";
238
239
// Running sum
240
of(1, 2, 3, 4).pipe(
241
scan((acc, value) => acc + value, 0)
242
).subscribe(sum => console.log('Running sum:', sum));
243
// Output: 1, 3, 6, 10
244
245
// Build object over time
246
of(
247
{ type: 'SET_NAME', payload: 'Alice' },
248
{ type: 'SET_AGE', payload: 25 },
249
{ type: 'SET_EMAIL', payload: 'alice@example.com' }
250
).pipe(
251
scan((state, action) => {
252
switch (action.type) {
253
case 'SET_NAME': return { ...state, name: action.payload };
254
case 'SET_AGE': return { ...state, age: action.payload };
255
case 'SET_EMAIL': return { ...state, email: action.payload };
256
default: return state;
257
}
258
}, {})
259
).subscribe(state => console.log('State:', state));
260
```
261
262
### buffer
263
264
Buffer values until boundary observable emits.
265
266
```typescript { .api }
267
/**
268
* Buffer values until boundary observable emits
269
* @param closingNotifier - Observable that triggers buffer emission
270
* @returns Operator function buffering values until boundary
271
*/
272
function buffer<T>(closingNotifier: ObservableInput<any>): OperatorFunction<T, T[]>;
273
```
274
275
**Usage Examples:**
276
277
```typescript
278
import { interval, buffer, fromEvent } from "rxjs";
279
280
// Buffer interval values until button click
281
const source$ = interval(1000);
282
const clicks$ = fromEvent(document.getElementById('flush'), 'click');
283
284
source$.pipe(
285
buffer(clicks$)
286
).subscribe(buffered => console.log('Buffered values:', buffered));
287
// Click after 3 seconds: [0, 1, 2]
288
```
289
290
### bufferCount
291
292
Buffer values until specific count is reached.
293
294
```typescript { .api }
295
/**
296
* Buffer values until buffer reaches specified size
297
* @param bufferSize - Size of buffer
298
* @param startBufferEvery - Interval to start new buffer
299
* @returns Operator function buffering by count
300
*/
301
function bufferCount<T>(bufferSize: number, startBufferEvery?: number): OperatorFunction<T, T[]>;
302
```
303
304
**Usage Examples:**
305
306
```typescript
307
import { of, bufferCount } from "rxjs";
308
309
// Buffer every 3 values
310
of(1, 2, 3, 4, 5, 6, 7, 8).pipe(
311
bufferCount(3)
312
).subscribe(buffer => console.log('Buffer:', buffer));
313
// Output: [1, 2, 3], [4, 5, 6], [7, 8]
314
315
// Overlapping buffers
316
of(1, 2, 3, 4, 5, 6).pipe(
317
bufferCount(3, 1) // New buffer every 1, size 3
318
).subscribe(buffer => console.log('Buffer:', buffer));
319
// Output: [1, 2, 3], [2, 3, 4], [3, 4, 5], [4, 5, 6]
320
```
321
322
### bufferTime
323
324
Buffer values for specific time periods.
325
326
```typescript { .api }
327
/**
328
* Buffer values for specified time periods
329
* @param bufferTimeSpan - Time span of buffer in milliseconds
330
* @param bufferCreationInterval - Interval to start new buffer
331
* @param maxBufferSize - Maximum buffer size
332
* @param scheduler - Optional scheduler
333
* @returns Operator function buffering by time
334
*/
335
function bufferTime<T>(
336
bufferTimeSpan: number,
337
bufferCreationInterval?: number,
338
maxBufferSize?: number,
339
scheduler?: SchedulerLike
340
): OperatorFunction<T, T[]>;
341
```
342
343
**Usage Examples:**
344
345
```typescript
346
import { interval, bufferTime } from "rxjs";
347
348
// Buffer values for 2 seconds
349
interval(500).pipe(
350
bufferTime(2000)
351
).subscribe(buffer => console.log('2-second buffer:', buffer));
352
// Every 2 seconds: [0, 1, 2], [3, 4, 5, 6], etc.
353
```
354
355
### bufferToggle and bufferWhen
356
357
```typescript { .api }
358
/**
359
* Buffer values from opening of one observable until closing of another
360
* @param openings - Observable that opens the buffer
361
* @param closingSelector - Function that returns observable to close buffer
362
* @returns Operator function buffering between opening and closing
363
*/
364
function bufferToggle<T, O>(
365
openings: ObservableInput<O>,
366
closingSelector: (value: O) => ObservableInput<any>
367
): OperatorFunction<T, T[]>;
368
369
/**
370
* Buffer values until boundary observable emits
371
* @param closingSelector - Function returning boundary observable
372
* @returns Operator function buffering until boundary
373
*/
374
function bufferWhen<T>(closingSelector: () => ObservableInput<any>): OperatorFunction<T, T[]>;
375
```
376
377
**Usage Examples:**
378
379
```typescript
380
import { interval, bufferToggle, bufferWhen, fromEvent, timer } from "rxjs";
381
382
// Buffer between mouse down and mouse up
383
const mouseDown$ = fromEvent(document, 'mousedown');
384
const mouseUp$ = fromEvent(document, 'mouseup');
385
386
interval(100).pipe(
387
bufferToggle(mouseDown$, () => mouseUp$)
388
).subscribe(buffered => {
389
console.log('Values during mouse press:', buffered);
390
});
391
392
// Buffer until random interval
393
interval(100).pipe(
394
bufferWhen(() => timer(Math.random() * 2000))
395
).subscribe(buffered => {
396
console.log('Random buffer:', buffered);
397
});
398
```
399
400
### groupBy
401
402
Group values by key into separate observables.
403
404
```typescript { .api }
405
/**
406
* Group values by key into separate GroupedObservable instances
407
* @param keySelector - Function to select grouping key
408
* @param elementSelector - Function to select element for group
409
* @param durationSelector - Function returning observable that determines group lifetime
410
* @returns Operator function grouping by key
411
*/
412
function groupBy<T, K>(
413
keySelector: (value: T) => K
414
): OperatorFunction<T, GroupedObservable<K, T>>;
415
function groupBy<T, K, R>(
416
keySelector: (value: T) => K,
417
elementSelector: (value: T) => R
418
): OperatorFunction<T, GroupedObservable<K, R>>;
419
function groupBy<T, K, R>(
420
keySelector: (value: T) => K,
421
elementSelector?: (value: T) => R,
422
durationSelector?: (grouped: GroupedObservable<K, R>) => ObservableInput<any>
423
): OperatorFunction<T, GroupedObservable<K, R>>;
424
```
425
426
**Usage Examples:**
427
428
```typescript
429
import { of, groupBy, mergeMap, toArray } from "rxjs";
430
431
// Group by category
432
of(
433
{ category: 'fruit', name: 'apple' },
434
{ category: 'vegetable', name: 'carrot' },
435
{ category: 'fruit', name: 'banana' },
436
{ category: 'vegetable', name: 'lettuce' }
437
).pipe(
438
groupBy(item => item.category),
439
mergeMap(group =>
440
group.pipe(
441
toArray(),
442
map(items => ({ category: group.key, items }))
443
)
444
)
445
).subscribe(result => console.log(result));
446
// Output: { category: 'fruit', items: [apple, banana] }
447
// { category: 'vegetable', items: [carrot, lettuce] }
448
```
449
450
### Materialization Operators
451
452
```typescript { .api }
453
/**
454
* Convert all emissions and notifications to Notification objects
455
* @returns Operator function materializing notifications
456
*/
457
function materialize<T>(): OperatorFunction<T, Notification<T>>;
458
459
/**
460
* Convert Notification objects back to emissions
461
* @returns Operator function dematerializing notifications
462
*/
463
function dematerialize<T>(): OperatorFunction<Notification<T>, T>;
464
```
465
466
**Usage Examples:**
467
468
```typescript
469
import { of, materialize, dematerialize, map } from "rxjs";
470
471
// Materialize all notifications
472
of(1, 2, 3).pipe(
473
materialize()
474
).subscribe(notification => {
475
console.log('Kind:', notification.kind);
476
if (notification.kind === 'N') {
477
console.log('Value:', notification.value);
478
}
479
});
480
481
// Convert back from notifications
482
const notifications$ = of(
483
{ kind: 'N', value: 1 },
484
{ kind: 'N', value: 2 },
485
{ kind: 'C' }
486
);
487
488
notifications$.pipe(
489
dematerialize()
490
).subscribe(
491
value => console.log('Value:', value),
492
err => console.error('Error:', err),
493
() => console.log('Complete!')
494
);
495
```
496
497
### Advanced Transformation Operators
498
499
```typescript { .api }
500
/**
501
* Map each value to constant value
502
* @param value - Constant value to map to
503
* @returns Operator function mapping to constant
504
*/
505
function mapTo<R>(value: R): OperatorFunction<any, R>;
506
507
/**
508
* Recursively projects each source value to an observable which is merged in the output observable
509
* @param project - Function returning observable for recursion
510
* @param concurrent - Maximum concurrent recursions
511
* @returns Operator function expanding recursively
512
*/
513
function expand<T, R>(
514
project: (value: T, index: number) => ObservableInput<R>,
515
concurrent?: number,
516
scheduler?: SchedulerLike
517
): OperatorFunction<T, R>;
518
519
/**
520
* Emit previous and current value as pair
521
* @returns Operator function emitting pairs of consecutive values
522
*/
523
function pairwise<T>(): OperatorFunction<T, [T, T]>;
524
525
/**
526
* Select properties from source objects
527
* @param properties - Property keys to pluck
528
* @returns Operator function plucking properties
529
*/
530
function pluck<T, K1 extends keyof T>(k1: K1): OperatorFunction<T, T[K1]>;
531
function pluck<T, K1 extends keyof T, K2 extends keyof T[K1]>(k1: K1, k2: K2): OperatorFunction<T, T[K1][K2]>;
532
function pluck<T, K1 extends keyof T, K2 extends keyof T[K1], K3 extends keyof T[K1][K2]>(k1: K1, k2: K2, k3: K3): OperatorFunction<T, T[K1][K2][K3]>;
533
534
/**
535
* Apply multiple operators in sequence using mergeScan
536
* @param accumulator - Accumulator function returning observable
537
* @param seed - Initial seed value
538
* @param concurrent - Maximum concurrent inner subscriptions
539
* @returns Operator function applying mergeScan pattern
540
*/
541
function mergeScan<T, R>(
542
accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
543
seed: R,
544
concurrent?: number
545
): OperatorFunction<T, R>;
546
547
/**
548
* Apply switchMap pattern with accumulator
549
* @param accumulator - Accumulator function returning observable
550
* @param seed - Initial seed value
551
* @returns Operator function applying switchScan pattern
552
*/
553
function switchScan<T, R>(
554
accumulator: (acc: R, value: T, index: number) => ObservableInput<R>,
555
seed: R
556
): OperatorFunction<T, R>;
557
558
/**
559
* Count emissions that pass optional predicate
560
* @param predicate - Optional predicate to filter counted emissions
561
* @returns Operator function emitting count of emissions
562
*/
563
function count<T>(predicate?: (value: T, index: number) => boolean): OperatorFunction<T, number>;
564
565
/**
566
* Test if all emissions satisfy predicate
567
* @param predicate - Predicate function to test emissions
568
* @returns Operator function emitting boolean result
569
*/
570
function every<T>(predicate: (value: T, index: number) => boolean): OperatorFunction<T, boolean>;
571
572
/**
573
* Find maximum value using optional comparer
574
* @param comparer - Optional comparison function
575
* @returns Operator function emitting maximum value
576
*/
577
function max<T>(comparer?: (x: T, y: T) => number): OperatorFunction<T, T>;
578
579
/**
580
* Find minimum value using optional comparer
581
* @param comparer - Optional comparison function
582
* @returns Operator function emitting minimum value
583
*/
584
function min<T>(comparer?: (x: T, y: T) => number): OperatorFunction<T, T>;
585
586
/**
587
* Reduce emissions to single accumulated value
588
* @param accumulator - Accumulator function
589
* @param seed - Initial seed value
590
* @returns Operator function emitting final accumulated value
591
*/
592
function reduce<T, R>(
593
accumulator: (acc: R, value: T, index: number) => R,
594
seed: R
595
): OperatorFunction<T, R>;
596
function reduce<T>(accumulator: (acc: T, value: T, index: number) => T): OperatorFunction<T, T>;
597
598
/**
599
* Collect all emissions into array
600
* @returns Operator function emitting array of all values
601
*/
602
function toArray<T>(): OperatorFunction<T, T[]>;
603
604
/**
605
* Materialize notifications as emission objects
606
* @returns Operator function emitting notification objects
607
*/
608
function materialize<T>(): OperatorFunction<T, Notification<T>>;
609
610
/**
611
* Dematerialize notification objects back to emissions
612
* @returns Operator function converting notifications to emissions
613
*/
614
function dematerialize<T>(): OperatorFunction<Notification<T>, T>;
615
616
/**
617
* Add timestamp to each emission
618
* @param timestampProvider - Provider for timestamp values
619
* @returns Operator function adding timestamps
620
*/
621
function timestamp<T>(timestampProvider?: TimestampProvider): OperatorFunction<T, Timestamp<T>>;
622
623
/**
624
* Add time interval between emissions
625
* @param timestampProvider - Provider for timestamp values
626
* @returns Operator function adding time intervals
627
*/
628
function timeInterval<T>(timestampProvider?: TimestampProvider): OperatorFunction<T, TimeInterval<T>>;
629
```
630
631
## Types
632
633
```typescript { .api }
634
interface GroupedObservable<K, T> extends Observable<T> {
635
readonly key: K;
636
}
637
638
type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;
639
type ObservableInput<T> = Observable<T> | Promise<T> | Iterable<T>;
640
```