CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-projectreactor--reactor-test

Testing support library providing utilities for verifying reactive stream behavior in Project Reactor applications

Pending
Overview
Eval results
Files

test-subscriber.mddocs/

Manual Subscribers

TestSubscriber provides a CoreSubscriber implementation for runtime event assertion, offering an alternative to StepVerifier for complex testing scenarios requiring programmatic access to received signals and flexible assertion patterns.

Capabilities

Creating TestSubscribers

Factory methods and builder for creating TestSubscriber instances.

interface TestSubscriber<T> extends CoreSubscriber<T>, Scannable {
    /** Create simple TestSubscriber with unbounded demand */
    static <T> TestSubscriber<T> create();
    
    /** Create TestSubscriberBuilder for customization */
    static TestSubscriberBuilder builder();
}

class TestSubscriberBuilder {
    /** Add context entry */
    TestSubscriberBuilder contextPut(Object key, Object value);
    
    /** Add multiple context entries */
    TestSubscriberBuilder contextPutAll(ContextView toAdd);
    
    /** Set initial request amount */
    TestSubscriberBuilder initialRequest(long initialRequest);
    
    /** Set unbounded initial request */
    TestSubscriberBuilder initialRequestUnbounded();
    
    /** Require specific fusion mode */
    TestSubscriberBuilder requireFusion(int exactMode);
    
    /** Require fusion negotiation result */
    TestSubscriberBuilder requireFusion(int requestedMode, int negotiatedMode);
    
    /** Require non-fuseable subscription */
    TestSubscriberBuilder requireNotFuseable();
    
    /** Build standard TestSubscriber */
    <T> TestSubscriber<T> build();
    
    /** Build ConditionalTestSubscriber */
    <T> ConditionalTestSubscriber<T> buildConditional(Predicate<? super T> tryOnNext);
}

Usage Examples:

import reactor.test.subscriber.TestSubscriber;

// Simple subscriber with unbounded demand
TestSubscriber<String> subscriber = TestSubscriber.create();
Flux.just("hello", "world").subscribe(subscriber);

// Custom subscriber with limited initial request
TestSubscriber<Integer> limitedSubscriber = TestSubscriber.builder()
    .initialRequest(5)
    .build();
    
Flux.range(1, 100).subscribe(limitedSubscriber);

// Subscriber with context
TestSubscriber<String> contextSubscriber = TestSubscriber.builder()
    .contextPut("userId", "12345")
    .contextPut("requestId", "abc-def")
    .build();

// Subscriber with fusion requirements
TestSubscriber<String> fusionSubscriber = TestSubscriber.builder()
    .requireFusion(Fuseable.SYNC) // Require synchronous fusion
    .build();

Control Methods

Methods for controlling subscription behavior and backpressure.

interface TestSubscriber<T> {
    /** Cancel subscription and unblock pending block() calls */
    void cancel();
    
    /** Request additional elements from publisher */
    void request(long additionalRequest);
}

Usage Examples:

TestSubscriber<Integer> subscriber = TestSubscriber.builder()
    .initialRequest(0) // Start with no demand
    .build();

Flux.range(1, 10).subscribe(subscriber);

// Control backpressure manually
subscriber.request(3); // Request first 3 elements
// Process received elements...
subscriber.request(5); // Request 5 more
// Process more elements...
subscriber.cancel(); // Cancel remaining

State Query Methods

Methods for checking subscriber state and termination status.

interface TestSubscriber<T> {
    /** Check if subscriber reached any end state (terminated or cancelled) */
    boolean isTerminatedOrCancelled();
    
    /** Check if subscriber received terminal signal (onComplete or onError) */
    boolean isTerminated();
    
    /** Check if subscriber received onComplete */
    boolean isTerminatedComplete();
    
    /** Check if subscriber received onError */
    boolean isTerminatedError();
    
    /** Check if subscriber was cancelled */
    boolean isCancelled();
}

Usage Examples:

TestSubscriber<String> subscriber = TestSubscriber.create();

