CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-projectreactor--reactor-test

Testing support library providing utilities for verifying reactive stream behavior in Project Reactor applications

Pending
Overview
Eval results
Files

virtual-time.mddocs/

Virtual Time Control

VirtualTimeScheduler enables time manipulation in reactive tests by replacing real time with a controllable virtual clock. This allows testing of time-based operations like delays, timeouts, and intervals without actual waiting.

Capabilities

Creating VirtualTimeSchedulers

Factory methods for creating VirtualTimeScheduler instances.

class VirtualTimeScheduler implements Scheduler {
    /** Create new VirtualTimeScheduler (not enabled globally) */
    static VirtualTimeScheduler create();
    
    /** Create with optional deferred time operations */
    static VirtualTimeScheduler create(boolean defer);
}

Usage Examples:

import reactor.test.scheduler.VirtualTimeScheduler;

// Create virtual time scheduler
VirtualTimeScheduler vts = VirtualTimeScheduler.create();

// Use with specific operations
Flux<String> delayed = Flux.just("hello")
    .delayElements(Duration.ofHours(1), vts);

// Time doesn't advance automatically - you control it
vts.advanceTimeBy(Duration.ofHours(1));

Global Scheduler Control

Static methods for managing the global default scheduler replacement.

class VirtualTimeScheduler {
    /** Get existing or create and set as default scheduler */
    static VirtualTimeScheduler getOrSet();
    
    /** Get or create with deferred option */
    static VirtualTimeScheduler getOrSet(boolean defer);
    
    /** Set specific scheduler if none exists */
    static VirtualTimeScheduler getOrSet(VirtualTimeScheduler scheduler);
    
    /** Force set scheduler as default */
    static VirtualTimeScheduler set(VirtualTimeScheduler scheduler);
    
    /** Get current VirtualTimeScheduler (throws if not set) */
    static VirtualTimeScheduler get() throws IllegalStateException;
    
    /** Check if VTS is currently enabled in Schedulers factory */
    static boolean isFactoryEnabled();
    
    /** Reset to original schedulers */
    static void reset();
}

Usage Examples:

// Enable virtual time globally
VirtualTimeScheduler vts = VirtualTimeScheduler.getOrSet();

// Now all time-based operations use virtual time
Flux<String> delayed = Flux.just("hello")
    .delayElements(Duration.ofMinutes(30)); // Uses virtual time

// Control time advancement
vts.advanceTimeBy(Duration.ofMinutes(30));

// Reset when done
VirtualTimeScheduler.reset();

// Check if virtual time is enabled
if (VirtualTimeScheduler.isFactoryEnabled()) {
    // Virtual time operations
} else {
    // Real time operations
}

Time Control Methods

Methods for manipulating virtual time progression.

class VirtualTimeScheduler {
    /** Trigger all pending tasks at current virtual time */
    void advanceTime();
    
    /** Advance virtual clock by the specified duration */
    void advanceTimeBy(Duration delayTime);
    
    /** Advance virtual clock to specific instant */
    void advanceTimeTo(Instant targetTime);
}

Usage Examples:

VirtualTimeScheduler vts = VirtualTimeScheduler.create();

// Schedule tasks at different times
vts.schedule(() -> System.out.println("Task 1"), 1, TimeUnit.HOURS);
vts.schedule(() -> System.out.println("Task 2"), 2, TimeUnit.HOURS);
vts.schedule(() -> System.out.println("Task 3"), 3, TimeUnit.HOURS);

// Initially no tasks execute
assert vts.getScheduledTaskCount() == 3;

// Advance by 1 hour - executes Task 1
vts.advanceTimeBy(Duration.ofHours(1));
assert vts.getScheduledTaskCount() == 2;

// Advance by another hour - executes Task 2  
vts.advanceTimeBy(Duration.ofHours(1));
assert vts.getScheduledTaskCount() == 1;

// Advance to specific time - executes Task 3
vts.advanceTimeTo(Instant.now().plus(Duration.ofHours(3)));
assert vts.getScheduledTaskCount() == 0;

// Trigger any remaining tasks at current time
vts.advanceTime();

Query Methods

Methods for inspecting scheduler state.

class VirtualTimeScheduler {
    /** Get number of currently scheduled tasks */
    long getScheduledTaskCount();
}

Scheduler Implementation

Standard Scheduler interface implementation for task scheduling.

class VirtualTimeScheduler implements Scheduler {
    /** Create new worker for this scheduler */
    Worker createWorker();
    
    /** Schedule immediate task */
    Disposable schedule(Runnable task);
    
    /** Schedule delayed task */
    Disposable schedule(Runnable task, long delay, TimeUnit unit);
    
    /** Schedule periodic task */
    Disposable schedulePeriodically(
        Runnable task,
        long initialDelay, 
        long period, 
        TimeUnit unit
    );
    
    /** Get current virtual time */
    long now(TimeUnit unit);
    
    /** Check if scheduler is disposed */
    boolean isDisposed();
    
    /** Dispose scheduler and cancel all tasks */
    void dispose();
}

Integration with StepVerifier

VirtualTimeScheduler integrates seamlessly with StepVerifier for time-based testing:

@Test
public void testDelayedSequence() {
    StepVerifier.withVirtualTime(() ->
        Flux.just("a", "b", "c")
            .delayElements(Duration.ofMinutes(1))
    )
    .expectSubscription()
    .expectNoEvent(Duration.ofMinutes(1))  // No events for 1 minute
    .expectNext("a")
    .expectNoEvent(Duration.ofMinutes(1))  // Wait another minute
    .expectNext("b") 
    .expectNoEvent(Duration.ofMinutes(1))  // Wait another minute
    .expectNext("c")
    .expectComplete()
    .verify();
}

