0
# Controlled Publishers
1
2
TestPublisher provides a manipulable Publisher implementation for controlled signal emission in tests. It allows complete control over when and what signals are emitted, making it ideal for testing subscriber behavior and backpressure scenarios.
3
4
## Capabilities
5
6
### Creating TestPublishers
7
8
Factory methods for creating different types of TestPublisher instances.
9
10
```java { .api }
11
abstract class TestPublisher<T> implements Publisher<T>, PublisherProbe<T> {
12
/** Create standard hot TestPublisher */
13
static <T> TestPublisher<T> create();
14
15
/** Create spec-violating TestPublisher with specified violations */
16
static <T> TestPublisher<T> createNoncompliant(Violation first, Violation... rest);
17
18
/** Create cold TestPublisher with buffering support for late subscribers */
19
static <T> TestPublisher<T> createCold();
20
21
/** Create cold TestPublisher that errors on overflow instead of buffering */
22
static <T> TestPublisher<T> createColdNonBuffering();
23
24
/** Create cold non-compliant TestPublisher with custom behavior */
25
static <T> TestPublisher<T> createColdNonCompliant(
26
boolean errorOnOverflow,
27
Violation firstViolation,
28
Violation... otherViolations
29
);
30
}
31
```
32
33
**Usage Examples:**
34
35
```java
36
import reactor.test.publisher.TestPublisher;
37
38
// Hot publisher - signals sent to current subscribers only
39
TestPublisher<String> hotPublisher = TestPublisher.create();
40
hotPublisher.next("early"); // Lost if no subscribers yet
41
42
Flux<String> flux = hotPublisher.flux();
43
flux.subscribe(System.out::println);
44
hotPublisher.next("hello"); // Delivered to subscriber
45
hotPublisher.complete();
46
47
// Cold publisher - buffers signals for late subscribers
48
TestPublisher<Integer> coldPublisher = TestPublisher.createCold();
49
coldPublisher.next(1, 2, 3); // Buffered
50
coldPublisher.complete();
51
52
// Late subscriber receives all buffered signals
53
coldPublisher.flux().subscribe(System.out::println); // Prints: 1, 2, 3
54
55
// Non-compliant publisher for testing error handling
56
TestPublisher<String> badPublisher = TestPublisher.createNoncompliant(
57
TestPublisher.Violation.ALLOW_NULL
58
);
59
badPublisher.next(null); // Normally illegal, but allowed
60
```
61
62
### Conversion Methods
63
64
Convert TestPublisher to standard Reactor types.
65
66
```java { .api }
67
abstract class TestPublisher<T> {
68
/** Wrap as Flux */
69
Flux<T> flux();
70
71
/** Wrap as Mono (will error if more than one element) */
72
Mono<T> mono();
73
}
74
```
75
76
### Signal Emission
77
78
Methods for manually emitting reactive signals.
79
80
```java { .api }
81
abstract class TestPublisher<T> {
82
/** Emit single onNext signal */
83
TestPublisher<T> next(@Nullable T value);
84
85
/** Emit multiple onNext signals */
86
TestPublisher<T> next(T first, T... rest);
87
88
/** Emit values and complete immediately */
89
TestPublisher<T> emit(T... values);
90
91
/** Trigger error signal */
92
TestPublisher<T> error(Throwable t);
93
94
/** Trigger completion signal */
95
TestPublisher<T> complete();
96
}
97
```
98
99
**Usage Examples:**
100
101
```java
102
TestPublisher<String> publisher = TestPublisher.create();
103
104
// Emit signals one by one
105
publisher.next("first")
106
.next("second")
107
.complete();
108
109
// Emit multiple values at once
110
publisher.next("hello", "world", "!");
111
112
// Emit values and complete in one call
113
TestPublisher.create()
114
.emit("alpha", "beta", "gamma"); // Automatically completes
115
116
// Error scenarios
117
publisher.error(new RuntimeException("Something went wrong"));
118
```
119
120
### Subscription Assertions
121
122
Methods for asserting subscriber behavior and request patterns.
123
124
```java { .api }
125
abstract class TestPublisher<T> {
126
/** Assert minimum request amount received */
127
TestPublisher<T> assertMinRequested(long n);
128
129
/** Assert maximum request amount received */
130
TestPublisher<T> assertMaxRequested(long n);
131
132
/** Assert has at least one subscriber */
133
TestPublisher<T> assertSubscribers();
134
135
/** Assert specific number of subscribers */
136
TestPublisher<T> assertSubscribers(int n);
137
138
/** Assert no subscribers */
139
TestPublisher<T> assertNoSubscribers();
140
141
/** Assert has cancelled subscribers */
142
TestPublisher<T> assertCancelled();
143
144
/** Assert specific number of cancelled subscribers */
145
TestPublisher<T> assertCancelled(int n);
146
147
/** Assert no cancelled subscribers */
148
TestPublisher<T> assertNotCancelled();
149
150
/** Assert request overflow occurred (requested more than available) */
151
TestPublisher<T> assertRequestOverflow();
152
153
/** Assert no request overflow */
154
TestPublisher<T> assertNoRequestOverflow();
155
}
156
```
157
158
**Usage Examples:**
159
160
```java
161
TestPublisher<Integer> publisher = TestPublisher.create();
162
163
// Subscribe and make requests
164
Disposable subscription = publisher.flux()
165
.subscribe(System.out::println);
166
167
// Test subscription behavior
168
publisher.assertSubscribers(1) // Has 1 subscriber
169
.assertMinRequested(1) // Requested at least 1 element
170
.next(42) // Send value
171
.assertNotCancelled() // Still subscribed
172
.complete();
173
174
// Test cancellation
175
subscription.dispose();
176
publisher.assertCancelled(1); // 1 cancelled subscriber
177
178
// Test backpressure
179
publisher.flux()
180
.take(5) // Subscriber will cancel after 5
181
.subscribe();
182
183
publisher.next(1, 2, 3, 4, 5, 6) // Send 6 values
184
.assertCancelled(); // Subscriber cancelled after 5
185
```
186
187
### Publisher Probe Integration
188
189
TestPublisher implements PublisherProbe for subscription event tracking.
190
191
```java { .api }
192
// TestPublisher inherits from PublisherProbe
193
interface PublisherProbe<T> {
194
boolean wasSubscribed();
195
long subscribeCount();
196
boolean wasCancelled();
197
boolean wasRequested();
198
199
void assertWasSubscribed();
200
void assertWasNotSubscribed();
201
void assertWasCancelled();
202
void assertWasNotCancelled();
203
void assertWasRequested();
204
void assertWasNotRequested();
205
}
206
```
207
208
## Types
209
210
```java { .api }
211
// Reactive Streams specification violations for testing error handling
212
enum TestPublisher.Violation {
213
/** Allow next calls despite insufficient request (violates backpressure) */
214
REQUEST_OVERFLOW,
215
216
/** Allow null values in next calls (violates non-null requirement) */
217
ALLOW_NULL,
218
219
/** Allow multiple termination signals (violates single terminal) */
220
CLEANUP_ON_TERMINATE,
221
222
/** Ignore cancellation signals (violates cancellation semantics) */
223
DEFER_CANCELLATION
224
}
225
```
226
227
**Violation Usage Examples:**
228
229
```java
230
// Test handling of backpressure violations
231
TestPublisher<String> overflowPublisher = TestPublisher.createNoncompliant(
232
TestPublisher.Violation.REQUEST_OVERFLOW
233
);
234
235
StepVerifier.create(overflowPublisher.flux(), 0) // Request 0 initially
236
.then(() -> overflowPublisher.next("overflow")) // Send despite no request
237
.thenRequest(1)
238
.expectNext("overflow")
239
.expectComplete()
240
.verify();
241
242
// Test null value handling
243
TestPublisher<String> nullPublisher = TestPublisher.createNoncompliant(
244
TestPublisher.Violation.ALLOW_NULL
245
);
246
247
StepVerifier.create(nullPublisher.flux())
248
.then(() -> nullPublisher.next(null).complete())
249
.expectNext((String) null)
250
.expectComplete()
251
.verify();
252
253
// Test multiple termination signals
254
TestPublisher<String> multiTermPublisher = TestPublisher.createNoncompliant(
255
TestPublisher.Violation.CLEANUP_ON_TERMINATE
256
);
257
258
StepVerifier.create(multiTermPublisher.flux())
259
.then(() -> {
260
multiTermPublisher.complete(); // First termination
261
multiTermPublisher.error(new RuntimeException()); // Second termination
262
})
263
.expectComplete() // Only first termination takes effect
264
.verify();
265
266
// Test cancellation deferral
267
TestPublisher<Integer> deferCancelPublisher = TestPublisher.createNoncompliant(
268
TestPublisher.Violation.DEFER_CANCELLATION
269
);
270
271
// Publisher ignores cancellation and continues emitting
272
Disposable subscription = deferCancelPublisher.flux()
273
.subscribe(System.out::println);
274
275
subscription.dispose(); // Cancel subscription
276
deferCancelPublisher.next(1, 2, 3); // Still emits despite cancellation
277
```