CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-projectreactor--reactor-test

Testing support library providing utilities for verifying reactive stream behavior in Project Reactor applications

Pending
Overview
Eval results
Files

test-publisher.mddocs/

Controlled Publishers

TestPublisher provides a manipulable Publisher implementation for controlled signal emission in tests. It allows complete control over when and what signals are emitted, making it ideal for testing subscriber behavior and backpressure scenarios.

Capabilities

Creating TestPublishers

Factory methods for creating different types of TestPublisher instances.

abstract class TestPublisher<T> implements Publisher<T>, PublisherProbe<T> {
    /** Create standard hot TestPublisher */
    static <T> TestPublisher<T> create();
    
    /** Create spec-violating TestPublisher with specified violations */
    static <T> TestPublisher<T> createNoncompliant(Violation first, Violation... rest);
    
    /** Create cold TestPublisher with buffering support for late subscribers */
    static <T> TestPublisher<T> createCold();
    
    /** Create cold TestPublisher that errors on overflow instead of buffering */
    static <T> TestPublisher<T> createColdNonBuffering();
    
    /** Create cold non-compliant TestPublisher with custom behavior */
    static <T> TestPublisher<T> createColdNonCompliant(
        boolean errorOnOverflow, 
        Violation firstViolation, 
        Violation... otherViolations
    );
}

Usage Examples:

import reactor.test.publisher.TestPublisher;

// Hot publisher - signals sent to current subscribers only
TestPublisher<String> hotPublisher = TestPublisher.create();
hotPublisher.next("early"); // Lost if no subscribers yet

Flux<String> flux = hotPublisher.flux();
flux.subscribe(System.out::println);
hotPublisher.next("hello");    // Delivered to subscriber
hotPublisher.complete();

// Cold publisher - buffers signals for late subscribers  
TestPublisher<Integer> coldPublisher = TestPublisher.createCold();
coldPublisher.next(1, 2, 3);  // Buffered
coldPublisher.complete();

// Late subscriber receives all buffered signals
coldPublisher.flux().subscribe(System.out::println); // Prints: 1, 2, 3

// Non-compliant publisher for testing error handling
TestPublisher<String> badPublisher = TestPublisher.createNoncompliant(
    TestPublisher.Violation.ALLOW_NULL
);
badPublisher.next(null); // Normally illegal, but allowed

Conversion Methods

Convert TestPublisher to standard Reactor types.

abstract class TestPublisher<T> {
    /** Wrap as Flux */
    Flux<T> flux();
    
    /** Wrap as Mono (will error if more than one element) */
    Mono<T> mono();
}

Signal Emission

Methods for manually emitting reactive signals.

abstract class TestPublisher<T> {
    /** Emit single onNext signal */
    TestPublisher<T> next(@Nullable T value);
    
    /** Emit multiple onNext signals */
    TestPublisher<T> next(T first, T... rest);
    
    /** Emit values and complete immediately */
    TestPublisher<T> emit(T... values);
    
    /** Trigger error signal */
    TestPublisher<T> error(Throwable t);
    
    /** Trigger completion signal */
    TestPublisher<T> complete();
}

Usage Examples:

TestPublisher<String> publisher = TestPublisher.create();

// Emit signals one by one
publisher.next("first")
         .next("second") 
         .complete();

// Emit multiple values at once
publisher.next("hello", "world", "!");

// Emit values and complete in one call
TestPublisher.create()
    .emit("alpha", "beta", "gamma"); // Automatically completes

// Error scenarios
publisher.error(new RuntimeException("Something went wrong"));

Subscription Assertions

Methods for asserting subscriber behavior and request patterns.

abstract class TestPublisher<T> {
    /** Assert minimum request amount received */
    TestPublisher<T> assertMinRequested(long n);
    
    /** Assert maximum request amount received */
    TestPublisher<T> assertMaxRequested(long n);
    
    /** Assert has at least one subscriber */
    TestPublisher<T> assertSubscribers();
    
    /** Assert specific number of subscribers */
    TestPublisher<T> assertSubscribers(int n);
    
    /** Assert no subscribers */
    TestPublisher<T> assertNoSubscribers();
    
    /** Assert has cancelled subscribers */
    TestPublisher<T> assertCancelled();
    
