CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-rxjs

Reactive Extensions for modern JavaScript providing comprehensive reactive programming with observable sequences

Pending
Overview
Eval results
Files

testing-utilities.mddocs/

Testing Utilities

Comprehensive testing framework with marble testing and virtual time scheduling for testing reactive streams and time-dependent operations.

Capabilities

TestScheduler

Virtual time scheduler for testing time-dependent operations with marble diagrams.

/**
 * Test scheduler for marble testing with virtual time control
 */
class TestScheduler extends VirtualTimeScheduler {
  /**
   * Create TestScheduler with assertion function
   * @param assertDeepEqual - Function to compare actual vs expected results
   */
  constructor(assertDeepEqual: (actual: any, expected: any) => boolean | void);
  
  /**
   * Run test with marble testing helpers
   * @param callback - Test function receiving run helpers
   * @returns Result from callback
   */
  run<T>(callback: (helpers: RunHelpers) => T): T;
  
  /**
   * Create hot observable from marble diagram
   * @param marbles - Marble diagram string
   * @param values - Values object mapping marble characters to values
   * @param error - Error value for error emissions
   * @returns Hot observable for testing
   */
  createHotObservable<T>(marbles: string, values?: any, error?: any): HotObservable<T>;
  
  /**
   * Create cold observable from marble diagram
   * @param marbles - Marble diagram string  
   * @param values - Values object mapping marble characters to values
   * @param error - Error value for error emissions
   * @returns Cold observable for testing
   */
  createColdObservable<T>(marbles: string, values?: any, error?: any): ColdObservable<T>;
  
  /**
   * Create expectation for observable
   * @param observable - Observable to test
   * @param subscriptionMarbles - Optional subscription timing
   * @returns Expectation object for assertions
   */
  expectObservable<T>(observable: Observable<T>, subscriptionMarbles?: string): Expectation<T>;
  
  /**
   * Create expectation for subscriptions
   * @param subscriptions - Subscription logs to test
   * @returns Subscription expectation
   */
  expectSubscriptions(subscriptions: SubscriptionLog[]): SubscriptionExpectation;
  
  /**
   * Flush all scheduled work immediately
   */
  flush(): void;
  
  /**
   * Get current virtual time frame
   */
  frame: number;
  
  /**
   * Maximum number of frames to process
   */
  maxFrames: number;
}

RunHelpers Interface

Helper functions provided by TestScheduler.run().

/**
 * Helper functions for marble testing
 */
interface RunHelpers {
  /**
   * Create cold observable from marble diagram
   * @param marbles - Marble diagram string
   * @param values - Values mapping
   * @param error - Error value
   */
  cold: <T = string>(marbles: string, values?: any, error?: any) => ColdObservable<T>;
  
  /**
   * Create hot observable from marble diagram
   * @param marbles - Marble diagram string
   * @param values - Values mapping
   * @param error - Error value
   */
  hot: <T = string>(marbles: string, values?: any, error?: any) => HotObservable<T>;
  
  /**
   * Create expectation for observable behavior
   * @param observable - Observable to test
   * @param subscriptionMarbles - Subscription timing
   */
  expectObservable: <T = any>(observable: Observable<T>, subscriptionMarbles?: string) => Expectation<T>;
  
  /**
   * Create expectation for subscription behavior
   * @param subscriptions - Subscription logs
   */
  expectSubscriptions: (subscriptions: SubscriptionLog[]) => SubscriptionExpectation;
  
  /**
   * Flush scheduled work
   */
  flush: () => void;
  
  /**
   * Get virtual time in frames
   */
  time: (marbles: string) => number;
}

Marble Testing Syntax

Marble Diagram Characters

// Time progression and values
'-'     // Time passing (1 frame)
'a'     // Emitted value (can be any character)
'|'     // Completion
'#'     // Error
'^'     // Subscription point  
'!'     // Unsubscription point
'('     // Start group (values emitted synchronously)
')'     // End group

// Examples:
'--a--b--c--|'     // Values a, b, c emitted over time, then completion
'--a--b--c--#'     // Values a, b, c emitted, then error
'--a--(bc)--d--|'  // Value a, then b and c synchronously, then d, then completion
'^--!--'           // Subscribe immediately, unsubscribe after 3 frames

Basic Testing Examples

Simple Value Testing

import { TestScheduler } from "rxjs/testing";
import { map, delay } from "rxjs/operators";

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ cold, hot, expectObservable }) => {
  // Test map operator
  const source$ = cold('--a--b--c--|', { a: 1, b: 2, c: 3 });
  const expected = '    --x--y--z--|';
  const values = { x: 2, y: 4, z: 6 };
  
  const result$ = source$.pipe(map(x => x * 2));
  
  expectObservable(result$).toBe(expected, values);
});

