Testing support library providing utilities for verifying reactive stream behavior in Project Reactor applications
—
StepVerifier provides a declarative API for creating verifiable scripts that express expectations about reactive Publisher sequences. It's the primary testing utility in reactor-test for building step-by-step test scenarios.
Factory methods for creating StepVerifier instances with different configurations.
interface StepVerifier {
/** Default verification timeout */
Duration DEFAULT_VERIFY_TIMEOUT = Duration.ZERO;
/** Create standard verifier for the given publisher */
static <T> FirstStep<T> create(Publisher<? extends T> publisher);
/** Create verifier with specific initial request amount */
static <T> FirstStep<T> create(Publisher<? extends T> publisher, long initialRequest);
/** Create verifier with custom options */
static <T> FirstStep<T> create(Publisher<? extends T> publisher, StepVerifierOptions options);
/** Create verifier with virtual time scheduler for time-based testing */
static <T> FirstStep<T> withVirtualTime(Supplier<? extends Publisher<? extends T>> scenarioSupplier);
/** Create virtual time verifier with initial request */
static <T> FirstStep<T> withVirtualTime(Supplier<? extends Publisher<? extends T>> scenarioSupplier, long initialRequest);
/** Create virtual time verifier with custom options */
static <T> FirstStep<T> withVirtualTime(Supplier<? extends Publisher<? extends T> scenarioSupplier, StepVerifierOptions options);
}Usage Examples:
import reactor.test.StepVerifier;
// Basic verification
StepVerifier.create(Flux.just("hello", "world"))
.expectNext("hello")
.expectNext("world")
.expectComplete()
.verify();
// With initial request limit
StepVerifier.create(Flux.range(1, 100), 5)
.expectNextCount(5)
.thenRequest(10)
.expectNextCount(10)
.thenCancel()
.verify();
// Virtual time for delayed operations
StepVerifier.withVirtualTime(() ->
Flux.just("delayed").delayElements(Duration.ofHours(1)))
.expectSubscription()
.expectNoEvent(Duration.ofHours(1))
.expectNext("delayed")
.expectComplete()
.verify();First step expectations for subscription-related events.
interface FirstStep<T> {
/** Expect subscription signal */
Step<T> expectSubscription();
/** Expect subscription matching predicate */
Step<T> expectSubscriptionMatches(Predicate<? super Subscription> predicate);
/** Expect Reactor Fusion optimization */
Step<T> expectFusion();
/** Expect specific fusion mode */
Step<T> expectFusion(int requested);
/** Expect fusion negotiation with specific result */
Step<T> expectFusion(int requested, int expected);
/** Expect no fusion capability */
Step<T> expectNoFusionSupport();
/** Enable conditional subscriber mode */
Step<T> enableConditionalSupport(Predicate<? super T> tryOnNextPredicate);
}Core expectations for onNext events and data validation.
interface Step<T> extends LastStep {
/** Set description for previous step */
Step<T> as(String description);
/** Expect specific next value */
Step<T> expectNext(T value);
/** Expect multiple specific next values */
Step<T> expectNext(T first, T second);
Step<T> expectNext(T first, T second, T third);
Step<T> expectNext(T first, T second, T third, T fourth);
Step<T> expectNext(T first, T second, T third, T fourth, T fifth);
Step<T> expectNext(T first, T second, T third, T fourth, T fifth, T sixth);
Step<T> expectNext(T... values);
/** Expect specific number of next values */
Step<T> expectNextCount(long count);
/** Expect values matching the given sequence */
Step<T> expectNextSequence(Iterable<? extends T> sequence);
/** Expect next value matching predicate */
Step<T> expectNextMatches(Predicate<? super T> predicate);
/** Consume and assert next value with consumer */
Step<T> consumeNextWith(Consumer<? super T> consumer);
/** Alias for consumeNextWith */
Step<T> assertNext(Consumer<? super T> assertionConsumer);
/** Consume subscription with custom consumer */
Step<T> consumeSubscriptionWith(Consumer<? super Subscription> consumer);
}Usage Examples:
// Exact value expectations
StepVerifier.create(Flux.just(1, 2, 3))
.expectNext(1)
.expectNext(2, 3)
.expectComplete()
.verify();
// Count-based expectations
StepVerifier.create(Flux.range(1, 100))
.expectNextCount(100)
.expectComplete()
.verify();
// Predicate-based expectations
StepVerifier.create(Flux.just("hello", "world"))
.expectNextMatches(s -> s.startsWith("h"))
.expectNextMatches(s -> s.length() == 5)
.expectComplete()
.verify();
// Custom assertions
StepVerifier.create(Flux.just(new Person("Alice", 30)))
.consumeNextWith(person -> {
assertThat(person.getName()).isEqualTo("Alice");
assertThat(person.getAge()).isEqualTo(30);
})
.expectComplete()
.verify();Support for testing Reactor Context propagation.
interface Step<T> {
/** Expect Context propagation */
ContextExpectations<T> expectAccessibleContext();
/** Expect no Context propagation */
Step<T> expectNoAccessibleContext();
}
interface ContextExpectations<T> {
/** Assert context contains key */
ContextExpectations<T> hasKey(Object key);
/** Assert context size */
ContextExpectations<T> hasSize(int size);
/** Assert context contains key-value pair */
ContextExpectations<T> contains(Object key, Object value);
/** Assert context contains all of other context */
ContextExpectations<T> containsAllOf(Context other);
ContextExpectations<T> containsAllOf(Map<?, ?> other);
/** Assert context equals other context */
ContextExpectations<T> containsOnly(Context other);
ContextExpectations<T> containsOnly(Map<?, ?> other);
/** Custom context assertions */
ContextExpectations<T> assertThat(Consumer<Context> assertingConsumer);
/** Assert context matches predicate */
ContextExpectations<T> matches(Predicate<Context> predicate);
ContextExpectations<T> matches(Predicate<Context> predicate, String description);
/** Return to Step building */
Step<T> then();
}Advanced expectations for timing, recording, and flow control.
interface Step<T> {
/** Expect no events for the given duration */
Step<T> expectNoEvent(Duration duration);
/** Start recording onNext values */
Step<T> recordWith(Supplier<? extends Collection<T>> collectionSupplier);
/** Verify recorded values match predicate */
Step<T> expectRecordedMatches(Predicate<? super Collection<T>> predicate);
/** Consume recorded values with assertions */
Step<T> consumeRecordedWith(Consumer<? super Collection<T>> consumer);
/** Consume subscription signal */
Step<T> consumeSubscriptionWith(Consumer<? super Subscription> consumer);
/** Execute arbitrary task */
Step<T> then(Runnable task);
/** Pause expectation evaluation (virtual time aware) */
Step<T> thenAwait();
/** Pause for specific duration */
Step<T> thenAwait(Duration timeshift);
/** Consume values while predicate matches */
Step<T> thenConsumeWhile(Predicate<T> predicate);
/** Consume values with assertions while predicate matches */
Step<T> thenConsumeWhile(Predicate<T> predicate, Consumer<T> consumer);
/** Request additional elements */
Step<T> thenRequest(long additionalRequest);
/** Cancel subscription */
LastStep thenCancel();
}Expectations for completion, error, and timeout scenarios.
interface LastStep {
/** Expect onComplete signal */
StepVerifier expectComplete();
/** Expect any error signal */
StepVerifier expectError();
/** Expect specific error type */
StepVerifier expectError(Class<? extends Throwable> errorType);
/** Expect error with specific message */
StepVerifier expectErrorMessage(String errorMessage);
/** Expect error matching predicate */
StepVerifier expectErrorMatches(Predicate<Throwable> predicate);
/** Assert error with consumer */
StepVerifier expectErrorSatisfies(Consumer<Throwable> assertionConsumer);
/** Consume error signal */
StepVerifier consumeErrorWith(Consumer<Throwable> consumer);
/** Expect publisher to timeout */
StepVerifier expectTimeout(Duration duration);
/** Cancel subscription */
StepVerifier thenCancel();
// Terminal verification convenience methods
Duration verifyComplete();
Duration verifyError();
Duration verifyError(Class<? extends Throwable> errorType);
Duration verifyErrorMessage(String errorMessage);
Duration verifyErrorMatches(Predicate<Throwable> predicate);
Duration verifyErrorSatisfies(Consumer<Throwable> assertionConsumer);
Duration verifyTimeout(Duration duration);
}Methods for executing the verification script.
interface StepVerifier {
/** Enable debug logging of verification steps */
StepVerifier log();
/** Trigger subscription without blocking (for manual verification) */
StepVerifier verifyLater();
/** Execute verification script (blocking until completion) */
Duration verify();
/** Execute verification script with timeout */
Duration verify(Duration timeout);
/** Execute verification and expose post-verification assertions */
Assertions verifyThenAssertThat();
/** Execute verification with timeout and expose assertions */
Assertions verifyThenAssertThat(Duration timeout);
}Assertions available after verification completion for inspecting dropped elements, errors, and timing.
interface Assertions {
// Element dropping assertions
Assertions hasDroppedElements();
Assertions hasNotDroppedElements();
Assertions hasDropped(Object... values);
Assertions hasDroppedExactly(Object... values);
// Element discarding assertions
Assertions hasDiscardedElements();
Assertions hasNotDiscardedElements();
Assertions hasDiscarded(Object... values);
Assertions hasDiscardedExactly(Object... values);
Assertions hasDiscardedElementsMatching(Predicate<Collection<Object>> matcher);
Assertions hasDiscardedElementsSatisfying(Consumer<Collection<Object>> consumer);
// Error dropping assertions
Assertions hasDroppedErrors();
Assertions hasNotDroppedErrors();
Assertions hasDroppedErrors(int count);
Assertions hasDroppedErrorOfType(Class<? extends Throwable> errorType);
Assertions hasDroppedErrorMatching(Predicate<Throwable> matcher);
Assertions hasDroppedErrorWithMessage(String message);
Assertions hasDroppedErrorWithMessageContaining(String messagePart);
Assertions hasDroppedErrorsSatisfying(Consumer<Collection<Throwable>> errorsConsumer);
Assertions hasDroppedErrorsMatching(Predicate<Collection<Throwable>> errorsConsumer);
// Operator error assertions
Assertions hasOperatorErrors();
Assertions hasOperatorErrors(int count);
Assertions hasOperatorErrorOfType(Class<? extends Throwable> errorType);
Assertions hasOperatorErrorMatching(Predicate<Throwable> matcher);
Assertions hasOperatorErrorWithMessage(String message);
Assertions hasOperatorErrorWithMessageContaining(String messagePart);
// Timing assertions
Assertions tookLessThan(Duration duration);
Assertions tookMoreThan(Duration duration);
}Static methods for configuring default verification behavior.
interface StepVerifier {
/** Set global default verification timeout */
static void setDefaultTimeout(Duration timeout);
/** Reset timeout to unlimited default */
static void resetDefaultTimeout();
/** Default timeout value (Duration.ZERO = unlimited) */
Duration DEFAULT_VERIFY_TIMEOUT = Duration.ZERO;
}// Configuration options for StepVerifier
class StepVerifierOptions {
static StepVerifierOptions create();
StepVerifierOptions copy();
StepVerifierOptions checkUnderRequesting(boolean enabled);
StepVerifierOptions initialRequest(long initialRequest);
StepVerifierOptions valueFormatter(ToStringConverter valueFormatter);
StepVerifierOptions extractor(Extractor<T> extractor);
StepVerifierOptions virtualTimeSchedulerSupplier(Supplier<? extends VirtualTimeScheduler> vtsLookup);
StepVerifierOptions withInitialContext(Context context);
StepVerifierOptions scenarioName(String scenarioName);
// Accessors
boolean isCheckUnderRequesting();
long getInitialRequest();
ToStringConverter getValueFormatter();
Map<Class<?>, Extractor<?>> getExtractors();
Supplier<? extends VirtualTimeScheduler> getVirtualTimeSchedulerSupplier();
Context getInitialContext();
String getScenarioName();
}
// Value formatting utilities
class ValueFormatters {
static <T> ToStringConverter forClass(Class<T> tClass, Function<T, String> tToString);
static <T> ToStringConverter forClassMatching(Class<T> tClass, Predicate<T> tPredicate, Function<T, String> tToString);
static ToStringConverter filtering(Predicate<Object> predicate, Function<Object, String> anyToString);
static Extractor<Signal<?>> signalExtractor();
static Extractor<Iterable<?>> iterableExtractor();
static <T> Extractor<T[]> arrayExtractor(Class<T[]> arrayClass);
ToStringConverter DURATION_CONVERTER;
}
// Functional interfaces for custom formatting
@FunctionalInterface
interface ToStringConverter extends Function<Object, String> {}
@FunctionalInterface
interface Extractor<CONTAINER> extends Function<CONTAINER, Stream<?>> {}Install with Tessl CLI
npx tessl i tessl/maven-io-projectreactor--reactor-test