    /** Assert specific number of cancelled subscribers */
    TestPublisher<T> assertCancelled(int n);
    
    /** Assert no cancelled subscribers */
    TestPublisher<T> assertNotCancelled();
    
    /** Assert request overflow occurred (requested more than available) */
    TestPublisher<T> assertRequestOverflow();
    
    /** Assert no request overflow */
    TestPublisher<T> assertNoRequestOverflow();
}

Usage Examples:

TestPublisher<Integer> publisher = TestPublisher.create();

// Subscribe and make requests
Disposable subscription = publisher.flux()
    .subscribe(System.out::println);

// Test subscription behavior
publisher.assertSubscribers(1)           // Has 1 subscriber
         .assertMinRequested(1)          // Requested at least 1 element
         .next(42)                       // Send value
         .assertNotCancelled()           // Still subscribed
         .complete();

// Test cancellation
subscription.dispose();
publisher.assertCancelled(1);            // 1 cancelled subscriber

// Test backpressure
publisher.flux()
    .take(5)                             // Subscriber will cancel after 5
    .subscribe();
    
publisher.next(1, 2, 3, 4, 5, 6)        // Send 6 values
         .assertCancelled();             // Subscriber cancelled after 5

Publisher Probe Integration

TestPublisher implements PublisherProbe for subscription event tracking.

// TestPublisher inherits from PublisherProbe
interface PublisherProbe<T> {
    boolean wasSubscribed();
    long subscribeCount();
    boolean wasCancelled();
    boolean wasRequested();
    
    void assertWasSubscribed();
    void assertWasNotSubscribed();
    void assertWasCancelled();
    void assertWasNotCancelled();
    void assertWasRequested();
    void assertWasNotRequested();
}

Types

// Reactive Streams specification violations for testing error handling
enum TestPublisher.Violation {
    /** Allow next calls despite insufficient request (violates backpressure) */
    REQUEST_OVERFLOW,
    
    /** Allow null values in next calls (violates non-null requirement) */
    ALLOW_NULL,
    
    /** Allow multiple termination signals (violates single terminal) */
    CLEANUP_ON_TERMINATE,
    
    /** Ignore cancellation signals (violates cancellation semantics) */
    DEFER_CANCELLATION
}

Violation Usage Examples:

// Test handling of backpressure violations
TestPublisher<String> overflowPublisher = TestPublisher.createNoncompliant(
    TestPublisher.Violation.REQUEST_OVERFLOW
);

StepVerifier.create(overflowPublisher.flux(), 0) // Request 0 initially
    .then(() -> overflowPublisher.next("overflow")) // Send despite no request
    .thenRequest(1)
    .expectNext("overflow")
    .expectComplete()
    .verify();

// Test null value handling
TestPublisher<String> nullPublisher = TestPublisher.createNoncompliant(
    TestPublisher.Violation.ALLOW_NULL
);

StepVerifier.create(nullPublisher.flux())
    .then(() -> nullPublisher.next(null).complete())
    .expectNext((String) null)
    .expectComplete()
    .verify();

// Test multiple termination signals
TestPublisher<String> multiTermPublisher = TestPublisher.createNoncompliant(
    TestPublisher.Violation.CLEANUP_ON_TERMINATE
);

StepVerifier.create(multiTermPublisher.flux())
    .then(() -> {
        multiTermPublisher.complete(); // First termination
        multiTermPublisher.error(new RuntimeException()); // Second termination
    })
    .expectComplete() // Only first termination takes effect
    .verify();

// Test cancellation deferral
TestPublisher<Integer> deferCancelPublisher = TestPublisher.createNoncompliant(
    TestPublisher.Violation.DEFER_CANCELLATION
);

// Publisher ignores cancellation and continues emitting
Disposable subscription = deferCancelPublisher.flux()
    .subscribe(System.out::println);
    
subscription.dispose(); // Cancel subscription
deferCancelPublisher.next(1, 2, 3); // Still emits despite cancellation

Install with Tessl CLI

npx tessl i tessl/maven-io-projectreactor--reactor-test

docs

index.md

publisher-probe.md

step-verifier.md

test-publisher.md

test-subscriber.md

testing-utilities.md

virtual-time.md

tile.json