or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ajax-operations.mdcombination-operators.mdcore-types.mderror-handling.mdfetch-operations.mdfiltering-operators.mdindex.mdobservable-creation.mdschedulers.mdsubjects.mdtesting-utilities.mdtransformation-operators.mdwebsocket-operations.md

combination-operators.mddocs/

0

# Combination Operators

1

2

Operators for combining multiple observable streams in various ways, including merging, concatenating, and timing-based combinations.

3

4

## Capabilities

5

6

### combineLatestWith

7

8

Combine latest values with other observables.

9

10

```typescript { .api }

11

/**

12

* Combine latest values from source with provided observables

13

* @param sources - Other observables to combine with

14

* @returns Operator function combining latest values as arrays

15

*/

16

function combineLatestWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;

17

```

18

19

**Usage Examples:**

20

21

```typescript

22

import { interval, combineLatestWith, map } from "rxjs";

23

24

// Combine timer with user interactions

25

const timer$ = interval(1000);

26

const clicks$ = fromEvent(document, 'click').pipe(map(() => 'click'));

27

28

timer$.pipe(

29

combineLatestWith(clicks$)

30

).subscribe(([time, lastClick]) => {

31

console.log(`Time: ${time}, Last action: ${lastClick}`);

32

});

33

```

34

35

### mergeWith

36

37

Merge source with other observables.

38

39

```typescript { .api }

40

/**

41

* Merge source observable with provided observables

42

* @param sources - Other observables to merge with

43

* @returns Operator function merging all sources

44

*/

45

function mergeWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;

46

```

47

48

**Usage Examples:**

49

50

```typescript

51

import { of, mergeWith, delay } from "rxjs";

52

53

// Merge multiple streams

54

const source1$ = of(1, 2, 3);

55

const source2$ = of(4, 5, 6).pipe(delay(1000));

56

const source3$ = of(7, 8, 9).pipe(delay(2000));

57

58

source1$.pipe(

59

mergeWith(source2$, source3$)

60

).subscribe(value => console.log('Merged value:', value));

61

// Output: 1, 2, 3 (immediately), then 4, 5, 6 (after 1s), then 7, 8, 9 (after 2s)

62

```

63

64

### concatWith

65

66

Concatenate source with other observables in sequence.

67

68

```typescript { .api }

69

/**

70

* Concatenate source observable with provided observables sequentially

71

* @param sources - Other observables to concatenate after source

72

* @returns Operator function concatenating in order

73

*/

74

function concatWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;

75

```

76

77

**Usage Examples:**

78

79

```typescript

80

import { of, concatWith, delay } from "rxjs";

81

82

// Sequential execution

83

const intro$ = of('Starting...');

84

const process$ = of('Processing...').pipe(delay(1000));

85

const complete$ = of('Complete!').pipe(delay(1000));

86

87

intro$.pipe(

88

concatWith(process$, complete$)

89

).subscribe(message => console.log(message));

90

// Output: Starting... (immediately), Processing... (after 1s), Complete! (after 2s total)

91

```

92

93

### startWith

94

95

Emit specified values before source values.

96

97

```typescript { .api }

98

/**

99

* Emit specified values before source observable values

100

* @param values - Values to emit first

101

* @returns Operator function prepending values

102

*/

103

function startWith<T>(...values: T[]): OperatorFunction<T, T>;

104

function startWith<T>(...values: (T | SchedulerLike)[]): OperatorFunction<T, T>;

105

```

106

107

**Usage Examples:**

108

109

```typescript

110

import { of, startWith } from "rxjs";

111

112

// Add initial values

113

of(4, 5, 6).pipe(

114

startWith(1, 2, 3)

115

).subscribe(x => console.log(x)); // 1, 2, 3, 4, 5, 6

116

117

// Start with loading state

118

const data$ = ajax.getJSON('/api/data');

119

data$.pipe(

120

startWith({ loading: true })

121

).subscribe(result => console.log(result));

122

// Immediately emits { loading: true }, then actual data

123

```

124

125

### endWith

126

127

