CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-stream-testkit-3

Akka Stream TestKit provides utilities for testing Akka Streams applications with controllable test sources, sinks, and assertion capabilities

Pending
Overview
Eval results
Files

test-sources-sinks.mddocs/

Test Sources and Sinks

Factory methods for creating Source and Sink instances that materialize to test probes. These provide the primary interface for stream testing by creating streams that can be controlled and observed through probe interfaces.

Capabilities

Test Source (Scala DSL)

Creates Source instances that materialize to TestPublisher.Probe for controllable upstream testing.

object TestSource {
  /**
   * Creates a Source that materializes to a TestPublisher.Probe
   * @param system The actor system provider
   * @return Source that emits elements of type T and materializes to TestPublisher.Probe[T]
   */
  def apply[T]()(implicit system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]]
  
  /**
   * Deprecated method - use apply() instead
   */
  @deprecated("Use `TestSource()` with implicit ClassicActorSystemProvider instead.", "2.7.0")
  def probe[T](implicit system: ActorSystem): Source[T, TestPublisher.Probe[T]]
}

Usage Examples:

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Keep, Sink}
import akka.stream.testkit.scaladsl.TestSource

implicit val system = ActorSystem("test")

// Create test source and materialize
val (probe, future) = TestSource[String]()
  .toMat(Sink.seq)(Keep.both)
  .run()

// Control the source
probe.sendNext("hello")
probe.sendNext("world") 
probe.sendComplete()

// Verify results
val result = Await.result(future, 3.seconds)
result should contain inOrderOnly ("hello", "world")

// Pre-materialize for multiple uses
val (pub, source) = TestSource[Int]().preMaterialize()
pub.sendNext(42)

val sink1 = source.runWith(Sink.head)
val sink2 = source.runWith(Sink.seq)

Test Sink (Scala DSL)

Creates Sink instances that materialize to TestSubscriber.Probe for controllable downstream testing.

object TestSink {
  /**
   * Creates a Sink that materializes to a TestSubscriber.Probe
   * @param system The actor system provider  
   * @return Sink that consumes elements of type T and materializes to TestSubscriber.Probe[T]
   */
  def apply[T]()(implicit system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]]
  
  /**
   * Deprecated method - use apply() instead
   */
  @deprecated("Use `TestSink()` with implicit ClassicActorSystemProvider instead.", "2.7.0") 
  def probe[T](implicit system: ActorSystem): Sink[T, TestSubscriber.Probe[T]]
}

Usage Examples:

import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink

// Create test sink and materialize
val probe = Source(1 to 5)
  .map(_ * 2)
  .runWith(TestSink[Int]())

// Verify elements
probe.request(5)
probe.expectNext(2, 4, 6, 8, 10)
probe.expectComplete()

// Use with Keep.both for bidirectional testing
val (sourceProbe, sinkProbe) = TestSource[String]()
  .map(_.toUpperCase)
  .toMat(TestSink[String]())(Keep.both)
  .run()

sourceProbe.sendNext("hello")
sourceProbe.sendComplete()

sinkProbe.request(1)
sinkProbe.expectNext("HELLO")
sinkProbe.expectComplete()

Test Source (Java API)

Java API for creating test sources.

object TestSource {
  /**
   * Creates a Source that materializes to a TestPublisher.Probe (Java API)
   */
  def create[T](system: ClassicActorSystemProvider): akka.stream.javadsl.Source[T, TestPublisher.Probe[T]]
  
  /**
   * Deprecated method - use create() instead
   */
  @deprecated("Use `TestSource.create` with ClassicActorSystemProvider instead.", "2.7.0")
  def probe[T](system: ActorSystem): akka.stream.javadsl.Source[T, TestPublisher.Probe[T]]
}

Usage Examples (Java):

import akka.actor.ActorSystem;
import akka.stream.javadsl.Sink;
import akka.stream.testkit.javadsl.TestSource;
import akka.japi.Pair;

ActorSystem system = ActorSystem.create("test");

// Create test source
Pair<TestPublisher.Probe<String>, CompletionStage<List<String>>> pair = 
  TestSource.<String>create(system)
    .toMat(Sink.seq(), Keep.both())
    .run(system);

TestPublisher.Probe<String> probe = pair.first();
CompletionStage<List<String>> future = pair.second();

// Control the source
probe.sendNext("hello");
probe.sendNext("world");
probe.sendComplete();

// Verify results
List<String> result = future.toCompletableFuture().get(3, TimeUnit.SECONDS);
assertEquals(Arrays.asList("hello", "world"), result);

