Akka Stream TestKit provides utilities for testing Akka Streams applications with controllable test sources, sinks, and assertion capabilities
—
Publisher utilities for creating controllable upstream sources in tests. Test publishers implement the Reactive Streams Publisher interface while providing fine-grained control over element emission, demand tracking, and subscription lifecycle management.
Static factory methods for creating various types of test publishers with different behavior patterns.
/**
* Publisher that signals complete to subscribers immediately after handing a void subscription
*/
def empty[T](): Publisher[T]
/**
* Publisher that subscribes the subscriber and completes after the first request
*/
def lazyEmpty[T]: Publisher[T]
/**
* Publisher that signals error to subscribers immediately after handing out subscription
*/
def error[T](cause: Throwable): Publisher[T]
/**
* Publisher that subscribes the subscriber and signals error after the first request
*/
def lazyError[T](cause: Throwable): Publisher[T]
/**
* Creates a manual probe that implements Publisher interface
* @param autoOnSubscribe Whether to automatically call onSubscribe (default: true)
*/
def manualProbe[T](autoOnSubscribe: Boolean = true)(implicit system: ActorSystem): ManualProbe[T]
/**
* Creates a probe that implements Publisher interface and tracks demand
* @param initialPendingRequests Initial number of pending requests (default: 0)
*/
def probe[T](initialPendingRequests: Long = 0)(implicit system: ActorSystem): Probe[T]Usage Examples:
import akka.stream.testkit.TestPublisher
// Empty publishers for testing completion scenarios
val emptyPub = TestPublisher.empty[String]()
val lazyEmptyPub = TestPublisher.lazyEmpty[String]
// Error publishers for testing error scenarios
val errorPub = TestPublisher.error[String](new RuntimeException("test error"))
val lazyErrorPub = TestPublisher.lazyError[String](new RuntimeException("test error"))
// Manual control publishers
val manualPub = TestPublisher.manualProbe[String]()
val autoPub = TestPublisher.probe[String]()Manual test publisher that provides complete control over the publishing lifecycle and requires explicit management of subscription and demand.
class ManualProbe[I] extends Publisher[I] {
type Self <: ManualProbe[I]
/**
* Subscribes a given Subscriber to this probe publisher
*/
def subscribe(subscriber: Subscriber[_ >: I]): Unit
/**
* Execute code block after subscription is established
*/
def executeAfterSubscription[T](f: => T): T
/**
* Expect a subscription and return the subscription object
*/
def expectSubscription(): PublisherProbeSubscription[I]
/**
* Expect demand from a given subscription
*/
def expectRequest(subscription: Subscription, n: Int): Self
/**
* Expect no messages for the default period
*/
def expectNoMessage(): Self
/**
* Expect no messages for a given duration
*/
def expectNoMessage(max: FiniteDuration): Self
/**
* Receive messages for a given duration or until one does not match a given partial function
*/
def receiveWhile[T](
max: Duration = Duration.Undefined,
idle: Duration = Duration.Inf,
messages: Int = Int.MaxValue
)(f: PartialFunction[PublisherEvent, T]): immutable.Seq[T]
/**
* Expect an event matching the partial function
*/
def expectEventPF[T](f: PartialFunction[PublisherEvent, T]): T
/**
* Get the Publisher interface
*/
def getPublisher: Publisher[I]
/**
* Execute code block while bounding its execution time between min and max
*/
def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
/**
* Execute code block within the specified maximum time
*/
def within[T](max: FiniteDuration)(f: => T): T
}Usage Examples:
import akka.stream.testkit.TestPublisher
val probe = TestPublisher.manualProbe[String]()
val subscription = probe.expectSubscription()
// Expect demand and send elements
probe.expectRequest(subscription, 1)
subscription.sendNext("hello")
probe.expectRequest(subscription, 2)
subscription.sendNext("world")
subscription.sendComplete()Test publisher with automatic demand tracking that extends ManualProbe with simplified element sending and demand management.
class Probe[T] extends ManualProbe[T] {
type Self = Probe[T]
/**
* Asserts that a subscription has been received or will be received
*/
def ensureSubscription(): Unit
/**
* Current pending requests
*/
def pending: Long
/**
* Send next element, automatically checking and decrementing demand
*/
def sendNext(elem: T): Self
/**
* Send next element without checking demand (unsafe)
*/
def unsafeSendNext(elem: T): Self
/**
* Send completion signal
*/
def sendComplete(): Self
/**
* Send error signal
*/
def sendError(cause: Throwable): Self
/**
* Expect request and add to pending requests, returning the request amount
*/
def expectRequest(): Long
/**
* Expect cancellation
*/
def expectCancellation(): Self
/**
* Expect cancellation with specific cause
*/
def expectCancellationWithCause(expectedCause: Throwable): Self
/**
* Expect cancellation with typed cause, returning the cause
*/
def expectCancellationWithCause[E <: Throwable: ClassTag](): E
/**
* Java API: Expect cancellation with specific cause class
*/
def expectCancellationWithCause[E <: Throwable](causeClass: Class[E]): E
}Usage Examples:
import akka.stream.testkit.TestPublisher
val probe = TestPublisher.probe[Int]()
// Send elements with automatic demand management
probe.sendNext(1)
probe.sendNext(2)
probe.sendNext(3)
probe.sendComplete()
// Check pending demand
println(s"Pending requests: ${probe.pending}")
// Handle cancellation scenarios
probe.expectCancellation()
// Handle typed cancellation causes
val cause = probe.expectCancellationWithCause[IllegalArgumentException]()Internal subscription implementation that bridges the test probe with subscribers, providing methods for sending elements and expecting subscription events.
case class PublisherProbeSubscription[I](
subscriber: Subscriber[_ >: I],
publisherProbe: TestProbe
) extends Subscription with SubscriptionWithCancelException {
/**
* Request elements from the subscription
*/
def request(elements: Long): Unit
/**
* Cancel subscription with cause
*/
def cancel(cause: Throwable): Unit
/**
* Expect a specific request amount
*/
def expectRequest(n: Long): Unit
/**
* Expect any request and return the amount
*/
def expectRequest(): Long
/**
* Expect cancellation and return the cause
*/
def expectCancellation(): Throwable
/**
* Send next element to subscriber
*/
def sendNext(element: I): Unit
/**
* Send completion signal to subscriber
*/
def sendComplete(): Unit
/**
* Send error signal to subscriber
*/
def sendError(cause: Throwable): Unit
/**
* Send onSubscribe signal to subscriber
*/
def sendOnSubscribe(): Unit
}sealed trait PublisherEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded
/**
* Emitted when a subscriber subscribes to the publisher
*/
final case class Subscribe(subscription: Subscription) extends PublisherEvent
/**
* Emitted when a subscription is cancelled
*/
final case class CancelSubscription(subscription: Subscription, cause: Throwable) extends PublisherEvent
/**
* Emitted when a subscriber requests more elements
*/
final case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEventval probe = TestPublisher.probe[Int]()
// Send multiple elements
probe.sendNext(1)
probe.sendNext(2)
probe.sendNext(3)
// Verify demand management
probe.expectRequest() should be > 0Lval probe = TestPublisher.probe[String]()
val error = new RuntimeException("test failure")
probe.sendError(error)
// Verify error handling in downstreamval probe = TestPublisher.probe[Int]()
probe.sendNext(1)
probe.sendNext(2)
probe.sendComplete()
// Verify completion behaviorInstall with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-stream-testkit-3