CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-projectreactor--reactor-test

Testing support library providing utilities for verifying reactive stream behavior in Project Reactor applications

Pending
Overview
Eval results
Files

step-verifier.mddocs/

Step-by-Step Verification

StepVerifier provides a declarative API for creating verifiable scripts that express expectations about reactive Publisher sequences. It's the primary testing utility in reactor-test for building step-by-step test scenarios.

Capabilities

Creating StepVerifiers

Factory methods for creating StepVerifier instances with different configurations.

interface StepVerifier {
    /** Default verification timeout */
    Duration DEFAULT_VERIFY_TIMEOUT = Duration.ZERO;
    
    /** Create standard verifier for the given publisher */
    static <T> FirstStep<T> create(Publisher<? extends T> publisher);
    
    /** Create verifier with specific initial request amount */
    static <T> FirstStep<T> create(Publisher<? extends T> publisher, long initialRequest);
    
    /** Create verifier with custom options */
    static <T> FirstStep<T> create(Publisher<? extends T> publisher, StepVerifierOptions options);
    
    /** Create verifier with virtual time scheduler for time-based testing */
    static <T> FirstStep<T> withVirtualTime(Supplier<? extends Publisher<? extends T>> scenarioSupplier);
    
    /** Create virtual time verifier with initial request */
    static <T> FirstStep<T> withVirtualTime(Supplier<? extends Publisher<? extends T>> scenarioSupplier, long initialRequest);
    
    /** Create virtual time verifier with custom options */
    static <T> FirstStep<T> withVirtualTime(Supplier<? extends Publisher<? extends T> scenarioSupplier, StepVerifierOptions options);
}

Usage Examples:

import reactor.test.StepVerifier;

// Basic verification
StepVerifier.create(Flux.just("hello", "world"))
    .expectNext("hello")
    .expectNext("world")
    .expectComplete()
    .verify();

// With initial request limit
StepVerifier.create(Flux.range(1, 100), 5)
    .expectNextCount(5)
    .thenRequest(10)
    .expectNextCount(10)
    .thenCancel()
    .verify();

// Virtual time for delayed operations
StepVerifier.withVirtualTime(() -> 
        Flux.just("delayed").delayElements(Duration.ofHours(1)))
    .expectSubscription()
    .expectNoEvent(Duration.ofHours(1))
    .expectNext("delayed")
    .expectComplete()
    .verify();

Subscription Expectations

First step expectations for subscription-related events.

interface FirstStep<T> {
    /** Expect subscription signal */
    Step<T> expectSubscription();
    
    /** Expect subscription matching predicate */
    Step<T> expectSubscriptionMatches(Predicate<? super Subscription> predicate);
    
    /** Expect Reactor Fusion optimization */
    Step<T> expectFusion();
    
    /** Expect specific fusion mode */
    Step<T> expectFusion(int requested);
    
    /** Expect fusion negotiation with specific result */
    Step<T> expectFusion(int requested, int expected);
    
    /** Expect no fusion capability */
    Step<T> expectNoFusionSupport();
    
    /** Enable conditional subscriber mode */
    Step<T> enableConditionalSupport(Predicate<? super T> tryOnNextPredicate);
}

Data Expectations

Core expectations for onNext events and data validation.

interface Step<T> extends LastStep {
    /** Set description for previous step */
    Step<T> as(String description);
    
    /** Expect specific next value */
    Step<T> expectNext(T value);
    
    /** Expect multiple specific next values */
    Step<T> expectNext(T first, T second);
    Step<T> expectNext(T first, T second, T third);
    Step<T> expectNext(T first, T second, T third, T fourth);
    Step<T> expectNext(T first, T second, T third, T fourth, T fifth);
    Step<T> expectNext(T first, T second, T third, T fourth, T fifth, T sixth);
    Step<T> expectNext(T... values);
    
    /** Expect specific number of next values */
    Step<T> expectNextCount(long count);
    
    /** Expect values matching the given sequence */
    Step<T> expectNextSequence(Iterable<? extends T> sequence);
    
    /** Expect next value matching predicate */
    Step<T> expectNextMatches(Predicate<? super T> predicate);
    
    /** Consume and assert next value with consumer */
    Step<T> consumeNextWith(Consumer<? super T> consumer);
    
    /** Alias for consumeNextWith */
    Step<T> assertNext(Consumer<? super T> assertionConsumer);
    
    /** Consume subscription with custom consumer */
    Step<T> consumeSubscriptionWith(Consumer<? super Subscription> consumer);
}

Usage Examples:

// Exact value expectations
StepVerifier.create(Flux.just(1, 2, 3))
    .expectNext(1)
    .expectNext(2, 3)
    .expectComplete()
    .verify();

// Count-based expectations
StepVerifier.create(Flux.range(1, 100))
    .expectNextCount(100)
    .expectComplete()
    .verify();

