0
# Test Sources and Sinks
1
2
Factory methods for creating Source and Sink instances that materialize to test probes. These provide the primary interface for stream testing by creating streams that can be controlled and observed through probe interfaces.
3
4
## Capabilities
5
6
### Test Source (Scala DSL)
7
8
Creates Source instances that materialize to TestPublisher.Probe for controllable upstream testing.
9
10
```scala { .api }
11
object TestSource {
12
/**
13
* Creates a Source that materializes to a TestPublisher.Probe
14
* @param system The actor system provider
15
* @return Source that emits elements of type T and materializes to TestPublisher.Probe[T]
16
*/
17
def apply[T]()(implicit system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]]
18
19
/**
20
* Deprecated method - use apply() instead
21
*/
22
@deprecated("Use `TestSource()` with implicit ClassicActorSystemProvider instead.", "2.7.0")
23
def probe[T](implicit system: ActorSystem): Source[T, TestPublisher.Probe[T]]
24
}
25
```
26
27
**Usage Examples:**
28
29
```scala
30
import akka.actor.ActorSystem
31
import akka.stream.scaladsl.{Keep, Sink}
32
import akka.stream.testkit.scaladsl.TestSource
33
34
implicit val system = ActorSystem("test")
35
36
// Create test source and materialize
37
val (probe, future) = TestSource[String]()
38
.toMat(Sink.seq)(Keep.both)
39
.run()
40
41
// Control the source
42
probe.sendNext("hello")
43
probe.sendNext("world")
44
probe.sendComplete()
45
46
// Verify results
47
val result = Await.result(future, 3.seconds)
48
result should contain inOrderOnly ("hello", "world")
49
50
// Pre-materialize for multiple uses
51
val (pub, source) = TestSource[Int]().preMaterialize()
52
pub.sendNext(42)
53
54
val sink1 = source.runWith(Sink.head)
55
val sink2 = source.runWith(Sink.seq)
56
```
57
58
### Test Sink (Scala DSL)
59
60
Creates Sink instances that materialize to TestSubscriber.Probe for controllable downstream testing.
61
62
```scala { .api }
63
object TestSink {
64
/**
65
* Creates a Sink that materializes to a TestSubscriber.Probe
66
* @param system The actor system provider
67
* @return Sink that consumes elements of type T and materializes to TestSubscriber.Probe[T]
68
*/
69
def apply[T]()(implicit system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]]
70
71
/**
72
* Deprecated method - use apply() instead
73
*/
74
@deprecated("Use `TestSink()` with implicit ClassicActorSystemProvider instead.", "2.7.0")
75
def probe[T](implicit system: ActorSystem): Sink[T, TestSubscriber.Probe[T]]
76
}
77
```
78
79
**Usage Examples:**
80
81
```scala
82
import akka.stream.scaladsl.Source
83
import akka.stream.testkit.scaladsl.TestSink
84
85
// Create test sink and materialize
86
val probe = Source(1 to 5)
87
.map(_ * 2)
88
.runWith(TestSink[Int]())
89
90
// Verify elements
91
probe.request(5)
92
probe.expectNext(2, 4, 6, 8, 10)
93
probe.expectComplete()
94
95
// Use with Keep.both for bidirectional testing
96
val (sourceProbe, sinkProbe) = TestSource[String]()
97
.map(_.toUpperCase)
98
.toMat(TestSink[String]())(Keep.both)
99
.run()
100
101
sourceProbe.sendNext("hello")
102
sourceProbe.sendComplete()
103
104
sinkProbe.request(1)
105
sinkProbe.expectNext("HELLO")
106
sinkProbe.expectComplete()
107
```
108
109
### Test Source (Java API)
110
111
Java API for creating test sources.
112
113
```scala { .api }
114
object TestSource {
115
/**
116
* Creates a Source that materializes to a TestPublisher.Probe (Java API)
117
*/
118
def create[T](system: ClassicActorSystemProvider): akka.stream.javadsl.Source[T, TestPublisher.Probe[T]]
119
120
/**
121
* Deprecated method - use create() instead
122
*/
123
@deprecated("Use `TestSource.create` with ClassicActorSystemProvider instead.", "2.7.0")
124
def probe[T](system: ActorSystem): akka.stream.javadsl.Source[T, TestPublisher.Probe[T]]
125
}
126
```
127
128
**Usage Examples (Java):**
129
130
```java
131
import akka.actor.ActorSystem;
132
import akka.stream.javadsl.Sink;
133
import akka.stream.testkit.javadsl.TestSource;
134
import akka.japi.Pair;
135
136
ActorSystem system = ActorSystem.create("test");
137
138
// Create test source
139
Pair<TestPublisher.Probe<String>, CompletionStage<List<String>>> pair =
140
TestSource.<String>create(system)
141
.toMat(Sink.seq(), Keep.both())
142
.run(system);
143
144
TestPublisher.Probe<String> probe = pair.first();
145
CompletionStage<List<String>> future = pair.second();
146
147
// Control the source
148
probe.sendNext("hello");
149
probe.sendNext("world");
150
probe.sendComplete();
151
152
// Verify results
153
List<String> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
154
assertEquals(Arrays.asList("hello", "world"), result);
155
```
156
157
### Test Sink (Java API)
158
159
Java API for creating test sinks.
160
161
```scala { .api }
162
object TestSink {
163
/**
164
* Creates a Sink that materializes to a TestSubscriber.Probe (Java API)
165
*/
166
def create[T](system: ClassicActorSystemProvider): akka.stream.javadsl.Sink[T, TestSubscriber.Probe[T]]
167
168
/**
169
* Deprecated method - use create() instead
170
*/
171
@deprecated("Use `TestSink.create` with ClassicActorSystemProvider instead.", "2.7.0")
172
def probe[T](system: ActorSystem): akka.stream.javadsl.Sink[T, TestSubscriber.Probe[T]]
173
}
174
```
175
176
**Usage Examples (Java):**
177
178
```java
179
import akka.stream.javadsl.Source;
180
import akka.stream.testkit.javadsl.TestSink;
181
182
// Create test sink
183
TestSubscriber.Probe<Integer> probe =
184
Source.range(1, 5)
185
.map(x -> x * 2)
186
.runWith(TestSink.create(system), system);
187
188
// Verify elements
189
probe.request(5);
190
probe.expectNext(2, 4, 6, 8, 10);
191
probe.expectComplete();
192
```
193
194
## Common Testing Patterns
195
196
### End-to-End Stream Testing
197
198
```scala
199
import akka.stream.scaladsl.Flow
200
201
val (sourceProbe, sinkProbe) = TestSource[String]()
202
.via(Flow[String].map(_.toUpperCase).filter(_.length > 3))
203
.toMat(TestSink[String]())(Keep.both)
204
.run()
205
206
// Test the flow
207
sourceProbe.sendNext("hi") // Should be filtered out
208
sourceProbe.sendNext("hello") // Should pass through
209
sourceProbe.sendNext("world") // Should pass through
210
sourceProbe.sendComplete()
211
212
sinkProbe.request(10)
213
sinkProbe.expectNext("HELLO", "WORLD")
214
sinkProbe.expectComplete()
215
```
216
217
### Testing Backpressure Propagation
218
219
```scala
220
val (sourceProbe, sinkProbe) = TestSource[Int]()
221
.toMat(TestSink[Int]())(Keep.both)
222
.run()
223
224
// Don't request from sink - should not be able to send from source
225
sourceProbe.sendNext(1)
226
sourceProbe.expectRequest() // Will block until downstream requests
227
228
// Now request and verify element flows
229
sinkProbe.request(1)
230
// sourceProbe.expectRequest() returns now
231
sinkProbe.expectNext(1)
232
```
233
234
### Testing Error Propagation
235
236
```scala
237
val (sourceProbe, sinkProbe) = TestSource[String]()
238
.map(s => if (s == "error") throw new RuntimeException("test") else s)
239
.toMat(TestSink[String]())(Keep.both)
240
.run()
241
242
sourceProbe.sendNext("ok")
243
sourceProbe.sendNext("error")
244
245
sinkProbe.request(2)
246
sinkProbe.expectNext("ok")
247
val error = sinkProbe.expectError()
248
error.getMessage should be("test")
249
```
250
251
### Testing Stream Completion
252
253
```scala
254
val (sourceProbe, sinkProbe) = TestSource[String]()
255
.toMat(TestSink[String]())(Keep.both)
256
.run()
257
258
sourceProbe.sendNext("last")
259
sourceProbe.sendComplete()
260
261
sinkProbe.request(1)
262
sinkProbe.expectNext("last")
263
sinkProbe.expectComplete()
264
```
265
266
### Testing Materialize Once, Use Multiple Times
267
268
```scala
269
// Pre-materialize for reuse
270
val (sourceProbe, source) = TestSource[Int]().preMaterialize()
271
272
// Use with multiple sinks
273
val sink1 = source.runWith(Sink.head)
274
val sink2 = source.runWith(Sink.seq)
275
val sink3 = source.runWith(TestSink[Int]())
276
277
// Control single source, affects all sinks
278
sourceProbe.sendNext(42)
279
sourceProbe.sendComplete()
280
281
// Verify all sinks received the element
282
Await.result(sink1, 1.second) should be(42)
283
sink3.request(1)
284
sink3.expectNext(42)
285
sink3.expectComplete()
286
```
287
288
### Testing Complex Flows with Multiple Stages
289
290
```scala
291
val complexFlow = Flow[String]
292
.map(_.toLowerCase)
293
.filter(_.nonEmpty)
294
.groupedWithin(3, 1.second)
295
.map(_.mkString(","))
296
297
val (sourceProbe, sinkProbe) = TestSource[String]()
298
.via(complexFlow)
299
.toMat(TestSink[String]())(Keep.both)
300
.run()
301
302
// Send test data
303
sourceProbe.sendNext("HELLO")
304
sourceProbe.sendNext("") // Will be filtered out
305
sourceProbe.sendNext("WORLD")
306
sourceProbe.sendNext("TEST")
307
sourceProbe.sendComplete()
308
309
// Verify grouped output
310
sinkProbe.request(2)
311
sinkProbe.expectNext("hello,world,test") // Grouped and joined
312
sinkProbe.expectComplete()
313
```