CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-stream-testkit-3

Akka Stream TestKit provides utilities for testing Akka Streams applications with controllable test sources, sinks, and assertion capabilities

Pending
Overview
Eval results
Files

test-publishers.mddocs/

Test Publishers

Publisher utilities for creating controllable upstream sources in tests. Test publishers implement the Reactive Streams Publisher interface while providing fine-grained control over element emission, demand tracking, and subscription lifecycle management.

Capabilities

Factory Methods

Static factory methods for creating various types of test publishers with different behavior patterns.

/**
 * Publisher that signals complete to subscribers immediately after handing a void subscription
 */
def empty[T](): Publisher[T]

/**
 * Publisher that subscribes the subscriber and completes after the first request
 */
def lazyEmpty[T]: Publisher[T]

/**
 * Publisher that signals error to subscribers immediately after handing out subscription
 */
def error[T](cause: Throwable): Publisher[T]

/**
 * Publisher that subscribes the subscriber and signals error after the first request
 */
def lazyError[T](cause: Throwable): Publisher[T]

/**
 * Creates a manual probe that implements Publisher interface
 * @param autoOnSubscribe Whether to automatically call onSubscribe (default: true)
 */
def manualProbe[T](autoOnSubscribe: Boolean = true)(implicit system: ActorSystem): ManualProbe[T]

/**
 * Creates a probe that implements Publisher interface and tracks demand
 * @param initialPendingRequests Initial number of pending requests (default: 0)
 */
def probe[T](initialPendingRequests: Long = 0)(implicit system: ActorSystem): Probe[T]

Usage Examples:

import akka.stream.testkit.TestPublisher

// Empty publishers for testing completion scenarios
val emptyPub = TestPublisher.empty[String]()
val lazyEmptyPub = TestPublisher.lazyEmpty[String]

// Error publishers for testing error scenarios  
val errorPub = TestPublisher.error[String](new RuntimeException("test error"))
val lazyErrorPub = TestPublisher.lazyError[String](new RuntimeException("test error"))

// Manual control publishers
val manualPub = TestPublisher.manualProbe[String]()
val autoPub = TestPublisher.probe[String]()

ManualProbe Class

Manual test publisher that provides complete control over the publishing lifecycle and requires explicit management of subscription and demand.

class ManualProbe[I] extends Publisher[I] {
  type Self <: ManualProbe[I]
  
  /**
   * Subscribes a given Subscriber to this probe publisher
   */
  def subscribe(subscriber: Subscriber[_ >: I]): Unit
  
  /**
   * Execute code block after subscription is established
   */
  def executeAfterSubscription[T](f: => T): T
  
  /**
   * Expect a subscription and return the subscription object
   */
  def expectSubscription(): PublisherProbeSubscription[I]
  
  /**
   * Expect demand from a given subscription
   */
  def expectRequest(subscription: Subscription, n: Int): Self
  
  /**
   * Expect no messages for the default period
   */
  def expectNoMessage(): Self
  
  /**
   * Expect no messages for a given duration
   */
  def expectNoMessage(max: FiniteDuration): Self
  
  /**
   * Receive messages for a given duration or until one does not match a given partial function
   */
  def receiveWhile[T](
    max: Duration = Duration.Undefined,
    idle: Duration = Duration.Inf,
    messages: Int = Int.MaxValue
  )(f: PartialFunction[PublisherEvent, T]): immutable.Seq[T]
  
  /**
   * Expect an event matching the partial function
   */
  def expectEventPF[T](f: PartialFunction[PublisherEvent, T]): T
  
  /**
   * Get the Publisher interface
   */
  def getPublisher: Publisher[I]
  
  /**
   * Execute code block while bounding its execution time between min and max
   */
  def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
  
  /**
   * Execute code block within the specified maximum time
   */
  def within[T](max: FiniteDuration)(f: => T): T
}

Usage Examples:

import akka.stream.testkit.TestPublisher

val probe = TestPublisher.manualProbe[String]()
val subscription = probe.expectSubscription()

// Expect demand and send elements
probe.expectRequest(subscription, 1)
subscription.sendNext("hello")

probe.expectRequest(subscription, 2)  
subscription.sendNext("world")
subscription.sendComplete()

Probe Class

Test publisher with automatic demand tracking that extends ManualProbe with simplified element sending and demand management.

