0
# Combination Operators
1
2
Operators for combining multiple observable streams in various ways, including merging, concatenating, and timing-based combinations.
3
4
## Capabilities
5
6
### combineLatestWith
7
8
Combine latest values with other observables.
9
10
```typescript { .api }
11
/**
12
* Combine latest values from source with provided observables
13
* @param sources - Other observables to combine with
14
* @returns Operator function combining latest values as arrays
15
*/
16
function combineLatestWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;
17
```
18
19
**Usage Examples:**
20
21
```typescript
22
import { interval, combineLatestWith, map } from "rxjs";
23
24
// Combine timer with user interactions
25
const timer$ = interval(1000);
26
const clicks$ = fromEvent(document, 'click').pipe(map(() => 'click'));
27
28
timer$.pipe(
29
combineLatestWith(clicks$)
30
).subscribe(([time, lastClick]) => {
31
console.log(`Time: ${time}, Last action: ${lastClick}`);
32
});
33
```
34
35
### mergeWith
36
37
Merge source with other observables.
38
39
```typescript { .api }
40
/**
41
* Merge source observable with provided observables
42
* @param sources - Other observables to merge with
43
* @returns Operator function merging all sources
44
*/
45
function mergeWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;
46
```
47
48
**Usage Examples:**
49
50
```typescript
51
import { of, mergeWith, delay } from "rxjs";
52
53
// Merge multiple streams
54
const source1$ = of(1, 2, 3);
55
const source2$ = of(4, 5, 6).pipe(delay(1000));
56
const source3$ = of(7, 8, 9).pipe(delay(2000));
57
58
source1$.pipe(
59
mergeWith(source2$, source3$)
60
).subscribe(value => console.log('Merged value:', value));
61
// Output: 1, 2, 3 (immediately), then 4, 5, 6 (after 1s), then 7, 8, 9 (after 2s)
62
```
63
64
### concatWith
65
66
Concatenate source with other observables in sequence.
67
68
```typescript { .api }
69
/**
70
* Concatenate source observable with provided observables sequentially
71
* @param sources - Other observables to concatenate after source
72
* @returns Operator function concatenating in order
73
*/
74
function concatWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;
75
```
76
77
**Usage Examples:**
78
79
```typescript
80
import { of, concatWith, delay } from "rxjs";
81
82
// Sequential execution
83
const intro$ = of('Starting...');
84
const process$ = of('Processing...').pipe(delay(1000));
85
const complete$ = of('Complete!').pipe(delay(1000));
86
87
intro$.pipe(
88
concatWith(process$, complete$)
89
).subscribe(message => console.log(message));
90
// Output: Starting... (immediately), Processing... (after 1s), Complete! (after 2s total)
91
```
92
93
### startWith
94
95
Emit specified values before source values.
96
97
```typescript { .api }
98
/**
99
* Emit specified values before source observable values
100
* @param values - Values to emit first
101
* @returns Operator function prepending values
102
*/
103
function startWith<T>(...values: T[]): OperatorFunction<T, T>;
104
function startWith<T>(...values: (T | SchedulerLike)[]): OperatorFunction<T, T>;
105
```
106
107
**Usage Examples:**
108
109
```typescript
110
import { of, startWith } from "rxjs";
111
112
// Add initial values
113
of(4, 5, 6).pipe(
114
startWith(1, 2, 3)
115
).subscribe(x => console.log(x)); // 1, 2, 3, 4, 5, 6
116
117
// Start with loading state
118
const data$ = ajax.getJSON('/api/data');
119
data$.pipe(
120
startWith({ loading: true })
121
).subscribe(result => console.log(result));
122
// Immediately emits { loading: true }, then actual data
123
```
124
125
### endWith
126
127
Emit specified values after source completes.
128
129
```typescript { .api }
130
/**
131
* Emit specified values after source observable completes
132
* @param values - Values to emit after completion
133
* @returns Operator function appending values
134
*/
135
function endWith<T>(...values: T[]): OperatorFunction<T, T>;
136
function endWith<T>(...values: (T | SchedulerLike)[]): OperatorFunction<T, T>;
137
```
138
139
**Usage Examples:**
140
141
```typescript
142
import { of, endWith } from "rxjs";
143
144
// Add final values
145
of(1, 2, 3).pipe(
146
endWith('complete', 'done')
147
).subscribe(x => console.log(x)); // 1, 2, 3, 'complete', 'done'
148
```
149
150
### withLatestFrom
151
152
Combine source with latest values from other observables when source emits.
153
154
```typescript { .api }
155
/**
156
* Combine each source emission with latest values from other observables
157
* @param sources - Other observables to get latest values from
158
* @returns Operator function combining with latest values
159
*/
160
function withLatestFrom<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;
161
function withLatestFrom<T, A, R>(
162
...sourcesAndProject: [...ObservableInput<A>[], (...values: [T, ...A[]]) => R]
163
): OperatorFunction<T, R>;
164
```
165
166
**Usage Examples:**
167
168
```typescript
169
import { fromEvent, interval, withLatestFrom, map } from "rxjs";
170
171
// Get latest timer value on button click
172
const clicks$ = fromEvent(document.getElementById('button'), 'click');
173
const timer$ = interval(1000);
174
175
clicks$.pipe(
176
withLatestFrom(timer$),
177
map(([click, time]) => `Clicked at timer value: ${time}`)
178
).subscribe(message => console.log(message));
179
```
180
181
### zipWith
182
183
Zip source with other observables.
184
185
```typescript { .api }
186
/**
187
* Zip source observable with provided observables
188
* @param sources - Other observables to zip with
189
* @returns Operator function zipping corresponding values
190
*/
191
function zipWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;
192
```
193
194
**Usage Examples:**
195
196
```typescript
197
import { of, zipWith } from "rxjs";
198
199
// Zip corresponding values
200
const letters$ = of('a', 'b', 'c');
201
const numbers$ = of(1, 2, 3);
202
const symbols$ = of('!', '@', '#');
203
204
letters$.pipe(
205
zipWith(numbers$, symbols$)
206
).subscribe(([letter, number, symbol]) => {
207
console.log(`${letter}${number}${symbol}`); // a1!, b2@, c3#
208
});
209
```
210
211
### raceWith
212
213
Race source with other observables, emit from first to emit.
214
215
```typescript { .api }
216
/**
217
* Race source with other observables, emit values from the first to emit
218
* @param sources - Other observables to race with
219
* @returns Operator function racing with other sources
220
*/
221
function raceWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;
222
```
223
224
**Usage Examples:**
225
226
```typescript
227
import { timer, raceWith, map } from "rxjs";
228
229
// Race different timers
230
const fast$ = timer(1000).pipe(map(() => 'fast'));
231
const slow$ = timer(3000).pipe(map(() => 'slow'));
232
233
fast$.pipe(
234
raceWith(slow$)
235
).subscribe(winner => console.log('Winner:', winner)); // 'fast' (after 1s)
236
```
237
238
### Flattening Combination Operators
239
240
```typescript { .api }
241
/**
242
* Flatten higher-order observable by merging all inner observables
243
* @param concurrent - Maximum concurrent inner subscriptions
244
* @returns Operator function merging all inner observables
245
*/
246
function mergeAll<T>(concurrent?: number): OperatorFunction<ObservableInput<T>, T>;
247
248
/**
249
* Flatten higher-order observable by concatenating inner observables
250
* @returns Operator function concatenating all inner observables
251
*/
252
function concatAll<T>(): OperatorFunction<ObservableInput<T>, T>;
253
254
/**
255
* Flatten higher-order observable by switching to latest inner observable
256
* @returns Operator function switching to latest inner observable
257
*/
258
function switchAll<T>(): OperatorFunction<ObservableInput<T>, T>;
259
260
/**
261
* Flatten higher-order observable by exhausting (ignoring while active)
262
* @returns Operator function exhausting inner observables
263
*/
264
function exhaustAll<T>(): OperatorFunction<ObservableInput<T>, T>;
265
266
/**
267
* Flatten higher-order observable by combining latest from all inner observables
268
* @returns Operator function combining latest from inner observables
269
*/
270
function combineLatestAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;
271
function combineLatestAll<T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>;
272
273
/**
274
* Flatten higher-order observable by zipping inner observables
275
* @returns Operator function zipping inner observables
276
*/
277
function zipAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;
278
function zipAll<T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>;
279
```
280
281
**Usage Examples:**
282
283
```typescript
284
import { of, map, mergeAll, concatAll, switchAll } from "rxjs";
285
import { delay } from "rxjs/operators";
286
287
// Create higher-order observable
288
const higherOrder$ = of(1, 2, 3).pipe(
289
map(n => of(`inner-${n}`).pipe(delay(n * 1000)))
290
);
291
292
// Different flattening strategies:
293
294
// mergeAll - all inner observables run concurrently
295
higherOrder$.pipe(mergeAll()).subscribe(x => console.log('Merge:', x));
296
// Output: inner-1 (1s), inner-2 (2s), inner-3 (3s)
297
298
// concatAll - inner observables run sequentially
299
higherOrder$.pipe(concatAll()).subscribe(x => console.log('Concat:', x));
300
// Output: inner-1 (1s), inner-2 (3s), inner-3 (6s)
301
302
// switchAll - switch to latest inner observable
303
higherOrder$.pipe(switchAll()).subscribe(x => console.log('Switch:', x));
304
// Output: inner-3 (3s) only
305
```
306
307
## Advanced Combination Patterns
308
309
### Conditional Combination
310
311
```typescript
312
import { of, combineLatest, startWith, switchMap } from "rxjs";
313
314
// Conditional data loading
315
const userId$ = new BehaviorSubject(null);
316
const userPermissions$ = new BehaviorSubject([]);
317
318
const userData$ = combineLatest([userId$, userPermissions$]).pipe(
319
switchMap(([userId, permissions]) => {
320
if (userId && permissions.includes('read')) {
321
return ajax.getJSON(`/api/users/${userId}`);
322
}
323
return of(null);
324
}),
325
startWith({ loading: true })
326
);
327
```
328
329
### Multi-source State Management
330
331
```typescript
332
import { merge, scan, startWith } from "rxjs";
333
334
// Combine multiple action streams
335
const userActions$ = fromEvent(userButton, 'click').pipe(map(() => ({ type: 'USER_ACTION' })));
336
const systemEvents$ = fromEvent(window, 'beforeunload').pipe(map(() => ({ type: 'SYSTEM_EVENT' })));
337
const apiEvents$ = apiErrorStream$.pipe(map(error => ({ type: 'API_ERROR', error })));
338
339
const allEvents$ = merge(userActions$, systemEvents$, apiEvents$).pipe(
340
scan((state, event) => {
341
switch (event.type) {
342
case 'USER_ACTION': return { ...state, lastUserAction: Date.now() };
343
case 'SYSTEM_EVENT': return { ...state, systemState: 'closing' };
344
case 'API_ERROR': return { ...state, errors: [...state.errors, event.error] };
345
default: return state;
346
}
347
}, { lastUserAction: null, systemState: 'active', errors: [] }),
348
startWith({ lastUserAction: null, systemState: 'active', errors: [] })
349
);
350
```
351
352
## Types
353
354
```typescript { .api }
355
type ObservableInput<T> = Observable<T> | Promise<T> | Iterable<T>;
356
type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;
357
```