0
# Manual Subscribers
1
2
TestSubscriber provides a CoreSubscriber implementation for runtime event assertion, offering an alternative to StepVerifier for complex testing scenarios requiring programmatic access to received signals and flexible assertion patterns.
3
4
## Capabilities
5
6
### Creating TestSubscribers
7
8
Factory methods and builder for creating TestSubscriber instances.
9
10
```java { .api }
11
interface TestSubscriber<T> extends CoreSubscriber<T>, Scannable {
12
/** Create simple TestSubscriber with unbounded demand */
13
static <T> TestSubscriber<T> create();
14
15
/** Create TestSubscriberBuilder for customization */
16
static TestSubscriberBuilder builder();
17
}
18
19
class TestSubscriberBuilder {
20
/** Add context entry */
21
TestSubscriberBuilder contextPut(Object key, Object value);
22
23
/** Add multiple context entries */
24
TestSubscriberBuilder contextPutAll(ContextView toAdd);
25
26
/** Set initial request amount */
27
TestSubscriberBuilder initialRequest(long initialRequest);
28
29
/** Set unbounded initial request */
30
TestSubscriberBuilder initialRequestUnbounded();
31
32
/** Require specific fusion mode */
33
TestSubscriberBuilder requireFusion(int exactMode);
34
35
/** Require fusion negotiation result */
36
TestSubscriberBuilder requireFusion(int requestedMode, int negotiatedMode);
37
38
/** Require non-fuseable subscription */
39
TestSubscriberBuilder requireNotFuseable();
40
41
/** Build standard TestSubscriber */
42
<T> TestSubscriber<T> build();
43
44
/** Build ConditionalTestSubscriber */
45
<T> ConditionalTestSubscriber<T> buildConditional(Predicate<? super T> tryOnNext);
46
}
47
```
48
49
**Usage Examples:**
50
51
```java
52
import reactor.test.subscriber.TestSubscriber;
53
54
// Simple subscriber with unbounded demand
55
TestSubscriber<String> subscriber = TestSubscriber.create();
56
Flux.just("hello", "world").subscribe(subscriber);
57
58
// Custom subscriber with limited initial request
59
TestSubscriber<Integer> limitedSubscriber = TestSubscriber.builder()
60
.initialRequest(5)
61
.build();
62
63
Flux.range(1, 100).subscribe(limitedSubscriber);
64
65
// Subscriber with context
66
TestSubscriber<String> contextSubscriber = TestSubscriber.builder()
67
.contextPut("userId", "12345")
68
.contextPut("requestId", "abc-def")
69
.build();
70
71
// Subscriber with fusion requirements
72
TestSubscriber<String> fusionSubscriber = TestSubscriber.builder()
73
.requireFusion(Fuseable.SYNC) // Require synchronous fusion
74
.build();
75
```
76
77
### Control Methods
78
79
Methods for controlling subscription behavior and backpressure.
80
81
```java { .api }
82
interface TestSubscriber<T> {
83
/** Cancel subscription and unblock pending block() calls */
84
void cancel();
85
86
/** Request additional elements from publisher */
87
void request(long additionalRequest);
88
}
89
```
90
91
**Usage Examples:**
92
93
```java
94
TestSubscriber<Integer> subscriber = TestSubscriber.builder()
95
.initialRequest(0) // Start with no demand
96
.build();
97
98
Flux.range(1, 10).subscribe(subscriber);
99
100
// Control backpressure manually
101
subscriber.request(3); // Request first 3 elements
102
// Process received elements...
103
subscriber.request(5); // Request 5 more
104
// Process more elements...
105
subscriber.cancel(); // Cancel remaining
106
```
107
108
### State Query Methods
109
110
Methods for checking subscriber state and termination status.
111
112
```java { .api }
113
interface TestSubscriber<T> {
114
/** Check if subscriber reached any end state (terminated or cancelled) */
115
boolean isTerminatedOrCancelled();
116
117
/** Check if subscriber received terminal signal (onComplete or onError) */
118
boolean isTerminated();
119
120
/** Check if subscriber received onComplete */
121
boolean isTerminatedComplete();
122
123
/** Check if subscriber received onError */
124
boolean isTerminatedError();
125
126
/** Check if subscriber was cancelled */
127
boolean isCancelled();
128
}
129
```
130
131
**Usage Examples:**
132
133
```java
134
TestSubscriber<String> subscriber = TestSubscriber.create();
135
136
// Before subscription
137
assert !subscriber.isTerminated();
138
assert !subscriber.isCancelled();
139
140
// Subscribe to completing publisher
141
Flux.just("hello").subscribe(subscriber);
142
143
// After completion
144
assert subscriber.isTerminated();
145
assert subscriber.isTerminatedComplete();
146
assert !subscriber.isTerminatedError();
147
assert !subscriber.isCancelled();
148
149
// Test error case
150
TestSubscriber<String> errorSubscriber = TestSubscriber.create();
151
Flux.<String>error(new RuntimeException()).subscribe(errorSubscriber);
152
153
assert errorSubscriber.isTerminated();
154
assert !errorSubscriber.isTerminatedComplete();
155
assert errorSubscriber.isTerminatedError();
156
```
157
158
### Data Access Methods
159
160
Methods for accessing received signals and protocol violations.
161
162
```java { .api }
163
interface TestSubscriber<T> {
164
/** Get terminal signal if available (nullable) */
165
@Nullable
166
Signal<T> getTerminalSignal();
167
168
/** Assert terminated and get terminal signal (throws if not terminated) */
169
Signal<T> expectTerminalSignal();
170
171
/** Assert error terminated and get error (throws if not error) */
172
Throwable expectTerminalError();
173
174
/** Get all received onNext values */
175
List<T> getReceivedOnNext();
176
177
/** Get onNext values received after cancellation (protocol violations) */
178
List<T> getReceivedOnNextAfterCancellation();
179
180
/** Get detected protocol violations from publisher */
181
List<String> getProtocolErrors();
182
183
/** Get negotiated fusion mode */
184
int getFusionMode();
185
}
186
```
187
188
**Usage Examples:**
189
190
```java
191
TestSubscriber<Integer> subscriber = TestSubscriber.create();
192
Flux.range(1, 5).subscribe(subscriber);
193
194
// Access received data
195
List<Integer> values = subscriber.getReceivedOnNext();
196
assert values.equals(Arrays.asList(1, 2, 3, 4, 5));
197
198
// Check terminal signal
199
Signal<Integer> terminal = subscriber.expectTerminalSignal();
200
assert terminal.isOnComplete();
201
202
// Test error case
203
TestSubscriber<String> errorSubscriber = TestSubscriber.create();
204
RuntimeException error = new RuntimeException("Test error");
205
Flux.<String>error(error).subscribe(errorSubscriber);
206
207
Throwable receivedError = errorSubscriber.expectTerminalError();
208
assert receivedError == error;
209
210
// Test protocol violations
211
TestSubscriber<String> violationSubscriber = TestSubscriber.create();
212
// ... publisher that violates protocol ...
213
List<String> violations = violationSubscriber.getProtocolErrors();
214
assert !violations.isEmpty();
215
```
216
217
### Blocking Methods
218
219
Methods for blocking until subscriber reaches terminal state.
220
221
```java { .api }
222
interface TestSubscriber<T> {
223
/** Block until end state reached (terminated or cancelled) */
224
void block();
225
226
/** Block until end state or timeout */
227
void block(Duration timeout);
228
}
229
```
230
231
**Usage Examples:**
232
233
```java
234
TestSubscriber<String> subscriber = TestSubscriber.create();
235
236
// Subscribe to async publisher
237
Flux.just("hello")
238
.delayElements(Duration.ofMillis(100))
239
.subscribe(subscriber);
240
241
// Block until completion
242
subscriber.block(); // Waits for async completion
243
assert subscriber.isTerminatedComplete();
244
245
// Block with timeout
246
TestSubscriber<String> timeoutSubscriber = TestSubscriber.create();
247
Flux.never().subscribe(timeoutSubscriber); // Publisher that never emits
248
249
try {
250
timeoutSubscriber.block(Duration.ofMillis(50));
251
assert false; // Should not reach here
252
} catch (IllegalStateException e) {
253
// Expected timeout
254
assert e.getMessage().contains("timeout");
255
}
256
```
257
258
### ConditionalTestSubscriber
259
260
Specialized subscriber for conditional subscription scenarios.
261
262
```java { .api }
263
interface ConditionalTestSubscriber<T> extends TestSubscriber<T>, Fuseable.ConditionalSubscriber<T> {
264
// Inherits all TestSubscriber methods plus:
265
266
/** Try to consume next value (from ConditionalSubscriber) */
267
boolean tryOnNext(T value);
268
}
269
```
270
271
**Usage Examples:**
272
273
```java
274
// Create conditional subscriber that only accepts even numbers
275
ConditionalTestSubscriber<Integer> conditionalSub = TestSubscriber.builder()
276
.buildConditional(n -> n % 2 == 0);
277
278
// Subscribe to publisher
279
Flux.range(1, 10).subscribe(conditionalSub);
280
281
// Only even numbers are received
282
List<Integer> received = conditionalSub.getReceivedOnNext();
283
assert received.equals(Arrays.asList(2, 4, 6, 8, 10));
284
```
285
286
## Types
287
288
```java { .api }
289
// Fusion capability expectations for TestSubscriber
290
enum TestSubscriber.FusionRequirement {
291
/** Expect publisher to be fuseable */
292
FUSEABLE,
293
294
/** Expect publisher to not be fuseable */
295
NOT_FUSEABLE,
296
297
/** No fusion requirements */
298
NONE
299
}
300
301
// Fusion modes (from Reactor Fuseable interface)
302
interface Fuseable {
303
int NONE = 0; // No fusion support
304
int SYNC = 1; // Synchronous fusion
305
int ASYNC = 2; // Asynchronous fusion
306
int ANY = 3; // Any fusion mode
307
int THREAD_BARRIER = 4; // Thread barrier
308
}
309
```
310
311
## Advanced Usage Patterns
312
313
### Complex Assertion Scenarios
314
315
TestSubscriber is ideal for scenarios requiring complex assertions that don't fit StepVerifier's declarative model:
316
317
```java
318
@Test
319
public void testComplexDataValidation() {
320
TestSubscriber<DataEvent> subscriber = TestSubscriber.create();
321
322
// Subscribe to complex data stream
323
complexDataService.getEvents().subscribe(subscriber);
324
325
// Block until completion
326
subscriber.block(Duration.ofSeconds(5));
327
328
// Perform complex validations
329
List<DataEvent> events = subscriber.getReceivedOnNext();
330
331
// Validate event ordering
332
for (int i = 1; i < events.size(); i++) {
333
assert events.get(i).getTimestamp() >= events.get(i-1).getTimestamp();
334
}
335
336
// Validate event types
337
Map<EventType, Long> typeCounts = events.stream()
338
.collect(Collectors.groupingBy(DataEvent::getType, Collectors.counting()));
339
340
assert typeCounts.get(EventType.START) == 1;
341
assert typeCounts.get(EventType.END) == 1;
342
assert typeCounts.get(EventType.DATA) >= 1;
343
}
344
```
345
346
### Manual Backpressure Testing
347
348
Test backpressure behavior by controlling requests manually:
349
350
```java
351
@Test
352
public void testBackpressureBehavior() {
353
TestSubscriber<Integer> subscriber = TestSubscriber.builder()
354
.initialRequest(0) // No initial demand
355
.build();
356
357
// Subscribe to fast producer
358
Flux.range(1, 1000)
359
.onBackpressureBuffer(10) // Limited buffer
360
.subscribe(subscriber);
361
362
// Verify no data received initially
363
assert subscriber.getReceivedOnNext().isEmpty();
364
365
// Request small batch
366
subscriber.request(5);
367
// Verify exactly 5 received
368
assert subscriber.getReceivedOnNext().size() == 5;
369
370
// Request more
371
subscriber.request(3);
372
assert subscriber.getReceivedOnNext().size() == 8;
373
374
// Cancel to test cleanup
375
subscriber.cancel();
376
assert subscriber.isCancelled();
377
}
378
```
379
380
### Protocol Violation Detection
381
382
Detect and assert on reactive streams protocol violations:
383
384
```java
385
@Test
386
public void testProtocolViolations() {
387
TestSubscriber<String> subscriber = TestSubscriber.create();
388
389
// Create misbehaving publisher
390
Publisher<String> badPublisher = s -> {
391
s.onSubscribe(new Subscription() {
392
@Override
393
public void request(long n) {}
394
395
@Override
396
public void cancel() {}
397
});
398
399
s.onNext("value");
400
s.onComplete();
401
s.onNext("after complete"); // Protocol violation!
402
};
403
404
badPublisher.subscribe(subscriber);
405
406
// Check for protocol violations
407
List<String> violations = subscriber.getProtocolErrors();
408
assert !violations.isEmpty();
409
assert violations.stream()
410
.anyMatch(v -> v.contains("onNext after terminal"));
411
412
// Check that violation was captured
413
List<String> afterCancel = subscriber.getReceivedOnNextAfterCancellation();
414
assert afterCancel.contains("after complete");
415
}
416
```