Library for composing asynchronous and event-based operations in JavaScript using Observable sequences and fluent query operators
—
Testing utilities for unit testing reactive code with virtual time and marble testing.
Virtual time scheduler for deterministic testing of time-based operations.
/**
* Creates a virtual time scheduler for testing
* @constructor
* @returns {TestScheduler} New test scheduler instance
*/
function TestScheduler();
/**
* Creates a hot observable for testing
* @param {...Object} records - Array of recorded events
* @returns {Observable} Hot observable with specified events
*/
testScheduler.createHotObservable = function(...records);
/**
* Creates a cold observable for testing
* @param {...Object} records - Array of recorded events
* @returns {Observable} Cold observable with specified events
*/
testScheduler.createColdObservable = function(...records);
/**
* Creates a test observer
* @returns {Observer} Test observer that records events
*/
testScheduler.createObserver = function();
/**
* Starts a test with specified timing
* @param {function} create - Function that creates the observable to test
* @param {number} [created] - Time when observable is created (default: 100)
* @param {number} [subscribed] - Time when subscription occurs (default: 200)
* @param {number} [disposed] - Time when subscription is disposed (default: 1000)
* @returns {Object} Test result with messages and subscriptions
*/
testScheduler.start = function(create, created, subscribed, disposed);Static utilities for creating test records and timing.
/**
* Default timing constants for tests
*/
Rx.ReactiveTest.created = 100;
Rx.ReactiveTest.subscribed = 200;
Rx.ReactiveTest.disposed = 1000;
/**
* Creates an OnNext test record
* @param {number} ticks - Time when value is emitted
* @param {*} value - Value to emit
* @returns {Object} OnNext test record
*/
Rx.ReactiveTest.onNext = function(ticks, value);
/**
* Creates an OnError test record
* @param {number} ticks - Time when error occurs
* @param {*} error - Error to emit
* @returns {Object} OnError test record
*/
Rx.ReactiveTest.onError = function(ticks, error);
/**
* Creates an OnCompleted test record
* @param {number} ticks - Time when completion occurs
* @returns {Object} OnCompleted test record
*/
Rx.ReactiveTest.onCompleted = function(ticks);
/**
* Creates a subscription record
* @param {number} subscribe - Time when subscription starts
* @param {number} [unsubscribe] - Time when subscription ends
* @returns {Object} Subscription test record
*/
Rx.ReactiveTest.subscribe = function(subscribe, unsubscribe);// Example test using TestScheduler
var scheduler = new Rx.TestScheduler();
// Create test data
var hotObservable = scheduler.createHotObservable(
Rx.ReactiveTest.onNext(150, 1), // Before subscription
Rx.ReactiveTest.onNext(210, 2), // After subscription
Rx.ReactiveTest.onNext(220, 3),
Rx.ReactiveTest.onCompleted(230)
);
var coldObservable = scheduler.createColdObservable(
Rx.ReactiveTest.onNext(10, 'a'),
Rx.ReactiveTest.onNext(20, 'b'),
Rx.ReactiveTest.onCompleted(30)
);
// Test the observable
var result = scheduler.start(function() {
return hotObservable.map(function(x) { return x * 2; });
});
// Assert results
assert.deepEqual(result.messages, [
Rx.ReactiveTest.onNext(210, 4), // 2 * 2
Rx.ReactiveTest.onNext(220, 6), // 3 * 2
Rx.ReactiveTest.onCompleted(230)
]);Testing time-based operations:
var scheduler = new Rx.TestScheduler();
var result = scheduler.start(function() {
return Rx.Observable.interval(100, scheduler)
.take(3)
.map(function(x) { return x + 1; });
});
assert.deepEqual(result.messages, [
Rx.ReactiveTest.onNext(300, 1), // 200 + 100 interval
Rx.ReactiveTest.onNext(400, 2), // 200 + 200 interval
Rx.ReactiveTest.onNext(500, 3), // 200 + 300 interval
Rx.ReactiveTest.onCompleted(500)
]);Observers that record all events for assertion:
/**
* Test observer that records all events
* @constructor
* @returns {TestObserver} Test observer instance
*/
function TestObserver();
// Properties available on test observer
testObserver.messages; // Array of recorded events
testObserver.subscriptions; // Array of subscription recordsUsage Example:
var scheduler = new Rx.TestScheduler();
var observer = scheduler.createObserver();
var source = scheduler.createHotObservable(
Rx.ReactiveTest.onNext(210, 1),
Rx.ReactiveTest.onNext(220, 2),
Rx.ReactiveTest.onCompleted(230)
);
source.subscribe(observer);
scheduler.start();
// Check recorded messages
console.log(observer.messages);
// Output: [onNext(210, 1), onNext(220, 2), onCompleted(230)]While RxJS 4 doesn't have built-in marble testing, the patterns can be simulated:
// Simulate marble testing with helper functions
function parseMarbles(marbles, values, scheduler) {
var result = [];
var time = 0;
for (var i = 0; i < marbles.length; i++) {
var char = marbles[i];
if (char === '-') {
time += 10; // Frame advance
} else if (char === '|') {
result.push(Rx.ReactiveTest.onCompleted(time));
} else if (char === 'x') {
result.push(Rx.ReactiveTest.onError(time, new Error('test error')));
} else if (values[char]) {
result.push(Rx.ReactiveTest.onNext(time, values[char]));
time += 10;
}
}
return scheduler.createColdObservable.apply(scheduler, result);
}
// Usage
var scheduler = new Rx.TestScheduler();
var source = parseMarbles('a-b-c|', {a: 1, b: 2, c: 3}, scheduler);Testing error scenarios:
var scheduler = new Rx.TestScheduler();
var result = scheduler.start(function() {
return Rx.Observable.throw(new Error('test error'), scheduler);
});
assert.deepEqual(result.messages, [
Rx.ReactiveTest.onError(200, Error('test error'))
]);Testing subscription timing:
var scheduler = new Rx.TestScheduler();
var source = scheduler.createHotObservable(/* events */);
var subscription = source.subscribe(observer);
scheduler.schedule(null, 250, function() {
subscription.dispose();
});
scheduler.start();
// Check subscription was disposed at correct time
assert.equal(observer.subscriptions[0].unsubscribe, 250);Data structures for test records and subscription timing.
/**
* Represents a recorded notification with timing
* @param {number} time - Time when notification occurred
* @param {Object} value - Notification object (OnNext, OnError, or OnCompleted)
* @returns {Recorded} Recorded notification
*/
function Recorded(time, value);
/**
* Represents a subscription lifetime
* @param {number} subscribe - Time when subscription started
* @param {number} [unsubscribe] - Time when subscription ended
* @returns {Subscription} Subscription record
*/
function Subscription(subscribe, unsubscribe);Additional methods for complex testing scenarios.
/**
* Schedules an action to run at absolute time
* @param {*} state - State to pass to action
* @param {number} dueTime - Absolute time to run action
* @param {function} action - Action to execute
* @returns {Disposable} Disposable for canceling the action
*/
testScheduler.scheduleAbsolute = function(state, dueTime, action);
/**
* Schedules an action to run after relative time
* @param {*} state - State to pass to action
* @param {number} dueTime - Relative time to run action
* @param {function} action - Action to execute
* @returns {Disposable} Disposable for canceling the action
*/
testScheduler.scheduleRelative = function(state, dueTime, action);
/**
* Schedules a recursive action
* @param {*} state - State to pass to action
* @param {number} dueTime - Time to run action
* @param {function} action - Recursive action to execute
* @returns {Disposable} Disposable for canceling the action
*/
testScheduler.scheduleRecursiveAbsolute = function(state, dueTime, action);
/**
* Advances the scheduler clock to specified time
* @param {number} time - Time to advance to
*/
testScheduler.advanceTo = function(time);
/**
* Advances the scheduler clock by specified amount
* @param {number} time - Amount of time to advance
*/
testScheduler.advanceBy = function(time);
/**
* Starts the scheduler and runs all scheduled actions
*/
testScheduler.start = function();
/**
* Stops the scheduler
*/
testScheduler.stop = function();
/**
* Gets the current virtual time
* @returns {number} Current virtual time
*/
testScheduler.clock = number;Observer implementation for testing that records all events.
/**
* Creates a mock observer that records events
* @param {TestScheduler} scheduler - Test scheduler to use for timing
* @returns {MockObserver} Mock observer instance
*/
function MockObserver(scheduler);
// Mock observer properties
mockObserver.messages; // Array of recorded Recorded objects
mockObserver.isDisposed; // Whether observer is disposed
// Observer interface methods
mockObserver.onNext = function(value);
mockObserver.onError = function(error);
mockObserver.onCompleted = function();Base class for test scheduler with virtual time support.
/**
* Base virtual time scheduler
* @param {number} [initialClock] - Initial virtual time
* @returns {VirtualTimeScheduler} Virtual time scheduler
*/
function VirtualTimeScheduler(initialClock);
// Virtual time scheduler methods
virtualScheduler.now = function(); // Current virtual time
virtualScheduler.scheduleAbsolute = function(state, dueTime, action);
virtualScheduler.scheduleRelative = function(state, dueTime, action);
virtualScheduler.start = function();
virtualScheduler.stop = function();
virtualScheduler.advanceTo = function(time);
virtualScheduler.advanceBy = function(time);Scheduler for testing with historical time data.
/**
* Creates a historical scheduler that compares against historical data
* @param {number} initialClock - Initial historical time
* @param {function} comparer - Function to compare historical data
* @returns {HistoricalScheduler} Historical scheduler
*/
function HistoricalScheduler(initialClock, comparer);Key patterns for effective testing:
// 1. Use consistent timing constants
var T = Rx.ReactiveTest;
var scheduler = new Rx.TestScheduler();
// 2. Test both hot and cold observables
var hot = scheduler.createHotObservable(
T.onNext(150, 1), // Before subscription
T.onNext(210, 2), // After subscription
T.onCompleted(250)
);
var cold = scheduler.createColdObservable(
T.onNext(10, 'a'), // Relative to subscription
T.onNext(20, 'b'),
T.onCompleted(30)
);
// 3. Test error scenarios
var errorSource = scheduler.createColdObservable(
T.onNext(10, 1),
T.onError(20, new Error('test'))
);
// 4. Test subscription lifecycle
var result = scheduler.start(
function() { return source.take(2); },
100, // Created
200, // Subscribed
300 // Disposed
);
// 5. Assert both values and timing
assert.deepEqual(result.messages, expectedMessages);
assert.deepEqual(source.subscriptions, expectedSubscriptions);