Akka Stream TestKit provides utilities for testing Akka Streams applications with controllable test sources, sinks, and assertion capabilities
—
Subscriber utilities for creating controllable downstream sinks in tests. Test subscribers implement the Reactive Streams Subscriber interface while providing comprehensive assertion capabilities for verifying stream behavior, element expectations, and timing requirements.
Static factory methods for creating test subscribers with different control mechanisms.
/**
* Creates a manual probe that implements Subscriber interface
*/
def manualProbe[T]()(implicit system: ActorSystem): ManualProbe[T]
/**
* Creates a subscriber probe with automatic subscription management
*/
def probe[T]()(implicit system: ActorSystem): Probe[T]Usage Examples:
import akka.stream.testkit.TestSubscriber
// Manual control subscriber
val manualProbe = TestSubscriber.manualProbe[String]()
// Automatic subscription management
val autoProbe = TestSubscriber.probe[String]()Manual test subscriber that provides complete control over subscription management and element expectations with comprehensive assertion methods.
class ManualProbe[I] extends Subscriber[I] {
type Self <: ManualProbe[I]
/**
* Expect and return a Subscription
*/
def expectSubscription(): Subscription
/**
* Expect and return any SubscriberEvent (OnSubscribe, OnNext, OnError, or OnComplete)
*/
def expectEvent(): SubscriberEvent
def expectEvent(max: FiniteDuration): SubscriberEvent
def expectEvent(event: SubscriberEvent): Self
/**
* Expect and return a stream element
*/
def expectNext(): I
def expectNext(d: FiniteDuration): I
def expectNext(element: I): Self
def expectNext(d: FiniteDuration, element: I): Self
/**
* Expect multiple stream elements in order
*/
def expectNext(e1: I, e2: I, es: I*): Self
/**
* Expect multiple stream elements in arbitrary order
*/
def expectNextUnordered(e1: I, e2: I, es: I*): Self
/**
* Expect and return the next N stream elements
*/
def expectNextN(n: Long): immutable.Seq[I]
def expectNextN(all: immutable.Seq[I]): Self
/**
* Expect elements in any order
*/
def expectNextUnorderedN(all: immutable.Seq[I]): Self
/**
* Expect stream completion
*/
def expectComplete(): Self
/**
* Expect and return the signalled Throwable
*/
def expectError(): Throwable
def expectError(cause: Throwable): Self
/**
* Expect subscription followed by error (with optional demand signaling)
*/
def expectSubscriptionAndError(): Throwable
def expectSubscriptionAndError(signalDemand: Boolean): Throwable
def expectSubscriptionAndError(cause: Throwable): Self
def expectSubscriptionAndError(cause: Throwable, signalDemand: Boolean): Self
/**
* Expect subscription followed by completion (with optional demand signaling)
*/
def expectSubscriptionAndComplete(): Self
def expectSubscriptionAndComplete(signalDemand: Boolean): Self
/**
* Expect next element or error signal, returning whichever was signalled
*/
def expectNextOrError(): Either[Throwable, I]
def expectNextOrError(element: I, cause: Throwable): Either[Throwable, I]
/**
* Expect next element or stream completion
*/
def expectNextOrComplete(): Either[OnComplete.type, I]
def expectNextOrComplete(element: I): Self
/**
* Assert that no message is received for the specified time
*/
def expectNoMessage(): Self
def expectNoMessage(remaining: FiniteDuration): Self
def expectNoMessage(remaining: java.time.Duration): Self
/**
* Expect a stream element and test it with partial function
*/
def expectNextPF[T](f: PartialFunction[Any, T]): T
def expectNextWithTimeoutPF[T](max: Duration, f: PartialFunction[Any, T]): T
def expectNextChainingPF(f: PartialFunction[Any, Any]): Self
def expectNextChainingPF(max: Duration, f: PartialFunction[Any, Any]): Self
/**
* Expect event matching partial function
*/
def expectEventPF[T](f: PartialFunction[SubscriberEvent, T]): T
def expectEventWithTimeoutPF[T](max: Duration, f: PartialFunction[SubscriberEvent, T]): T
/**
* Receive messages for a given duration or until one does not match a given partial function
*/
def receiveWhile[T](
max: Duration = Duration.Undefined,
idle: Duration = Duration.Inf,
messages: Int = Int.MaxValue
)(f: PartialFunction[SubscriberEvent, T]): immutable.Seq[T]
/**
* Receive messages within time limit
*/
def receiveWithin(max: FiniteDuration, messages: Int = Int.MaxValue): immutable.Seq[I]
/**
* Attempt to drain the stream into a strict collection (requests Long.MaxValue elements)
* WARNING: Use with caution for infinite streams or large elements
*/
def toStrict(atMost: FiniteDuration): immutable.Seq[I]
/**
* Execute code block while bounding its execution time
*/
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
def within[T](max: FiniteDuration)(f: => T): T
// Subscriber interface methods
def onSubscribe(subscription: Subscription): Unit
def onNext(element: I): Unit
def onComplete(): Unit
def onError(cause: Throwable): Unit
}Usage Examples:
import akka.stream.testkit.TestSubscriber
import scala.concurrent.duration._
val probe = TestSubscriber.manualProbe[String]()
// Subscribe to a source
source.subscribe(probe)
// Expect subscription and request elements
val subscription = probe.expectSubscription()
subscription.request(3)
// Expect specific elements
probe.expectNext("hello")
probe.expectNext("world")
probe.expectComplete()
// Expect elements with timeout
probe.expectNext(1.second, "delayed")
// Expect multiple elements
probe.expectNext("a", "b", "c")
// Expect elements in any order
probe.expectNextUnordered("x", "y", "z")
// Pattern matching on elements
probe.expectNextPF {
case s: String if s.startsWith("test") => s.length
}Test subscriber with automatic subscription management that extends ManualProbe with simplified request handling and additional convenience methods.
class Probe[T] extends ManualProbe[T] {
override type Self = Probe[T]
/**
* Asserts that a subscription has been received or will be received
*/
def ensureSubscription(): Self
/**
* Request N elements from the subscription
*/
def request(n: Long): Self
/**
* Request and expect a specific stream element
*/
def requestNext(element: T): Self
/**
* Request and expect the next stream element, returning it
*/
def requestNext(): T
def requestNext(d: FiniteDuration): T
/**
* Cancel the subscription
*/
def cancel(): Self
/**
* Cancel the subscription with a specific cause
*/
def cancel(cause: Throwable): Self
}Usage Examples:
import akka.stream.testkit.TestSubscriber
val probe = TestSubscriber.probe[Int]()
// Automatic subscription management
source.runWith(Sink.fromSubscriber(probe))
// Request and expect elements
probe.request(1)
val element = probe.expectNext()
// Request and expect specific element
probe.requestNext(42)
// Request and return element
val next = probe.requestNext()
// Cancel subscription
probe.cancel()
// Cancel with cause
probe.cancel(new RuntimeException("test cancellation"))sealed trait SubscriberEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded
/**
* Received when the subscriber is subscribed to a publisher
*/
final case class OnSubscribe(subscription: Subscription) extends SubscriberEvent
/**
* Received when a stream element arrives
*/
final case class OnNext[I](element: I) extends SubscriberEvent
/**
* Received when the stream completes successfully
*/
case object OnComplete extends SubscriberEvent
/**
* Received when the stream terminates with an error
*/
final case class OnError(cause: Throwable) extends SubscriberEventval probe = TestSubscriber.probe[String]()
// Test immediate completion
probe.expectSubscriptionAndComplete()
// Test completion after elements
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext("a", "b")
probe.expectComplete()val probe = TestSubscriber.probe[String]()
// Test immediate error
val error = probe.expectSubscriptionAndError()
error shouldBe a[RuntimeException]
// Test error after elements
val subscription = probe.expectSubscription()
subscription.request(1)
probe.expectNext("element")
val thrownError = probe.expectError()val probe = TestSubscriber.probe[Int]()
// Don't request initially, verify no elements arrive
probe.expectSubscription()
probe.expectNoMessage(100.millis)
// Request and verify elements arrive
probe.request(2)
probe.expectNext(1, 2)val probe = TestSubscriber.probe[Int]()
// Request many elements
probe.request(1000)
// Verify specific elements
val elements = probe.expectNextN(1000)
elements should have size 1000
// Or drain to strict collection (use carefully)
val allElements = probe.toStrict(5.seconds)val probe = TestSubscriber.probe[String]()
// Test elements with partial function
probe.expectNextPF {
case s if s.length > 5 => s.toUpperCase
}
// Chain multiple expectations
probe
.expectNextChainingPF {
case s: String => s.trim
}
.expectNextChainingPF {
case s if s.nonEmpty => s
}val probe = TestSubscriber.probe[String]()
// Expect either next element or error
probe.expectNextOrError() match {
case Right(element) => println(s"Got element: $element")
case Left(error) => println(s"Got error: $error")
}
// Expect either next element or completion
probe.expectNextOrComplete() match {
case Right(element) => println(s"Got element: $element")
case Left(OnComplete) => println("Stream completed")
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-stream-testkit-3