Testing support library providing utilities for verifying reactive stream behavior in Project Reactor applications
—
PublisherProbe provides instrumentation for Publishers to capture subscription events and verify control flow without affecting the actual data flow. It's ideal for testing conditional logic and verifying that certain code paths are executed.
Factory methods for creating PublisherProbe instances.
interface PublisherProbe<T> {
/** Create probe wrapping existing publisher */
static <T> PublisherProbe<T> of(Publisher<? extends T> source);
/** Create probe that simply completes (empty source) */
static <T> PublisherProbe<T> empty();
}Usage Examples:
import reactor.test.publisher.PublisherProbe;
// Probe an existing publisher
Flux<String> source = Flux.just("hello", "world");
PublisherProbe<String> probe = PublisherProbe.of(source);
// Use probe in place of original publisher
probe.flux().subscribe(System.out::println);
// Verify subscription occurred
assert probe.wasSubscribed();
// Create empty probe for conditional paths
PublisherProbe<String> emptyProbe = PublisherProbe.empty();
// Use in conditional logic where you want to verify execution pathConvert PublisherProbe to standard Reactor types.
interface PublisherProbe<T> {
/** Get Mono version of probe */
Mono<T> mono();
/** Get Flux version of probe */
Flux<T> flux();
}Methods for querying subscription and interaction state.
interface PublisherProbe<T> {
/** Check if probe was subscribed to */
boolean wasSubscribed();
/** Get total number of subscriptions */
long subscribeCount();
/** Check if probe was cancelled */
boolean wasCancelled();
/** Check if probe received any requests */
boolean wasRequested();
}Usage Examples:
PublisherProbe<String> probe = PublisherProbe.of(Flux.just("test"));
// Before subscription
assert !probe.wasSubscribed();
assert probe.subscribeCount() == 0;
assert !probe.wasCancelled();
assert !probe.wasRequested();
// Subscribe and check state
Disposable subscription = probe.flux().subscribe();
assert probe.wasSubscribed();
assert probe.subscribeCount() == 1;
assert !probe.wasCancelled();
assert probe.wasRequested();
// Cancel and check state
subscription.dispose();
assert probe.wasCancelled();
// Multiple subscriptions
probe.flux().subscribe();
assert probe.subscribeCount() == 2; // Now has 2 total subscriptionsAssertion methods that throw exceptions on failure for test integration.
interface PublisherProbe<T> {
/** Assert probe was subscribed (throws if not) */
void assertWasSubscribed();
/** Assert probe was not subscribed (throws if was) */
void assertWasNotSubscribed();
/** Assert probe was cancelled (throws if not) */
void assertWasCancelled();
/** Assert probe was not cancelled (throws if was) */
void assertWasNotCancelled();
/** Assert probe received requests (throws if not) */
void assertWasRequested();
/** Assert probe received no requests (throws if did) */
void assertWasNotRequested();
}Usage Examples:
PublisherProbe<String> probe = PublisherProbe.empty();
// Test conditional execution paths
if (someCondition) {
probe.flux().subscribe(); // Conditional subscription
}
// Assert the condition was met
probe.assertWasSubscribed(); // Throws AssertionError if condition was false
// Test error handling paths
try {
probe.flux()
.map(s -> { throw new RuntimeException(); })
.subscribe();
} catch (Exception e) {
// Handle error
}
probe.assertWasSubscribed(); // Verify subscription occurred despite errorPublisherProbe is excellent for verifying that conditional code paths are executed:
@Test
public void testConditionalExecution() {
PublisherProbe<String> thenProbe = PublisherProbe.empty();
PublisherProbe<String> elseProbe = PublisherProbe.empty();
boolean condition = true;
Flux<String> result = condition
? thenProbe.flux()
: elseProbe.flux();
StepVerifier.create(result)
.expectComplete()
.verify();
// Verify correct branch was taken
thenProbe.assertWasSubscribed();
elseProbe.assertWasNotSubscribed();
}Verify that fallback publishers are used when main publishers are empty:
@Test
public void testSwitchIfEmpty() {
Flux<String> empty = Flux.empty();
PublisherProbe<String> fallback = PublisherProbe.of(Flux.just("fallback"));
StepVerifier.create(
empty.switchIfEmpty(fallback.flux())
)
.expectNext("fallback")
.expectComplete()
.verify();
fallback.assertWasSubscribed(); // Verify fallback was used
}
@Test
public void testSwitchIfEmptyNotUsed() {
Flux<String> nonEmpty = Flux.just("primary");
PublisherProbe<String> fallback = PublisherProbe.of(Flux.just("fallback"));
StepVerifier.create(
nonEmpty.switchIfEmpty(fallback.flux())
)
.expectNext("primary")
.expectComplete()
.verify();
fallback.assertWasNotSubscribed(); // Verify fallback was not used
}Verify that error recovery publishers are triggered:
@Test
public void testErrorRecovery() {
Flux<String> failing = Flux.error(new RuntimeException("Error"));
PublisherProbe<String> recovery = PublisherProbe.of(Flux.just("recovered"));
StepVerifier.create(
failing.onErrorResume(ex -> recovery.flux())
)
.expectNext("recovered")
.expectComplete()
.verify();
recovery.assertWasSubscribed(); // Verify recovery was triggered
}Verify multiple conditional paths in complex reactive chains:
@Test
public void testComplexControlFlow() {
PublisherProbe<String> cacheHit = PublisherProbe.of(Flux.just("cached"));
PublisherProbe<String> networkCall = PublisherProbe.of(Flux.just("network"));
PublisherProbe<String> fallback = PublisherProbe.of(Flux.just("fallback"));
boolean cacheAvailable = false;
boolean networkAvailable = true;
Flux<String> result = (cacheAvailable ? cacheHit.flux() : Flux.empty())
.switchIfEmpty(networkAvailable ? networkCall.flux() : fallback.flux());
StepVerifier.create(result)
.expectNext("network")
.expectComplete()
.verify();
// Verify execution path
cacheHit.assertWasNotSubscribed(); // Cache was not available
networkCall.assertWasSubscribed(); // Network was used
fallback.assertWasNotSubscribed(); // Fallback was not needed
}PublisherProbe works well with other reactor-test utilities:
@Test
public void testWithStepVerifier() {
PublisherProbe<String> probe = PublisherProbe.of(
Flux.just("a", "b", "c").delayElements(Duration.ofMillis(100))
);
StepVerifier.create(probe.flux())
.expectNext("a", "b", "c")
.expectComplete()
.verify();
// Verify subscription behavior
probe.assertWasSubscribed();
probe.assertWasRequested();
assert probe.subscribeCount() == 1;
}
@Test
public void testWithTestPublisher() {
TestPublisher<String> testPub = TestPublisher.create();
PublisherProbe<String> probe = PublisherProbe.of(testPub.flux());
StepVerifier.create(probe.flux())
.then(() -> testPub.next("test").complete())
.expectNext("test")
.expectComplete()
.verify();
probe.assertWasSubscribed();
testPub.assertSubscribers(1);
}Install with Tessl CLI
npx tessl i tessl/maven-io-projectreactor--reactor-test