class Probe[T] extends ManualProbe[T] {
  type Self = Probe[T]
  
  /**
   * Asserts that a subscription has been received or will be received
   */
  def ensureSubscription(): Unit
  
  /**
   * Current pending requests
   */
  def pending: Long
  
  /**
   * Send next element, automatically checking and decrementing demand
   */
  def sendNext(elem: T): Self
  
  /**
   * Send next element without checking demand (unsafe)
   */
  def unsafeSendNext(elem: T): Self
  
  /**
   * Send completion signal
   */
  def sendComplete(): Self
  
  /**
   * Send error signal
   */
  def sendError(cause: Throwable): Self
  
  /**
   * Expect request and add to pending requests, returning the request amount
   */
  def expectRequest(): Long
  
  /**
   * Expect cancellation
   */
  def expectCancellation(): Self
  
  /**
   * Expect cancellation with specific cause
   */
  def expectCancellationWithCause(expectedCause: Throwable): Self
  
  /**
   * Expect cancellation with typed cause, returning the cause
   */
  def expectCancellationWithCause[E <: Throwable: ClassTag](): E
  
  /**
   * Java API: Expect cancellation with specific cause class
   */
  def expectCancellationWithCause[E <: Throwable](causeClass: Class[E]): E
}

Usage Examples:

import akka.stream.testkit.TestPublisher

val probe = TestPublisher.probe[Int]()

// Send elements with automatic demand management
probe.sendNext(1)
probe.sendNext(2)
probe.sendNext(3)
probe.sendComplete()

// Check pending demand
println(s"Pending requests: ${probe.pending}")

// Handle cancellation scenarios
probe.expectCancellation()

// Handle typed cancellation causes
val cause = probe.expectCancellationWithCause[IllegalArgumentException]()

PublisherProbeSubscription

Internal subscription implementation that bridges the test probe with subscribers, providing methods for sending elements and expecting subscription events.

case class PublisherProbeSubscription[I](
  subscriber: Subscriber[_ >: I], 
  publisherProbe: TestProbe
) extends Subscription with SubscriptionWithCancelException {
  
  /**
   * Request elements from the subscription
   */
  def request(elements: Long): Unit
  
  /**
   * Cancel subscription with cause
   */
  def cancel(cause: Throwable): Unit
  
  /**
   * Expect a specific request amount
   */
  def expectRequest(n: Long): Unit
  
  /**
   * Expect any request and return the amount
   */
  def expectRequest(): Long
  
  /**
   * Expect cancellation and return the cause
   */
  def expectCancellation(): Throwable
  
  /**
   * Send next element to subscriber
   */
  def sendNext(element: I): Unit
  
  /**
   * Send completion signal to subscriber  
   */
  def sendComplete(): Unit
  
  /**
   * Send error signal to subscriber
   */
  def sendError(cause: Throwable): Unit
  
  /**
   * Send onSubscribe signal to subscriber
   */
  def sendOnSubscribe(): Unit
}

Publisher Events

sealed trait PublisherEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded

/**
 * Emitted when a subscriber subscribes to the publisher
 */
final case class Subscribe(subscription: Subscription) extends PublisherEvent

/**
 * Emitted when a subscription is cancelled
 */
final case class CancelSubscription(subscription: Subscription, cause: Throwable) extends PublisherEvent

/**
 * Emitted when a subscriber requests more elements
 */
final case class RequestMore(subscription: Subscription, elements: Long) extends PublisherEvent

Common Patterns

Testing Backpressure

val probe = TestPublisher.probe[Int]()

// Send multiple elements
probe.sendNext(1)
probe.sendNext(2)
probe.sendNext(3)

// Verify demand management
probe.expectRequest() should be > 0L

Testing Error Scenarios

val probe = TestPublisher.probe[String]()
val error = new RuntimeException("test failure")

probe.sendError(error)
// Verify error handling in downstream

Testing Stream Completion

val probe = TestPublisher.probe[Int]()

probe.sendNext(1)
probe.sendNext(2)
probe.sendComplete()

// Verify completion behavior

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-stream-testkit-3

docs

graph-stage-testing.md

index.md

stream-lifecycle.md

test-publishers.md

test-sources-sinks.md

test-subscribers.md

tile.json