Time-based Operator Testing

import { TestScheduler } from "rxjs/testing";
import { delay, debounceTime, throttleTime } from "rxjs/operators";

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ cold, expectObservable }) => {
  // Test delay operator
  const source$ = cold('--a--b--c--|');
  const expected = '    ----a--b--c--|';
  
  const result$ = source$.pipe(delay(20)); // 2 frames delay
  
  expectObservable(result$).toBe(expected);
  
  // Test debounceTime
  const rapidSource$ = cold('--a-b-c----d--|');
  const debouncedExpected = '----------c----d--|';
  
  const debounced$ = rapidSource$.pipe(debounceTime(30)); // 3 frames
  
  expectObservable(debounced$).toBe(debouncedExpected);
});

Error Testing

import { TestScheduler } from "rxjs/testing";
import { map, catchError } from "rxjs/operators";
import { of } from "rxjs";

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ cold, expectObservable }) => {
  // Test error handling
  const source$ = cold('--a--b--#', { a: 1, b: 2 }, new Error('test error'));
  const expected = '    --x--y--(z|)';
  const values = { x: 2, y: 4, z: 'error handled' };
  
  const result$ = source$.pipe(
    map(x => x * 2),
    catchError(err => of('error handled'))
  );
  
  expectObservable(result$).toBe(expected, values);
});

Subscription Testing

import { TestScheduler } from "rxjs/testing";
import { take } from "rxjs/operators";

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ hot, expectObservable, expectSubscriptions }) => {
  const source$ = hot('--a--b--c--d--e--|');
  const sourceSubs = '^----!           '; // Subscribe at frame 0, unsubscribe at frame 5
  const expected = '   --a--b           ';
  
  const result$ = source$.pipe(take(2));
  
  expectObservable(result$).toBe(expected);
  expectSubscriptions(source$.subscriptions).toBe(sourceSubs);
});

Advanced Testing Patterns

Testing Custom Operators

import { TestScheduler } from "rxjs/testing";
import { OperatorFunction, Observable } from "rxjs";

// Custom operator to test
function skipEveryOther<T>(): OperatorFunction<T, T> {
  return (source: Observable<T>) => new Observable(subscriber => {
    let index = 0;
    return source.subscribe({
      next(value) {
        if (index % 2 === 0) {
          subscriber.next(value);
        }
        index++;
      },
      error(err) { subscriber.error(err); },
      complete() { subscriber.complete(); }
    });
  });
}

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ cold, expectObservable }) => {
  const source$ = cold('--a--b--c--d--e--|');
  const expected = '    --a-----c-----e--|';
  
  const result$ = source$.pipe(skipEveryOther());
  
  expectObservable(result$).toBe(expected);
});

Testing Async Operations

import { TestScheduler } from "rxjs/testing";
import { switchMap, catchError } from "rxjs/operators";
import { of, throwError } from "rxjs";

// Mock HTTP service
const mockHttpService = {
  get: (url: string) => {
    if (url.includes('error')) {
      return throwError('HTTP Error');
    }
    return of({ data: 'success' });
  }
};

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ cold, expectObservable }) => {
  // Test successful request
  const trigger$ = cold('--a--|', { a: '/api/data' });
  const expected = '     --x--|';
  const values = { x: { data: 'success' } };
  
  const result$ = trigger$.pipe(
    switchMap(url => mockHttpService.get(url))
  );
  
  expectObservable(result$).toBe(expected, values);
  
  // Test error handling
  const errorTrigger$ = cold('--a--|', { a: '/api/error' });
  const errorExpected = '    --x--|';
  const errorValues = { x: 'Request failed' };
  
  const errorResult$ = errorTrigger$.pipe(
    switchMap(url => mockHttpService.get(url)),
    catchError(err => of('Request failed'))
  );
  
  expectObservable(errorResult$).toBe(errorExpected, errorValues);
});

Testing State Management

import { TestScheduler } from "rxjs/testing";
import { scan, startWith } from "rxjs/operators";
import { Subject } from "rxjs";

interface AppState {
  count: number;
  items: string[];
}

interface AppAction {
  type: 'increment' | 'add_item';
  payload?: any;
}

const initialState: AppState = { count: 0, items: [] };