Emit specified values after source completes.

128

129

```typescript { .api }

130

/**

131

* Emit specified values after source observable completes

132

* @param values - Values to emit after completion

133

* @returns Operator function appending values

134

*/

135

function endWith<T>(...values: T[]): OperatorFunction<T, T>;

136

function endWith<T>(...values: (T | SchedulerLike)[]): OperatorFunction<T, T>;

137

```

138

139

**Usage Examples:**

140

141

```typescript

142

import { of, endWith } from "rxjs";

143

144

// Add final values

145

of(1, 2, 3).pipe(

146

endWith('complete', 'done')

147

).subscribe(x => console.log(x)); // 1, 2, 3, 'complete', 'done'

148

```

149

150

### withLatestFrom

151

152

Combine source with latest values from other observables when source emits.

153

154

```typescript { .api }

155

/**

156

* Combine each source emission with latest values from other observables

157

* @param sources - Other observables to get latest values from

158

* @returns Operator function combining with latest values

159

*/

160

function withLatestFrom<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;

161

function withLatestFrom<T, A, R>(

162

...sourcesAndProject: [...ObservableInput<A>[], (...values: [T, ...A[]]) => R]

163

): OperatorFunction<T, R>;

164

```

165

166

**Usage Examples:**

167

168

```typescript

169

import { fromEvent, interval, withLatestFrom, map } from "rxjs";

170

171

// Get latest timer value on button click

172

const clicks$ = fromEvent(document.getElementById('button'), 'click');

173

const timer$ = interval(1000);

174

175

clicks$.pipe(

176

withLatestFrom(timer$),

177

map(([click, time]) => `Clicked at timer value: ${time}`)

178

).subscribe(message => console.log(message));

179

```

180

181

### zipWith

182

183

Zip source with other observables.

184

185

```typescript { .api }

186

/**

187

* Zip source observable with provided observables

188

* @param sources - Other observables to zip with

189

* @returns Operator function zipping corresponding values

190

*/

191

function zipWith<T, A>(...sources: ObservableInput<A>[]): OperatorFunction<T, [T, ...A[]]>;

192

```

193

194

**Usage Examples:**

195

196

```typescript

197

import { of, zipWith } from "rxjs";

198

199

// Zip corresponding values

200

const letters$ = of('a', 'b', 'c');

201

const numbers$ = of(1, 2, 3);

202

const symbols$ = of('!', '@', '#');

203

204

letters$.pipe(

205

zipWith(numbers$, symbols$)

206

).subscribe(([letter, number, symbol]) => {

207

console.log(`${letter}${number}${symbol}`); // a1!, b2@, c3#

208

});

209

```

210

211

### raceWith

212

213

Race source with other observables, emit from first to emit.

214

215

```typescript { .api }

216

/**

217

* Race source with other observables, emit values from the first to emit

218

* @param sources - Other observables to race with

219

* @returns Operator function racing with other sources

220

*/

221

function raceWith<T>(...sources: ObservableInput<T>[]): OperatorFunction<T, T>;

222

```

223

224

**Usage Examples:**

225

226

```typescript

227

import { timer, raceWith, map } from "rxjs";

228

229

// Race different timers

230

const fast$ = timer(1000).pipe(map(() => 'fast'));

231

const slow$ = timer(3000).pipe(map(() => 'slow'));

232

233

fast$.pipe(

234

raceWith(slow$)

235

).subscribe(winner => console.log('Winner:', winner)); // 'fast' (after 1s)

236

```

237

238

### Flattening Combination Operators

239

240

