or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

graph-stage-testing.mdindex.mdstream-lifecycle.mdtest-publishers.mdtest-sources-sinks.mdtest-subscribers.md

stream-lifecycle.mddocs/

0

# Stream Lifecycle Management

1

2

Utilities for asserting that stream processing stages are properly cleaned up after test execution. These tools help detect resource leaks and ensure that all stream components terminate correctly, which is essential for reliable testing and production deployments.

3

4

## Capabilities

5

6

### StreamTestKit (Scala DSL)

7

8

Provides utilities for asserting proper cleanup of stream stages in Scala applications.

9

10

```scala { .api }

11

object StreamTestKit {

12

/**

13

* Asserts that after the given code block is run, no stages are left over

14

* that were created by the given materializer.

15

*

16

* This assertion is useful to check that all of the stages have

17

* terminated successfully.

18

*

19

* @param block The code block to execute and monitor

20

* @param materializer The materializer to check for remaining stages

21

* @return The result of executing the block

22

*/

23

def assertAllStagesStopped[T](block: => T)(implicit materializer: Materializer): T

24

}

25

```

26

27

**Usage Examples:**

28

29

```scala

30

import akka.stream.scaladsl.{Source, Sink}

31

import akka.stream.testkit.scaladsl.StreamTestKit

32

33

implicit val materializer = ActorMaterializer()

34

35

// Test that stream completes and cleans up properly

36

StreamTestKit.assertAllStagesStopped {

37

val result = Source(1 to 100)

38

.map(_ * 2)

39

.filter(_ > 50)

40

.runWith(Sink.seq)

41

42

Await.result(result, 5.seconds)

43

}

44

45

// Test that failed streams also clean up properly

46

StreamTestKit.assertAllStagesStopped {

47

val result = Source(1 to 10)

48

.map(x => if (x == 5) throw new RuntimeException("test") else x)

49

.runWith(Sink.ignore)

50

51

intercept[RuntimeException] {

52

Await.result(result, 5.seconds)

53

}

54

}

55

56

// Test complex stream graphs clean up

57

StreamTestKit.assertAllStagesStopped {

58

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>

59

import GraphDSL.Implicits._

60

61

val source = Source(1 to 10)

62

val broadcast = builder.add(Broadcast[Int](2))

63

val merge = builder.add(Merge[Int](2))

64

val sink = Sink.foreach(println)

65

66

source ~> broadcast ~> Flow[Int].map(_ * 2) ~> merge ~> sink

67

broadcast ~> Flow[Int].map(_ * 3) ~> merge

68

69

ClosedShape

70

})

71

72

graph.run()

73

}

74

```

75

76

### StreamTestKit (Java API)

77

78

Provides utilities for asserting proper cleanup of stream stages in Java applications.

79

80

```scala { .api }

81

object StreamTestKit {

82

/**

83

* Assert that there are no stages running under a given materializer.

84

* Usually this assertion is run after a test-case to check that all of the

85

* stages have terminated successfully.

86

*/

87

def assertAllStagesStopped(mat: Materializer): Unit

88

89

/**

90

* Assert that there are no stages running under a given system's materializer.

91

* Usually this assertion is run after a test-case to check that all of the

92

* stages have terminated successfully.

93

*/

94

def assertAllStagesStopped(system: ClassicActorSystemProvider): Unit

95

}

96

```

97

98

**Usage Examples (Java):**

99

100

```java

101

import akka.actor.ActorSystem;

102

import akka.stream.javadsl.Source;

103

import akka.stream.javadsl.Sink;

104

import akka.stream.testkit.javadsl.StreamTestKit;

105

106

ActorSystem system = ActorSystem.create("test");

107

108

// Test that stream completes and cleans up properly

109

CompletionStage<List<Integer>> result = Source.range(1, 100)

110

.map(x -> x * 2)

111

.filter(x -> x > 50)

112

.runWith(Sink.seq(), system);

113

114

// Wait for completion

115

List<Integer> values = result.toCompletableFuture().get(5, TimeUnit.SECONDS);

116

117

// Assert all stages stopped

118

StreamTestKit.assertAllStagesStopped(system);

119

120

// Alternative using explicit materializer

121

Materializer mat = SystemMaterializer.get(system).materializer();

122

StreamTestKit.assertAllStagesStopped(mat);

123

```

124

125

## Internal Implementation Details

126

127

The StreamTestKit internally works with the PhasedFusingActorMaterializer to track and verify stage cleanup.

128

129

```scala { .api }

130

// Internal API - exposed for understanding but not for direct use

131

object StreamTestKit {

132

/**

133

* INTERNAL API: Stop all children of the stream supervisor

134

*/

135

@InternalApi

136

private[testkit] def stopAllChildren(sys: ActorSystem, supervisor: ActorRef): Unit

137

138

/**

139

* INTERNAL API: Assert that no children remain under the stream supervisor

140

*/

141

@InternalApi

142

private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit

143

144

/**

145

* INTERNAL API: Print debug information about running streams

146

*/

147

@InternalApi

148

private[akka] def printDebugDump(streamSupervisor: ActorRef)(implicit ec: ExecutionContext): Unit

149

150

/**

151

* INTERNAL API: Convert stream snapshot to string representation

152

*/

153

@InternalApi

154

private[testkit] def snapshotString(snapshot: StreamSnapshotImpl): String

155

}

156

```

157

158

## Testing Patterns

159

