0
# Publisher Instrumentation
1
2
PublisherProbe provides instrumentation for Publishers to capture subscription events and verify control flow without affecting the actual data flow. It's ideal for testing conditional logic and verifying that certain code paths are executed.
3
4
## Capabilities
5
6
### Creating PublisherProbes
7
8
Factory methods for creating PublisherProbe instances.
9
10
```java { .api }
11
interface PublisherProbe<T> {
12
/** Create probe wrapping existing publisher */
13
static <T> PublisherProbe<T> of(Publisher<? extends T> source);
14
15
/** Create probe that simply completes (empty source) */
16
static <T> PublisherProbe<T> empty();
17
}
18
```
19
20
**Usage Examples:**
21
22
```java
23
import reactor.test.publisher.PublisherProbe;
24
25
// Probe an existing publisher
26
Flux<String> source = Flux.just("hello", "world");
27
PublisherProbe<String> probe = PublisherProbe.of(source);
28
29
// Use probe in place of original publisher
30
probe.flux().subscribe(System.out::println);
31
32
// Verify subscription occurred
33
assert probe.wasSubscribed();
34
35
// Create empty probe for conditional paths
36
PublisherProbe<String> emptyProbe = PublisherProbe.empty();
37
// Use in conditional logic where you want to verify execution path
38
```
39
40
### Conversion Methods
41
42
Convert PublisherProbe to standard Reactor types.
43
44
```java { .api }
45
interface PublisherProbe<T> {
46
/** Get Mono version of probe */
47
Mono<T> mono();
48
49
/** Get Flux version of probe */
50
Flux<T> flux();
51
}
52
```
53
54
### State Query Methods
55
56
Methods for querying subscription and interaction state.
57
58
```java { .api }
59
interface PublisherProbe<T> {
60
/** Check if probe was subscribed to */
61
boolean wasSubscribed();
62
63
/** Get total number of subscriptions */
64
long subscribeCount();
65
66
/** Check if probe was cancelled */
67
boolean wasCancelled();
68
69
/** Check if probe received any requests */
70
boolean wasRequested();
71
}
72
```
73
74
**Usage Examples:**
75
76
```java
77
PublisherProbe<String> probe = PublisherProbe.of(Flux.just("test"));
78
79
// Before subscription
80
assert !probe.wasSubscribed();
81
assert probe.subscribeCount() == 0;
82
assert !probe.wasCancelled();
83
assert !probe.wasRequested();
84
85
// Subscribe and check state
86
Disposable subscription = probe.flux().subscribe();
87
88
assert probe.wasSubscribed();
89
assert probe.subscribeCount() == 1;
90
assert !probe.wasCancelled();
91
assert probe.wasRequested();
92
93
// Cancel and check state
94
subscription.dispose();
95
assert probe.wasCancelled();
96
97
// Multiple subscriptions
98
probe.flux().subscribe();
99
assert probe.subscribeCount() == 2; // Now has 2 total subscriptions
100
```
101
102
### Assertion Methods
103
104
Assertion methods that throw exceptions on failure for test integration.
105
106
```java { .api }
107
interface PublisherProbe<T> {
108
/** Assert probe was subscribed (throws if not) */
109
void assertWasSubscribed();
110
111
/** Assert probe was not subscribed (throws if was) */
112
void assertWasNotSubscribed();
113
114
/** Assert probe was cancelled (throws if not) */
115
void assertWasCancelled();
116
117
/** Assert probe was not cancelled (throws if was) */
118
void assertWasNotCancelled();
119
120
/** Assert probe received requests (throws if not) */
121
void assertWasRequested();
122
123
/** Assert probe received no requests (throws if did) */
124
void assertWasNotRequested();
125
}
126
```
127
128
**Usage Examples:**
129
130
```java
131
PublisherProbe<String> probe = PublisherProbe.empty();
132
133
// Test conditional execution paths
134
if (someCondition) {
135
probe.flux().subscribe(); // Conditional subscription
136
}
137
138
// Assert the condition was met
139
probe.assertWasSubscribed(); // Throws AssertionError if condition was false
140
141
// Test error handling paths
142
try {
143
probe.flux()
144
.map(s -> { throw new RuntimeException(); })
145
.subscribe();
146
} catch (Exception e) {
147
// Handle error
148
}
149
150
probe.assertWasSubscribed(); // Verify subscription occurred despite error
151
```
152
153
## Common Use Cases
154
155
### Testing Conditional Logic
156
157
PublisherProbe is excellent for verifying that conditional code paths are executed:
158
159
```java
160
@Test
161
public void testConditionalExecution() {
162
PublisherProbe<String> thenProbe = PublisherProbe.empty();
163
PublisherProbe<String> elseProbe = PublisherProbe.empty();
164
165
boolean condition = true;
166
167
Flux<String> result = condition
168
? thenProbe.flux()
169
: elseProbe.flux();
170
171
StepVerifier.create(result)
172
.expectComplete()
173
.verify();
174
175
// Verify correct branch was taken
176
thenProbe.assertWasSubscribed();
177
elseProbe.assertWasNotSubscribed();
178
}
179
```
180
181
### Testing switchIfEmpty Behavior
182
183
Verify that fallback publishers are used when main publishers are empty:
184
185
```java
186
@Test
187
public void testSwitchIfEmpty() {
188
Flux<String> empty = Flux.empty();
189
PublisherProbe<String> fallback = PublisherProbe.of(Flux.just("fallback"));
190
191
StepVerifier.create(
192
empty.switchIfEmpty(fallback.flux())
193
)
194
.expectNext("fallback")
195
.expectComplete()
196
.verify();
197
198
fallback.assertWasSubscribed(); // Verify fallback was used
199
}
200
201
@Test
202
public void testSwitchIfEmptyNotUsed() {
203
Flux<String> nonEmpty = Flux.just("primary");
204
PublisherProbe<String> fallback = PublisherProbe.of(Flux.just("fallback"));
205
206
StepVerifier.create(
207
nonEmpty.switchIfEmpty(fallback.flux())
208
)
209
.expectNext("primary")
210
.expectComplete()
211
.verify();
212
213
fallback.assertWasNotSubscribed(); // Verify fallback was not used
214
}
215
```
216
217
### Testing Error Recovery
218
219
Verify that error recovery publishers are triggered:
220
221
```java
222
@Test
223
public void testErrorRecovery() {
224
Flux<String> failing = Flux.error(new RuntimeException("Error"));
225
PublisherProbe<String> recovery = PublisherProbe.of(Flux.just("recovered"));
226
227
StepVerifier.create(
228
failing.onErrorResume(ex -> recovery.flux())
229
)
230
.expectNext("recovered")
231
.expectComplete()
232
.verify();
233
234
recovery.assertWasSubscribed(); // Verify recovery was triggered
235
}
236
```
237
238
### Testing Complex Control Flow
239
240
Verify multiple conditional paths in complex reactive chains:
241
242
```java
243
@Test
244
public void testComplexControlFlow() {
245
PublisherProbe<String> cacheHit = PublisherProbe.of(Flux.just("cached"));
246
PublisherProbe<String> networkCall = PublisherProbe.of(Flux.just("network"));
247
PublisherProbe<String> fallback = PublisherProbe.of(Flux.just("fallback"));
248
249
boolean cacheAvailable = false;
250
boolean networkAvailable = true;
251
252
Flux<String> result = (cacheAvailable ? cacheHit.flux() : Flux.empty())
253
.switchIfEmpty(networkAvailable ? networkCall.flux() : fallback.flux());
254
255
StepVerifier.create(result)
256
.expectNext("network")
257
.expectComplete()
258
.verify();
259
260
// Verify execution path
261
cacheHit.assertWasNotSubscribed(); // Cache was not available
262
networkCall.assertWasSubscribed(); // Network was used
263
fallback.assertWasNotSubscribed(); // Fallback was not needed
264
}
265
```
266
267
### Integration with Other Testing Utilities
268
269
PublisherProbe works well with other reactor-test utilities:
270
271
```java
272
@Test
273
public void testWithStepVerifier() {
274
PublisherProbe<String> probe = PublisherProbe.of(
275
Flux.just("a", "b", "c").delayElements(Duration.ofMillis(100))
276
);
277
278
StepVerifier.create(probe.flux())
279
.expectNext("a", "b", "c")
280
.expectComplete()
281
.verify();
282
283
// Verify subscription behavior
284
probe.assertWasSubscribed();
285
probe.assertWasRequested();
286
assert probe.subscribeCount() == 1;
287
}
288
289
@Test
290
public void testWithTestPublisher() {
291
TestPublisher<String> testPub = TestPublisher.create();
292
PublisherProbe<String> probe = PublisherProbe.of(testPub.flux());
293
294
StepVerifier.create(probe.flux())
295
.then(() -> testPub.next("test").complete())
296
.expectNext("test")
297
.expectComplete()
298
.verify();
299
300
probe.assertWasSubscribed();
301
testPub.assertSubscribers(1);
302
}
303
```