or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

graph-stage-testing.mdindex.mdstream-lifecycle.mdtest-publishers.mdtest-sources-sinks.mdtest-subscribers.md
tile.json

tessl/maven-com-typesafe-akka--akka-stream-testkit-3

Akka Stream TestKit provides utilities for testing Akka Streams applications with controllable test sources, sinks, and assertion capabilities

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/com.typesafe.akka/akka-stream-testkit_3@2.8.x

To install, run

npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-stream-testkit-3@2.8.0

index.mddocs/

Akka Stream TestKit

Akka Stream TestKit provides comprehensive testing utilities for Akka Streams applications. It offers controllable test sources and sinks, assertion capabilities for stream behavior verification, and utilities for testing custom graph stages and stream lifecycle management.

Package Information

  • Package Name: akka-stream-testkit_3
  • Package Type: maven (SBT)
  • Language: Scala (with Java API)
  • Group ID: com.typesafe.akka
  • Artifact ID: akka-stream-testkit_3
  • Version: 2.8.8
  • Installation: "com.typesafe.akka" %% "akka-stream-testkit" % "2.8.8" % Test

Core Imports

Scala DSL

import akka.stream.testkit.scaladsl.{TestSource, TestSink, StreamTestKit}
import akka.stream.testkit.{TestPublisher, TestSubscriber, TestSinkStage, TestSourceStage}
import akka.stream.testkit.GraphStageMessages

Java API

import akka.stream.testkit.javadsl.TestSource;
import akka.stream.testkit.javadsl.TestSink;
import akka.stream.testkit.javadsl.StreamTestKit;
import akka.stream.testkit.TestPublisher;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.TestSinkStage;
import akka.stream.testkit.TestSourceStage;
import akka.stream.testkit.GraphStageMessages;

Basic Usage

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Source, Sink, Keep}
import akka.stream.testkit.scaladsl.{TestSource, TestSink}

implicit val system = ActorSystem()

// Create a test source
val (pub, source) = TestSource[Int]().preMaterialize()

// Create a test sink  
val sink = TestSink[Int]()

// Test a simple flow
val (probe, sinkProbe) = source
  .map(_ * 2)
  .toMat(sink)(Keep.both)
  .run()

// Send elements and verify
pub.sendNext(1)
pub.sendNext(2)
pub.sendComplete()

sinkProbe.request(2)
sinkProbe.expectNext(2, 4)
sinkProbe.expectComplete()

Architecture

Akka Stream TestKit is built around several key components:

  • TestPublisher/TestSubscriber: Core testing utilities implementing Reactive Streams Publisher/Subscriber interfaces
  • Test Sources/Sinks: Factory methods for creating testable stream endpoints
  • Probe System: Manual and automatic probes for controlling and observing stream behavior
  • Assertion Framework: Rich set of expectation methods for verifying stream behavior
  • Graph Stage Testing: Utilities for testing custom stream processing stages
  • Lifecycle Management: Tools for asserting proper cleanup of stream resources

Capabilities

Test Publishers

Publisher utilities for creating controllable upstream sources in tests, with demand tracking and element injection capabilities.

object TestPublisher {
  def empty[T](): Publisher[T]
  def lazyEmpty[T]: Publisher[T]  
  def error[T](cause: Throwable): Publisher[T]
  def lazyError[T](cause: Throwable): Publisher[T]
  def manualProbe[T](autoOnSubscribe: Boolean = true)(implicit system: ActorSystem): ManualProbe[T]
  def probe[T](initialPendingRequests: Long = 0)(implicit system: ActorSystem): Probe[T]
  
  // Factory methods with ClassicActorSystemProvider
  object ManualProbe {
    def apply[T](autoOnSubscribe: Boolean = true)(implicit system: ClassicActorSystemProvider): ManualProbe[T]
  }
}

Test Publishers

Test Subscribers

Subscriber utilities for creating controllable downstream sinks in tests, with element expectation and timing assertion capabilities.

object TestSubscriber {
  def manualProbe[T]()(implicit system: ActorSystem): ManualProbe[T]
  def probe[T]()(implicit system: ActorSystem): Probe[T]
  
  // Factory methods with ClassicActorSystemProvider
  object ManualProbe {
    def apply[T]()(implicit system: ClassicActorSystemProvider): ManualProbe[T]
  }
}

Test Subscribers

Test Sources and Sinks

Factory methods for creating Source and Sink instances that materialize to test probes, providing the primary interface for stream testing.

// Scala DSL
object TestSource {
  def apply[T]()(implicit system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]]
  @deprecated("Use `TestSource()` with implicit ClassicActorSystemProvider instead.", "2.7.0")
  def probe[T](implicit system: ActorSystem): Source[T, TestPublisher.Probe[T]]
}

object TestSink {
  def apply[T]()(implicit system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]]
  @deprecated("Use `TestSink()` with implicit ClassicActorSystemProvider instead.", "2.7.0")
  def probe[T](implicit system: ActorSystem): Sink[T, TestSubscriber.Probe[T]]
}

