CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-rxjs

Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences

Pending
Overview
Eval results
Files

observable-creation.mddocs/

Observable Creation

Static functions for creating observables from various sources including events, promises, iterables, and custom logic.

Capabilities

from

Convert various input types to observables.

/**
 * Convert various input types to observables
 * @param input - Array, promise, iterable, or observable-like object
 * @returns Observable emitting values from the input
 */
function from<T>(input: ObservableInput<T>): Observable<T>;
function from<T>(input: ObservableInput<T>, scheduler: SchedulerLike): Observable<T>;

Usage Examples:

import { from } from "rxjs";

// From array
from([1, 2, 3, 4]).subscribe(x => console.log(x));

// From promise
from(fetch('/api/data')).subscribe(response => console.log(response));

// From async iterable
async function* asyncGenerator() {
  yield 1;
  yield 2;
  yield 3;
}
from(asyncGenerator()).subscribe(x => console.log(x));

// From string (iterable)
from('hello').subscribe(char => console.log(char)); // h, e, l, l, o

of

Create observable that emits provided values in sequence.

/**
 * Create observable that emits provided arguments in sequence
 * @param args - Values to emit
 * @returns Observable emitting the provided values
 */
function of<T>(): Observable<never>;
function of<T>(value: T): Observable<T>;
function of<T>(...args: T[]): Observable<T>;
function of<T>(...args: (T | SchedulerLike)[]): Observable<T>;

Usage Examples:

import { of } from "rxjs";

// Emit multiple values
of(1, 2, 3, 4).subscribe(x => console.log(x));

// Emit single value
of('hello').subscribe(x => console.log(x));

// Emit complex objects
of(
  { id: 1, name: 'Alice' },
  { id: 2, name: 'Bob' }
).subscribe(user => console.log(user));

fromEvent

Create observable from DOM events or Node.js EventEmitter.

/**
 * Create observable from DOM events or EventEmitter
 * @param target - Event target (DOM element, EventEmitter, etc.)
 * @param eventName - Name of the event to listen for
 * @param options - Event listener options
 * @returns Observable emitting event objects
 */
function fromEvent<T>(
  target: any, 
  eventName: string, 
  options?: EventListenerOptions | ((...args: any[]) => T)
): Observable<T>;

Usage Examples:

import { fromEvent } from "rxjs";
import { map, throttleTime } from "rxjs/operators";

// DOM events
const button = document.getElementById('myButton');
const clicks$ = fromEvent(button, 'click');
clicks$.subscribe(event => console.log('Button clicked!', event));

// Window resize events with throttling
const resize$ = fromEvent(window, 'resize').pipe(
  throttleTime(200),
  map(() => ({ width: window.innerWidth, height: window.innerHeight }))
);
resize$.subscribe(size => console.log('Window resized:', size));

// Node.js EventEmitter
const EventEmitter = require('events');
const emitter = new EventEmitter();
const myEvents$ = fromEvent(emitter, 'data');
myEvents$.subscribe(data => console.log('Received:', data));

interval

Create observable that emits sequential numbers at specified intervals.

/**
 * Create observable that emits sequential numbers at regular intervals
 * @param period - Interval between emissions in milliseconds
 * @param scheduler - Scheduler to control timing
 * @returns Observable emitting incremental numbers
 */
function interval(period: number, scheduler?: SchedulerLike): Observable<number>;

Usage Examples:

import { interval } from "rxjs";
import { take, map } from "rxjs/operators";

// Emit every second
const timer$ = interval(1000);
timer$.pipe(take(5)).subscribe(x => console.log('Timer:', x)); // 0, 1, 2, 3, 4

// Create clock
interval(1000).pipe(
  map(() => new Date().toLocaleTimeString())
).subscribe(time => console.log('Current time:', time));

timer

Create observable that emits after a delay, optionally repeating at intervals.

/**
 * Create observable that emits after initial delay, optionally repeating
 * @param dueTime - Initial delay in milliseconds or specific Date
 * @param period - Repeat interval in milliseconds (optional)
 * @param scheduler - Scheduler to control timing
 * @returns Observable emitting numbers starting from 0
 */
function timer(dueTime: number | Date, period?: number, scheduler?: SchedulerLike): Observable<number>;
function timer(dueTime: number | Date, scheduler?: SchedulerLike): Observable<0>;

Usage Examples:

import { timer } from "rxjs";

// Single emission after 3 seconds
timer(3000).subscribe(() => console.log('3 seconds have passed'));

// Start after 2 seconds, then emit every 1 second
timer(2000, 1000).pipe(take(5)).subscribe(x => console.log('Timer value:', x));

