0
# Step-by-Step Verification
1
2
StepVerifier provides a declarative API for creating verifiable scripts that express expectations about reactive Publisher sequences. It's the primary testing utility in reactor-test for building step-by-step test scenarios.
3
4
## Capabilities
5
6
### Creating StepVerifiers
7
8
Factory methods for creating StepVerifier instances with different configurations.
9
10
```java { .api }
11
interface StepVerifier {
12
/** Default verification timeout */
13
Duration DEFAULT_VERIFY_TIMEOUT = Duration.ZERO;
14
15
/** Create standard verifier for the given publisher */
16
static <T> FirstStep<T> create(Publisher<? extends T> publisher);
17
18
/** Create verifier with specific initial request amount */
19
static <T> FirstStep<T> create(Publisher<? extends T> publisher, long initialRequest);
20
21
/** Create verifier with custom options */
22
static <T> FirstStep<T> create(Publisher<? extends T> publisher, StepVerifierOptions options);
23
24
/** Create verifier with virtual time scheduler for time-based testing */
25
static <T> FirstStep<T> withVirtualTime(Supplier<? extends Publisher<? extends T>> scenarioSupplier);
26
27
/** Create virtual time verifier with initial request */
28
static <T> FirstStep<T> withVirtualTime(Supplier<? extends Publisher<? extends T>> scenarioSupplier, long initialRequest);
29
30
/** Create virtual time verifier with custom options */
31
static <T> FirstStep<T> withVirtualTime(Supplier<? extends Publisher<? extends T> scenarioSupplier, StepVerifierOptions options);
32
}
33
```
34
35
**Usage Examples:**
36
37
```java
38
import reactor.test.StepVerifier;
39
40
// Basic verification
41
StepVerifier.create(Flux.just("hello", "world"))
42
.expectNext("hello")
43
.expectNext("world")
44
.expectComplete()
45
.verify();
46
47
// With initial request limit
48
StepVerifier.create(Flux.range(1, 100), 5)
49
.expectNextCount(5)
50
.thenRequest(10)
51
.expectNextCount(10)
52
.thenCancel()
53
.verify();
54
55
// Virtual time for delayed operations
56
StepVerifier.withVirtualTime(() ->
57
Flux.just("delayed").delayElements(Duration.ofHours(1)))
58
.expectSubscription()
59
.expectNoEvent(Duration.ofHours(1))
60
.expectNext("delayed")
61
.expectComplete()
62
.verify();
63
```
64
65
### Subscription Expectations
66
67
First step expectations for subscription-related events.
68
69
```java { .api }
70
interface FirstStep<T> {
71
/** Expect subscription signal */
72
Step<T> expectSubscription();
73
74
/** Expect subscription matching predicate */
75
Step<T> expectSubscriptionMatches(Predicate<? super Subscription> predicate);
76
77
/** Expect Reactor Fusion optimization */
78
Step<T> expectFusion();
79
80
/** Expect specific fusion mode */
81
Step<T> expectFusion(int requested);
82
83
/** Expect fusion negotiation with specific result */
84
Step<T> expectFusion(int requested, int expected);
85
86
/** Expect no fusion capability */
87
Step<T> expectNoFusionSupport();
88
89
/** Enable conditional subscriber mode */
90
Step<T> enableConditionalSupport(Predicate<? super T> tryOnNextPredicate);
91
}
92
```
93
94
### Data Expectations
95
96
Core expectations for onNext events and data validation.
97
98
```java { .api }
99
interface Step<T> extends LastStep {
100
/** Set description for previous step */
101
Step<T> as(String description);
102
103
/** Expect specific next value */
104
Step<T> expectNext(T value);
105
106
/** Expect multiple specific next values */
107
Step<T> expectNext(T first, T second);
108
Step<T> expectNext(T first, T second, T third);
109
Step<T> expectNext(T first, T second, T third, T fourth);
110
Step<T> expectNext(T first, T second, T third, T fourth, T fifth);
111
Step<T> expectNext(T first, T second, T third, T fourth, T fifth, T sixth);
112
Step<T> expectNext(T... values);
113
114
/** Expect specific number of next values */
115
Step<T> expectNextCount(long count);
116
117
/** Expect values matching the given sequence */
118
Step<T> expectNextSequence(Iterable<? extends T> sequence);
119
120
/** Expect next value matching predicate */
121
Step<T> expectNextMatches(Predicate<? super T> predicate);
122
123
/** Consume and assert next value with consumer */
124
Step<T> consumeNextWith(Consumer<? super T> consumer);
125
126
/** Alias for consumeNextWith */
127
Step<T> assertNext(Consumer<? super T> assertionConsumer);
128
129
/** Consume subscription with custom consumer */
130
Step<T> consumeSubscriptionWith(Consumer<? super Subscription> consumer);
131
}
132
```
133
134
**Usage Examples:**
135
136
```java
137
// Exact value expectations
138
StepVerifier.create(Flux.just(1, 2, 3))
139
.expectNext(1)
140
.expectNext(2, 3)
141
.expectComplete()
142
.verify();
143
144
// Count-based expectations
145
StepVerifier.create(Flux.range(1, 100))
146
.expectNextCount(100)
147
.expectComplete()
148
.verify();
149
150
// Predicate-based expectations
151
StepVerifier.create(Flux.just("hello", "world"))
152
.expectNextMatches(s -> s.startsWith("h"))
153
.expectNextMatches(s -> s.length() == 5)
154
.expectComplete()
155
.verify();
156
157
// Custom assertions
158
StepVerifier.create(Flux.just(new Person("Alice", 30)))
159
.consumeNextWith(person -> {
160
assertThat(person.getName()).isEqualTo("Alice");
161
assertThat(person.getAge()).isEqualTo(30);
162
})
163
.expectComplete()
164
.verify();
165
```
166
167
### Context Expectations
168
169
Support for testing Reactor Context propagation.
170
171
```java { .api }
172
interface Step<T> {
173
/** Expect Context propagation */
174
ContextExpectations<T> expectAccessibleContext();
175
176
/** Expect no Context propagation */
177
Step<T> expectNoAccessibleContext();
178
}
179
180
interface ContextExpectations<T> {
181
/** Assert context contains key */
182
ContextExpectations<T> hasKey(Object key);
183
184
/** Assert context size */
185
ContextExpectations<T> hasSize(int size);
186
187
/** Assert context contains key-value pair */
188
ContextExpectations<T> contains(Object key, Object value);
189
190
/** Assert context contains all of other context */
191
ContextExpectations<T> containsAllOf(Context other);
192
ContextExpectations<T> containsAllOf(Map<?, ?> other);
193
194
/** Assert context equals other context */
195
ContextExpectations<T> containsOnly(Context other);
196
ContextExpectations<T> containsOnly(Map<?, ?> other);
197
198
/** Custom context assertions */
199
ContextExpectations<T> assertThat(Consumer<Context> assertingConsumer);
200
201
/** Assert context matches predicate */
202
ContextExpectations<T> matches(Predicate<Context> predicate);
203
ContextExpectations<T> matches(Predicate<Context> predicate, String description);
204
205
/** Return to Step building */
206
Step<T> then();
207
}
208
```
209
210
### Timing and Control Expectations
211
212
Advanced expectations for timing, recording, and flow control.
213
214
```java { .api }
215
interface Step<T> {
216
/** Expect no events for the given duration */
217
Step<T> expectNoEvent(Duration duration);
218
219
/** Start recording onNext values */
220
Step<T> recordWith(Supplier<? extends Collection<T>> collectionSupplier);
221
222
/** Verify recorded values match predicate */
223
Step<T> expectRecordedMatches(Predicate<? super Collection<T>> predicate);
224
225
/** Consume recorded values with assertions */
226
Step<T> consumeRecordedWith(Consumer<? super Collection<T>> consumer);
227
228
/** Consume subscription signal */
229
Step<T> consumeSubscriptionWith(Consumer<? super Subscription> consumer);
230
231
/** Execute arbitrary task */
232
Step<T> then(Runnable task);
233
234
/** Pause expectation evaluation (virtual time aware) */
235
Step<T> thenAwait();
236
237
/** Pause for specific duration */
238
Step<T> thenAwait(Duration timeshift);
239
240
/** Consume values while predicate matches */
241
Step<T> thenConsumeWhile(Predicate<T> predicate);
242
243
/** Consume values with assertions while predicate matches */
244
Step<T> thenConsumeWhile(Predicate<T> predicate, Consumer<T> consumer);
245
246
/** Request additional elements */
247
Step<T> thenRequest(long additionalRequest);
248
249
/** Cancel subscription */
250
LastStep thenCancel();
251
}
252
```
253
254
### Terminal Expectations
255
256
Expectations for completion, error, and timeout scenarios.
257
258
```java { .api }
259
interface LastStep {
260
/** Expect onComplete signal */
261
StepVerifier expectComplete();
262
263
/** Expect any error signal */
264
StepVerifier expectError();
265
266
/** Expect specific error type */
267
StepVerifier expectError(Class<? extends Throwable> errorType);
268
269
/** Expect error with specific message */
270
StepVerifier expectErrorMessage(String errorMessage);
271
272
/** Expect error matching predicate */
273
StepVerifier expectErrorMatches(Predicate<Throwable> predicate);
274
275
/** Assert error with consumer */
276
StepVerifier expectErrorSatisfies(Consumer<Throwable> assertionConsumer);
277
278
/** Consume error signal */
279
StepVerifier consumeErrorWith(Consumer<Throwable> consumer);
280
281
/** Expect publisher to timeout */
282
StepVerifier expectTimeout(Duration duration);
283
284
/** Cancel subscription */
285
StepVerifier thenCancel();
286
287
// Terminal verification convenience methods
288
Duration verifyComplete();
289
Duration verifyError();
290
Duration verifyError(Class<? extends Throwable> errorType);
291
Duration verifyErrorMessage(String errorMessage);
292
Duration verifyErrorMatches(Predicate<Throwable> predicate);
293
Duration verifyErrorSatisfies(Consumer<Throwable> assertionConsumer);
294
Duration verifyTimeout(Duration duration);
295
}
296
```
297
298
### Verification Execution
299
300
Methods for executing the verification script.
301
302
```java { .api }
303
interface StepVerifier {
304
/** Enable debug logging of verification steps */
305
StepVerifier log();
306
307
/** Trigger subscription without blocking (for manual verification) */
308
StepVerifier verifyLater();
309
310
/** Execute verification script (blocking until completion) */
311
Duration verify();
312
313
/** Execute verification script with timeout */
314
Duration verify(Duration timeout);
315
316
/** Execute verification and expose post-verification assertions */
317
Assertions verifyThenAssertThat();
318
319
/** Execute verification with timeout and expose assertions */
320
Assertions verifyThenAssertThat(Duration timeout);
321
}
322
```
323
324
### Post-Verification Assertions
325
326
Assertions available after verification completion for inspecting dropped elements, errors, and timing.
327
328
```java { .api }
329
interface Assertions {
330
// Element dropping assertions
331
Assertions hasDroppedElements();
332
Assertions hasNotDroppedElements();
333
Assertions hasDropped(Object... values);
334
Assertions hasDroppedExactly(Object... values);
335
336
// Element discarding assertions
337
Assertions hasDiscardedElements();
338
Assertions hasNotDiscardedElements();
339
Assertions hasDiscarded(Object... values);
340
Assertions hasDiscardedExactly(Object... values);
341
Assertions hasDiscardedElementsMatching(Predicate<Collection<Object>> matcher);
342
Assertions hasDiscardedElementsSatisfying(Consumer<Collection<Object>> consumer);
343
344
// Error dropping assertions
345
Assertions hasDroppedErrors();
346
Assertions hasNotDroppedErrors();
347
Assertions hasDroppedErrors(int count);
348
Assertions hasDroppedErrorOfType(Class<? extends Throwable> errorType);
349
Assertions hasDroppedErrorMatching(Predicate<Throwable> matcher);
350
Assertions hasDroppedErrorWithMessage(String message);
351
Assertions hasDroppedErrorWithMessageContaining(String messagePart);
352
Assertions hasDroppedErrorsSatisfying(Consumer<Collection<Throwable>> errorsConsumer);
353
Assertions hasDroppedErrorsMatching(Predicate<Collection<Throwable>> errorsConsumer);
354
355
// Operator error assertions
356
Assertions hasOperatorErrors();
357
Assertions hasOperatorErrors(int count);
358
Assertions hasOperatorErrorOfType(Class<? extends Throwable> errorType);
359
Assertions hasOperatorErrorMatching(Predicate<Throwable> matcher);
360
Assertions hasOperatorErrorWithMessage(String message);
361
Assertions hasOperatorErrorWithMessageContaining(String messagePart);
362
363
// Timing assertions
364
Assertions tookLessThan(Duration duration);
365
Assertions tookMoreThan(Duration duration);
366
}
367
```
368
369
### Global Configuration
370
371
Static methods for configuring default verification behavior.
372
373
```java { .api }
374
interface StepVerifier {
375
/** Set global default verification timeout */
376
static void setDefaultTimeout(Duration timeout);
377
378
/** Reset timeout to unlimited default */
379
static void resetDefaultTimeout();
380
381
/** Default timeout value (Duration.ZERO = unlimited) */
382
Duration DEFAULT_VERIFY_TIMEOUT = Duration.ZERO;
383
}
384
```
385
386
## Types
387
388
```java { .api }
389
// Configuration options for StepVerifier
390
class StepVerifierOptions {
391
static StepVerifierOptions create();
392
393
StepVerifierOptions copy();
394
StepVerifierOptions checkUnderRequesting(boolean enabled);
395
StepVerifierOptions initialRequest(long initialRequest);
396
StepVerifierOptions valueFormatter(ToStringConverter valueFormatter);
397
StepVerifierOptions extractor(Extractor<T> extractor);
398
StepVerifierOptions virtualTimeSchedulerSupplier(Supplier<? extends VirtualTimeScheduler> vtsLookup);
399
StepVerifierOptions withInitialContext(Context context);
400
StepVerifierOptions scenarioName(String scenarioName);
401
402
// Accessors
403
boolean isCheckUnderRequesting();
404
long getInitialRequest();
405
ToStringConverter getValueFormatter();
406
Map<Class<?>, Extractor<?>> getExtractors();
407
Supplier<? extends VirtualTimeScheduler> getVirtualTimeSchedulerSupplier();
408
Context getInitialContext();
409
String getScenarioName();
410
}
411
412
// Value formatting utilities
413
class ValueFormatters {
414
static <T> ToStringConverter forClass(Class<T> tClass, Function<T, String> tToString);
415
static <T> ToStringConverter forClassMatching(Class<T> tClass, Predicate<T> tPredicate, Function<T, String> tToString);
416
static ToStringConverter filtering(Predicate<Object> predicate, Function<Object, String> anyToString);
417
static Extractor<Signal<?>> signalExtractor();
418
static Extractor<Iterable<?>> iterableExtractor();
419
static <T> Extractor<T[]> arrayExtractor(Class<T[]> arrayClass);
420
421
ToStringConverter DURATION_CONVERTER;
422
}
423
424
// Functional interfaces for custom formatting
425
@FunctionalInterface
426
interface ToStringConverter extends Function<Object, String> {}
427
428
@FunctionalInterface
429
interface Extractor<CONTAINER> extends Function<CONTAINER, Stream<?>> {}
430
```