Test Sink (Java API)

Java API for creating test sinks.

object TestSink {
  /**
   * Creates a Sink that materializes to a TestSubscriber.Probe (Java API)
   */
  def create[T](system: ClassicActorSystemProvider): akka.stream.javadsl.Sink[T, TestSubscriber.Probe[T]]
  
  /**
   * Deprecated method - use create() instead  
   */
  @deprecated("Use `TestSink.create` with ClassicActorSystemProvider instead.", "2.7.0")
  def probe[T](system: ActorSystem): akka.stream.javadsl.Sink[T, TestSubscriber.Probe[T]]
}

Usage Examples (Java):

import akka.stream.javadsl.Source;
import akka.stream.testkit.javadsl.TestSink;

// Create test sink
TestSubscriber.Probe<Integer> probe = 
  Source.range(1, 5)
    .map(x -> x * 2)
    .runWith(TestSink.create(system), system);

// Verify elements
probe.request(5);
probe.expectNext(2, 4, 6, 8, 10);
probe.expectComplete();

Common Testing Patterns

End-to-End Stream Testing

import akka.stream.scaladsl.Flow

val (sourceProbe, sinkProbe) = TestSource[String]()
  .via(Flow[String].map(_.toUpperCase).filter(_.length > 3))
  .toMat(TestSink[String]())(Keep.both)
  .run()

// Test the flow
sourceProbe.sendNext("hi")      // Should be filtered out
sourceProbe.sendNext("hello")   // Should pass through
sourceProbe.sendNext("world")   // Should pass through  
sourceProbe.sendComplete()

sinkProbe.request(10)
sinkProbe.expectNext("HELLO", "WORLD")
sinkProbe.expectComplete()

Testing Backpressure Propagation

val (sourceProbe, sinkProbe) = TestSource[Int]()
  .toMat(TestSink[Int]())(Keep.both)
  .run()

// Don't request from sink - should not be able to send from source
sourceProbe.sendNext(1)
sourceProbe.expectRequest() // Will block until downstream requests

// Now request and verify element flows
sinkProbe.request(1)
// sourceProbe.expectRequest() returns now
sinkProbe.expectNext(1)

Testing Error Propagation

val (sourceProbe, sinkProbe) = TestSource[String]()
  .map(s => if (s == "error") throw new RuntimeException("test") else s)
  .toMat(TestSink[String]())(Keep.both)
  .run() 

sourceProbe.sendNext("ok")
sourceProbe.sendNext("error")

sinkProbe.request(2)
sinkProbe.expectNext("ok")
val error = sinkProbe.expectError()
error.getMessage should be("test")

Testing Stream Completion

val (sourceProbe, sinkProbe) = TestSource[String]()
  .toMat(TestSink[String]())(Keep.both)
  .run()

sourceProbe.sendNext("last")
sourceProbe.sendComplete()

sinkProbe.request(1)  
sinkProbe.expectNext("last")
sinkProbe.expectComplete()

Testing Materialize Once, Use Multiple Times

// Pre-materialize for reuse
val (sourceProbe, source) = TestSource[Int]().preMaterialize()

// Use with multiple sinks
val sink1 = source.runWith(Sink.head)
val sink2 = source.runWith(Sink.seq)
val sink3 = source.runWith(TestSink[Int]())

// Control single source, affects all sinks
sourceProbe.sendNext(42)
sourceProbe.sendComplete()

// Verify all sinks received the element
Await.result(sink1, 1.second) should be(42)
sink3.request(1)
sink3.expectNext(42)
sink3.expectComplete()

Testing Complex Flows with Multiple Stages

val complexFlow = Flow[String]
  .map(_.toLowerCase)
  .filter(_.nonEmpty)
  .groupedWithin(3, 1.second)
  .map(_.mkString(","))

val (sourceProbe, sinkProbe) = TestSource[String]()
  .via(complexFlow)
  .toMat(TestSink[String]())(Keep.both)
  .run()

// Send test data
sourceProbe.sendNext("HELLO")
sourceProbe.sendNext("")       // Will be filtered out
sourceProbe.sendNext("WORLD")
sourceProbe.sendNext("TEST")
sourceProbe.sendComplete()

// Verify grouped output
sinkProbe.request(2)
sinkProbe.expectNext("hello,world,test") // Grouped and joined
sinkProbe.expectComplete()

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-stream-testkit-3

docs

graph-stage-testing.md

index.md

stream-lifecycle.md

test-publishers.md

test-sources-sinks.md

test-subscribers.md

tile.json