// Timer at specific time
const tomorrow = new Date();
tomorrow.setDate(tomorrow.getDate() + 1);
timer(tomorrow).subscribe(() => console.log('Tomorrow has arrived!'));

combineLatest

Combine latest values from multiple observables into arrays.

/**
 * Combine latest values from multiple observables
 * @param sources - Array of observables or individual observables
 * @returns Observable emitting arrays of latest values
 */
function combineLatest<T>(sources: ObservableInput<T>[]): Observable<T[]>;
function combineLatest<T, R>(sources: ObservableInput<T>[], project: (...values: T[]) => R): Observable<R>;
function combineLatest<T1, T2>(v1: ObservableInput<T1>, v2: ObservableInput<T2>): Observable<[T1, T2]>;
function combineLatest<T1, T2, T3>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<[T1, T2, T3]>;
function combineLatest<T1, T2, T3, T4>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<[T1, T2, T3, T4]>;
function combineLatest<T1, T2, T3, T4, T5>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>): Observable<[T1, T2, T3, T4, T5]>;
function combineLatest<T1, T2, T3, T4, T5, T6>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>, v5: ObservableInput<T5>, v6: ObservableInput<T6>): Observable<[T1, T2, T3, T4, T5, T6]>;

Usage Examples:

import { combineLatest, timer, of } from "rxjs";
import { map } from "rxjs/operators";

// Combine user input and server data
const username$ = of('Alice');
const permissions$ = of(['read', 'write']);
const isOnline$ = timer(0, 5000).pipe(map(x => x % 2 === 0));

combineLatest([username$, permissions$, isOnline$]).subscribe(
  ([username, permissions, isOnline]) => {
    console.log(`User: ${username}, Permissions: ${permissions}, Online: ${isOnline}`);
  }
);

merge

Merge multiple observables into a single stream.

/**
 * Merge multiple observables into one
 * @param sources - Observables to merge
 * @param concurrent - Maximum concurrent subscriptions
 * @param scheduler - Scheduler for managing subscriptions
 * @returns Observable emitting values from all sources
 */
function merge<T>(...sources: ObservableInput<T>[]): Observable<T>;
function merge<T>(...sources: (ObservableInput<T> | SchedulerLike | number)[]): Observable<T>;
function merge<T>(sources: ObservableInput<T>[], concurrent?: number, scheduler?: SchedulerLike): Observable<T>;

Usage Examples:

import { merge, interval, fromEvent } from "rxjs";
import { map } from "rxjs/operators";

// Merge timer and user clicks
const timer$ = interval(1000).pipe(map(() => 'timer'));
const clicks$ = fromEvent(document, 'click').pipe(map(() => 'click'));

merge(timer$, clicks$).subscribe(source => {
  console.log('Event from:', source);
});

// Merge with concurrency limit
const urls = ['url1', 'url2', 'url3'];
const requests$ = urls.map(url => from(fetch(url)));
merge(...requests$, 2).subscribe(response => {
  console.log('Response received:', response);
});

forkJoin

Wait for all observables to complete, then emit final values as array.

/**
 * Wait for all observables to complete and emit their final values as array
 * @param sources - Array of observables or object with observable properties
 * @returns Observable emitting array or object of final values
 */
function forkJoin<T>(sources: ObservableInput<T>[]): Observable<T[]>;
function forkJoin<T extends Record<string, ObservableInput<any>>>(sources: T): Observable<{ [K in keyof T]: ObservedValueOf<T[K]> }>;
function forkJoin<T1, T2>(v1: ObservableInput<T1>, v2: ObservableInput<T2>): Observable<[T1, T2]>;
function forkJoin<T1, T2, T3>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>): Observable<[T1, T2, T3]>;
function forkJoin<T1, T2, T3, T4>(v1: ObservableInput<T1>, v2: ObservableInput<T2>, v3: ObservableInput<T3>, v4: ObservableInput<T4>): Observable<[T1, T2, T3, T4]>;

Usage Examples:

import { forkJoin, timer, of } from "rxjs";
import { delay } from "rxjs/operators";

// Wait for multiple async operations
const user$ = of({ id: 1, name: 'Alice' }).pipe(delay(1000));
const posts$ = of([{ id: 1, title: 'Post 1' }]).pipe(delay(2000));
const comments$ = of([{ id: 1, text: 'Comment 1' }]).pipe(delay(1500));

forkJoin([user$, posts$, comments$]).subscribe(
  ([user, posts, comments]) => {
    console.log('All data loaded:', { user, posts, comments });
  }
);

// Object syntax
forkJoin({
  user: user$,
  posts: posts$,
  comments: comments$
}).subscribe(data => {
  console.log('User:', data.user);
  console.log('Posts:', data.posts);
  console.log('Comments:', data.comments);
});

