CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-stream-testkit-3

Akka Stream TestKit provides utilities for testing Akka Streams applications with controllable test sources, sinks, and assertion capabilities

Pending
Overview
Eval results
Files

test-subscribers.mddocs/

Test Subscribers

Subscriber utilities for creating controllable downstream sinks in tests. Test subscribers implement the Reactive Streams Subscriber interface while providing comprehensive assertion capabilities for verifying stream behavior, element expectations, and timing requirements.

Capabilities

Factory Methods

Static factory methods for creating test subscribers with different control mechanisms.

/**
 * Creates a manual probe that implements Subscriber interface
 */
def manualProbe[T]()(implicit system: ActorSystem): ManualProbe[T]

/**
 * Creates a subscriber probe with automatic subscription management
 */
def probe[T]()(implicit system: ActorSystem): Probe[T]

Usage Examples:

import akka.stream.testkit.TestSubscriber

// Manual control subscriber
val manualProbe = TestSubscriber.manualProbe[String]()

// Automatic subscription management
val autoProbe = TestSubscriber.probe[String]()

ManualProbe Class

Manual test subscriber that provides complete control over subscription management and element expectations with comprehensive assertion methods.

class ManualProbe[I] extends Subscriber[I] {
  type Self <: ManualProbe[I]
  
  /**
   * Expect and return a Subscription
   */
  def expectSubscription(): Subscription
  
  /**
   * Expect and return any SubscriberEvent (OnSubscribe, OnNext, OnError, or OnComplete)
   */
  def expectEvent(): SubscriberEvent
  def expectEvent(max: FiniteDuration): SubscriberEvent
  def expectEvent(event: SubscriberEvent): Self
  
  /**
   * Expect and return a stream element
   */
  def expectNext(): I
  def expectNext(d: FiniteDuration): I
  def expectNext(element: I): Self
  def expectNext(d: FiniteDuration, element: I): Self
  
  /**
   * Expect multiple stream elements in order
   */
  def expectNext(e1: I, e2: I, es: I*): Self
  
  /**
   * Expect multiple stream elements in arbitrary order
   */
  def expectNextUnordered(e1: I, e2: I, es: I*): Self
  
  /**
   * Expect and return the next N stream elements
   */
  def expectNextN(n: Long): immutable.Seq[I]
  def expectNextN(all: immutable.Seq[I]): Self
  
  /**
   * Expect elements in any order
   */
  def expectNextUnorderedN(all: immutable.Seq[I]): Self
  
  /**
   * Expect stream completion
   */
  def expectComplete(): Self
  
  /**
   * Expect and return the signalled Throwable
   */
  def expectError(): Throwable
  def expectError(cause: Throwable): Self
  
  /**
   * Expect subscription followed by error (with optional demand signaling)
   */
  def expectSubscriptionAndError(): Throwable
  def expectSubscriptionAndError(signalDemand: Boolean): Throwable
  def expectSubscriptionAndError(cause: Throwable): Self
  def expectSubscriptionAndError(cause: Throwable, signalDemand: Boolean): Self
  
  /**
   * Expect subscription followed by completion (with optional demand signaling)
   */
  def expectSubscriptionAndComplete(): Self
  def expectSubscriptionAndComplete(signalDemand: Boolean): Self
  
  /**
   * Expect next element or error signal, returning whichever was signalled
   */
  def expectNextOrError(): Either[Throwable, I]
  def expectNextOrError(element: I, cause: Throwable): Either[Throwable, I]
  
  /**
   * Expect next element or stream completion
   */
  def expectNextOrComplete(): Either[OnComplete.type, I]
  def expectNextOrComplete(element: I): Self
  
  /**
   * Assert that no message is received for the specified time
   */
  def expectNoMessage(): Self
  def expectNoMessage(remaining: FiniteDuration): Self
  def expectNoMessage(remaining: java.time.Duration): Self
  
  /**
   * Expect a stream element and test it with partial function
   */
  def expectNextPF[T](f: PartialFunction[Any, T]): T
  def expectNextWithTimeoutPF[T](max: Duration, f: PartialFunction[Any, T]): T
  def expectNextChainingPF(f: PartialFunction[Any, Any]): Self
  def expectNextChainingPF(max: Duration, f: PartialFunction[Any, Any]): Self
  
  /**
   * Expect event matching partial function
   */
  def expectEventPF[T](f: PartialFunction[SubscriberEvent, T]): T
  def expectEventWithTimeoutPF[T](max: Duration, f: PartialFunction[SubscriberEvent, T]): T
  
  /**
   * Receive messages for a given duration or until one does not match a given partial function
   */
  def receiveWhile[T](
    max: Duration = Duration.Undefined,
    idle: Duration = Duration.Inf,
    messages: Int = Int.MaxValue
  )(f: PartialFunction[SubscriberEvent, T]): immutable.Seq[T]
  
  /**
   * Receive messages within time limit
   */
  def receiveWithin(max: FiniteDuration, messages: Int = Int.MaxValue): immutable.Seq[I]
  
  /**
   * Attempt to drain the stream into a strict collection (requests Long.MaxValue elements)
   * WARNING: Use with caution for infinite streams or large elements
   */
  def toStrict(atMost: FiniteDuration): immutable.Seq[I]
  