// Java API
object TestSource {
  def create[T](system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]]
  @deprecated("Use `TestSource.create` with ClassicActorSystemProvider instead.", "2.7.0")
  def probe[T](system: ActorSystem): Source[T, TestPublisher.Probe[T]]
}

object TestSink {
  def create[T](system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]]
  @deprecated("Use `TestSink.create` with ClassicActorSystemProvider instead.", "2.7.0")
  def probe[T](system: ActorSystem): Sink[T, TestSubscriber.Probe[T]]
}

Test Sources and Sinks

Graph Stage Testing

Utilities for testing custom GraphStage implementations by wrapping them with monitoring capabilities that emit events to test probes.

object TestSinkStage {
  def apply[T, M](stageUnderTest: GraphStageWithMaterializedValue[SinkShape[T], M], probe: TestProbe): Sink[T, M]
}

object TestSourceStage {
  def apply[T, M](stageUnderTest: GraphStageWithMaterializedValue[SourceShape[T], M], probe: TestProbe): Source[T, M]
}

// Graph Stage Messages
object GraphStageMessages {
  sealed trait StageMessage
  case object Push extends StageMessage
  case object Pull extends StageMessage
  case object UpstreamFinish extends StageMessage
  case object DownstreamFinish extends StageMessage
  case class Failure(ex: Throwable) extends StageMessage
  case class StageFailure(operation: StageMessage, exception: Throwable)
}

Graph Stage Testing

Stream Lifecycle Management

Utilities for asserting that stream processing stages are properly cleaned up after test execution, helping detect resource leaks.

// Scala DSL
object StreamTestKit {
  def assertAllStagesStopped[T](block: => T)(implicit materializer: Materializer): T
}

// Java API  
object StreamTestKit {
  def assertAllStagesStopped(mat: Materializer): Unit
  def assertAllStagesStopped(system: ClassicActorSystemProvider): Unit
}

Stream Lifecycle Management

Types

// Publisher Events
sealed trait PublisherEvent
case class Subscribe(subscription: Subscription) extends PublisherEvent
case class CancelSubscription(subscription: Subscription, cause: Throwable) extends PublisherEvent  
case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent

// Subscriber Events
sealed trait SubscriberEvent
case class OnSubscribe(subscription: Subscription) extends SubscriberEvent
case class OnNext[I](element: I) extends SubscriberEvent
case object OnComplete extends SubscriberEvent
case class OnError(cause: Throwable) extends SubscriberEvent

// Graph Stage Messages
sealed trait StageMessage
case object Push extends StageMessage
case object Pull extends StageMessage
case object UpstreamFinish extends StageMessage
case object DownstreamFinish extends StageMessage
case class Failure(ex: Throwable) extends StageMessage
case class StageFailure(operation: StageMessage, exception: Throwable)

// Probe Classes  
abstract class TestPublisher {
  abstract class ManualProbe[I] extends Publisher[I] {
    def expectSubscription(): Subscription
    def expectRequest(): Long
    def expectRequest(n: Long): Unit
    def expectCancellation(): Unit
    def expectCancellationWithCause(): Throwable
    def expectNoMessage(): Unit
    def expectNoMessage(remaining: FiniteDuration): Unit
    def sendNext(element: I): Unit
    def sendComplete(): Unit
    def sendError(cause: Throwable): Unit
    def getSubscriber: Subscriber[_ >: I]
  }
  
  abstract class Probe[I] extends ManualProbe[I] {
    def sendNext(element: I): Unit
    def sendComplete(): Unit
    def sendError(cause: Throwable): Unit
    def expectRequest(): Long
    def expectRequest(n: Long): Unit
    def pending: Long
  }
}

abstract class TestSubscriber {
  abstract class ManualProbe[I] extends Subscriber[I] {
    def expectSubscription(): Subscription
    def expectNext(): I
    def expectNext(element: I): Unit
    def expectNext(d: FiniteDuration, element: I): Unit
    def expectNext(e1: I, e2: I, es: I*): Unit
    def expectNextN(n: Long): immutable.Seq[I]
    def expectNextN(all: immutable.Seq[I]): Unit
    def expectNextUnordered(e1: I, e2: I, es: I*): Unit
    def expectNextUnorderedN(all: immutable.Seq[I]): Unit
    def expectNextOrError(): Either[Throwable, I]
    def expectNextOrComplete(): Either[OnComplete.type, I]
    def expectComplete(): Unit
    def expectError(): Throwable
    def expectError(cause: Throwable): Unit
    def expectNoMessage(): Unit
    def expectNoMessage(remaining: FiniteDuration): Unit
    def request(n: Long): Unit
    def cancel(): Unit
  }
  
  abstract class Probe[I] extends ManualProbe[I] {
    def request(n: Long): Unit
    def requestNext(): I
    def requestNext(element: I): Unit
  }
}