// Predicate-based expectations
StepVerifier.create(Flux.just("hello", "world"))
    .expectNextMatches(s -> s.startsWith("h"))
    .expectNextMatches(s -> s.length() == 5)
    .expectComplete()
    .verify();

// Custom assertions
StepVerifier.create(Flux.just(new Person("Alice", 30)))
    .consumeNextWith(person -> {
        assertThat(person.getName()).isEqualTo("Alice");
        assertThat(person.getAge()).isEqualTo(30);
    })
    .expectComplete()
    .verify();

Context Expectations

Support for testing Reactor Context propagation.

interface Step<T> {
    /** Expect Context propagation */
    ContextExpectations<T> expectAccessibleContext();
    
    /** Expect no Context propagation */
    Step<T> expectNoAccessibleContext();
}

interface ContextExpectations<T> {
    /** Assert context contains key */
    ContextExpectations<T> hasKey(Object key);
    
    /** Assert context size */
    ContextExpectations<T> hasSize(int size);
    
    /** Assert context contains key-value pair */
    ContextExpectations<T> contains(Object key, Object value);
    
    /** Assert context contains all of other context */
    ContextExpectations<T> containsAllOf(Context other);
    ContextExpectations<T> containsAllOf(Map<?, ?> other);
    
    /** Assert context equals other context */
    ContextExpectations<T> containsOnly(Context other);
    ContextExpectations<T> containsOnly(Map<?, ?> other);
    
    /** Custom context assertions */
    ContextExpectations<T> assertThat(Consumer<Context> assertingConsumer);
    
    /** Assert context matches predicate */
    ContextExpectations<T> matches(Predicate<Context> predicate);
    ContextExpectations<T> matches(Predicate<Context> predicate, String description);
    
    /** Return to Step building */
    Step<T> then();
}

Timing and Control Expectations

Advanced expectations for timing, recording, and flow control.

interface Step<T> {
    /** Expect no events for the given duration */
    Step<T> expectNoEvent(Duration duration);
    
    /** Start recording onNext values */
    Step<T> recordWith(Supplier<? extends Collection<T>> collectionSupplier);
    
    /** Verify recorded values match predicate */
    Step<T> expectRecordedMatches(Predicate<? super Collection<T>> predicate);
    
    /** Consume recorded values with assertions */
    Step<T> consumeRecordedWith(Consumer<? super Collection<T>> consumer);
    
    /** Consume subscription signal */
    Step<T> consumeSubscriptionWith(Consumer<? super Subscription> consumer);
    
    /** Execute arbitrary task */
    Step<T> then(Runnable task);
    
    /** Pause expectation evaluation (virtual time aware) */
    Step<T> thenAwait();
    
    /** Pause for specific duration */
    Step<T> thenAwait(Duration timeshift);
    
    /** Consume values while predicate matches */
    Step<T> thenConsumeWhile(Predicate<T> predicate);
    
    /** Consume values with assertions while predicate matches */
    Step<T> thenConsumeWhile(Predicate<T> predicate, Consumer<T> consumer);
    
    /** Request additional elements */
    Step<T> thenRequest(long additionalRequest);
    
    /** Cancel subscription */
    LastStep thenCancel();
}

Terminal Expectations

Expectations for completion, error, and timeout scenarios.

interface LastStep {
    /** Expect onComplete signal */
    StepVerifier expectComplete();
    
    /** Expect any error signal */
    StepVerifier expectError();
    
    /** Expect specific error type */
    StepVerifier expectError(Class<? extends Throwable> errorType);
    
    /** Expect error with specific message */
    StepVerifier expectErrorMessage(String errorMessage);
    
    /** Expect error matching predicate */
    StepVerifier expectErrorMatches(Predicate<Throwable> predicate);
    
    /** Assert error with consumer */
    StepVerifier expectErrorSatisfies(Consumer<Throwable> assertionConsumer);
    
    /** Consume error signal */
    StepVerifier consumeErrorWith(Consumer<Throwable> consumer);
    
    /** Expect publisher to timeout */
    StepVerifier expectTimeout(Duration duration);
    
    /** Cancel subscription */
    StepVerifier thenCancel();
    
    // Terminal verification convenience methods
    Duration verifyComplete();
    Duration verifyError();
    Duration verifyError(Class<? extends Throwable> errorType);
    Duration verifyErrorMessage(String errorMessage);
    Duration verifyErrorMatches(Predicate<Throwable> predicate);
    Duration verifyErrorSatisfies(Consumer<Throwable> assertionConsumer);
    Duration verifyTimeout(Duration duration);
}

Verification Execution

Methods for executing the verification script.

interface StepVerifier {
    /** Enable debug logging of verification steps */
    StepVerifier log();
    
    /** Trigger subscription without blocking (for manual verification) */
    StepVerifier verifyLater();
    
    /** Execute verification script (blocking until completion) */
    Duration verify();
    
