Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences
—
Comprehensive testing framework with marble testing and virtual time scheduling for testing reactive streams and time-dependent operations.
Virtual time scheduler for testing time-dependent operations with marble diagrams.
/**
* Test scheduler for marble testing with virtual time control
*/
class TestScheduler extends VirtualTimeScheduler {
/**
* Create TestScheduler with assertion function
* @param assertDeepEqual - Function to compare actual vs expected results
*/
constructor(assertDeepEqual: (actual: any, expected: any) => boolean | void);
/**
* Run test with marble testing helpers
* @param callback - Test function receiving run helpers
* @returns Result from callback
*/
run<T>(callback: (helpers: RunHelpers) => T): T;
/**
* Create hot observable from marble diagram
* @param marbles - Marble diagram string
* @param values - Values object mapping marble characters to values
* @param error - Error value for error emissions
* @returns Hot observable for testing
*/
createHotObservable<T>(marbles: string, values?: any, error?: any): HotObservable<T>;
/**
* Create cold observable from marble diagram
* @param marbles - Marble diagram string
* @param values - Values object mapping marble characters to values
* @param error - Error value for error emissions
* @returns Cold observable for testing
*/
createColdObservable<T>(marbles: string, values?: any, error?: any): ColdObservable<T>;
/**
* Create expectation for observable
* @param observable - Observable to test
* @param subscriptionMarbles - Optional subscription timing
* @returns Expectation object for assertions
*/
expectObservable<T>(observable: Observable<T>, subscriptionMarbles?: string): Expectation<T>;
/**
* Create expectation for subscriptions
* @param subscriptions - Subscription logs to test
* @returns Subscription expectation
*/
expectSubscriptions(subscriptions: SubscriptionLog[]): SubscriptionExpectation;
/**
* Flush all scheduled work immediately
*/
flush(): void;
/**
* Get current virtual time frame
*/
frame: number;
/**
* Maximum number of frames to process
*/
maxFrames: number;
}Helper functions provided by TestScheduler.run().
/**
* Helper functions for marble testing
*/
interface RunHelpers {
/**
* Create cold observable from marble diagram
* @param marbles - Marble diagram string
* @param values - Values mapping
* @param error - Error value
*/
cold: <T = string>(marbles: string, values?: any, error?: any) => ColdObservable<T>;
/**
* Create hot observable from marble diagram
* @param marbles - Marble diagram string
* @param values - Values mapping
* @param error - Error value
*/
hot: <T = string>(marbles: string, values?: any, error?: any) => HotObservable<T>;
/**
* Create expectation for observable behavior
* @param observable - Observable to test
* @param subscriptionMarbles - Subscription timing
*/
expectObservable: <T = any>(observable: Observable<T>, subscriptionMarbles?: string) => Expectation<T>;
/**
* Create expectation for subscription behavior
* @param subscriptions - Subscription logs
*/
expectSubscriptions: (subscriptions: SubscriptionLog[]) => SubscriptionExpectation;
/**
* Flush scheduled work
*/
flush: () => void;
/**
* Get virtual time in frames
*/
time: (marbles: string) => number;
}// Time progression and values
'-' // Time passing (1 frame)
'a' // Emitted value (can be any character)
'|' // Completion
'#' // Error
'^' // Subscription point
'!' // Unsubscription point
'(' // Start group (values emitted synchronously)
')' // End group
// Examples:
'--a--b--c--|' // Values a, b, c emitted over time, then completion
'--a--b--c--#' // Values a, b, c emitted, then error
'--a--(bc)--d--|' // Value a, then b and c synchronously, then d, then completion
'^--!--' // Subscribe immediately, unsubscribe after 3 framesimport { TestScheduler } from "rxjs/testing";
import { map, delay } from "rxjs/operators";
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, hot, expectObservable }) => {
// Test map operator
const source$ = cold('--a--b--c--|', { a: 1, b: 2, c: 3 });
const expected = ' --x--y--z--|';
const values = { x: 2, y: 4, z: 6 };
const result$ = source$.pipe(map(x => x * 2));
expectObservable(result$).toBe(expected, values);
});import { TestScheduler } from "rxjs/testing";
import { delay, debounceTime, throttleTime } from "rxjs/operators";
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, expectObservable }) => {
// Test delay operator
const source$ = cold('--a--b--c--|');
const expected = ' ----a--b--c--|';
const result$ = source$.pipe(delay(20)); // 2 frames delay
expectObservable(result$).toBe(expected);
// Test debounceTime
const rapidSource$ = cold('--a-b-c----d--|');
const debouncedExpected = '----------c----d--|';
const debounced$ = rapidSource$.pipe(debounceTime(30)); // 3 frames
expectObservable(debounced$).toBe(debouncedExpected);
});import { TestScheduler } from "rxjs/testing";
import { map, catchError } from "rxjs/operators";
import { of } from "rxjs";
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, expectObservable }) => {
// Test error handling
const source$ = cold('--a--b--#', { a: 1, b: 2 }, new Error('test error'));
const expected = ' --x--y--(z|)';
const values = { x: 2, y: 4, z: 'error handled' };
const result$ = source$.pipe(
map(x => x * 2),
catchError(err => of('error handled'))
);
expectObservable(result$).toBe(expected, values);
});import { TestScheduler } from "rxjs/testing";
import { take } from "rxjs/operators";
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
const source$ = hot('--a--b--c--d--e--|');
const sourceSubs = '^----! '; // Subscribe at frame 0, unsubscribe at frame 5
const expected = ' --a--b ';
const result$ = source$.pipe(take(2));
expectObservable(result$).toBe(expected);
expectSubscriptions(source$.subscriptions).toBe(sourceSubs);
});import { TestScheduler } from "rxjs/testing";
import { OperatorFunction, Observable } from "rxjs";
// Custom operator to test
function skipEveryOther<T>(): OperatorFunction<T, T> {
return (source: Observable<T>) => new Observable(subscriber => {
let index = 0;
return source.subscribe({
next(value) {
if (index % 2 === 0) {
subscriber.next(value);
}
index++;
},
error(err) { subscriber.error(err); },
complete() { subscriber.complete(); }
});
});
}
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, expectObservable }) => {
const source$ = cold('--a--b--c--d--e--|');
const expected = ' --a-----c-----e--|';
const result$ = source$.pipe(skipEveryOther());
expectObservable(result$).toBe(expected);
});import { TestScheduler } from "rxjs/testing";
import { switchMap, catchError } from "rxjs/operators";
import { of, throwError } from "rxjs";
// Mock HTTP service
const mockHttpService = {
get: (url: string) => {
if (url.includes('error')) {
return throwError('HTTP Error');
}
return of({ data: 'success' });
}
};
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, expectObservable }) => {
// Test successful request
const trigger$ = cold('--a--|', { a: '/api/data' });
const expected = ' --x--|';
const values = { x: { data: 'success' } };
const result$ = trigger$.pipe(
switchMap(url => mockHttpService.get(url))
);
expectObservable(result$).toBe(expected, values);
// Test error handling
const errorTrigger$ = cold('--a--|', { a: '/api/error' });
const errorExpected = ' --x--|';
const errorValues = { x: 'Request failed' };
const errorResult$ = errorTrigger$.pipe(
switchMap(url => mockHttpService.get(url)),
catchError(err => of('Request failed'))
);
expectObservable(errorResult$).toBe(errorExpected, errorValues);
});import { TestScheduler } from "rxjs/testing";
import { scan, startWith } from "rxjs/operators";
import { Subject } from "rxjs";
interface AppState {
count: number;
items: string[];
}
interface AppAction {
type: 'increment' | 'add_item';
payload?: any;
}
const initialState: AppState = { count: 0, items: [] };
function reducer(state: AppState, action: AppAction): AppState {
switch (action.type) {
case 'increment':
return { ...state, count: state.count + 1 };
case 'add_item':
return { ...state, items: [...state.items, action.payload] };
default:
return state;
}
}
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, expectObservable }) => {
const actions$ = cold('--a--b--c--|', {
a: { type: 'increment' },
b: { type: 'add_item', payload: 'item1' },
c: { type: 'increment' }
});
const state$ = actions$.pipe(
scan(reducer, initialState),
startWith(initialState)
);
const expected = 'i--x--y--z--|';
const values = {
i: { count: 0, items: [] },
x: { count: 1, items: [] },
y: { count: 1, items: ['item1'] },
z: { count: 2, items: ['item1'] }
};
expectObservable(state$).toBe(expected, values);
});import { TestScheduler } from "rxjs/testing";
import { mergeMap, switchMap, concatMap } from "rxjs/operators";
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, hot, expectObservable }) => {
// Test mergeMap flattening strategy
const source$ = cold('--a----b----c---|');
const inner$ = cold( ' x-x-x| ');
const expected = ' --x-x-x-x-x-x---|';
const result$ = source$.pipe(
mergeMap(() => inner$)
);
expectObservable(result$).toBe(expected);
// Test switchMap cancellation
const switchSource$ = cold('--a--b--c---|');
const switchInner$ = cold( ' x-x-x| ');
const switchExpected = ' --x--x--x-x-x|';
const switchResult$ = switchSource$.pipe(
switchMap(() => switchInner$)
);
expectObservable(switchResult$).toBe(switchExpected);
});import { TestScheduler } from "rxjs/testing";
import { Observable } from "rxjs";
// Utility for creating reusable test scheduler
function createTestScheduler() {
return new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
}
// Utility for testing observable emissions
export function testObservable<T>(
source$: Observable<T>,
expectedMarbles: string,
expectedValues?: any,
expectedError?: any
) {
const testScheduler = createTestScheduler();
testScheduler.run(({ expectObservable }) => {
expectObservable(source$).toBe(expectedMarbles, expectedValues, expectedError);
});
}
// Usage
const numbers$ = of(1, 2, 3);
testObservable(numbers$, '(abc|)', { a: 1, b: 2, c: 3 });import { TestScheduler } from "rxjs/testing";
import { combineLatest, merge } from "rxjs";
import { startWith, switchMap } from "rxjs/operators";
const testScheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
testScheduler.run(({ cold, hot, expectObservable, time }) => {
// Test complex combination of operators
const user$ = hot(' --u----U----u--|', { u: 'user1', U: 'user2' });
const permissions$ = cold('p--P--| ', { p: ['read'], P: ['read', 'write'] });
const userWithPermissions$ = combineLatest([user$, permissions$]).pipe(
map(([user, perms]) => ({ user, permissions: perms }))
);
const expected = ' --x----y----z--|';
const values = {
x: { user: 'user1', permissions: ['read'] },
y: { user: 'user2', permissions: ['read', 'write'] },
z: { user: 'user1', permissions: ['read', 'write'] }
};
expectObservable(userWithPermissions$).toBe(expected, values);
// Test timing-specific behavior
const delayTime = time('---|'); // 3 frames
const delayedSource$ = cold('a|').pipe(delay(delayTime));
const delayedExpected = ' ---a|';
expectObservable(delayedSource$).toBe(delayedExpected);
});interface HotObservable<T> extends Observable<T> {
subscriptions: SubscriptionLog[];
}
interface ColdObservable<T> extends Observable<T> {
subscriptions: SubscriptionLog[];
}
interface SubscriptionLog {
subscribedFrame: number;
unsubscribedFrame: number;
}
interface Expectation<T> {
toBe(marbles: string, values?: any, errorValue?: any): void;
toEqual(other: Observable<any>): void;
}
interface SubscriptionExpectation {
toBe(marbles: string | string[]): void;
}