160

### Basic Stream Lifecycle Testing

161

162

```scala

163

import akka.stream.testkit.scaladsl.StreamTestKit

164

165

class StreamLifecycleSpec extends AnyFlatSpec with Matchers {

166

implicit val system = ActorSystem("test")

167

implicit val materializer = ActorMaterializer()

168

169

"Simple stream" should "clean up properly" in {

170

StreamTestKit.assertAllStagesStopped {

171

Source(1 to 10)

172

.map(_ * 2)

173

.runWith(Sink.ignore)

174

.futureValue

175

}

176

}

177

}

178

```

179

180

### Testing Failed Streams

181

182

```scala

183

"Failed stream" should "still clean up properly" in {

184

StreamTestKit.assertAllStagesStopped {

185

val result = Source(1 to 10)

186

.map(x => if (x == 5) throw new RuntimeException("test") else x)

187

.runWith(Sink.seq)

188

189

// Exception should be thrown, but cleanup should still happen

190

intercept[RuntimeException] {

191

Await.result(result, 3.seconds)

192

}

193

}

194

}

195

```

196

197

### Testing Infinite Streams with Cancellation

198

199

```scala

200

"Cancelled infinite stream" should "clean up properly" in {

201

StreamTestKit.assertAllStagesStopped {

202

val cancellable = Source.repeat(1)

203

.throttle(1, 100.millis)

204

.to(Sink.ignore)

205

.run()

206

207

// Let it run briefly then cancel

208

Thread.sleep(500)

209

cancellable.cancel()

210

211

// Give time for cleanup

212

Thread.sleep(200)

213

}

214

}

215

```

216

217

### Testing Complex Graph Topologies

218

219

```scala

220

"Complex graph" should "clean up all stages" in {

221

StreamTestKit.assertAllStagesStopped {

222

val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>

223

import GraphDSL.Implicits._

224

225

val source = Source(1 to 100)

226

val bcast = builder.add(Broadcast[Int](3))

227

val merge = builder.add(Merge[String](3))

228

val sink = Sink.foreach[String](println)

229

230

source ~> bcast ~> Flow[Int].map(_.toString) ~> merge ~> sink

231

bcast ~> Flow[Int].map(x => s"doubled: ${x * 2}") ~> merge

232

bcast ~> Flow[Int].map(x => s"tripled: ${x * 3}") ~> merge

233

234

ClosedShape

235

})

236

237

graph.run().futureValue

238

}

239

}

240

```

241

242

### Testing With Custom Materializer

243

244

```scala

245

"Stream with custom materializer" should "clean up properly" in {

246

val customMaterializer = ActorMaterializer(

247

ActorMaterializerSettings(system)

248

.withInputBuffer(16, 32)

249

)

250

251

StreamTestKit.assertAllStagesStopped {

252

Source(1 to 1000)

253

.grouped(10)

254

.map(_.sum)

255

.runWith(Sink.seq)(customMaterializer)

256

.futureValue

257

}(customMaterializer)

258

}

259

```

260

261

### Combining with Other Test Utilities

262

263

```scala

264

"Stream with test probes" should "clean up properly" in {

265

StreamTestKit.assertAllStagesStopped {

266

val (sourceProbe, sinkProbe) = TestSource[String]()

267

.map(_.toUpperCase)

268

.toMat(TestSink[String]())(Keep.both)

269

.run()

270

271

sourceProbe.sendNext("hello")

272

sourceProbe.sendNext("world")

273

sourceProbe.sendComplete()

274

275

sinkProbe.request(2)

276

sinkProbe.expectNext("HELLO", "WORLD")

277

sinkProbe.expectComplete()

278

}

279

}

280

```

281

282

## Configuration

283

284

The StreamTestKit behavior can be configured through Akka configuration:

285

286

```hocon

287

akka.stream.testkit {

288

# Timeout for asserting all stages have stopped

289

all-stages-stopped-timeout = 5s

290

}

291

```

292

293

## Best Practices

294

295

### Always Use in Test Cleanup

296

297

```scala

298

class StreamSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach {

299

implicit val system = ActorSystem("test")

300

implicit val materializer = ActorMaterializer()

301

302

override def afterEach(): Unit = {

303

// Ensure cleanup after each test

304

StreamTestKit.assertAllStagesStopped {

305

// Any remaining cleanup code

306

}

307

}

308

}

309

```

310

311

### Combine with ScalaTest Integration

312

313

```scala

314

trait StreamTestKit extends TestSuite {

315

implicit def system: ActorSystem

316

implicit def materializer: Materializer

317

318

def withStreamCleanup[T](block: => T): T = {

319

akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped(block)

320

}

321

}

322

323

class MyStreamSpec extends AnyFlatSpec with StreamTestKit {

324

implicit val system = ActorSystem("test")

325

implicit val materializer = ActorMaterializer()

326

327

"My stream" should "work correctly" in withStreamCleanup {

328

// Test code here

329

}

330

}

331

```

332

333

### Handle Timeouts Gracefully

334

335

```scala

336

"Long running stream" should "eventually clean up" in {

337

// Increase timeout for complex streams

338

implicit val patience = PatienceConfig(timeout = 10.seconds)

339

340

StreamTestKit.assertAllStagesStopped {

341

Source(1 to 10000)

342

.throttle(100, 1.second)

343

.runWith(Sink.ignore)

344

.futureValue

345

}

346

}

347

```