    /** Execute verification script with timeout */
    Duration verify(Duration timeout);
    
    /** Execute verification and expose post-verification assertions */
    Assertions verifyThenAssertThat();
    
    /** Execute verification with timeout and expose assertions */
    Assertions verifyThenAssertThat(Duration timeout);
}

Post-Verification Assertions

Assertions available after verification completion for inspecting dropped elements, errors, and timing.

interface Assertions {
    // Element dropping assertions
    Assertions hasDroppedElements();
    Assertions hasNotDroppedElements();
    Assertions hasDropped(Object... values);
    Assertions hasDroppedExactly(Object... values);
    
    // Element discarding assertions
    Assertions hasDiscardedElements();
    Assertions hasNotDiscardedElements();
    Assertions hasDiscarded(Object... values);
    Assertions hasDiscardedExactly(Object... values);
    Assertions hasDiscardedElementsMatching(Predicate<Collection<Object>> matcher);
    Assertions hasDiscardedElementsSatisfying(Consumer<Collection<Object>> consumer);
    
    // Error dropping assertions
    Assertions hasDroppedErrors();
    Assertions hasNotDroppedErrors();
    Assertions hasDroppedErrors(int count);
    Assertions hasDroppedErrorOfType(Class<? extends Throwable> errorType);
    Assertions hasDroppedErrorMatching(Predicate<Throwable> matcher);
    Assertions hasDroppedErrorWithMessage(String message);
    Assertions hasDroppedErrorWithMessageContaining(String messagePart);
    Assertions hasDroppedErrorsSatisfying(Consumer<Collection<Throwable>> errorsConsumer);
    Assertions hasDroppedErrorsMatching(Predicate<Collection<Throwable>> errorsConsumer);
    
    // Operator error assertions
    Assertions hasOperatorErrors();
    Assertions hasOperatorErrors(int count);
    Assertions hasOperatorErrorOfType(Class<? extends Throwable> errorType);
    Assertions hasOperatorErrorMatching(Predicate<Throwable> matcher);
    Assertions hasOperatorErrorWithMessage(String message);
    Assertions hasOperatorErrorWithMessageContaining(String messagePart);
    
    // Timing assertions
    Assertions tookLessThan(Duration duration);
    Assertions tookMoreThan(Duration duration);
}

Global Configuration

Static methods for configuring default verification behavior.

interface StepVerifier {
    /** Set global default verification timeout */
    static void setDefaultTimeout(Duration timeout);
    
    /** Reset timeout to unlimited default */
    static void resetDefaultTimeout();
    
    /** Default timeout value (Duration.ZERO = unlimited) */
    Duration DEFAULT_VERIFY_TIMEOUT = Duration.ZERO;
}

Types

// Configuration options for StepVerifier
class StepVerifierOptions {
    static StepVerifierOptions create();
    
    StepVerifierOptions copy();
    StepVerifierOptions checkUnderRequesting(boolean enabled);
    StepVerifierOptions initialRequest(long initialRequest);
    StepVerifierOptions valueFormatter(ToStringConverter valueFormatter);
    StepVerifierOptions extractor(Extractor<T> extractor);
    StepVerifierOptions virtualTimeSchedulerSupplier(Supplier<? extends VirtualTimeScheduler> vtsLookup);
    StepVerifierOptions withInitialContext(Context context);
    StepVerifierOptions scenarioName(String scenarioName);
    
    // Accessors
    boolean isCheckUnderRequesting();
    long getInitialRequest();
    ToStringConverter getValueFormatter();
    Map<Class<?>, Extractor<?>> getExtractors();
    Supplier<? extends VirtualTimeScheduler> getVirtualTimeSchedulerSupplier();
    Context getInitialContext();
    String getScenarioName();
}

// Value formatting utilities
class ValueFormatters {
    static <T> ToStringConverter forClass(Class<T> tClass, Function<T, String> tToString);
    static <T> ToStringConverter forClassMatching(Class<T> tClass, Predicate<T> tPredicate, Function<T, String> tToString);
    static ToStringConverter filtering(Predicate<Object> predicate, Function<Object, String> anyToString);
    static Extractor<Signal<?>> signalExtractor();
    static Extractor<Iterable<?>> iterableExtractor();
    static <T> Extractor<T[]> arrayExtractor(Class<T[]> arrayClass);
    
    ToStringConverter DURATION_CONVERTER;
}

// Functional interfaces for custom formatting
@FunctionalInterface
interface ToStringConverter extends Function<Object, String> {}

@FunctionalInterface  
interface Extractor<CONTAINER> extends Function<CONTAINER, Stream<?>> {}

Install with Tessl CLI

npx tessl i tessl/maven-io-projectreactor--reactor-test

docs

index.md

publisher-probe.md

step-verifier.md

test-publisher.md

test-subscriber.md

testing-utilities.md

virtual-time.md

tile.json