CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-projectreactor--reactor-test

Testing support library providing utilities for verifying reactive stream behavior in Project Reactor applications

Pending
Overview
Eval results
Files

testing-utilities.mddocs/

Testing Utilities

Additional testing utilities for advanced scenarios including race condition testing, log output verification, and custom formatting support.

Capabilities

Race Condition Testing

RaceTestUtils provides utilities for testing race conditions by synchronizing concurrent operations.

class RaceTestUtils {
    /** Generic race condition testing with state management */
    static <T> T race(
        T initial,
        Function<? super T, ? extends T> race,
        Predicate<? super T> stopRace,
        BiPredicate<? super T, ? super T> terminate
    );
    
    /** Synchronize execution of multiple Runnables */
    static void race(Runnable... runnables);
    
    /** Synchronize two Runnables (binary compatibility) */
    static void race(Runnable r1, Runnable r2);
    
    /** Race on specific scheduler */
    static void race(Scheduler scheduler, Runnable... runnables);
    
    /** Race with configurable timeout */
    static void race(int timeoutSeconds, Scheduler scheduler, Runnable... runnables);
}

Usage Examples:

import reactor.test.util.RaceTestUtils;

@Test
public void testConcurrentModification() {
    AtomicInteger counter = new AtomicInteger(0);
    List<Integer> results = new CopyOnWriteArrayList<>();
    
    // Race multiple operations
    RaceTestUtils.race(
        () -> results.add(counter.incrementAndGet()),
        () -> results.add(counter.incrementAndGet()),
        () -> results.add(counter.incrementAndGet())
    );
    
    // Verify all operations completed
    assert results.size() == 3;
    assert counter.get() == 3;
    // Order may vary due to race conditions
}

@Test
public void testPublisherConcurrency() {
    TestPublisher<String> publisher = TestPublisher.create();
    AtomicReference<String> result1 = new AtomicReference<>();
    AtomicReference<String> result2 = new AtomicReference<>();
    
    // Setup subscribers
    publisher.flux().subscribe(result1::set);
    publisher.flux().subscribe(result2::set);
    
    // Race signal emission and subscription
    RaceTestUtils.race(
        () -> publisher.next("value1"),
        () -> publisher.next("value2")
    );
    
    // Both subscribers should receive a value
    assert result1.get() != null;
    assert result2.get() != null;
}

@Test
public void testSchedulerRacing() {
    Scheduler testScheduler = Schedulers.parallel();
    AtomicInteger executionCount = new AtomicInteger(0);
    
    // Race on specific scheduler
    RaceTestUtils.race(testScheduler,
        () -> executionCount.incrementAndGet(),
        () -> executionCount.incrementAndGet(),
        () -> executionCount.incrementAndGet()
    );
    
    assert executionCount.get() == 3;
}

State-Based Race Testing

Advanced race testing with state management:

@Test
public void testStatefulRaceConditions() {
    // Test concurrent map updates
    Map<String, Integer> map = new ConcurrentHashMap<>();
    
    String result = RaceTestUtils.race(
        "initial",  // Initial state
        state -> {  // Race function
            map.put("key", map.getOrDefault("key", 0) + 1);
            return state + "_updated";
        },
        state -> state.length() > 50, // Stop condition
        (prev, curr) -> !prev.equals(curr) // Termination condition
    );
    
    // Verify final state
    assert map.containsKey("key");
    assert result.contains("updated");
}

Log Output Verification

TestLogger and LoggerUtils provide log capture and verification capabilities.

class TestLogger implements Logger {
    /** Default constructor with thread name logging enabled */
    TestLogger();
    
    /** Constructor with thread name option */
    TestLogger(boolean logCurrentThreadName);
    
    /** Get error stream content as string */
    String getErrContent();
    
    /** Get output stream content as string */
    String getOutContent();
    
    /** Clear both output buffers */
    void reset();
    
    /** Check thread name logging setting */
    boolean isLogCurrentThreadName();
    
    // Standard Logger methods
    void trace(String msg);
    void debug(String msg);
    void info(String msg); 
    void warn(String msg);
    void error(String msg);
    
    // Formatted logging
    void trace(String format, Object... arguments);
    void debug(String format, Object... arguments);
    void info(String format, Object... arguments);
    void warn(String format, Object... arguments);
    void error(String format, Object... arguments);
    