function reducer(state: AppState, action: AppAction): AppState {
  switch (action.type) {
    case 'increment':
      return { ...state, count: state.count + 1 };
    case 'add_item':
      return { ...state, items: [...state.items, action.payload] };
    default:
      return state;
  }
}

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ cold, expectObservable }) => {
  const actions$ = cold('--a--b--c--|', {
    a: { type: 'increment' },
    b: { type: 'add_item', payload: 'item1' },
    c: { type: 'increment' }
  });
  
  const state$ = actions$.pipe(
    scan(reducer, initialState),
    startWith(initialState)
  );
  
  const expected = 'i--x--y--z--|';
  const values = {
    i: { count: 0, items: [] },
    x: { count: 1, items: [] },
    y: { count: 1, items: ['item1'] },
    z: { count: 2, items: ['item1'] }
  };
  
  expectObservable(state$).toBe(expected, values);
});

Testing Higher-Order Observables

import { TestScheduler } from "rxjs/testing";
import { mergeMap, switchMap, concatMap } from "rxjs/operators";

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ cold, hot, expectObservable }) => {
  // Test mergeMap flattening strategy
  const source$ = cold('--a----b----c---|');
  const inner$ = cold(  '  x-x-x|         ');
  const expected = '     --x-x-x-x-x-x---|';
  
  const result$ = source$.pipe(
    mergeMap(() => inner$)
  );
  
  expectObservable(result$).toBe(expected);
  
  // Test switchMap cancellation
  const switchSource$ = cold('--a--b--c---|');
  const switchInner$ = cold(  ' x-x-x|    ');
  const switchExpected = '    --x--x--x-x-x|';
  
  const switchResult$ = switchSource$.pipe(
    switchMap(() => switchInner$)
  );
  
  expectObservable(switchResult$).toBe(switchExpected);
});

Test Utilities

Helper Functions

import { TestScheduler } from "rxjs/testing";
import { Observable } from "rxjs";

// Utility for creating reusable test scheduler
function createTestScheduler() {
  return new TestScheduler((actual, expected) => {
    expect(actual).toEqual(expected);
  });
}

// Utility for testing observable emissions
export function testObservable<T>(
  source$: Observable<T>,
  expectedMarbles: string,
  expectedValues?: any,
  expectedError?: any
) {
  const testScheduler = createTestScheduler();
  
  testScheduler.run(({ expectObservable }) => {
    expectObservable(source$).toBe(expectedMarbles, expectedValues, expectedError);
  });
}

// Usage
const numbers$ = of(1, 2, 3);
testObservable(numbers$, '(abc|)', { a: 1, b: 2, c: 3 });

Testing Complex Scenarios

import { TestScheduler } from "rxjs/testing";
import { combineLatest, merge } from "rxjs";
import { startWith, switchMap } from "rxjs/operators";

const testScheduler = new TestScheduler((actual, expected) => {
  expect(actual).toEqual(expected);
});

testScheduler.run(({ cold, hot, expectObservable, time }) => {
  // Test complex combination of operators
  const user$ = hot('   --u----U----u--|', { u: 'user1', U: 'user2' });
  const permissions$ = cold('p--P--|    ', { p: ['read'], P: ['read', 'write'] });
  
  const userWithPermissions$ = combineLatest([user$, permissions$]).pipe(
    map(([user, perms]) => ({ user, permissions: perms }))
  );
  
  const expected = '     --x----y----z--|';
  const values = {
    x: { user: 'user1', permissions: ['read'] },
    y: { user: 'user2', permissions: ['read', 'write'] },
    z: { user: 'user1', permissions: ['read', 'write'] }
  };
  
  expectObservable(userWithPermissions$).toBe(expected, values);
  
  // Test timing-specific behavior
  const delayTime = time('---|'); // 3 frames
  const delayedSource$ = cold('a|').pipe(delay(delayTime));
  const delayedExpected = '   ---a|';
  
  expectObservable(delayedSource$).toBe(delayedExpected);
});

Types

interface HotObservable<T> extends Observable<T> {
  subscriptions: SubscriptionLog[];
}

interface ColdObservable<T> extends Observable<T> {
  subscriptions: SubscriptionLog[];
}

interface SubscriptionLog {
  subscribedFrame: number;
  unsubscribedFrame: number;
}

interface Expectation<T> {
  toBe(marbles: string, values?: any, errorValue?: any): void;
  toEqual(other: Observable<any>): void;
}

interface SubscriptionExpectation {
  toBe(marbles: string | string[]): void;
}

Install with Tessl CLI

npx tessl i tessl/npm-rxjs

docs

ajax-operations.md

combination-operators.md

core-types.md

error-handling.md

fetch-operations.md

filtering-operators.md

index.md

observable-creation.md

schedulers.md

subjects.md

testing-utilities.md

transformation-operators.md

websocket-operations.md

tile.json