Akka Stream TestKit provides utilities for testing Akka Streams applications with controllable test sources, sinks, and assertion capabilities
—
Utilities for asserting that stream processing stages are properly cleaned up after test execution. These tools help detect resource leaks and ensure that all stream components terminate correctly, which is essential for reliable testing and production deployments.
Provides utilities for asserting proper cleanup of stream stages in Scala applications.
object StreamTestKit {
/**
* Asserts that after the given code block is run, no stages are left over
* that were created by the given materializer.
*
* This assertion is useful to check that all of the stages have
* terminated successfully.
*
* @param block The code block to execute and monitor
* @param materializer The materializer to check for remaining stages
* @return The result of executing the block
*/
def assertAllStagesStopped[T](block: => T)(implicit materializer: Materializer): T
}Usage Examples:
import akka.stream.scaladsl.{Source, Sink}
import akka.stream.testkit.scaladsl.StreamTestKit
implicit val materializer = ActorMaterializer()
// Test that stream completes and cleans up properly
StreamTestKit.assertAllStagesStopped {
val result = Source(1 to 100)
.map(_ * 2)
.filter(_ > 50)
.runWith(Sink.seq)
Await.result(result, 5.seconds)
}
// Test that failed streams also clean up properly
StreamTestKit.assertAllStagesStopped {
val result = Source(1 to 10)
.map(x => if (x == 5) throw new RuntimeException("test") else x)
.runWith(Sink.ignore)
intercept[RuntimeException] {
Await.result(result, 5.seconds)
}
}
// Test complex stream graphs clean up
StreamTestKit.assertAllStagesStopped {
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source = Source(1 to 10)
val broadcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val sink = Sink.foreach(println)
source ~> broadcast ~> Flow[Int].map(_ * 2) ~> merge ~> sink
broadcast ~> Flow[Int].map(_ * 3) ~> merge
ClosedShape
})
graph.run()
}Provides utilities for asserting proper cleanup of stream stages in Java applications.
object StreamTestKit {
/**
* Assert that there are no stages running under a given materializer.
* Usually this assertion is run after a test-case to check that all of the
* stages have terminated successfully.
*/
def assertAllStagesStopped(mat: Materializer): Unit
/**
* Assert that there are no stages running under a given system's materializer.
* Usually this assertion is run after a test-case to check that all of the
* stages have terminated successfully.
*/
def assertAllStagesStopped(system: ClassicActorSystemProvider): Unit
}Usage Examples (Java):
import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Sink;
import akka.stream.testkit.javadsl.StreamTestKit;
ActorSystem system = ActorSystem.create("test");
// Test that stream completes and cleans up properly
CompletionStage<List<Integer>> result = Source.range(1, 100)
.map(x -> x * 2)
.filter(x -> x > 50)
.runWith(Sink.seq(), system);
// Wait for completion
List<Integer> values = result.toCompletableFuture().get(5, TimeUnit.SECONDS);
// Assert all stages stopped
StreamTestKit.assertAllStagesStopped(system);
// Alternative using explicit materializer
Materializer mat = SystemMaterializer.get(system).materializer();
StreamTestKit.assertAllStagesStopped(mat);The StreamTestKit internally works with the PhasedFusingActorMaterializer to track and verify stage cleanup.
// Internal API - exposed for understanding but not for direct use
object StreamTestKit {
/**
* INTERNAL API: Stop all children of the stream supervisor
*/
@InternalApi
private[testkit] def stopAllChildren(sys: ActorSystem, supervisor: ActorRef): Unit
/**
* INTERNAL API: Assert that no children remain under the stream supervisor
*/
@InternalApi
private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit
/**
* INTERNAL API: Print debug information about running streams
*/
@InternalApi
private[akka] def printDebugDump(streamSupervisor: ActorRef)(implicit ec: ExecutionContext): Unit
/**
* INTERNAL API: Convert stream snapshot to string representation
*/
@InternalApi
private[testkit] def snapshotString(snapshot: StreamSnapshotImpl): String
}import akka.stream.testkit.scaladsl.StreamTestKit
class StreamLifecycleSpec extends AnyFlatSpec with Matchers {
implicit val system = ActorSystem("test")
implicit val materializer = ActorMaterializer()
"Simple stream" should "clean up properly" in {
StreamTestKit.assertAllStagesStopped {
Source(1 to 10)
.map(_ * 2)
.runWith(Sink.ignore)
.futureValue
}
}
}"Failed stream" should "still clean up properly" in {
StreamTestKit.assertAllStagesStopped {
val result = Source(1 to 10)
.map(x => if (x == 5) throw new RuntimeException("test") else x)
.runWith(Sink.seq)
// Exception should be thrown, but cleanup should still happen
intercept[RuntimeException] {
Await.result(result, 3.seconds)
}
}
}"Cancelled infinite stream" should "clean up properly" in {
StreamTestKit.assertAllStagesStopped {
val cancellable = Source.repeat(1)
.throttle(1, 100.millis)
.to(Sink.ignore)
.run()
// Let it run briefly then cancel
Thread.sleep(500)
cancellable.cancel()
// Give time for cleanup
Thread.sleep(200)
}
}"Complex graph" should "clean up all stages" in {
StreamTestKit.assertAllStagesStopped {
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source = Source(1 to 100)
val bcast = builder.add(Broadcast[Int](3))
val merge = builder.add(Merge[String](3))
val sink = Sink.foreach[String](println)
source ~> bcast ~> Flow[Int].map(_.toString) ~> merge ~> sink
bcast ~> Flow[Int].map(x => s"doubled: ${x * 2}") ~> merge
bcast ~> Flow[Int].map(x => s"tripled: ${x * 3}") ~> merge
ClosedShape
})
graph.run().futureValue
}
}"Stream with custom materializer" should "clean up properly" in {
val customMaterializer = ActorMaterializer(
ActorMaterializerSettings(system)
.withInputBuffer(16, 32)
)
StreamTestKit.assertAllStagesStopped {
Source(1 to 1000)
.grouped(10)
.map(_.sum)
.runWith(Sink.seq)(customMaterializer)
.futureValue
}(customMaterializer)
}"Stream with test probes" should "clean up properly" in {
StreamTestKit.assertAllStagesStopped {
val (sourceProbe, sinkProbe) = TestSource[String]()
.map(_.toUpperCase)
.toMat(TestSink[String]())(Keep.both)
.run()
sourceProbe.sendNext("hello")
sourceProbe.sendNext("world")
sourceProbe.sendComplete()
sinkProbe.request(2)
sinkProbe.expectNext("HELLO", "WORLD")
sinkProbe.expectComplete()
}
}The StreamTestKit behavior can be configured through Akka configuration:
akka.stream.testkit {
# Timeout for asserting all stages have stopped
all-stages-stopped-timeout = 5s
}class StreamSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach {
implicit val system = ActorSystem("test")
implicit val materializer = ActorMaterializer()
override def afterEach(): Unit = {
// Ensure cleanup after each test
StreamTestKit.assertAllStagesStopped {
// Any remaining cleanup code
}
}
}trait StreamTestKit extends TestSuite {
implicit def system: ActorSystem
implicit def materializer: Materializer
def withStreamCleanup[T](block: => T): T = {
akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped(block)
}
}
class MyStreamSpec extends AnyFlatSpec with StreamTestKit {
implicit val system = ActorSystem("test")
implicit val materializer = ActorMaterializer()
"My stream" should "work correctly" in withStreamCleanup {
// Test code here
}
}"Long running stream" should "eventually clean up" in {
// Increase timeout for complex streams
implicit val patience = PatienceConfig(timeout = 10.seconds)
StreamTestKit.assertAllStagesStopped {
Source(1 to 10000)
.throttle(100, 1.second)
.runWith(Sink.ignore)
.futureValue
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-stream-testkit-3