0
# Test Subscribers
1
2
Subscriber utilities for creating controllable downstream sinks in tests. Test subscribers implement the Reactive Streams Subscriber interface while providing comprehensive assertion capabilities for verifying stream behavior, element expectations, and timing requirements.
3
4
## Capabilities
5
6
### Factory Methods
7
8
Static factory methods for creating test subscribers with different control mechanisms.
9
10
```scala { .api }
11
/**
12
* Creates a manual probe that implements Subscriber interface
13
*/
14
def manualProbe[T]()(implicit system: ActorSystem): ManualProbe[T]
15
16
/**
17
* Creates a subscriber probe with automatic subscription management
18
*/
19
def probe[T]()(implicit system: ActorSystem): Probe[T]
20
```
21
22
**Usage Examples:**
23
24
```scala
25
import akka.stream.testkit.TestSubscriber
26
27
// Manual control subscriber
28
val manualProbe = TestSubscriber.manualProbe[String]()
29
30
// Automatic subscription management
31
val autoProbe = TestSubscriber.probe[String]()
32
```
33
34
### ManualProbe Class
35
36
Manual test subscriber that provides complete control over subscription management and element expectations with comprehensive assertion methods.
37
38
```scala { .api }
39
class ManualProbe[I] extends Subscriber[I] {
40
type Self <: ManualProbe[I]
41
42
/**
43
* Expect and return a Subscription
44
*/
45
def expectSubscription(): Subscription
46
47
/**
48
* Expect and return any SubscriberEvent (OnSubscribe, OnNext, OnError, or OnComplete)
49
*/
50
def expectEvent(): SubscriberEvent
51
def expectEvent(max: FiniteDuration): SubscriberEvent
52
def expectEvent(event: SubscriberEvent): Self
53
54
/**
55
* Expect and return a stream element
56
*/
57
def expectNext(): I
58
def expectNext(d: FiniteDuration): I
59
def expectNext(element: I): Self
60
def expectNext(d: FiniteDuration, element: I): Self
61
62
/**
63
* Expect multiple stream elements in order
64
*/
65
def expectNext(e1: I, e2: I, es: I*): Self
66
67
/**
68
* Expect multiple stream elements in arbitrary order
69
*/
70
def expectNextUnordered(e1: I, e2: I, es: I*): Self
71
72
/**
73
* Expect and return the next N stream elements
74
*/
75
def expectNextN(n: Long): immutable.Seq[I]
76
def expectNextN(all: immutable.Seq[I]): Self
77
78
/**
79
* Expect elements in any order
80
*/
81
def expectNextUnorderedN(all: immutable.Seq[I]): Self
82
83
/**
84
* Expect stream completion
85
*/
86
def expectComplete(): Self
87
88
/**
89
* Expect and return the signalled Throwable
90
*/
91
def expectError(): Throwable
92
def expectError(cause: Throwable): Self
93
94
/**
95
* Expect subscription followed by error (with optional demand signaling)
96
*/
97
def expectSubscriptionAndError(): Throwable
98
def expectSubscriptionAndError(signalDemand: Boolean): Throwable
99
def expectSubscriptionAndError(cause: Throwable): Self
100
def expectSubscriptionAndError(cause: Throwable, signalDemand: Boolean): Self
101
102
/**
103
* Expect subscription followed by completion (with optional demand signaling)
104
*/
105
def expectSubscriptionAndComplete(): Self
106
def expectSubscriptionAndComplete(signalDemand: Boolean): Self
107
108
/**
109
* Expect next element or error signal, returning whichever was signalled
110
*/
111
def expectNextOrError(): Either[Throwable, I]
112
def expectNextOrError(element: I, cause: Throwable): Either[Throwable, I]
113
114
/**
115
* Expect next element or stream completion
116
*/
117
def expectNextOrComplete(): Either[OnComplete.type, I]
118
def expectNextOrComplete(element: I): Self
119
120
/**
121
* Assert that no message is received for the specified time
122
*/
123
def expectNoMessage(): Self
124
def expectNoMessage(remaining: FiniteDuration): Self
125
def expectNoMessage(remaining: java.time.Duration): Self
126
127
/**
128
* Expect a stream element and test it with partial function
129
*/
130
def expectNextPF[T](f: PartialFunction[Any, T]): T
131
def expectNextWithTimeoutPF[T](max: Duration, f: PartialFunction[Any, T]): T
132
def expectNextChainingPF(f: PartialFunction[Any, Any]): Self
133
def expectNextChainingPF(max: Duration, f: PartialFunction[Any, Any]): Self
134
135
/**
136
* Expect event matching partial function
137
*/
138
def expectEventPF[T](f: PartialFunction[SubscriberEvent, T]): T
139
def expectEventWithTimeoutPF[T](max: Duration, f: PartialFunction[SubscriberEvent, T]): T
140
141
/**
142
* Receive messages for a given duration or until one does not match a given partial function
143
*/
144
def receiveWhile[T](
145
max: Duration = Duration.Undefined,
146
idle: Duration = Duration.Inf,
147
messages: Int = Int.MaxValue
148
)(f: PartialFunction[SubscriberEvent, T]): immutable.Seq[T]
149
150
/**
151
* Receive messages within time limit
152
*/
153
def receiveWithin(max: FiniteDuration, messages: Int = Int.MaxValue): immutable.Seq[I]
154
155
/**
156
* Attempt to drain the stream into a strict collection (requests Long.MaxValue elements)
157
* WARNING: Use with caution for infinite streams or large elements
158
*/
159
def toStrict(atMost: FiniteDuration): immutable.Seq[I]
160
161
/**
162
* Execute code block while bounding its execution time
163
*/
164
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
165
def within[T](max: FiniteDuration)(f: => T): T
166
167
// Subscriber interface methods
168
def onSubscribe(subscription: Subscription): Unit
169
def onNext(element: I): Unit
170
def onComplete(): Unit
171
def onError(cause: Throwable): Unit
172
}
173
```
174
175
**Usage Examples:**
176
177
```scala
178
import akka.stream.testkit.TestSubscriber
179
import scala.concurrent.duration._
180
181
val probe = TestSubscriber.manualProbe[String]()
182
183
// Subscribe to a source
184
source.subscribe(probe)
185
186
// Expect subscription and request elements
187
val subscription = probe.expectSubscription()
188
subscription.request(3)
189
190
// Expect specific elements
191
probe.expectNext("hello")
192
probe.expectNext("world")
193
probe.expectComplete()
194
195
// Expect elements with timeout
196
probe.expectNext(1.second, "delayed")
197
198
// Expect multiple elements
199
probe.expectNext("a", "b", "c")
200
201
// Expect elements in any order
202
probe.expectNextUnordered("x", "y", "z")
203
204
// Pattern matching on elements
205
probe.expectNextPF {
206
case s: String if s.startsWith("test") => s.length
207
}
208
```
209
210
### Probe Class
211
212
Test subscriber with automatic subscription management that extends ManualProbe with simplified request handling and additional convenience methods.
213
214
```scala { .api }
215
class Probe[T] extends ManualProbe[T] {
216
override type Self = Probe[T]
217
218
/**
219
* Asserts that a subscription has been received or will be received
220
*/
221
def ensureSubscription(): Self
222
223
/**
224
* Request N elements from the subscription
225
*/
226
def request(n: Long): Self
227
228
/**
229
* Request and expect a specific stream element
230
*/
231
def requestNext(element: T): Self
232
233
/**
234
* Request and expect the next stream element, returning it
235
*/
236
def requestNext(): T
237
def requestNext(d: FiniteDuration): T
238
239
/**
240
* Cancel the subscription
241
*/
242
def cancel(): Self
243
244
/**
245
* Cancel the subscription with a specific cause
246
*/
247
def cancel(cause: Throwable): Self
248
}
249
```
250
251
**Usage Examples:**
252
253
```scala
254
import akka.stream.testkit.TestSubscriber
255
256
val probe = TestSubscriber.probe[Int]()
257
258
// Automatic subscription management
259
source.runWith(Sink.fromSubscriber(probe))
260
261
// Request and expect elements
262
probe.request(1)
263
val element = probe.expectNext()
264
265
// Request and expect specific element
266
probe.requestNext(42)
267
268
// Request and return element
269
val next = probe.requestNext()
270
271
// Cancel subscription
272
probe.cancel()
273
274
// Cancel with cause
275
probe.cancel(new RuntimeException("test cancellation"))
276
```
277
278
## Subscriber Events
279
280
```scala { .api }
281
sealed trait SubscriberEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded
282
283
/**
284
* Received when the subscriber is subscribed to a publisher
285
*/
286
final case class OnSubscribe(subscription: Subscription) extends SubscriberEvent
287
288
/**
289
* Received when a stream element arrives
290
*/
291
final case class OnNext[I](element: I) extends SubscriberEvent
292
293
/**
294
* Received when the stream completes successfully
295
*/
296
case object OnComplete extends SubscriberEvent
297
298
/**
299
* Received when the stream terminates with an error
300
*/
301
final case class OnError(cause: Throwable) extends SubscriberEvent
302
```
303
304
## Advanced Usage Patterns
305
306
### Testing Stream Completion
307
308
```scala
309
val probe = TestSubscriber.probe[String]()
310
311
// Test immediate completion
312
probe.expectSubscriptionAndComplete()
313
314
// Test completion after elements
315
val subscription = probe.expectSubscription()
316
subscription.request(2)
317
probe.expectNext("a", "b")
318
probe.expectComplete()
319
```
320
321
### Testing Error Scenarios
322
323
```scala
324
val probe = TestSubscriber.probe[String]()
325
326
// Test immediate error
327
val error = probe.expectSubscriptionAndError()
328
error shouldBe a[RuntimeException]
329
330
// Test error after elements
331
val subscription = probe.expectSubscription()
332
subscription.request(1)
333
probe.expectNext("element")
334
val thrownError = probe.expectError()
335
```
336
337
### Testing Backpressure
338
339
```scala
340
val probe = TestSubscriber.probe[Int]()
341
342
// Don't request initially, verify no elements arrive
343
probe.expectSubscription()
344
probe.expectNoMessage(100.millis)
345
346
// Request and verify elements arrive
347
probe.request(2)
348
probe.expectNext(1, 2)
349
```
350
351
### Testing Large Streams
352
353
```scala
354
val probe = TestSubscriber.probe[Int]()
355
356
// Request many elements
357
probe.request(1000)
358
359
// Verify specific elements
360
val elements = probe.expectNextN(1000)
361
elements should have size 1000
362
363
// Or drain to strict collection (use carefully)
364
val allElements = probe.toStrict(5.seconds)
365
```
366
367
### Conditional Element Testing
368
369
```scala
370
val probe = TestSubscriber.probe[String]()
371
372
// Test elements with partial function
373
probe.expectNextPF {
374
case s if s.length > 5 => s.toUpperCase
375
}
376
377
// Chain multiple expectations
378
probe
379
.expectNextChainingPF {
380
case s: String => s.trim
381
}
382
.expectNextChainingPF {
383
case s if s.nonEmpty => s
384
}
385
```
386
387
### Testing Alternative Outcomes
388
389
```scala
390
val probe = TestSubscriber.probe[String]()
391
392
// Expect either next element or error
393
probe.expectNextOrError() match {
394
case Right(element) => println(s"Got element: $element")
395
case Left(error) => println(s"Got error: $error")
396
}
397
398
// Expect either next element or completion
399
probe.expectNextOrComplete() match {
400
case Right(element) => println(s"Got element: $element")
401
case Left(OnComplete) => println("Stream completed")
402
}
403
```