CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-stream-testkit-3

Akka Stream TestKit provides utilities for testing Akka Streams applications with controllable test sources, sinks, and assertion capabilities

Pending
Overview
Eval results
Files

stream-lifecycle.mddocs/

Stream Lifecycle Management

Utilities for asserting that stream processing stages are properly cleaned up after test execution. These tools help detect resource leaks and ensure that all stream components terminate correctly, which is essential for reliable testing and production deployments.

Capabilities

StreamTestKit (Scala DSL)

Provides utilities for asserting proper cleanup of stream stages in Scala applications.

object StreamTestKit {
  /**
   * Asserts that after the given code block is run, no stages are left over
   * that were created by the given materializer.
   * 
   * This assertion is useful to check that all of the stages have
   * terminated successfully.
   * 
   * @param block The code block to execute and monitor
   * @param materializer The materializer to check for remaining stages
   * @return The result of executing the block
   */
  def assertAllStagesStopped[T](block: => T)(implicit materializer: Materializer): T
}

Usage Examples:

import akka.stream.scaladsl.{Source, Sink}
import akka.stream.testkit.scaladsl.StreamTestKit

implicit val materializer = ActorMaterializer()

// Test that stream completes and cleans up properly
StreamTestKit.assertAllStagesStopped {
  val result = Source(1 to 100)
    .map(_ * 2)
    .filter(_ > 50)
    .runWith(Sink.seq)
    
  Await.result(result, 5.seconds)
}

// Test that failed streams also clean up properly
StreamTestKit.assertAllStagesStopped {
  val result = Source(1 to 10)
    .map(x => if (x == 5) throw new RuntimeException("test") else x)
    .runWith(Sink.ignore)
    
  intercept[RuntimeException] {
    Await.result(result, 5.seconds)
  }
}

// Test complex stream graphs clean up
StreamTestKit.assertAllStagesStopped {
  val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
    import GraphDSL.Implicits._
    
    val source = Source(1 to 10)
    val broadcast = builder.add(Broadcast[Int](2))
    val merge = builder.add(Merge[Int](2))
    val sink = Sink.foreach(println)
    
    source ~> broadcast ~> Flow[Int].map(_ * 2) ~> merge ~> sink
              broadcast ~> Flow[Int].map(_ * 3) ~> merge
    
    ClosedShape
  })
  
  graph.run()
}

StreamTestKit (Java API)

Provides utilities for asserting proper cleanup of stream stages in Java applications.

object StreamTestKit {
  /**
   * Assert that there are no stages running under a given materializer.
   * Usually this assertion is run after a test-case to check that all of the
   * stages have terminated successfully.
   */
  def assertAllStagesStopped(mat: Materializer): Unit
  
  /**
   * Assert that there are no stages running under a given system's materializer.
   * Usually this assertion is run after a test-case to check that all of the
   * stages have terminated successfully.
   */
  def assertAllStagesStopped(system: ClassicActorSystemProvider): Unit
}

Usage Examples (Java):

import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Sink;
import akka.stream.testkit.javadsl.StreamTestKit;

ActorSystem system = ActorSystem.create("test");

// Test that stream completes and cleans up properly
CompletionStage<List<Integer>> result = Source.range(1, 100)
    .map(x -> x * 2)
    .filter(x -> x > 50)
    .runWith(Sink.seq(), system);

// Wait for completion
List<Integer> values = result.toCompletableFuture().get(5, TimeUnit.SECONDS);

// Assert all stages stopped
StreamTestKit.assertAllStagesStopped(system);

// Alternative using explicit materializer
Materializer mat = SystemMaterializer.get(system).materializer();
StreamTestKit.assertAllStagesStopped(mat);

Internal Implementation Details

The StreamTestKit internally works with the PhasedFusingActorMaterializer to track and verify stage cleanup.

// Internal API - exposed for understanding but not for direct use
object StreamTestKit {
  /**
   * INTERNAL API: Stop all children of the stream supervisor
   */
  @InternalApi 
  private[testkit] def stopAllChildren(sys: ActorSystem, supervisor: ActorRef): Unit
  
  /**
   * INTERNAL API: Assert that no children remain under the stream supervisor
   */
  @InternalApi 
  private[testkit] def assertNoChildren(sys: ActorSystem, supervisor: ActorRef): Unit
  
  /**
   * INTERNAL API: Print debug information about running streams
   */
  @InternalApi 
  private[akka] def printDebugDump(streamSupervisor: ActorRef)(implicit ec: ExecutionContext): Unit
  
  /**
   * INTERNAL API: Convert stream snapshot to string representation
   */
  @InternalApi 
  private[testkit] def snapshotString(snapshot: StreamSnapshotImpl): String
}

Testing Patterns

Basic Stream Lifecycle Testing