```typescript { .api }

241

/**

242

* Flatten higher-order observable by merging all inner observables

243

* @param concurrent - Maximum concurrent inner subscriptions

244

* @returns Operator function merging all inner observables

245

*/

246

function mergeAll<T>(concurrent?: number): OperatorFunction<ObservableInput<T>, T>;

247

248

/**

249

* Flatten higher-order observable by concatenating inner observables

250

* @returns Operator function concatenating all inner observables

251

*/

252

function concatAll<T>(): OperatorFunction<ObservableInput<T>, T>;

253

254

/**

255

* Flatten higher-order observable by switching to latest inner observable

256

* @returns Operator function switching to latest inner observable

257

*/

258

function switchAll<T>(): OperatorFunction<ObservableInput<T>, T>;

259

260

/**

261

* Flatten higher-order observable by exhausting (ignoring while active)

262

* @returns Operator function exhausting inner observables

263

*/

264

function exhaustAll<T>(): OperatorFunction<ObservableInput<T>, T>;

265

266

/**

267

* Flatten higher-order observable by combining latest from all inner observables

268

* @returns Operator function combining latest from inner observables

269

*/

270

function combineLatestAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;

271

function combineLatestAll<T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>;

272

273

/**

274

* Flatten higher-order observable by zipping inner observables

275

* @returns Operator function zipping inner observables

276

*/

277

function zipAll<T>(): OperatorFunction<ObservableInput<T>, T[]>;

278

function zipAll<T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>;

279

```

280

281

**Usage Examples:**

282

283

```typescript

284

import { of, map, mergeAll, concatAll, switchAll } from "rxjs";

285

import { delay } from "rxjs/operators";

286

287

// Create higher-order observable

288

const higherOrder$ = of(1, 2, 3).pipe(

289

map(n => of(`inner-${n}`).pipe(delay(n * 1000)))

290

);

291

292

// Different flattening strategies:

293

294

// mergeAll - all inner observables run concurrently

295

higherOrder$.pipe(mergeAll()).subscribe(x => console.log('Merge:', x));

296

// Output: inner-1 (1s), inner-2 (2s), inner-3 (3s)

297

298

// concatAll - inner observables run sequentially

299

higherOrder$.pipe(concatAll()).subscribe(x => console.log('Concat:', x));

300

// Output: inner-1 (1s), inner-2 (3s), inner-3 (6s)

301

302

// switchAll - switch to latest inner observable

303

higherOrder$.pipe(switchAll()).subscribe(x => console.log('Switch:', x));

304

// Output: inner-3 (3s) only

305

```

306

307

## Advanced Combination Patterns

308

309

### Conditional Combination

310

311

```typescript

312

import { of, combineLatest, startWith, switchMap } from "rxjs";

313

314

// Conditional data loading

315

const userId$ = new BehaviorSubject(null);

316

const userPermissions$ = new BehaviorSubject([]);

317

318

const userData$ = combineLatest([userId$, userPermissions$]).pipe(

319

switchMap(([userId, permissions]) => {

320

if (userId && permissions.includes('read')) {

321

return ajax.getJSON(`/api/users/${userId}`);

322

}

323

return of(null);

324

}),

325

startWith({ loading: true })

326

);

327

```

328

329

### Multi-source State Management

330

331

```typescript

332

import { merge, scan, startWith } from "rxjs";

333

334

// Combine multiple action streams

335

const userActions$ = fromEvent(userButton, 'click').pipe(map(() => ({ type: 'USER_ACTION' })));

336

const systemEvents$ = fromEvent(window, 'beforeunload').pipe(map(() => ({ type: 'SYSTEM_EVENT' })));

337

const apiEvents$ = apiErrorStream$.pipe(map(error => ({ type: 'API_ERROR', error })));

338

339

const allEvents$ = merge(userActions$, systemEvents$, apiEvents$).pipe(

340

scan((state, event) => {

341

switch (event.type) {

342

case 'USER_ACTION': return { ...state, lastUserAction: Date.now() };

343

case 'SYSTEM_EVENT': return { ...state, systemState: 'closing' };

344

case 'API_ERROR': return { ...state, errors: [...state.errors, event.error] };

345

default: return state;

346

}

347

}, { lastUserAction: null, systemState: 'active', errors: [] }),

348

startWith({ lastUserAction: null, systemState: 'active', errors: [] })

349

);

350

```

351

352

## Types

353

354

```typescript { .api }

355

type ObservableInput<T> = Observable<T> | Promise<T> | Iterable<T>;

356

type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;

357

```