Testing support library providing utilities for verifying reactive stream behavior in Project Reactor applications
—
VirtualTimeScheduler enables time manipulation in reactive tests by replacing real time with a controllable virtual clock. This allows testing of time-based operations like delays, timeouts, and intervals without actual waiting.
Factory methods for creating VirtualTimeScheduler instances.
class VirtualTimeScheduler implements Scheduler {
/** Create new VirtualTimeScheduler (not enabled globally) */
static VirtualTimeScheduler create();
/** Create with optional deferred time operations */
static VirtualTimeScheduler create(boolean defer);
}Usage Examples:
import reactor.test.scheduler.VirtualTimeScheduler;
// Create virtual time scheduler
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
// Use with specific operations
Flux<String> delayed = Flux.just("hello")
.delayElements(Duration.ofHours(1), vts);
// Time doesn't advance automatically - you control it
vts.advanceTimeBy(Duration.ofHours(1));Static methods for managing the global default scheduler replacement.
class VirtualTimeScheduler {
/** Get existing or create and set as default scheduler */
static VirtualTimeScheduler getOrSet();
/** Get or create with deferred option */
static VirtualTimeScheduler getOrSet(boolean defer);
/** Set specific scheduler if none exists */
static VirtualTimeScheduler getOrSet(VirtualTimeScheduler scheduler);
/** Force set scheduler as default */
static VirtualTimeScheduler set(VirtualTimeScheduler scheduler);
/** Get current VirtualTimeScheduler (throws if not set) */
static VirtualTimeScheduler get() throws IllegalStateException;
/** Check if VTS is currently enabled in Schedulers factory */
static boolean isFactoryEnabled();
/** Reset to original schedulers */
static void reset();
}Usage Examples:
// Enable virtual time globally
VirtualTimeScheduler vts = VirtualTimeScheduler.getOrSet();
// Now all time-based operations use virtual time
Flux<String> delayed = Flux.just("hello")
.delayElements(Duration.ofMinutes(30)); // Uses virtual time
// Control time advancement
vts.advanceTimeBy(Duration.ofMinutes(30));
// Reset when done
VirtualTimeScheduler.reset();
// Check if virtual time is enabled
if (VirtualTimeScheduler.isFactoryEnabled()) {
// Virtual time operations
} else {
// Real time operations
}Methods for manipulating virtual time progression.
class VirtualTimeScheduler {
/** Trigger all pending tasks at current virtual time */
void advanceTime();
/** Advance virtual clock by the specified duration */
void advanceTimeBy(Duration delayTime);
/** Advance virtual clock to specific instant */
void advanceTimeTo(Instant targetTime);
}Usage Examples:
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
// Schedule tasks at different times
vts.schedule(() -> System.out.println("Task 1"), 1, TimeUnit.HOURS);
vts.schedule(() -> System.out.println("Task 2"), 2, TimeUnit.HOURS);
vts.schedule(() -> System.out.println("Task 3"), 3, TimeUnit.HOURS);
// Initially no tasks execute
assert vts.getScheduledTaskCount() == 3;
// Advance by 1 hour - executes Task 1
vts.advanceTimeBy(Duration.ofHours(1));
assert vts.getScheduledTaskCount() == 2;
// Advance by another hour - executes Task 2
vts.advanceTimeBy(Duration.ofHours(1));
assert vts.getScheduledTaskCount() == 1;
// Advance to specific time - executes Task 3
vts.advanceTimeTo(Instant.now().plus(Duration.ofHours(3)));
assert vts.getScheduledTaskCount() == 0;
// Trigger any remaining tasks at current time
vts.advanceTime();Methods for inspecting scheduler state.
class VirtualTimeScheduler {
/** Get number of currently scheduled tasks */
long getScheduledTaskCount();
}Standard Scheduler interface implementation for task scheduling.
class VirtualTimeScheduler implements Scheduler {
/** Create new worker for this scheduler */
Worker createWorker();
/** Schedule immediate task */
Disposable schedule(Runnable task);
/** Schedule delayed task */
Disposable schedule(Runnable task, long delay, TimeUnit unit);
/** Schedule periodic task */
Disposable schedulePeriodically(
Runnable task,
long initialDelay,
long period,
TimeUnit unit
);
/** Get current virtual time */
long now(TimeUnit unit);
/** Check if scheduler is disposed */
boolean isDisposed();
/** Dispose scheduler and cancel all tasks */
void dispose();
}VirtualTimeScheduler integrates seamlessly with StepVerifier for time-based testing:
@Test
public void testDelayedSequence() {
StepVerifier.withVirtualTime(() ->
Flux.just("a", "b", "c")
.delayElements(Duration.ofMinutes(1))
)
.expectSubscription()
.expectNoEvent(Duration.ofMinutes(1)) // No events for 1 minute
.expectNext("a")
.expectNoEvent(Duration.ofMinutes(1)) // Wait another minute
.expectNext("b")
.expectNoEvent(Duration.ofMinutes(1)) // Wait another minute
.expectNext("c")
.expectComplete()
.verify();
}
@Test
public void testTimeout() {
StepVerifier.withVirtualTime(() ->
Flux.never().timeout(Duration.ofSeconds(5))
)
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(5)) // Wait for timeout
.expectError(TimeoutException.class)
.verify();
}
@Test
public void testInterval() {
StepVerifier.withVirtualTime(() ->
Flux.interval(Duration.ofHours(1)).take(3)
)
.expectSubscription()
.expectNoEvent(Duration.ofHours(1))
.expectNext(0L)
.expectNoEvent(Duration.ofHours(1))
.expectNext(1L)
.expectNoEvent(Duration.ofHours(1))
.expectNext(2L)
.expectComplete()
.verify();
}For complex time-based testing scenarios:
@Test
public void testComplexTimeBasedBehavior() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
// Create time-sensitive publisher
Flux<String> timedFlux = Flux.interval(Duration.ofMinutes(10), vts)
.map(i -> "Event " + i)
.take(5);
TestSubscriber<String> subscriber = TestSubscriber.create();
timedFlux.subscribe(subscriber);
// No events initially
assert subscriber.getReceivedOnNext().isEmpty();
// Advance time and check events
vts.advanceTimeBy(Duration.ofMinutes(10));
assert subscriber.getReceivedOnNext().size() == 1;
assert subscriber.getReceivedOnNext().get(0).equals("Event 0");
vts.advanceTimeBy(Duration.ofMinutes(30)); // Advance 3 more intervals
assert subscriber.getReceivedOnNext().size() == 4;
vts.advanceTimeBy(Duration.ofMinutes(10)); // Final interval
assert subscriber.getReceivedOnNext().size() == 5;
assert subscriber.isTerminatedComplete();
}Combine with other testing utilities for race condition testing:
@Test
public void testTimeBasedRaceCondition() {
VirtualTimeScheduler vts = VirtualTimeScheduler.getOrSet();
try {
AtomicInteger counter = new AtomicInteger(0);
// Schedule competing tasks
Flux.interval(Duration.ofMillis(100), vts)
.take(10)
.subscribe(i -> counter.incrementAndGet());
Flux.interval(Duration.ofMillis(150), vts)
.take(7)
.subscribe(i -> counter.addAndGet(2));
// Advance time to let all tasks complete
vts.advanceTimeBy(Duration.ofSeconds(2));
// Verify expected interactions
assert counter.get() == 10 + (7 * 2); // 10 from first, 14 from second
} finally {
VirtualTimeScheduler.reset();
}
}Test backpressure behavior in time-based scenarios:
@Test
public void testBackpressureWithTime() {
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
// Fast producer with backpressure
Flux<Long> fastProducer = Flux.interval(Duration.ofMillis(10), vts)
.onBackpressureBuffer(5); // Small buffer
TestSubscriber<Long> slowSubscriber = TestSubscriber.builder()
.initialRequest(1) // Slow consumer
.build();
fastProducer.subscribe(slowSubscriber);
// Advance time quickly - should cause backpressure
vts.advanceTimeBy(Duration.ofMillis(100)); // Would generate 10 items
// But subscriber only requested 1
assert slowSubscriber.getReceivedOnNext().size() == 1;
// Request more
slowSubscriber.request(3);
assert slowSubscriber.getReceivedOnNext().size() == 4;
// Continue advancing time
vts.advanceTimeBy(Duration.ofMillis(50));
slowSubscriber.request(10);
// Verify buffer overflow handling
List<String> protocolErrors = slowSubscriber.getProtocolErrors();
// May contain backpressure-related errors if buffer overflowed
}Proper cleanup when using VirtualTimeScheduler:
@Test
public void testWithProperCleanup() {
VirtualTimeScheduler vts = null;
try {
vts = VirtualTimeScheduler.getOrSet();
// Test time-based operations
Flux<String> delayed = Flux.just("test")
.delayElements(Duration.ofSeconds(1));
StepVerifier.create(delayed)
.then(() -> vts.advanceTimeBy(Duration.ofSeconds(1)))
.expectNext("test")
.expectComplete()
.verify();
} finally {
// Always reset to avoid affecting other tests
VirtualTimeScheduler.reset();
// Dispose if created manually
if (vts != null && !vts.isDisposed()) {
vts.dispose();
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-projectreactor--reactor-test