import akka.stream.testkit.scaladsl.StreamTestKit

class StreamLifecycleSpec extends AnyFlatSpec with Matchers {
  implicit val system = ActorSystem("test")
  implicit val materializer = ActorMaterializer()
  
  "Simple stream" should "clean up properly" in {
    StreamTestKit.assertAllStagesStopped {
      Source(1 to 10)
        .map(_ * 2)
        .runWith(Sink.ignore)
        .futureValue
    }
  }
}

Testing Failed Streams

"Failed stream" should "still clean up properly" in {
  StreamTestKit.assertAllStagesStopped {
    val result = Source(1 to 10)
      .map(x => if (x == 5) throw new RuntimeException("test") else x)
      .runWith(Sink.seq)
    
    // Exception should be thrown, but cleanup should still happen
    intercept[RuntimeException] {
      Await.result(result, 3.seconds) 
    }
  }
}

Testing Infinite Streams with Cancellation

"Cancelled infinite stream" should "clean up properly" in {
  StreamTestKit.assertAllStagesStopped {
    val cancellable = Source.repeat(1)
      .throttle(1, 100.millis)
      .to(Sink.ignore)
      .run()
    
    // Let it run briefly then cancel
    Thread.sleep(500)
    cancellable.cancel()
    
    // Give time for cleanup
    Thread.sleep(200)
  }
}

Testing Complex Graph Topologies

"Complex graph" should "clean up all stages" in {
  StreamTestKit.assertAllStagesStopped {
    val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._
      
      val source = Source(1 to 100)
      val bcast = builder.add(Broadcast[Int](3))
      val merge = builder.add(Merge[String](3))
      val sink = Sink.foreach[String](println)
      
      source ~> bcast ~> Flow[Int].map(_.toString) ~> merge ~> sink
                bcast ~> Flow[Int].map(x => s"doubled: ${x * 2}") ~> merge
                bcast ~> Flow[Int].map(x => s"tripled: ${x * 3}") ~> merge
      
      ClosedShape
    })
    
    graph.run().futureValue
  }
}

Testing With Custom Materializer

"Stream with custom materializer" should "clean up properly" in {
  val customMaterializer = ActorMaterializer(
    ActorMaterializerSettings(system)
      .withInputBuffer(16, 32)
  )
  
  StreamTestKit.assertAllStagesStopped {
    Source(1 to 1000)
      .grouped(10)
      .map(_.sum)
      .runWith(Sink.seq)(customMaterializer)
      .futureValue
  }(customMaterializer)
}

Combining with Other Test Utilities

"Stream with test probes" should "clean up properly" in {
  StreamTestKit.assertAllStagesStopped {
    val (sourceProbe, sinkProbe) = TestSource[String]()
      .map(_.toUpperCase)
      .toMat(TestSink[String]())(Keep.both)
      .run()
    
    sourceProbe.sendNext("hello")
    sourceProbe.sendNext("world")
    sourceProbe.sendComplete()
    
    sinkProbe.request(2)
    sinkProbe.expectNext("HELLO", "WORLD")
    sinkProbe.expectComplete()
  }
}

Configuration

The StreamTestKit behavior can be configured through Akka configuration:

akka.stream.testkit {
  # Timeout for asserting all stages have stopped
  all-stages-stopped-timeout = 5s
}

Best Practices

Always Use in Test Cleanup

class StreamSpec extends AnyFlatSpec with Matchers with BeforeAndAfterEach {
  implicit val system = ActorSystem("test")
  implicit val materializer = ActorMaterializer()
  
  override def afterEach(): Unit = {
    // Ensure cleanup after each test
    StreamTestKit.assertAllStagesStopped {
      // Any remaining cleanup code
    }
  }
}

Combine with ScalaTest Integration

trait StreamTestKit extends TestSuite {
  implicit def system: ActorSystem
  implicit def materializer: Materializer
  
  def withStreamCleanup[T](block: => T): T = {
    akka.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped(block)
  }
}

class MyStreamSpec extends AnyFlatSpec with StreamTestKit {
  implicit val system = ActorSystem("test")
  implicit val materializer = ActorMaterializer()
  
  "My stream" should "work correctly" in withStreamCleanup {
    // Test code here
  }
}

Handle Timeouts Gracefully

"Long running stream" should "eventually clean up" in {
  // Increase timeout for complex streams
  implicit val patience = PatienceConfig(timeout = 10.seconds)
  
  StreamTestKit.assertAllStagesStopped {
    Source(1 to 10000)
      .throttle(100, 1.second)
      .runWith(Sink.ignore)
      .futureValue
  }
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-stream-testkit-3

docs

graph-stage-testing.md

index.md

stream-lifecycle.md

test-publishers.md

test-sources-sinks.md

test-subscribers.md

tile.json