// Before subscription
assert !subscriber.isTerminated();
assert !subscriber.isCancelled();

// Subscribe to completing publisher
Flux.just("hello").subscribe(subscriber);

// After completion
assert subscriber.isTerminated();
assert subscriber.isTerminatedComplete();
assert !subscriber.isTerminatedError();
assert !subscriber.isCancelled();

// Test error case
TestSubscriber<String> errorSubscriber = TestSubscriber.create();
Flux.<String>error(new RuntimeException()).subscribe(errorSubscriber);

assert errorSubscriber.isTerminated();
assert !errorSubscriber.isTerminatedComplete();
assert errorSubscriber.isTerminatedError();

Data Access Methods

Methods for accessing received signals and protocol violations.

interface TestSubscriber<T> {
    /** Get terminal signal if available (nullable) */
    @Nullable
    Signal<T> getTerminalSignal();
    
    /** Assert terminated and get terminal signal (throws if not terminated) */
    Signal<T> expectTerminalSignal();
    
    /** Assert error terminated and get error (throws if not error) */
    Throwable expectTerminalError();
    
    /** Get all received onNext values */
    List<T> getReceivedOnNext();
    
    /** Get onNext values received after cancellation (protocol violations) */
    List<T> getReceivedOnNextAfterCancellation();
    
    /** Get detected protocol violations from publisher */
    List<String> getProtocolErrors();
    
    /** Get negotiated fusion mode */
    int getFusionMode();
}

Usage Examples:

TestSubscriber<Integer> subscriber = TestSubscriber.create();
Flux.range(1, 5).subscribe(subscriber);

// Access received data
List<Integer> values = subscriber.getReceivedOnNext();
assert values.equals(Arrays.asList(1, 2, 3, 4, 5));

// Check terminal signal
Signal<Integer> terminal = subscriber.expectTerminalSignal();
assert terminal.isOnComplete();

// Test error case
TestSubscriber<String> errorSubscriber = TestSubscriber.create();
RuntimeException error = new RuntimeException("Test error");
Flux.<String>error(error).subscribe(errorSubscriber);

Throwable receivedError = errorSubscriber.expectTerminalError();
assert receivedError == error;

// Test protocol violations
TestSubscriber<String> violationSubscriber = TestSubscriber.create();
// ... publisher that violates protocol ...
List<String> violations = violationSubscriber.getProtocolErrors();
assert !violations.isEmpty();

Blocking Methods

Methods for blocking until subscriber reaches terminal state.

interface TestSubscriber<T> {
    /** Block until end state reached (terminated or cancelled) */
    void block();
    
    /** Block until end state or timeout */
    void block(Duration timeout);
}

Usage Examples:

TestSubscriber<String> subscriber = TestSubscriber.create();

// Subscribe to async publisher
Flux.just("hello")
    .delayElements(Duration.ofMillis(100))
    .subscribe(subscriber);

// Block until completion
subscriber.block(); // Waits for async completion
assert subscriber.isTerminatedComplete();

// Block with timeout
TestSubscriber<String> timeoutSubscriber = TestSubscriber.create();
Flux.never().subscribe(timeoutSubscriber); // Publisher that never emits

try {
    timeoutSubscriber.block(Duration.ofMillis(50));
    assert false; // Should not reach here
} catch (IllegalStateException e) {
    // Expected timeout
    assert e.getMessage().contains("timeout");
}

ConditionalTestSubscriber

Specialized subscriber for conditional subscription scenarios.

interface ConditionalTestSubscriber<T> extends TestSubscriber<T>, Fuseable.ConditionalSubscriber<T> {
    // Inherits all TestSubscriber methods plus:
    
    /** Try to consume next value (from ConditionalSubscriber) */
    boolean tryOnNext(T value);
}

Usage Examples:

// Create conditional subscriber that only accepts even numbers
ConditionalTestSubscriber<Integer> conditionalSub = TestSubscriber.builder()
    .buildConditional(n -> n % 2 == 0);

// Subscribe to publisher
Flux.range(1, 10).subscribe(conditionalSub);

