0
# Test Publishers
1
2
Publisher utilities for creating controllable upstream sources in tests. Test publishers implement the Reactive Streams Publisher interface while providing fine-grained control over element emission, demand tracking, and subscription lifecycle management.
3
4
## Capabilities
5
6
### Factory Methods
7
8
Static factory methods for creating various types of test publishers with different behavior patterns.
9
10
```scala { .api }
11
/**
12
* Publisher that signals complete to subscribers immediately after handing a void subscription
13
*/
14
def empty[T](): Publisher[T]
15
16
/**
17
* Publisher that subscribes the subscriber and completes after the first request
18
*/
19
def lazyEmpty[T]: Publisher[T]
20
21
/**
22
* Publisher that signals error to subscribers immediately after handing out subscription
23
*/
24
def error[T](cause: Throwable): Publisher[T]
25
26
/**
27
* Publisher that subscribes the subscriber and signals error after the first request
28
*/
29
def lazyError[T](cause: Throwable): Publisher[T]
30
31
/**
32
* Creates a manual probe that implements Publisher interface
33
* @param autoOnSubscribe Whether to automatically call onSubscribe (default: true)
34
*/
35
def manualProbe[T](autoOnSubscribe: Boolean = true)(implicit system: ActorSystem): ManualProbe[T]
36
37
/**
38
* Creates a probe that implements Publisher interface and tracks demand
39
* @param initialPendingRequests Initial number of pending requests (default: 0)
40
*/
41
def probe[T](initialPendingRequests: Long = 0)(implicit system: ActorSystem): Probe[T]
42
```
43
44
**Usage Examples:**
45
46
```scala
47
import akka.stream.testkit.TestPublisher
48
49
// Empty publishers for testing completion scenarios
50
val emptyPub = TestPublisher.empty[String]()
51
val lazyEmptyPub = TestPublisher.lazyEmpty[String]
52
53
// Error publishers for testing error scenarios
54
val errorPub = TestPublisher.error[String](new RuntimeException("test error"))
55
val lazyErrorPub = TestPublisher.lazyError[String](new RuntimeException("test error"))
56
57
// Manual control publishers
58
val manualPub = TestPublisher.manualProbe[String]()
59
val autoPub = TestPublisher.probe[String]()
60
```
61
62
### ManualProbe Class
63
64
Manual test publisher that provides complete control over the publishing lifecycle and requires explicit management of subscription and demand.
65
66
```scala { .api }
67
class ManualProbe[I] extends Publisher[I] {
68
type Self <: ManualProbe[I]
69
70
/**
71
* Subscribes a given Subscriber to this probe publisher
72
*/
73
def subscribe(subscriber: Subscriber[_ >: I]): Unit
74
75
/**
76
* Execute code block after subscription is established
77
*/
78
def executeAfterSubscription[T](f: => T): T
79
80
/**
81
* Expect a subscription and return the subscription object
82
*/
83
def expectSubscription(): PublisherProbeSubscription[I]
84
85
/**
86
* Expect demand from a given subscription
87
*/
88
def expectRequest(subscription: Subscription, n: Int): Self
89
90
/**
91
* Expect no messages for the default period
92
*/
93
def expectNoMessage(): Self
94
95
/**
96
* Expect no messages for a given duration
97
*/
98
def expectNoMessage(max: FiniteDuration): Self
99
100
/**
101
* Receive messages for a given duration or until one does not match a given partial function
102
*/
103
def receiveWhile[T](
104
max: Duration = Duration.Undefined,
105
idle: Duration = Duration.Inf,
106
messages: Int = Int.MaxValue
107
)(f: PartialFunction[PublisherEvent, T]): immutable.Seq[T]
108
109
/**
110
* Expect an event matching the partial function
111
*/
112
def expectEventPF[T](f: PartialFunction[PublisherEvent, T]): T
113
114
/**
115
* Get the Publisher interface
116
*/
117
def getPublisher: Publisher[I]
118
119
/**
120
* Execute code block while bounding its execution time between min and max
121
*/
122
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
123
124
/**
125
* Execute code block within the specified maximum time
126
*/
127
def within[T](max: FiniteDuration)(f: => T): T
128
}
129
```
130
131
**Usage Examples:**
132
133
```scala
134
import akka.stream.testkit.TestPublisher
135
136
val probe = TestPublisher.manualProbe[String]()
137
val subscription = probe.expectSubscription()
138
139
// Expect demand and send elements
140
probe.expectRequest(subscription, 1)
141
subscription.sendNext("hello")
142
143
probe.expectRequest(subscription, 2)
144
subscription.sendNext("world")
145
subscription.sendComplete()
146
```
147
148
### Probe Class
149
150
Test publisher with automatic demand tracking that extends ManualProbe with simplified element sending and demand management.
151
152
```scala { .api }
153
class Probe[T] extends ManualProbe[T] {
154
type Self = Probe[T]
155
156
/**
157
* Asserts that a subscription has been received or will be received
158
*/
159
def ensureSubscription(): Unit
160
161
/**
162
* Current pending requests
163
*/
164
def pending: Long
165
166
/**
167
* Send next element, automatically checking and decrementing demand
168
*/
169
def sendNext(elem: T): Self
170
171
/**
172
* Send next element without checking demand (unsafe)
173
*/
174
def unsafeSendNext(elem: T): Self
175
176
/**
177
* Send completion signal
178
*/
179
def sendComplete(): Self
180
181
/**
182
* Send error signal
183
*/
184
def sendError(cause: Throwable): Self
185
186
/**
187
* Expect request and add to pending requests, returning the request amount
188
*/
189
def expectRequest(): Long
190
191
/**
192
* Expect cancellation
193
*/
194
def expectCancellation(): Self
195
196
/**
197
* Expect cancellation with specific cause
198
*/
199
def expectCancellationWithCause(expectedCause: Throwable): Self
200
201
/**
202
* Expect cancellation with typed cause, returning the cause
203
*/
204
def expectCancellationWithCause[E <: Throwable: ClassTag](): E
205
206
/**
207
* Java API: Expect cancellation with specific cause class
208
*/
209
def expectCancellationWithCause[E <: Throwable](causeClass: Class[E]): E
210
}
211
```
212
213
**Usage Examples:**
214
215
```scala
216
import akka.stream.testkit.TestPublisher
217
218
val probe = TestPublisher.probe[Int]()
219
220
// Send elements with automatic demand management
221
probe.sendNext(1)
222
probe.sendNext(2)
223
probe.sendNext(3)
224
probe.sendComplete()
225
226
// Check pending demand
227
println(s"Pending requests: ${probe.pending}")
228
229
// Handle cancellation scenarios
230
probe.expectCancellation()
231
232
// Handle typed cancellation causes
233
val cause = probe.expectCancellationWithCause[IllegalArgumentException]()
234
```
235
236
### PublisherProbeSubscription
237
238
Internal subscription implementation that bridges the test probe with subscribers, providing methods for sending elements and expecting subscription events.
239
240
```scala { .api }
241
case class PublisherProbeSubscription[I](
242
subscriber: Subscriber[_ >: I],
243
publisherProbe: TestProbe
244
) extends Subscription with SubscriptionWithCancelException {
245
246
/**
247
* Request elements from the subscription
248
*/
249
def request(elements: Long): Unit
250
251
/**
252
* Cancel subscription with cause
253
*/
254
def cancel(cause: Throwable): Unit
255
256
/**
257
* Expect a specific request amount
258
*/
259
def expectRequest(n: Long): Unit
260
261
/**
262
* Expect any request and return the amount
263
*/
264
def expectRequest(): Long
265
266
/**
267
* Expect cancellation and return the cause
268
*/
269
def expectCancellation(): Throwable
270
271
/**
272
* Send next element to subscriber
273
*/
274
def sendNext(element: I): Unit
275
276
/**
277
* Send completion signal to subscriber
278
*/
279
def sendComplete(): Unit
280
281
/**
282
* Send error signal to subscriber
283
*/
284
def sendError(cause: Throwable): Unit
285
286
/**
287
* Send onSubscribe signal to subscriber
288
*/
289
def sendOnSubscribe(): Unit
290
}
291
```
292
293
## Publisher Events
294
295
```scala { .api }
296
sealed trait PublisherEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded
297
298
/**
299
* Emitted when a subscriber subscribes to the publisher
300
*/
301
final case class Subscribe(subscription: Subscription) extends PublisherEvent
302
303
/**
304
* Emitted when a subscription is cancelled
305
*/
306
final case class CancelSubscription(subscription: Subscription, cause: Throwable) extends PublisherEvent
307
308
/**
309
* Emitted when a subscriber requests more elements
310
*/
311
final case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent
312
```
313
314
## Common Patterns
315
316
### Testing Backpressure
317
318
```scala
319
val probe = TestPublisher.probe[Int]()
320
321
// Send multiple elements
322
probe.sendNext(1)
323
probe.sendNext(2)
324
probe.sendNext(3)
325
326
// Verify demand management
327
probe.expectRequest() should be > 0L
328
```
329
330
### Testing Error Scenarios
331
332
```scala
333
val probe = TestPublisher.probe[String]()
334
val error = new RuntimeException("test failure")
335
336
probe.sendError(error)
337
// Verify error handling in downstream
338
```
339
340
### Testing Stream Completion
341
342
```scala
343
val probe = TestPublisher.probe[Int]()
344
345
probe.sendNext(1)
346
probe.sendNext(2)
347
probe.sendComplete()
348
349
// Verify completion behavior
350
```