  /**
   * Execute code block while bounding its execution time
   */
  def within[T](min: FiniteDuration, max: FiniteDuration)(f: => T): T
  def within[T](max: FiniteDuration)(f: => T): T
  
  // Subscriber interface methods
  def onSubscribe(subscription: Subscription): Unit
  def onNext(element: I): Unit
  def onComplete(): Unit
  def onError(cause: Throwable): Unit
}

Usage Examples:

import akka.stream.testkit.TestSubscriber
import scala.concurrent.duration._

val probe = TestSubscriber.manualProbe[String]()

// Subscribe to a source
source.subscribe(probe)

// Expect subscription and request elements
val subscription = probe.expectSubscription()
subscription.request(3)

// Expect specific elements
probe.expectNext("hello")
probe.expectNext("world")
probe.expectComplete()

// Expect elements with timeout
probe.expectNext(1.second, "delayed")

// Expect multiple elements
probe.expectNext("a", "b", "c")

// Expect elements in any order
probe.expectNextUnordered("x", "y", "z")

// Pattern matching on elements
probe.expectNextPF {
  case s: String if s.startsWith("test") => s.length
}

Probe Class

Test subscriber with automatic subscription management that extends ManualProbe with simplified request handling and additional convenience methods.

class Probe[T] extends ManualProbe[T] {
  override type Self = Probe[T]
  
  /**
   * Asserts that a subscription has been received or will be received
   */
  def ensureSubscription(): Self
  
  /**
   * Request N elements from the subscription
   */
  def request(n: Long): Self
  
  /**
   * Request and expect a specific stream element
   */
  def requestNext(element: T): Self
  
  /**
   * Request and expect the next stream element, returning it
   */
  def requestNext(): T
  def requestNext(d: FiniteDuration): T
  
  /**
   * Cancel the subscription
   */
  def cancel(): Self
  
  /**
   * Cancel the subscription with a specific cause
   */
  def cancel(cause: Throwable): Self
}

Usage Examples:

import akka.stream.testkit.TestSubscriber

val probe = TestSubscriber.probe[Int]()

// Automatic subscription management
source.runWith(Sink.fromSubscriber(probe))

// Request and expect elements
probe.request(1)
val element = probe.expectNext()

// Request and expect specific element
probe.requestNext(42)

// Request and return element
val next = probe.requestNext()

// Cancel subscription
probe.cancel()

// Cancel with cause
probe.cancel(new RuntimeException("test cancellation"))

Subscriber Events

sealed trait SubscriberEvent extends DeadLetterSuppression with NoSerializationVerificationNeeded

/**
 * Received when the subscriber is subscribed to a publisher
 */
final case class OnSubscribe(subscription: Subscription) extends SubscriberEvent

/**
 * Received when a stream element arrives
 */
final case class OnNext[I](element: I) extends SubscriberEvent

/**
 * Received when the stream completes successfully
 */
case object OnComplete extends SubscriberEvent

/**
 * Received when the stream terminates with an error
 */
final case class OnError(cause: Throwable) extends SubscriberEvent

Advanced Usage Patterns

Testing Stream Completion

val probe = TestSubscriber.probe[String]()

// Test immediate completion
probe.expectSubscriptionAndComplete()

// Test completion after elements
val subscription = probe.expectSubscription()
subscription.request(2)
probe.expectNext("a", "b")
probe.expectComplete()

Testing Error Scenarios

val probe = TestSubscriber.probe[String]()

// Test immediate error
val error = probe.expectSubscriptionAndError()
error shouldBe a[RuntimeException]

// Test error after elements
val subscription = probe.expectSubscription()
subscription.request(1)
probe.expectNext("element")
val thrownError = probe.expectError()

Testing Backpressure

val probe = TestSubscriber.probe[Int]()

// Don't request initially, verify no elements arrive
probe.expectSubscription()
probe.expectNoMessage(100.millis)

// Request and verify elements arrive
probe.request(2)
probe.expectNext(1, 2)

Testing Large Streams

val probe = TestSubscriber.probe[Int]()

// Request many elements
probe.request(1000)

// Verify specific elements
val elements = probe.expectNextN(1000)
elements should have size 1000

// Or drain to strict collection (use carefully)
val allElements = probe.toStrict(5.seconds)

Conditional Element Testing

val probe = TestSubscriber.probe[String]()

// Test elements with partial function
probe.expectNextPF {
  case s if s.length > 5 => s.toUpperCase
}

// Chain multiple expectations
probe
  .expectNextChainingPF {
    case s: String => s.trim
  }
  .expectNextChainingPF {
    case s if s.nonEmpty => s
  }

Testing Alternative Outcomes

val probe = TestSubscriber.probe[String]()

// Expect either next element or error
probe.expectNextOrError() match {
  case Right(element) => println(s"Got element: $element")
  case Left(error) => println(s"Got error: $error")
}

// Expect either next element or completion
probe.expectNextOrComplete() match {
  case Right(element) => println(s"Got element: $element") 
  case Left(OnComplete) => println("Stream completed")
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-stream-testkit-3

docs

graph-stage-testing.md

index.md

stream-lifecycle.md

test-publishers.md

test-sources-sinks.md

test-subscribers.md

tile.json