    // With throwable
    void trace(String msg, Throwable t);
    void debug(String msg, Throwable t);
    void info(String msg, Throwable t);
    void warn(String msg, Throwable t);
    void error(String msg, Throwable t);
    
    // Level checks
    boolean isTraceEnabled();
    boolean isDebugEnabled();
    boolean isInfoEnabled();
    boolean isWarnEnabled();
    boolean isErrorEnabled();
    
    String getName();
}

Usage Examples:

import reactor.test.util.TestLogger;

@Test
public void testLoggingOutput() {
    TestLogger logger = new TestLogger();
    
    // Simulate logging
    logger.info("Processing started");
    logger.warn("Low memory warning");
    logger.error("Processing failed", new RuntimeException("Test error"));
    
    // Verify log output
    String output = logger.getOutContent();
    assert output.contains("Processing started");
    assert output.contains("Low memory warning");
    
    String errorOutput = logger.getErrContent();
    assert errorOutput.contains("Processing failed");
    assert errorOutput.contains("RuntimeException");
    assert errorOutput.contains("Test error");
    
    // Reset for next test
    logger.reset();
    assert logger.getOutContent().isEmpty();
    assert logger.getErrContent().isEmpty();
}

@Test
public void testThreadNameLogging() {
    TestLogger withThreadName = new TestLogger(true);
    TestLogger withoutThreadName = new TestLogger(false);
    
    withThreadName.info("Test message");
    withoutThreadName.info("Test message");
    
    String withThread = withThreadName.getOutContent();
    String withoutThread = withoutThreadName.getOutContent();
    
    // Thread name logger includes thread info
    assert withThread.contains(Thread.currentThread().getName());
    assert !withoutThread.contains(Thread.currentThread().getName());
}

Logger Utilities

LoggerUtils provides global log capture for Reactor's internal logging.

class LoggerUtils {
    /** Install capturing logger factory */
    static void useCurrentLoggersWithCapture();
    
    /** Enable log capture to specific logger */
    static void enableCaptureWith(Logger testLogger);
    
    /** Enable capture with optional redirect to original */
    static void enableCaptureWith(Logger testLogger, boolean redirectToOriginal);
    
    /** Disable log capture and restore original factory */
    static void disableCapture();
}

Usage Examples:

import reactor.test.util.LoggerUtils;

@Test
public void testReactorInternalLogging() {
    TestLogger testLogger = new TestLogger();
    
    try {
        // Capture Reactor's internal logs
        LoggerUtils.enableCaptureWith(testLogger);
        
        // Perform operations that generate internal logs
        Flux.range(1, 10)
            .log() // This will generate internal log messages
            .subscribe();
        
        // Verify internal logs were captured
        String logOutput = testLogger.getOutContent();
        assert logOutput.contains("onSubscribe");
        assert logOutput.contains("request");
        assert logOutput.contains("onNext");
        assert logOutput.contains("onComplete");
        
    } finally {
        // Always restore original logging
        LoggerUtils.disableCapture();
    }
}

@Test
public void testLogCaptureWithRedirect() {
    TestLogger testLogger = new TestLogger();
    
    try {
        // Capture and also redirect to original loggers
        LoggerUtils.enableCaptureWith(testLogger, true);
        
        // Operations will log to both test logger and console
        Flux.just("test")
            .doOnNext(v -> System.out.println("Processing: " + v))
            .log()
            .subscribe();
        
        // Verify capture
        assert !testLogger.getOutContent().isEmpty();
        
    } finally {
        LoggerUtils.disableCapture();
    }
}

Value Formatting

ValueFormatters provides utilities for custom value display in test output.

class ValueFormatters {
    /** Create class-specific formatter */
    static <T> ToStringConverter forClass(Class<T> tClass, Function<T, String> tToString);
    
    /** Create filtered class formatter */
    static <T> ToStringConverter forClassMatching(
        Class<T> tClass, 
        Predicate<T> tPredicate, 
        Function<T, String> tToString
    );
    
    /** Create predicate-based formatter */
    static ToStringConverter filtering(
        Predicate<Object> predicate, 
        Function<Object, String> anyToString
    );
    
    /** Get default Signal extractor */
    static Extractor<Signal<?>> signalExtractor();
    