// Only even numbers are received
List<Integer> received = conditionalSub.getReceivedOnNext();
assert received.equals(Arrays.asList(2, 4, 6, 8, 10));

Types

// Fusion capability expectations for TestSubscriber
enum TestSubscriber.FusionRequirement {
    /** Expect publisher to be fuseable */
    FUSEABLE,
    
    /** Expect publisher to not be fuseable */
    NOT_FUSEABLE,
    
    /** No fusion requirements */
    NONE
}

// Fusion modes (from Reactor Fuseable interface)
interface Fuseable {
    int NONE = 0;      // No fusion support
    int SYNC = 1;      // Synchronous fusion
    int ASYNC = 2;     // Asynchronous fusion  
    int ANY = 3;       // Any fusion mode
    int THREAD_BARRIER = 4; // Thread barrier
}

Advanced Usage Patterns

Complex Assertion Scenarios

TestSubscriber is ideal for scenarios requiring complex assertions that don't fit StepVerifier's declarative model:

@Test
public void testComplexDataValidation() {
    TestSubscriber<DataEvent> subscriber = TestSubscriber.create();
    
    // Subscribe to complex data stream
    complexDataService.getEvents().subscribe(subscriber);
    
    // Block until completion
    subscriber.block(Duration.ofSeconds(5));
    
    // Perform complex validations
    List<DataEvent> events = subscriber.getReceivedOnNext();
    
    // Validate event ordering
    for (int i = 1; i < events.size(); i++) {
        assert events.get(i).getTimestamp() >= events.get(i-1).getTimestamp();
    }
    
    // Validate event types
    Map<EventType, Long> typeCounts = events.stream()
        .collect(Collectors.groupingBy(DataEvent::getType, Collectors.counting()));
    
    assert typeCounts.get(EventType.START) == 1;
    assert typeCounts.get(EventType.END) == 1;
    assert typeCounts.get(EventType.DATA) >= 1;
}

Manual Backpressure Testing

Test backpressure behavior by controlling requests manually:

@Test
public void testBackpressureBehavior() {
    TestSubscriber<Integer> subscriber = TestSubscriber.builder()
        .initialRequest(0) // No initial demand
        .build();
    
    // Subscribe to fast producer
    Flux.range(1, 1000)
        .onBackpressureBuffer(10) // Limited buffer
        .subscribe(subscriber);
    
    // Verify no data received initially
    assert subscriber.getReceivedOnNext().isEmpty();
    
    // Request small batch
    subscriber.request(5);
    // Verify exactly 5 received
    assert subscriber.getReceivedOnNext().size() == 5;
    
    // Request more
    subscriber.request(3);
    assert subscriber.getReceivedOnNext().size() == 8;
    
    // Cancel to test cleanup
    subscriber.cancel();
    assert subscriber.isCancelled();
}

Protocol Violation Detection

Detect and assert on reactive streams protocol violations:

@Test
public void testProtocolViolations() {
    TestSubscriber<String> subscriber = TestSubscriber.create();
    
    // Create misbehaving publisher
    Publisher<String> badPublisher = s -> {
        s.onSubscribe(new Subscription() {
            @Override
            public void request(long n) {}
            
            @Override
            public void cancel() {}
        });
        
        s.onNext("value");
        s.onComplete();
        s.onNext("after complete"); // Protocol violation!
    };
    
    badPublisher.subscribe(subscriber);
    
    // Check for protocol violations
    List<String> violations = subscriber.getProtocolErrors();
    assert !violations.isEmpty();
    assert violations.stream()
        .anyMatch(v -> v.contains("onNext after terminal"));
    
    // Check that violation was captured
    List<String> afterCancel = subscriber.getReceivedOnNextAfterCancellation();
    assert afterCancel.contains("after complete");
}

Install with Tessl CLI

npx tessl i tessl/maven-io-projectreactor--reactor-test

docs

index.md

publisher-probe.md

step-verifier.md

test-publisher.md

test-subscriber.md

testing-utilities.md

virtual-time.md

tile.json