Other Creation Functions

/**
 * Create observable that emits no values and immediately completes
 */
function empty(scheduler?: SchedulerLike): Observable<never>;

/**
 * Create observable that never emits any values
 */
function never(): Observable<never>;

/**
 * Create observable that immediately emits an error
 */
function throwError(errorOrErrorFactory: any | (() => any), scheduler?: SchedulerLike): Observable<never>;

/**
 * Create observable from event pattern (addHandler/removeHandler)
 */
function fromEventPattern<T>(
  addHandler: (handler: NodeEventHandler) => any,
  removeHandler?: (handler: NodeEventHandler, signal?: any) => void,
  resultSelector?: (...args: any[]) => T
): Observable<T>;

/**
 * Defer observable creation until subscription
 */
function defer<R extends ObservableInput<any>>(observableFactory: () => R): Observable<ObservedValueOf<R>>;

/**
 * Generate sequence of values based on state and condition
 */
function generate<T, S>(
  initialState: S,
  condition: (state: S) => boolean,
  iterate: (state: S) => S,
  resultSelector?: (state: S) => T,
  scheduler?: SchedulerLike
): Observable<T>;

/**
 * Create range of sequential numbers
 */
function range(start: number, count?: number, scheduler?: SchedulerLike): Observable<number>;

/**
 * Choose between observables based on condition
 */
function iif<T, F>(
  condition: () => boolean,
  trueResult?: ObservableInput<T>,
  falseResult?: ObservableInput<F>
): Observable<T | F>;

/**
 * Convert callback-based function to observable-returning function
 */
function bindCallback<T>(
  callbackFunc: (...args: any[]) => void,
  resultSelector?: (...args: any[]) => T,
  scheduler?: SchedulerLike
): (...args: any[]) => Observable<T>;

/**
 * Convert Node.js callback-based function to observable-returning function  
 */
function bindNodeCallback<T>(
  callbackFunc: (...args: any[]) => void,
  resultSelector?: (...args: any[]) => T,
  scheduler?: SchedulerLike
): (...args: any[]) => Observable<T>;

/**
 * Create connectable observable that can be shared among subscribers
 */
function connectable<T>(source: ObservableInput<T>, config?: ConnectableConfig<T>): ConnectableObservable<T>;

/**
 * Continue with next observable on error (concatenation with error recovery)
 */
function onErrorResumeNext<T, R>(...sources: ObservableInput<any>[]): Observable<T | R>;

/**
 * Create key-value pair emissions from object properties
 */
function pairs<T>(obj: Record<string, T>, scheduler?: SchedulerLike): Observable<[string, T]>;

/**
 * Split source observable into two based on predicate
 */
function partition<T>(
  source: ObservableInput<T>,
  predicate: (value: T, index: number) => boolean,
  thisArg?: any
): [Observable<T>, Observable<T>];

/**
 * Race multiple observables - emit from first to emit
 */
function race<T>(...sources: ObservableInput<T>[]): Observable<T>;

/**
 * Create scheduled observable with custom scheduler
 */
function scheduled<T>(input: ObservableInput<T>, scheduler: SchedulerLike): Observable<T>;

/**
 * Manage resource lifecycle with automatic cleanup
 */
function using<T, R>(
  resourceFactory: () => R,
  observableFactory: (resource: R) => ObservableInput<T>
): Observable<T>;

/**
 * Combine corresponding values from multiple observables into tuples
 */
function zip<T, R>(...sources: ObservableInput<any>[]): Observable<R[]>;

/**
 * Concatenate observables in sequence
 */
function concat<T>(...sources: ObservableInput<T>[]): Observable<T>;

/**
 * Create observable from animation frames
 */
function animationFrames(timestampProvider?: TimestampProvider): Observable<{ timestamp: number; elapsed: number }>;

Constants

/**
 * Empty observable constant
 */
const EMPTY: Observable<never>;

/**
 * Never-emitting observable constant  
 */
const NEVER: Observable<never>;

Types

type ObservableInput<T> = Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T>;

type ObservedValueOf<O> = O extends ObservableInput<infer T> ? T : never;

interface EventListenerOptions {
  capture?: boolean;
  once?: boolean;
  passive?: boolean;
}

type NodeEventHandler = (...args: any[]) => void;

Install with Tessl CLI

npx tessl i tessl/npm-rxjs

docs

ajax-operations.md

combination-operators.md

core-types.md

error-handling.md

fetch-operations.md

filtering-operators.md

index.md

observable-creation.md

schedulers.md

subjects.md

testing-utilities.md

transformation-operators.md

websocket-operations.md

tile.json