Akka Stream TestKit provides utilities for testing Akka Streams applications with controllable test sources, sinks, and assertion capabilities
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-stream-testkit-3@2.8.0Akka Stream TestKit provides comprehensive testing utilities for Akka Streams applications. It offers controllable test sources and sinks, assertion capabilities for stream behavior verification, and utilities for testing custom graph stages and stream lifecycle management.
"com.typesafe.akka" %% "akka-stream-testkit" % "2.8.8" % Testimport akka.stream.testkit.scaladsl.{TestSource, TestSink, StreamTestKit}
import akka.stream.testkit.{TestPublisher, TestSubscriber, TestSinkStage, TestSourceStage}
import akka.stream.testkit.GraphStageMessagesimport akka.stream.testkit.javadsl.TestSource;
import akka.stream.testkit.javadsl.TestSink;
import akka.stream.testkit.javadsl.StreamTestKit;
import akka.stream.testkit.TestPublisher;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.TestSinkStage;
import akka.stream.testkit.TestSourceStage;
import akka.stream.testkit.GraphStageMessages;import akka.actor.ActorSystem
import akka.stream.scaladsl.{Source, Sink, Keep}
import akka.stream.testkit.scaladsl.{TestSource, TestSink}
implicit val system = ActorSystem()
// Create a test source
val (pub, source) = TestSource[Int]().preMaterialize()
// Create a test sink
val sink = TestSink[Int]()
// Test a simple flow
val (probe, sinkProbe) = source
.map(_ * 2)
.toMat(sink)(Keep.both)
.run()
// Send elements and verify
pub.sendNext(1)
pub.sendNext(2)
pub.sendComplete()
sinkProbe.request(2)
sinkProbe.expectNext(2, 4)
sinkProbe.expectComplete()Akka Stream TestKit is built around several key components:
Publisher utilities for creating controllable upstream sources in tests, with demand tracking and element injection capabilities.
object TestPublisher {
def empty[T](): Publisher[T]
def lazyEmpty[T]: Publisher[T]
def error[T](cause: Throwable): Publisher[T]
def lazyError[T](cause: Throwable): Publisher[T]
def manualProbe[T](autoOnSubscribe: Boolean = true)(implicit system: ActorSystem): ManualProbe[T]
def probe[T](initialPendingRequests: Long = 0)(implicit system: ActorSystem): Probe[T]
// Factory methods with ClassicActorSystemProvider
object ManualProbe {
def apply[T](autoOnSubscribe: Boolean = true)(implicit system: ClassicActorSystemProvider): ManualProbe[T]
}
}Subscriber utilities for creating controllable downstream sinks in tests, with element expectation and timing assertion capabilities.
object TestSubscriber {
def manualProbe[T]()(implicit system: ActorSystem): ManualProbe[T]
def probe[T]()(implicit system: ActorSystem): Probe[T]
// Factory methods with ClassicActorSystemProvider
object ManualProbe {
def apply[T]()(implicit system: ClassicActorSystemProvider): ManualProbe[T]
}
}Factory methods for creating Source and Sink instances that materialize to test probes, providing the primary interface for stream testing.
// Scala DSL
object TestSource {
def apply[T]()(implicit system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]]
@deprecated("Use `TestSource()` with implicit ClassicActorSystemProvider instead.", "2.7.0")
def probe[T](implicit system: ActorSystem): Source[T, TestPublisher.Probe[T]]
}
object TestSink {
def apply[T]()(implicit system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]]
@deprecated("Use `TestSink()` with implicit ClassicActorSystemProvider instead.", "2.7.0")
def probe[T](implicit system: ActorSystem): Sink[T, TestSubscriber.Probe[T]]
}
// Java API
object TestSource {
def create[T](system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]]
@deprecated("Use `TestSource.create` with ClassicActorSystemProvider instead.", "2.7.0")
def probe[T](system: ActorSystem): Source[T, TestPublisher.Probe[T]]
}
object TestSink {
def create[T](system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]]
@deprecated("Use `TestSink.create` with ClassicActorSystemProvider instead.", "2.7.0")
def probe[T](system: ActorSystem): Sink[T, TestSubscriber.Probe[T]]
}Utilities for testing custom GraphStage implementations by wrapping them with monitoring capabilities that emit events to test probes.
object TestSinkStage {
def apply[T, M](stageUnderTest: GraphStageWithMaterializedValue[SinkShape[T], M], probe: TestProbe): Sink[T, M]
}
object TestSourceStage {
def apply[T, M](stageUnderTest: GraphStageWithMaterializedValue[SourceShape[T], M], probe: TestProbe): Source[T, M]
}
// Graph Stage Messages
object GraphStageMessages {
sealed trait StageMessage
case object Push extends StageMessage
case object Pull extends StageMessage
case object UpstreamFinish extends StageMessage
case object DownstreamFinish extends StageMessage
case class Failure(ex: Throwable) extends StageMessage
case class StageFailure(operation: StageMessage, exception: Throwable)
}Utilities for asserting that stream processing stages are properly cleaned up after test execution, helping detect resource leaks.
// Scala DSL
object StreamTestKit {
def assertAllStagesStopped[T](block: => T)(implicit materializer: Materializer): T
}
// Java API
object StreamTestKit {
def assertAllStagesStopped(mat: Materializer): Unit
def assertAllStagesStopped(system: ClassicActorSystemProvider): Unit
}// Publisher Events
sealed trait PublisherEvent
case class Subscribe(subscription: Subscription) extends PublisherEvent
case class CancelSubscription(subscription: Subscription, cause: Throwable) extends PublisherEvent
case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent
// Subscriber Events
sealed trait SubscriberEvent
case class OnSubscribe(subscription: Subscription) extends SubscriberEvent
case class OnNext[I](element: I) extends SubscriberEvent
case object OnComplete extends SubscriberEvent
case class OnError(cause: Throwable) extends SubscriberEvent
// Graph Stage Messages
sealed trait StageMessage
case object Push extends StageMessage
case object Pull extends StageMessage
case object UpstreamFinish extends StageMessage
case object DownstreamFinish extends StageMessage
case class Failure(ex: Throwable) extends StageMessage
case class StageFailure(operation: StageMessage, exception: Throwable)
// Probe Classes
abstract class TestPublisher {
abstract class ManualProbe[I] extends Publisher[I] {
def expectSubscription(): Subscription
def expectRequest(): Long
def expectRequest(n: Long): Unit
def expectCancellation(): Unit
def expectCancellationWithCause(): Throwable
def expectNoMessage(): Unit
def expectNoMessage(remaining: FiniteDuration): Unit
def sendNext(element: I): Unit
def sendComplete(): Unit
def sendError(cause: Throwable): Unit
def getSubscriber: Subscriber[_ >: I]
}
abstract class Probe[I] extends ManualProbe[I] {
def sendNext(element: I): Unit
def sendComplete(): Unit
def sendError(cause: Throwable): Unit
def expectRequest(): Long
def expectRequest(n: Long): Unit
def pending: Long
}
}
abstract class TestSubscriber {
abstract class ManualProbe[I] extends Subscriber[I] {
def expectSubscription(): Subscription
def expectNext(): I
def expectNext(element: I): Unit
def expectNext(d: FiniteDuration, element: I): Unit
def expectNext(e1: I, e2: I, es: I*): Unit
def expectNextN(n: Long): immutable.Seq[I]
def expectNextN(all: immutable.Seq[I]): Unit
def expectNextUnordered(e1: I, e2: I, es: I*): Unit
def expectNextUnorderedN(all: immutable.Seq[I]): Unit
def expectNextOrError(): Either[Throwable, I]
def expectNextOrComplete(): Either[OnComplete.type, I]
def expectComplete(): Unit
def expectError(): Throwable
def expectError(cause: Throwable): Unit
def expectNoMessage(): Unit
def expectNoMessage(remaining: FiniteDuration): Unit
def request(n: Long): Unit
def cancel(): Unit
}
abstract class Probe[I] extends ManualProbe[I] {
def request(n: Long): Unit
def requestNext(): I
def requestNext(element: I): Unit
}
}