0
# Stream Lifecycle Management
1
2
Utilities for asserting that stream processing stages are properly cleaned up after test execution. These tools help detect resource leaks and ensure that all stream components terminate correctly, which is essential for reliable testing and production deployments.
3
4
## Capabilities
5
6
### StreamTestKit (Scala DSL)
7
8
Provides utilities for asserting proper cleanup of stream stages in Scala applications.
9
10
```scala { .api }
11
object StreamTestKit {
12
/**
13
* Asserts that after the given code block is run, no stages are left over
14
* that were created by the given materializer.
15
*
16
* This assertion is useful to check that all of the stages have
17
* terminated successfully.
18
*
19
* @param block The code block to execute and monitor
20
* @param materializer The materializer to check for remaining stages
21
* @return The result of executing the block
22
*/
23
def assertAllStagesStopped[T](block: => T)(implicit materializer: Materializer): T
24
}
25
```
26
27
**Usage Examples:**
28
29
```scala
30
import akka.stream.scaladsl.{Source, Sink}
31
import akka.stream.testkit.scaladsl.StreamTestKit
32
33
implicit val materializer = ActorMaterializer()
34
35
// Test that stream completes and cleans up properly
36
StreamTestKit.assertAllStagesStopped {
37
val result = Source(1 to 100)
38
.map(_ * 2)
39
.filter(_ > 50)
40
.runWith(Sink.seq)
41
42
Await.result(result, 5.seconds)
43
}
44
45
// Test that failed streams also clean up properly
46
StreamTestKit.assertAllStagesStopped {
47
val result = Source(1 to 10)
48
.map(x => if (x == 5) throw new RuntimeException("test") else x)
49
.runWith(Sink.ignore)
50
51
intercept[RuntimeException] {
52
Await.result(result, 5.seconds)
53
}
54
}
55
56
// Test complex stream graphs clean up
57
StreamTestKit.assertAllStagesStopped {
58
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
59
import GraphDSL.Implicits._
60
61
val source = Source(1 to 10)
62
val broadcast = builder.add(Broadcast[Int](2))
63
val merge = builder.add(Merge[Int](2))
64
val sink = Sink.foreach(println)
65
66
source ~> broadcast ~> Flow[Int].map(_ * 2) ~> merge ~> sink
67
broadcast ~> Flow[Int].map(_ * 3) ~> merge
68
69
ClosedShape
70
})
71
72
graph.run()
73
}
74
```
75
76
### StreamTestKit (Java API)
77
78
Provides utilities for asserting proper cleanup of stream stages in Java applications.
79
80
```scala { .api }
81
object StreamTestKit {
82
/**
83
* Assert that there are no stages running under a given materializer.
84
* Usually this assertion is run after a test-case to check that all of the
85
* stages have terminated successfully.
86
*/
87
def assertAllStagesStopped(mat: Materializer): Unit
88
89
/**
90
* Assert that there are no stages running under a given system's materializer.
91
* Usually this assertion is run after a test-case to check that all of the
92
* stages have terminated successfully.
93
*/
94
def assertAllStagesStopped(system: ClassicActorSystemProvider): Unit
95
}
96
```
97
98
**Usage Examples (Java):**
99
100
```java
101
import akka.actor.ActorSystem;
102
import akka.stream.javadsl.Source;
103
import akka.stream.javadsl.Sink;
104
import akka.stream.testkit.javadsl.StreamTestKit;
105
106
ActorSystem system = ActorSystem.create("test");
107
108
// Test that stream completes and cleans up properly
109
CompletionStage<List<Integer>> result = Source.range(1, 100)
110
.map(x -> x * 2)
111
.filter(x -> x > 50)
112
.runWith(Sink.seq(), system);
113
114
// Wait for completion
115
List<Integer> values = result.toCompletableFuture().get(5, TimeUnit.SECONDS);
116
117
// Assert all stages stopped
118
StreamTestKit.assertAllStagesStopped(system);
119
120
// Alternative using explicit materializer
121
Materializer mat = SystemMaterializer.get(system).materializer();
122
StreamTestKit.assertAllStagesStopped(mat);
123
```
124
125
## Internal Implementation Details
126
127
The StreamTestKit internally works with the PhasedFusingActorMaterializer to track and verify stage cleanup.
128
129
```scala { .api }
130
// Internal API - exposed for understanding but not for direct use
131
object StreamTestKit {
132
/**
133
* INTERNAL API: Stop all children of the stream supervisor
134
*/
135
@InternalApi
136
private[testkit] def stopAllChildren(sys: ActorSystem, supervisor: ActorRef): Unit
137
138
/**
139
* INTERNAL API: Assert that no children remain under the stream supervisor
140
*/
141
@InternalApi
142
private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit
143
144
/**
145
* INTERNAL API: Print debug information about running streams
146
*/
147
@InternalApi
148
private[akka] def printDebugDump(streamSupervisor: ActorRef)(implicit ec: ExecutionContext): Unit
149
150
/**
151
* INTERNAL API: Convert stream snapshot to string representation
152
*/
153
@InternalApi
154
private[testkit] def snapshotString(snapshot: StreamSnapshotImpl): String
155
}
156
```
157
158
## Testing Patterns
159
160
### Basic Stream Lifecycle Testing
161
162
```scala
163
import akka.stream.testkit.scaladsl.StreamTestKit
164
165
class StreamLifecycleSpec extends AnyFlatSpec with Matchers {
166
implicit val system = ActorSystem("test")
167
implicit val materializer = ActorMaterializer()
168
169
"Simple stream" should "clean up properly" in {
170
StreamTestKit.assertAllStagesStopped {
171
Source(1 to 10)
172
.map(_ * 2)
173
.runWith(Sink.ignore)
174
.futureValue
175
}
176
}
177
}
178
```
179
180
### Testing Failed Streams
181
182
```scala
183
"Failed stream" should "still clean up properly" in {
184
StreamTestKit.assertAllStagesStopped {
185
val result = Source(1 to 10)
186
.map(x => if (x == 5) throw new RuntimeException("test") else x)
187
.runWith(Sink.seq)
188
189
// Exception should be thrown, but cleanup should still happen
190
intercept[RuntimeException] {
191
Await.result(result, 3.seconds)
192
}
193
}
194
}
195
```
196
197
### Testing Infinite Streams with Cancellation
198
199
```scala
200
"Cancelled infinite stream" should "clean up properly" in {
201
StreamTestKit.assertAllStagesStopped {
202
val cancellable = Source.repeat(1)
203
.throttle(1, 100.millis)
204
.to(Sink.ignore)
205
.run()
206
207
// Let it run briefly then cancel
208
Thread.sleep(500)
209
cancellable.cancel()
210
211
// Give time for cleanup
212
Thread.sleep(200)
213
}
214
}
215
```
216
217
### Testing Complex Graph Topologies
218
219
```scala
220
"Complex graph" should "clean up all stages" in {
221
StreamTestKit.assertAllStagesStopped {
222
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
223
import GraphDSL.Implicits._
224
225
val source = Source(1 to 100)
226
val bcast = builder.add(Broadcast[Int](3))
227
val merge = builder.add(Merge[String](3))
228
val sink = Sink.foreach[String](println)
229
230
source ~> bcast ~> Flow[Int].map(_.toString) ~> merge ~> sink
231
bcast ~> Flow[Int].map(x => s"doubled: ${x * 2}") ~> merge
232
bcast ~> Flow[Int].map(x => s"tripled: ${x * 3}") ~> merge
233
234
ClosedShape
235
})
236
237
graph.run().futureValue
238
}
239
}
240
```
241
242
### Testing With Custom Materializer
243
244
```scala
245
"Stream with custom materializer" should "clean up properly" in {
246
val customMaterializer = ActorMaterializer(
247
ActorMaterializerSettings(system)
248
.withInputBuffer(16, 32)
249
)
250
251
StreamTestKit.assertAllStagesStopped {
252
Source(1 to 1000)
253
.grouped(10)
254
.map(_.sum)
255
.runWith(Sink.seq)(customMaterializer)
256
.futureValue
257
}(customMaterializer)
258
}
259
```
260
261
### Combining with Other Test Utilities
262
263
```scala
264
"Stream with test probes" should "clean up properly" in {
265
StreamTestKit.assertAllStagesStopped {
266
val (sourceProbe, sinkProbe) = TestSource[String]()
267
.map(_.toUpperCase)
268
.toMat(TestSink[String]())(Keep.both)
269
.run()
270
271
sourceProbe.sendNext("hello")
272
sourceProbe.sendNext("world")
273
sourceProbe.sendComplete()
274
275
sinkProbe.request(2)
276
sinkProbe.expectNext("HELLO", "WORLD")
277
sinkProbe.expectComplete()
278
}
279
}
280
```
281
282
## Configuration
283
284
The StreamTestKit behavior can be configured through Akka configuration:
285
286
```hocon
287
akka.stream.testkit {
288
# Timeout for asserting all stages have stopped
289
all-stages-stopped-timeout = 5s
290
}
291
```
292
293
## Best Practices
294
295
### Always Use in Test Cleanup
296
297
```scala
298
class StreamSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach {
299
implicit val system = ActorSystem("test")
300
implicit val materializer = ActorMaterializer()
301
302
override def afterEach(): Unit = {
303
// Ensure cleanup after each test
304
StreamTestKit.assertAllStagesStopped {
305
// Any remaining cleanup code
306
}
307
}
308
}
309
```
310
311
### Combine with ScalaTest Integration
312
313
```scala
314
trait StreamTestKit extends TestSuite {
315
implicit def system: ActorSystem
316
implicit def materializer: Materializer
317
318
def withStreamCleanup[T](block: => T): T = {
319
akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped(block)
320
}
321
}
322
323
class MyStreamSpec extends AnyFlatSpec with StreamTestKit {
324
implicit val system = ActorSystem("test")
325
implicit val materializer = ActorMaterializer()
326
327
"My stream" should "work correctly" in withStreamCleanup {
328
// Test code here
329
}
330
}
331
```
332
333
### Handle Timeouts Gracefully
334
335
```scala
336
"Long running stream" should "eventually clean up" in {
337
// Increase timeout for complex streams
338
implicit val patience = PatienceConfig(timeout = 10.seconds)
339
340
StreamTestKit.assertAllStagesStopped {
341
Source(1 to 10000)
342
.throttle(100, 1.second)
343
.runWith(Sink.ignore)
344
.futureValue
345
}
346
}
347
```