Testing support library providing utilities for verifying reactive stream behavior in Project Reactor applications
—
Additional testing utilities for advanced scenarios including race condition testing, log output verification, and custom formatting support.
RaceTestUtils provides utilities for testing race conditions by synchronizing concurrent operations.
class RaceTestUtils {
/** Generic race condition testing with state management */
static <T> T race(
T initial,
Function<? super T, ? extends T> race,
Predicate<? super T> stopRace,
BiPredicate<? super T, ? super T> terminate
);
/** Synchronize execution of multiple Runnables */
static void race(Runnable... runnables);
/** Synchronize two Runnables (binary compatibility) */
static void race(Runnable r1, Runnable r2);
/** Race on specific scheduler */
static void race(Scheduler scheduler, Runnable... runnables);
/** Race with configurable timeout */
static void race(int timeoutSeconds, Scheduler scheduler, Runnable... runnables);
}Usage Examples:
import reactor.test.util.RaceTestUtils;
@Test
public void testConcurrentModification() {
AtomicInteger counter = new AtomicInteger(0);
List<Integer> results = new CopyOnWriteArrayList<>();
// Race multiple operations
RaceTestUtils.race(
() -> results.add(counter.incrementAndGet()),
() -> results.add(counter.incrementAndGet()),
() -> results.add(counter.incrementAndGet())
);
// Verify all operations completed
assert results.size() == 3;
assert counter.get() == 3;
// Order may vary due to race conditions
}
@Test
public void testPublisherConcurrency() {
TestPublisher<String> publisher = TestPublisher.create();
AtomicReference<String> result1 = new AtomicReference<>();
AtomicReference<String> result2 = new AtomicReference<>();
// Setup subscribers
publisher.flux().subscribe(result1::set);
publisher.flux().subscribe(result2::set);
// Race signal emission and subscription
RaceTestUtils.race(
() -> publisher.next("value1"),
() -> publisher.next("value2")
);
// Both subscribers should receive a value
assert result1.get() != null;
assert result2.get() != null;
}
@Test
public void testSchedulerRacing() {
Scheduler testScheduler = Schedulers.parallel();
AtomicInteger executionCount = new AtomicInteger(0);
// Race on specific scheduler
RaceTestUtils.race(testScheduler,
() -> executionCount.incrementAndGet(),
() -> executionCount.incrementAndGet(),
() -> executionCount.incrementAndGet()
);
assert executionCount.get() == 3;
}Advanced race testing with state management:
@Test
public void testStatefulRaceConditions() {
// Test concurrent map updates
Map<String, Integer> map = new ConcurrentHashMap<>();
String result = RaceTestUtils.race(
"initial", // Initial state
state -> { // Race function
map.put("key", map.getOrDefault("key", 0) + 1);
return state + "_updated";
},
state -> state.length() > 50, // Stop condition
(prev, curr) -> !prev.equals(curr) // Termination condition
);
// Verify final state
assert map.containsKey("key");
assert result.contains("updated");
}TestLogger and LoggerUtils provide log capture and verification capabilities.
class TestLogger implements Logger {
/** Default constructor with thread name logging enabled */
TestLogger();
/** Constructor with thread name option */
TestLogger(boolean logCurrentThreadName);
/** Get error stream content as string */
String getErrContent();
/** Get output stream content as string */
String getOutContent();
/** Clear both output buffers */
void reset();
/** Check thread name logging setting */
boolean isLogCurrentThreadName();
// Standard Logger methods
void trace(String msg);
void debug(String msg);
void info(String msg);
void warn(String msg);
void error(String msg);
// Formatted logging
void trace(String format, Object... arguments);
void debug(String format, Object... arguments);
void info(String format, Object... arguments);
void warn(String format, Object... arguments);
void error(String format, Object... arguments);
// With throwable
void trace(String msg, Throwable t);
void debug(String msg, Throwable t);
void info(String msg, Throwable t);
void warn(String msg, Throwable t);
void error(String msg, Throwable t);
// Level checks
boolean isTraceEnabled();
boolean isDebugEnabled();
boolean isInfoEnabled();
boolean isWarnEnabled();
boolean isErrorEnabled();
String getName();
}Usage Examples:
import reactor.test.util.TestLogger;
@Test
public void testLoggingOutput() {
TestLogger logger = new TestLogger();
// Simulate logging
logger.info("Processing started");
logger.warn("Low memory warning");
logger.error("Processing failed", new RuntimeException("Test error"));
// Verify log output
String output = logger.getOutContent();
assert output.contains("Processing started");
assert output.contains("Low memory warning");
String errorOutput = logger.getErrContent();
assert errorOutput.contains("Processing failed");
assert errorOutput.contains("RuntimeException");
assert errorOutput.contains("Test error");
// Reset for next test
logger.reset();
assert logger.getOutContent().isEmpty();
assert logger.getErrContent().isEmpty();
}
@Test
public void testThreadNameLogging() {
TestLogger withThreadName = new TestLogger(true);
TestLogger withoutThreadName = new TestLogger(false);
withThreadName.info("Test message");
withoutThreadName.info("Test message");
String withThread = withThreadName.getOutContent();
String withoutThread = withoutThreadName.getOutContent();
// Thread name logger includes thread info
assert withThread.contains(Thread.currentThread().getName());
assert !withoutThread.contains(Thread.currentThread().getName());
}LoggerUtils provides global log capture for Reactor's internal logging.
class LoggerUtils {
/** Install capturing logger factory */
static void useCurrentLoggersWithCapture();
/** Enable log capture to specific logger */
static void enableCaptureWith(Logger testLogger);
/** Enable capture with optional redirect to original */
static void enableCaptureWith(Logger testLogger, boolean redirectToOriginal);
/** Disable log capture and restore original factory */
static void disableCapture();
}Usage Examples:
import reactor.test.util.LoggerUtils;
@Test
public void testReactorInternalLogging() {
TestLogger testLogger = new TestLogger();
try {
// Capture Reactor's internal logs
LoggerUtils.enableCaptureWith(testLogger);
// Perform operations that generate internal logs
Flux.range(1, 10)
.log() // This will generate internal log messages
.subscribe();
// Verify internal logs were captured
String logOutput = testLogger.getOutContent();
assert logOutput.contains("onSubscribe");
assert logOutput.contains("request");
assert logOutput.contains("onNext");
assert logOutput.contains("onComplete");
} finally {
// Always restore original logging
LoggerUtils.disableCapture();
}
}
@Test
public void testLogCaptureWithRedirect() {
TestLogger testLogger = new TestLogger();
try {
// Capture and also redirect to original loggers
LoggerUtils.enableCaptureWith(testLogger, true);
// Operations will log to both test logger and console
Flux.just("test")
.doOnNext(v -> System.out.println("Processing: " + v))
.log()
.subscribe();
// Verify capture
assert !testLogger.getOutContent().isEmpty();
} finally {
LoggerUtils.disableCapture();
}
}ValueFormatters provides utilities for custom value display in test output.
class ValueFormatters {
/** Create class-specific formatter */
static <T> ToStringConverter forClass(Class<T> tClass, Function<T, String> tToString);
/** Create filtered class formatter */
static <T> ToStringConverter forClassMatching(
Class<T> tClass,
Predicate<T> tPredicate,
Function<T, String> tToString
);
/** Create predicate-based formatter */
static ToStringConverter filtering(
Predicate<Object> predicate,
Function<Object, String> anyToString
);
/** Get default Signal extractor */
static Extractor<Signal<?>> signalExtractor();
/** Get default Iterable extractor */
static Extractor<Iterable<?>> iterableExtractor();
/** Get array extractor for specific array type */
static <T> Extractor<T[]> arrayExtractor(Class<T[]> arrayClass);
/** Default Duration formatter */
ToStringConverter DURATION_CONVERTER;
}
@FunctionalInterface
interface ToStringConverter extends Function<Object, String> {}
@FunctionalInterface
interface Extractor<CONTAINER> extends Function<CONTAINER, Stream<?>> {}Usage Examples:
import reactor.test.ValueFormatters;
@Test
public void testCustomValueFormatting() {
// Custom formatter for Person objects
ToStringConverter personFormatter = ValueFormatters.forClass(
Person.class,
person -> String.format("Person{name='%s', age=%d}",
person.getName(), person.getAge())
);
// Custom formatter with filtering
ToStringConverter evenNumberFormatter = ValueFormatters.forClassMatching(
Integer.class,
n -> n % 2 == 0,
n -> "EVEN(" + n + ")"
);
// Use in StepVerifier options
StepVerifierOptions options = StepVerifierOptions.create()
.valueFormatter(personFormatter)
.valueFormatter(evenNumberFormatter);
StepVerifier.create(
Flux.just(new Person("Alice", 30), 42, new Person("Bob", 25), 17),
options
)
.expectNext(new Person("Alice", 30)) // Displayed as "Person{name='Alice', age=30}"
.expectNext(42) // Displayed as "EVEN(42)"
.expectNext(new Person("Bob", 25)) // Displayed as "Person{name='Bob', age=25}"
.expectNext(17) // Displayed as "17" (no special formatting)
.expectComplete()
.verify();
}
@Test
public void testExtractors() {
// Custom extractor for complex objects
Extractor<List<String>> listExtractor = list -> list.stream()
.map(s -> "Item: " + s);
StepVerifierOptions options = StepVerifierOptions.create()
.extractor(listExtractor);
// Use with StepVerifier for better error messages
List<String> testList = Arrays.asList("a", "b", "c");
StepVerifier.create(Flux.just(testList), options)
.expectNext(testList)
.expectComplete()
.verify();
}Combining multiple testing utilities for complex scenarios:
@Test
public void testComplexReactiveWorkflow() {
TestLogger logger = new TestLogger();
VirtualTimeScheduler vts = VirtualTimeScheduler.create();
try {
// Enable log capture
LoggerUtils.enableCaptureWith(logger);
// Create complex workflow with race conditions
AtomicInteger processedCount = new AtomicInteger(0);
PublisherProbe<String> fallbackProbe = PublisherProbe.empty();
Flux<String> workflow = Flux.interval(Duration.ofSeconds(1), vts)
.take(5)
.map(i -> "item-" + i)
.doOnNext(item -> {
// Simulate race condition
RaceTestUtils.race(
() -> processedCount.incrementAndGet(),
() -> logger.info("Processing: " + item)
);
})
.switchIfEmpty(fallbackProbe.flux())
.log();
TestSubscriber<String> subscriber = TestSubscriber.create();
workflow.subscribe(subscriber);
// Advance time to complete workflow
vts.advanceTimeBy(Duration.ofSeconds(6));
// Verify results
assert subscriber.isTerminatedComplete();
assert subscriber.getReceivedOnNext().size() == 5;
assert processedCount.get() == 5;
// Verify fallback was not used
fallbackProbe.assertWasNotSubscribed();
// Verify logging
String logOutput = logger.getOutContent();
assert logOutput.contains("Processing: item-0");
assert logOutput.contains("onComplete");
} finally {
LoggerUtils.disableCapture();
VirtualTimeScheduler.reset();
}
}Install with Tessl CLI
npx tessl i tessl/maven-io-projectreactor--reactor-test