0
# Observable Creation
1
2
Static functions for creating observables from various sources including events, promises, iterables, and custom logic.
3
4
## Capabilities
5
6
### from
7
8
Convert various input types to observables.
9
10
```typescript { .api }
11
/**
12
* Convert various input types to observables
13
* @param input - Array, promise, iterable, or observable-like object
14
* @returns Observable emitting values from the input
15
*/
16
function from<T>(input: ObservableInput<T>): Observable<T>;
17
function from<T>(input: ObservableInput<T>, scheduler: SchedulerLike): Observable<T>;
18
```
19
20
**Usage Examples:**
21
22
```typescript
23
import { from } from "rxjs";
24
25
// From array
26
from([1, 2, 3, 4]).subscribe(x => console.log(x));
27
28
// From promise
29
from(fetch('/api/data')).subscribe(response => console.log(response));
30
31
// From async iterable
32
async function* asyncGenerator() {
33
yield 1;
34
yield 2;
35
yield 3;
36
}
37
from(asyncGenerator()).subscribe(x => console.log(x));
38
39
// From string (iterable)
40
from('hello').subscribe(char => console.log(char)); // h, e, l, l, o
41
```
42
43
### of
44
45
Create observable that emits provided values in sequence.
46
47
```typescript { .api }
48
/**
49
* Create observable that emits provided arguments in sequence
50
* @param args - Values to emit
51
* @returns Observable emitting the provided values
52
*/
53
function of<T>(): Observable<never>;
54
function of<T>(value: T): Observable<T>;
55
function of<T>(...args: T[]): Observable<T>;
56
function of<T>(...args: (T | SchedulerLike)[]): Observable<T>;
57
```
58
59
**Usage Examples:**
60
61
```typescript
62
import { of } from "rxjs";
63
64
// Emit multiple values
65
of(1, 2, 3, 4).subscribe(x => console.log(x));
66
67
// Emit single value
68
of('hello').subscribe(x => console.log(x));
69
70
// Emit complex objects
71
of(
72
{ id: 1, name: 'Alice' },
73
{ id: 2, name: 'Bob' }
74
).subscribe(user => console.log(user));
75
```
76
77
### fromEvent
78
79
Create observable from DOM events or Node.js EventEmitter.
80
81
```typescript { .api }
82
/**
83
* Create observable from DOM events or EventEmitter
84
* @param target - Event target (DOM element, EventEmitter, etc.)
85
* @param eventName - Name of the event to listen for
86
* @param options - Event listener options
87
* @returns Observable emitting event objects
88
*/
89
function fromEvent<T>(
90
target: any,
91
eventName: string,
92
options?: EventListenerOptions | ((...args: any[]) => T)
93
): Observable<T>;
94
```
95
96
**Usage Examples:**
97
98
```typescript
99
import { fromEvent } from "rxjs";
100
import { map, throttleTime } from "rxjs/operators";
101
102
// DOM events
103
const button = document.getElementById('myButton');
104
const clicks$ = fromEvent(button, 'click');
105
clicks$.subscribe(event => console.log('Button clicked!', event));
106
107
// Window resize events with throttling
108
const resize$ = fromEvent(window, 'resize').pipe(
109
throttleTime(200),
110
map(() => ({ width: window.innerWidth, height: window.innerHeight }))
111
);
112
resize$.subscribe(size => console.log('Window resized:', size));
113
114
// Node.js EventEmitter
115
const EventEmitter = require('events');
116
const emitter = new EventEmitter();
117
const myEvents$ = fromEvent(emitter, 'data');
118
myEvents$.subscribe(data => console.log('Received:', data));
119
```
120
121
### interval
122
123
Create observable that emits sequential numbers at specified intervals.
124
125
```typescript { .api }
126
/**
127
* Create observable that emits sequential numbers at regular intervals
128
* @param period - Interval between emissions in milliseconds
129
* @param scheduler - Scheduler to control timing
130
* @returns Observable emitting incremental numbers
131
*/
132
function interval(period: number, scheduler?: SchedulerLike): Observable<number>;
133
```
134
135
**Usage Examples:**
136
137
```typescript
138
import { interval } from "rxjs";
139
import { take, map } from "rxjs/operators";
140
141
// Emit every second
142
const timer$ = interval(1000);
143
timer$.pipe(take(5)).subscribe(x => console.log('Timer:', x)); // 0, 1, 2, 3, 4
144
145
// Create clock
146
interval(1000).pipe(
147
map(() => new Date().toLocaleTimeString())
148
).subscribe(time => console.log('Current time:', time));
149
```
150
151
### timer
152
153
Create observable that emits after a delay, optionally repeating at intervals.
154
155
```typescript { .api }
156
/**
157
* Create observable that emits after initial delay, optionally repeating
158
* @param dueTime - Initial delay in milliseconds or specific Date
159
* @param period - Repeat interval in milliseconds (optional)
160
* @param scheduler - Scheduler to control timing
161
* @returns Observable emitting numbers starting from 0
162
*/
163
function timer(dueTime: number | Date, period?: number, scheduler?: SchedulerLike): Observable<number>;
164
function timer(dueTime: number | Date, scheduler?: SchedulerLike): Observable<0>;
165
```
166
167
**Usage Examples:**
168
169
```typescript
170
import { timer } from "rxjs";
171
172
// Single emission after 3 seconds
173
timer(3000).subscribe(() => console.log('3 seconds have passed'));
174
175
// Start after 2 seconds, then emit every 1 second
176
timer(2000, 1000).pipe(take(5)).subscribe(x => console.log('Timer value:', x));
177
178
// Timer at specific time
179
const tomorrow = new Date();
180
tomorrow.setDate(tomorrow.getDate() + 1);
181
timer(tomorrow).subscribe(() => console.log('Tomorrow has arrived!'));
182
```
183
184
### combineLatest
185
186
Combine latest values from multiple observables into arrays.
187
188
```typescript { .api }
189
/**
190
* Combine latest values from multiple observables
191
* @param sources - Array of observables or individual observables
192
* @returns Observable emitting arrays of latest values
193
*/
194
function combineLatest<T>(sources: ObservableInput<T>[]): Observable<T[]>;
195
function combineLatest<T, R>(sources: ObservableInput<T>[], project: (...values: T[]) => R): Observable<R>;
196
function combineLatest<T1, T2>(v1: ObservableInput<T1>, v2: ObservableInput<T2>): Observable<[T1, T2]>;
197
function combineLatest<T1, T2, T3>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<[T1, T2, T3]>;
198
function combineLatest<T1, T2, T3, T4>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<[T1, T2, T3, T4]>;
199
function combineLatest<T1, T2, T3, T4, T5>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<[T1, T2, T3, T4, T5]>;
200
function combineLatest<T1, T2, T3, T4, T5, T6>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<[T1, T2, T3, T4, T5, T6]>;
201
```
202
203
**Usage Examples:**
204
205
```typescript
206
import { combineLatest, timer, of } from "rxjs";
207
import { map } from "rxjs/operators";
208
209
// Combine user input and server data
210
const username$ = of('Alice');
211
const permissions$ = of(['read', 'write']);
212
const isOnline$ = timer(0, 5000).pipe(map(x => x % 2 === 0));
213
214
combineLatest([username$, permissions$, isOnline$]).subscribe(
215
([username, permissions, isOnline]) => {
216
console.log(`User: ${username}, Permissions: ${permissions}, Online: ${isOnline}`);
217
}
218
);
219
```
220
221
### merge
222
223
Merge multiple observables into a single stream.
224
225
```typescript { .api }
226
/**
227
* Merge multiple observables into one
228
* @param sources - Observables to merge
229
* @param concurrent - Maximum concurrent subscriptions
230
* @param scheduler - Scheduler for managing subscriptions
231
* @returns Observable emitting values from all sources
232
*/
233
function merge<T>(...sources: ObservableInput<T>[]): Observable<T>;
234
function merge<T>(...sources: (ObservableInput<T> | SchedulerLike | number)[]): Observable<T>;
235
function merge<T>(sources: ObservableInput<T>[], concurrent?: number, scheduler?: SchedulerLike): Observable<T>;
236
```
237
238
**Usage Examples:**
239
240
```typescript
241
import { merge, interval, fromEvent } from "rxjs";
242
import { map } from "rxjs/operators";
243
244
// Merge timer and user clicks
245
const timer$ = interval(1000).pipe(map(() => 'timer'));
246
const clicks$ = fromEvent(document, 'click').pipe(map(() => 'click'));
247
248
merge(timer$, clicks$).subscribe(source => {
249
console.log('Event from:', source);
250
});
251
252
// Merge with concurrency limit
253
const urls = ['url1', 'url2', 'url3'];
254
const requests$ = urls.map(url => from(fetch(url)));
255
merge(...requests$, 2).subscribe(response => {
256
console.log('Response received:', response);
257
});
258
```
259
260
### forkJoin
261
262
Wait for all observables to complete, then emit final values as array.
263
264
```typescript { .api }
265
/**
266
* Wait for all observables to complete and emit their final values as array
267
* @param sources - Array of observables or object with observable properties
268
* @returns Observable emitting array or object of final values
269
*/
270
function forkJoin<T>(sources: ObservableInput<T>[]): Observable<T[]>;
271
function forkJoin<T extends Record<string, ObservableInput<any>>>(sources: T): Observable<{ [K in keyof T]: ObservedValueOf<T[K]> }>;
272
function forkJoin<T1, T2>(v1: ObservableInput<T1>, v2: ObservableInput<T2>): Observable<[T1, T2]>;
273
function forkJoin<T1, T2, T3>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<[T1, T2, T3]>;
274
function forkJoin<T1, T2, T3, T4>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<[T1, T2, T3, T4]>;
275
```
276
277
**Usage Examples:**
278
279
```typescript
280
import { forkJoin, timer, of } from "rxjs";
281
import { delay } from "rxjs/operators";
282
283
// Wait for multiple async operations
284
const user$ = of({ id: 1, name: 'Alice' }).pipe(delay(1000));
285
const posts$ = of([{ id: 1, title: 'Post 1' }]).pipe(delay(2000));
286
const comments$ = of([{ id: 1, text: 'Comment 1' }]).pipe(delay(1500));
287
288
forkJoin([user$, posts$, comments$]).subscribe(
289
([user, posts, comments]) => {
290
console.log('All data loaded:', { user, posts, comments });
291
}
292
);
293
294
// Object syntax
295
forkJoin({
296
user: user$,
297
posts: posts$,
298
comments: comments$
299
}).subscribe(data => {
300
console.log('User:', data.user);
301
console.log('Posts:', data.posts);
302
console.log('Comments:', data.comments);
303
});
304
```
305
306
### Other Creation Functions
307
308
```typescript { .api }
309
/**
310
* Create observable that emits no values and immediately completes
311
*/
312
function empty(scheduler?: SchedulerLike): Observable<never>;
313
314
/**
315
* Create observable that never emits any values
316
*/
317
function never(): Observable<never>;
318
319
/**
320
* Create observable that immediately emits an error
321
*/
322
function throwError(errorOrErrorFactory: any | (() => any), scheduler?: SchedulerLike): Observable<never>;
323
324
/**
325
* Create observable from event pattern (addHandler/removeHandler)
326
*/
327
function fromEventPattern<T>(
328
addHandler: (handler: NodeEventHandler) => any,
329
removeHandler?: (handler: NodeEventHandler, signal?: any) => void,
330
resultSelector?: (...args: any[]) => T
331
): Observable<T>;
332
333
/**
334
* Defer observable creation until subscription
335
*/
336
function defer<R extends ObservableInput<any>>(observableFactory: () => R): Observable<ObservedValueOf<R>>;
337
338
/**
339
* Generate sequence of values based on state and condition
340
*/
341
function generate<T, S>(
342
initialState: S,
343
condition: (state: S) => boolean,
344
iterate: (state: S) => S,
345
resultSelector?: (state: S) => T,
346
scheduler?: SchedulerLike
347
): Observable<T>;
348
349
/**
350
* Create range of sequential numbers
351
*/
352
function range(start: number, count?: number, scheduler?: SchedulerLike): Observable<number>;
353
354
/**
355
* Choose between observables based on condition
356
*/
357
function iif<T, F>(
358
condition: () => boolean,
359
trueResult?: ObservableInput<T>,
360
falseResult?: ObservableInput<F>
361
): Observable<T | F>;
362
363
/**
364
* Convert callback-based function to observable-returning function
365
*/
366
function bindCallback<T>(
367
callbackFunc: (...args: any[]) => void,
368
resultSelector?: (...args: any[]) => T,
369
scheduler?: SchedulerLike
370
): (...args: any[]) => Observable<T>;
371
372
/**
373
* Convert Node.js callback-based function to observable-returning function
374
*/
375
function bindNodeCallback<T>(
376
callbackFunc: (...args: any[]) => void,
377
resultSelector?: (...args: any[]) => T,
378
scheduler?: SchedulerLike
379
): (...args: any[]) => Observable<T>;
380
381
/**
382
* Create connectable observable that can be shared among subscribers
383
*/
384
function connectable<T>(source: ObservableInput<T>, config?: ConnectableConfig<T>): ConnectableObservable<T>;
385
386
/**
387
* Continue with next observable on error (concatenation with error recovery)
388
*/
389
function onErrorResumeNext<T, R>(...sources: ObservableInput<any>[]): Observable<T | R>;
390
391
/**
392
* Create key-value pair emissions from object properties
393
*/
394
function pairs<T>(obj: Record<string, T>, scheduler?: SchedulerLike): Observable<[string, T]>;
395
396
/**
397
* Split source observable into two based on predicate
398
*/
399
function partition<T>(
400
source: ObservableInput<T>,
401
predicate: (value: T, index: number) => boolean,
402
thisArg?: any
403
): [Observable<T>, Observable<T>];
404
405
/**
406
* Race multiple observables - emit from first to emit
407
*/
408
function race<T>(...sources: ObservableInput<T>[]): Observable<T>;
409
410
/**
411
* Create scheduled observable with custom scheduler
412
*/
413
function scheduled<T>(input: ObservableInput<T>, scheduler: SchedulerLike): Observable<T>;
414
415
/**
416
* Manage resource lifecycle with automatic cleanup
417
*/
418
function using<T, R>(
419
resourceFactory: () => R,
420
observableFactory: (resource: R) => ObservableInput<T>
421
): Observable<T>;
422
423
/**
424
* Combine corresponding values from multiple observables into tuples
425
*/
426
function zip<T, R>(...sources: ObservableInput<any>[]): Observable<R[]>;
427
428
/**
429
* Concatenate observables in sequence
430
*/
431
function concat<T>(...sources: ObservableInput<T>[]): Observable<T>;
432
433
/**
434
* Create observable from animation frames
435
*/
436
function animationFrames(timestampProvider?: TimestampProvider): Observable<{ timestamp: number; elapsed: number }>;
437
```
438
439
## Constants
440
441
```typescript { .api }
442
/**
443
* Empty observable constant
444
*/
445
const EMPTY: Observable<never>;
446
447
/**
448
* Never-emitting observable constant
449
*/
450
const NEVER: Observable<never>;
451
```
452
453
## Types
454
455
```typescript { .api }
456
type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;
457
458
type ObservedValueOf<O> = O extends ObservableInput<infer T> ? T : never;
459
460
interface EventListenerOptions {
461
capture?: boolean;
462
once?: boolean;
463
passive?: boolean;
464
}
465
466
type NodeEventHandler = (...args: any[]) => void;
467
```