@Test
public void testTimeout() {
    StepVerifier.withVirtualTime(() ->
        Flux.never().timeout(Duration.ofSeconds(5))
    )
    .expectSubscription()
    .expectNoEvent(Duration.ofSeconds(5))  // Wait for timeout
    .expectError(TimeoutException.class)
    .verify();
}

@Test
public void testInterval() {
    StepVerifier.withVirtualTime(() ->
        Flux.interval(Duration.ofHours(1)).take(3)
    )
    .expectSubscription()
    .expectNoEvent(Duration.ofHours(1))
    .expectNext(0L)
    .expectNoEvent(Duration.ofHours(1)) 
    .expectNext(1L)
    .expectNoEvent(Duration.ofHours(1))
    .expectNext(2L)
    .expectComplete()
    .verify();
}

Advanced Usage Patterns

Manual Time Control

For complex time-based testing scenarios:

@Test
public void testComplexTimeBasedBehavior() {
    VirtualTimeScheduler vts = VirtualTimeScheduler.create();
    
    // Create time-sensitive publisher
    Flux<String> timedFlux = Flux.interval(Duration.ofMinutes(10), vts)
        .map(i -> "Event " + i)
        .take(5);
    
    TestSubscriber<String> subscriber = TestSubscriber.create();
    timedFlux.subscribe(subscriber);
    
    // No events initially
    assert subscriber.getReceivedOnNext().isEmpty();
    
    // Advance time and check events
    vts.advanceTimeBy(Duration.ofMinutes(10));
    assert subscriber.getReceivedOnNext().size() == 1;
    assert subscriber.getReceivedOnNext().get(0).equals("Event 0");
    
    vts.advanceTimeBy(Duration.ofMinutes(30)); // Advance 3 more intervals
    assert subscriber.getReceivedOnNext().size() == 4;
    
    vts.advanceTimeBy(Duration.ofMinutes(10)); // Final interval
    assert subscriber.getReceivedOnNext().size() == 5;
    assert subscriber.isTerminatedComplete();
}

Testing Race Conditions with Time

Combine with other testing utilities for race condition testing:

@Test
public void testTimeBasedRaceCondition() {
    VirtualTimeScheduler vts = VirtualTimeScheduler.getOrSet();
    
    try {
        AtomicInteger counter = new AtomicInteger(0);
        
        // Schedule competing tasks
        Flux.interval(Duration.ofMillis(100), vts)
            .take(10)
            .subscribe(i -> counter.incrementAndGet());
            
        Flux.interval(Duration.ofMillis(150), vts)
            .take(7)
            .subscribe(i -> counter.addAndGet(2));
        
        // Advance time to let all tasks complete
        vts.advanceTimeBy(Duration.ofSeconds(2));
        
        // Verify expected interactions
        assert counter.get() == 10 + (7 * 2); // 10 from first, 14 from second
        
    } finally {
        VirtualTimeScheduler.reset();
    }
}

Testing Backpressure with Time

Test backpressure behavior in time-based scenarios:

@Test  
public void testBackpressureWithTime() {
    VirtualTimeScheduler vts = VirtualTimeScheduler.create();
    
    // Fast producer with backpressure
    Flux<Long> fastProducer = Flux.interval(Duration.ofMillis(10), vts)
        .onBackpressureBuffer(5); // Small buffer
    
    TestSubscriber<Long> slowSubscriber = TestSubscriber.builder()
        .initialRequest(1) // Slow consumer
        .build();
    
    fastProducer.subscribe(slowSubscriber);
    
    // Advance time quickly - should cause backpressure
    vts.advanceTimeBy(Duration.ofMillis(100)); // Would generate 10 items
    
    // But subscriber only requested 1
    assert slowSubscriber.getReceivedOnNext().size() == 1;
    
    // Request more
    slowSubscriber.request(3);
    assert slowSubscriber.getReceivedOnNext().size() == 4;
    
    // Continue advancing time
    vts.advanceTimeBy(Duration.ofMillis(50));
    slowSubscriber.request(10);
    
    // Verify buffer overflow handling
    List<String> protocolErrors = slowSubscriber.getProtocolErrors();
    // May contain backpressure-related errors if buffer overflowed
}

Cleanup and Resource Management

Proper cleanup when using VirtualTimeScheduler:

@Test
public void testWithProperCleanup() {
    VirtualTimeScheduler vts = null;
    
    try {
        vts = VirtualTimeScheduler.getOrSet();
        
        // Test time-based operations
        Flux<String> delayed = Flux.just("test")
            .delayElements(Duration.ofSeconds(1));
            
        StepVerifier.create(delayed)
            .then(() -> vts.advanceTimeBy(Duration.ofSeconds(1)))
            .expectNext("test")
            .expectComplete()
            .verify();
            
    } finally {
        // Always reset to avoid affecting other tests
        VirtualTimeScheduler.reset();
        
        // Dispose if created manually
        if (vts != null && !vts.isDisposed()) {
            vts.dispose();
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-projectreactor--reactor-test

docs

index.md

publisher-probe.md

step-verifier.md

test-publisher.md

test-subscriber.md

testing-utilities.md

virtual-time.md

tile.json