    /** Get default Iterable extractor */
    static Extractor<Iterable<?>> iterableExtractor();
    
    /** Get array extractor for specific array type */
    static <T> Extractor<T[]> arrayExtractor(Class<T[]> arrayClass);
    
    /** Default Duration formatter */
    ToStringConverter DURATION_CONVERTER;
}

@FunctionalInterface
interface ToStringConverter extends Function<Object, String> {}

@FunctionalInterface
interface Extractor<CONTAINER> extends Function<CONTAINER, Stream<?>> {}

Usage Examples:

import reactor.test.ValueFormatters;

@Test
public void testCustomValueFormatting() {
    // Custom formatter for Person objects
    ToStringConverter personFormatter = ValueFormatters.forClass(
        Person.class,
        person -> String.format("Person{name='%s', age=%d}", 
                               person.getName(), person.getAge())
    );
    
    // Custom formatter with filtering
    ToStringConverter evenNumberFormatter = ValueFormatters.forClassMatching(
        Integer.class,
        n -> n % 2 == 0,
        n -> "EVEN(" + n + ")"
    );
    
    // Use in StepVerifier options
    StepVerifierOptions options = StepVerifierOptions.create()
        .valueFormatter(personFormatter)
        .valueFormatter(evenNumberFormatter);
    
    StepVerifier.create(
        Flux.just(new Person("Alice", 30), 42, new Person("Bob", 25), 17),
        options
    )
    .expectNext(new Person("Alice", 30))  // Displayed as "Person{name='Alice', age=30}"
    .expectNext(42)                       // Displayed as "EVEN(42)"
    .expectNext(new Person("Bob", 25))    // Displayed as "Person{name='Bob', age=25}"
    .expectNext(17)                       // Displayed as "17" (no special formatting)
    .expectComplete()
    .verify();
}

@Test
public void testExtractors() {
    // Custom extractor for complex objects
    Extractor<List<String>> listExtractor = list -> list.stream()
        .map(s -> "Item: " + s);
    
    StepVerifierOptions options = StepVerifierOptions.create()
        .extractor(listExtractor);
    
    // Use with StepVerifier for better error messages
    List<String> testList = Arrays.asList("a", "b", "c");
    
    StepVerifier.create(Flux.just(testList), options)
        .expectNext(testList)
        .expectComplete()
        .verify();
}

Integration Examples

Comprehensive Testing Scenario

Combining multiple testing utilities for complex scenarios:

@Test
public void testComplexReactiveWorkflow() {
    TestLogger logger = new TestLogger();
    VirtualTimeScheduler vts = VirtualTimeScheduler.create();
    
    try {
        // Enable log capture
        LoggerUtils.enableCaptureWith(logger);
        
        // Create complex workflow with race conditions
        AtomicInteger processedCount = new AtomicInteger(0);
        PublisherProbe<String> fallbackProbe = PublisherProbe.empty();
        
        Flux<String> workflow = Flux.interval(Duration.ofSeconds(1), vts)
            .take(5)
            .map(i -> "item-" + i)
            .doOnNext(item -> {
                // Simulate race condition
                RaceTestUtils.race(
                    () -> processedCount.incrementAndGet(),
                    () -> logger.info("Processing: " + item)
                );
            })
            .switchIfEmpty(fallbackProbe.flux())
            .log();
        
        TestSubscriber<String> subscriber = TestSubscriber.create();
        workflow.subscribe(subscriber);
        
        // Advance time to complete workflow
        vts.advanceTimeBy(Duration.ofSeconds(6));
        
        // Verify results
        assert subscriber.isTerminatedComplete();
        assert subscriber.getReceivedOnNext().size() == 5;
        assert processedCount.get() == 5;
        
        // Verify fallback was not used
        fallbackProbe.assertWasNotSubscribed();
        
        // Verify logging
        String logOutput = logger.getOutContent();
        assert logOutput.contains("Processing: item-0");
        assert logOutput.contains("onComplete");
        
    } finally {
        LoggerUtils.disableCapture();
        VirtualTimeScheduler.reset();
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-io-projectreactor--reactor-test

docs

index.md

publisher-probe.md

step-verifier.md

test-publisher.md

test-subscriber.md

testing-utilities.md

virtual-time.md

tile.json