CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-testkit-2-11

Comprehensive testing toolkit for Akka actor-based systems with synchronous testing utilities, event filtering, and deterministic execution control

Overview
Eval results
Files

test-utilities.mddocs/

Test Utilities and Coordination

Akka TestKit provides various utilities for test coordination, synchronization, and common testing patterns. These utilities help manage complex test scenarios involving multiple actors and timing constraints.

TestActors

TestActors Object { .api }

object TestActors {
  val echoActorProps: Props
  val blackholeProps: Props
  def forwardActorProps(ref: ActorRef): Props
}

Collection of common test actor patterns that can be used in various testing scenarios.

Built-in Test Actors

EchoActor { .api }

class EchoActor extends Actor {
  def receive = {
    case msg => sender() ! msg
  }
}

An actor that echoes back any message it receives to the sender.

BlackholeActor { .api }

class BlackholeActor extends Actor {
  def receive = {
    case _ => // ignore all messages
  }
}

An actor that ignores all messages it receives, useful for testing scenarios where you need to send messages but don't care about responses.

ForwardActor { .api }

class ForwardActor(target: ActorRef) extends Actor {
  def receive = {
    case msg => target.forward(msg)
  }
}

An actor that forwards all messages to a specified target actor.

Usage Examples

EchoActor Usage

import akka.testkit.TestActors

// Create echo actor
val echo = system.actorOf(TestActors.echoActorProps, "echo")

// Test echoing behavior
echo ! "hello"
expectMsg("hello")

echo ! 42
expectMsg(42)

echo ! SomeMessage("data")
expectMsg(SomeMessage("data"))

// Use in testing actor interactions
val myActor = system.actorOf(Props[MyActor])
myActor ! RegisterListener(echo)

myActor ! TriggerNotification("test")
expectMsg("test")  // Echo will forward the notification

BlackholeActor Usage

// Create blackhole actor
val blackhole = system.actorOf(TestActors.blackholeProps, "blackhole")

// Test that messages are ignored
blackhole ! "message1"
blackhole ! "message2"
blackhole ! "message3"

expectNoMessage(1.second)  // No messages should come back

// Use for testing fire-and-forget scenarios
val producer = system.actorOf(Props[EventProducer])
producer ! SetDestination(blackhole)
producer ! GenerateEvents(100)

// Events are sent but not processed, testing producer behavior only

ForwardActor Usage

// Create probe to receive forwarded messages
val probe = TestProbe()
val forwarder = system.actorOf(TestActors.forwardActorProps(probe.ref), "forwarder")

// Test forwarding behavior
forwarder ! "test-message"
probe.expectMsg("test-message")

// Test that sender is preserved in forwarding
val sender = TestProbe()
forwarder.tell("forwarded", sender.ref)
probe.expectMsg("forwarded")
assert(probe.lastSender == sender.ref)  // Sender is preserved

// Use for testing message routing
val router = system.actorOf(Props[MessageRouter])
router ! AddRoute("test-channel", forwarder)
router ! RouteMessage("test-channel", "routed-message")
probe.expectMsg("routed-message")

TestBarrier

TestBarrier Class { .api }

class TestBarrier(count: Int) {
  def await()(implicit system: ActorSystem): Unit
  def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit
  def reset(): Unit
}

A testing utility that implements a cyclic barrier pattern, allowing multiple threads or test execution paths to synchronize at a common point.

TestBarrier Object { .api }

object TestBarrier {
  val DefaultTimeout: Duration = 5.seconds
  def apply(count: Int): TestBarrier
}

TestBarrierTimeoutException { .api }

class TestBarrierTimeoutException(message: String) extends RuntimeException(message)

Exception thrown when barrier operations timeout.

Usage Examples

Basic Barrier Usage

import akka.testkit.TestBarrier
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// Create barrier for 3 parties
val barrier = TestBarrier(3)

// Simulate concurrent operations
val future1 = Future {
  // Do some work
  Thread.sleep(100)
  barrier.await()  // Wait for others
  "task1 complete"
}

val future2 = Future {
  // Do different work  
  Thread.sleep(200)
  barrier.await()  // Wait for others
  "task2 complete"
}

val future3 = Future {
  // Do more work
  Thread.sleep(150)  
  barrier.await()  // Wait for others
  "task3 complete"
}

// All futures will complete after all reach the barrier
val results = Await.result(Future.sequence(List(future1, future2, future3)), 5.seconds)
println(results)  // All tasks completed together

Barrier with Timeout

val barrier = TestBarrier(2)

val future1 = Future {
  barrier.await(3.seconds)
  "completed"
}

// Second party never arrives - will timeout
intercept[TestBarrierTimeoutException] {
  Await.result(future1, 5.seconds)
}

Multiple Barrier Rounds

val barrier = TestBarrier(2) 

def worker(id: Int): Future[List[String]] = Future {
  val results = mutable.ListBuffer[String]()
  
  for (round <- 1 to 3) {
    // Do work for this round
    results += s"worker-$id-round-$round"
    
    // Synchronize with other worker
    barrier.await()
    
    if (round < 3) {
      // Reset barrier for next round (except last)
      barrier.reset()
    }
  }
  
  results.toList
}

val worker1 = worker(1)
val worker2 = worker(2)

val results = Await.result(Future.sequence(List(worker1, worker2)), 10.seconds)
// Both workers complete all rounds together

Testing Actor Coordination

class CoordinatedActor(barrier: TestBarrier) extends Actor {
  def receive = {
    case "phase1" =>
      // Do phase 1 work
      sender() ! "phase1-done"
      barrier.await()
      
    case "phase2" =>
      // Do phase 2 work  
      sender() ! "phase2-done"
      barrier.await()
  }
}

val barrier = TestBarrier(2)
val actor1 = system.actorOf(Props(new CoordinatedActor(barrier)))
val actor2 = system.actorOf(Props(new CoordinatedActor(barrier)))

// Both actors will synchronize at barrier
actor1 ! "phase1" 
actor2 ! "phase1"

expectMsg("phase1-done")
expectMsg("phase1-done")

// Now phase 2
barrier.reset()
actor1 ! "phase2"
actor2 ! "phase2"

expectMsg("phase2-done") 
expectMsg("phase2-done")

TestLatch

TestLatch Class { .api }

class TestLatch(count: Int)(implicit system: ActorSystem) extends Awaitable[Unit] {
  def countDown(): Unit
  def isOpen: Boolean
  def open(): Unit
  def reset(): Unit
  def ready(atMost: Duration)(implicit permit: CanAwait): TestLatch
  def result(atMost: Duration)(implicit permit: CanAwait): Unit
}

A testing utility that implements a countdown latch pattern, allowing tests to wait for a specific number of events to occur.

TestLatch Object { .api }

object TestLatch {
  val DefaultTimeout: Duration = 5.seconds
  def apply(count: Int)(implicit system: ActorSystem): TestLatch
}

Usage Examples

Basic Latch Usage

import akka.testkit.TestLatch
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// Create latch that waits for 3 events
val latch = TestLatch(3)

// Simulate events happening
Future {
  Thread.sleep(100)
  latch.countDown()  // Event 1
}

Future {
  Thread.sleep(200)  
  latch.countDown()  // Event 2
}

Future {
  Thread.sleep(300)
  latch.countDown()  // Event 3
}

// Wait for all events to complete
Await.ready(latch, 5.seconds)
assert(latch.isOpen)

Testing Actor Completion

class WorkerActor(latch: TestLatch) extends Actor {
  def receive = {
    case "work" =>
      // Simulate work
      Thread.sleep(100)
      sender() ! "done"
      latch.countDown()  // Signal completion
      
    case "fail" =>
      throw new RuntimeException("Work failed")
      // Latch won't be decremented on failure
  }
}

val latch = TestLatch(3)
val workers = (1 to 3).map { i =>
  system.actorOf(Props(new WorkerActor(latch)), s"worker-$i")
}

// Send work to all workers
workers.foreach(_ ! "work")

// Wait for all to complete
Await.ready(latch, 5.seconds)
assert(latch.isOpen)

// Verify all completed
expectMsg("done")
expectMsg("done") 
expectMsg("done")

Latch with Manual Open

val latch = TestLatch(5)

// Count down a few times
latch.countDown()  // 4 remaining
latch.countDown()  // 3 remaining
latch.countDown()  // 2 remaining

assert(!latch.isOpen)

// Manually open without waiting for remaining countdowns
latch.open()
assert(latch.isOpen)

// Can wait on already open latch
Await.ready(latch, 1.second)  // Returns immediately

Resetting Latch

val latch = TestLatch(2)

// Use latch first time
latch.countDown()
latch.countDown()
assert(latch.isOpen)

// Reset for reuse
latch.reset()
assert(!latch.isOpen)

// Use again
Future { latch.countDown() }
Future { latch.countDown() }

Await.ready(latch, 5.seconds)
assert(latch.isOpen)

Testing System Initialization

class InitializableService(latch: TestLatch) extends Actor {
  override def preStart(): Unit = {
    // Simulate initialization work
    Future {
      Thread.sleep(500)  // Simulate async initialization
      self ! "initialized"
    }
  }
  
  def receive = {
    case "initialized" =>
      latch.countDown()  // Signal service is ready
      context.become(ready)
  }
  
  def ready: Receive = {
    case "request" => sender() ! "response"
  }
}

// Test system startup
val latch = TestLatch(3)  // 3 services
val services = (1 to 3).map { i =>
  system.actorOf(Props(new InitializableService(latch)), s"service-$i")
}

// Wait for all services to initialize
Await.ready(latch, 10.seconds)

// Now services are ready for requests
services.head ! "request"
expectMsg("response")

Combining Barriers and Latches

Complex Coordination Scenarios

// Test scenario: Initialize services, run coordinated work, cleanup
class CoordinatedService(
  initLatch: TestLatch,
  workBarrier: TestBarrier, 
  cleanupLatch: TestLatch
) extends Actor {
  
  override def preStart(): Unit = {
    // Initialize
    Future {
      Thread.sleep(Random.nextInt(200))
      self ! "ready"
    }
  }
  
  def receive = {
    case "ready" =>
      initLatch.countDown()
      context.become(ready)
  }
  
  def ready: Receive = {
    case "do-work" =>
      // Wait for all services to start work together
      workBarrier.await(3.seconds)
      
      // Do coordinated work
      Thread.sleep(100)
      sender() ! "work-done"
      
      cleanupLatch.countDown()
  }
}

// Test setup
val initLatch = TestLatch(3)
val workBarrier = TestBarrier(3) 
val cleanupLatch = TestLatch(3)

val services = (1 to 3).map { i =>
  system.actorOf(Props(new CoordinatedService(initLatch, workBarrier, cleanupLatch)))
}

// Wait for initialization
Await.ready(initLatch, 5.seconds)

// Trigger coordinated work
services.foreach(_ ! "do-work")

// Wait for cleanup
Await.ready(cleanupLatch, 5.seconds)

// Verify all completed
expectMsg("work-done")
expectMsg("work-done")
expectMsg("work-done")

Best Practices

TestActors Best Practices

  1. Use appropriate test actor: Choose EchoActor, BlackholeActor, or ForwardActor based on testing needs
  2. Name test actors: Use meaningful names for easier debugging
  3. Clean up resources: Stop test actors when no longer needed
// Good: Clear purpose and naming
val responseCapture = system.actorOf(TestActors.echoActorProps, "response-capture")
val eventSink = system.actorOf(TestActors.blackholeProps, "event-sink")

TestBarrier Best Practices

  1. Match party count: Ensure barrier count matches number of participants
  2. Handle timeouts: Always specify appropriate timeouts for barrier operations
  3. Reset after use: Reset barriers when reusing in multiple test phases
  4. Avoid deadlocks: Ensure all parties will eventually reach the barrier
// Good: Proper timeout and error handling
try {
  barrier.await(10.seconds)
} catch {
  case _: TestBarrierTimeoutException =>
    fail("Not all parties reached barrier in time")
}

TestLatch Best Practices

  1. Count accurately: Set latch count to match expected number of events
  2. Use timeouts: Always specify timeouts when waiting on latches
  3. Handle failures: Consider what happens if some events fail to occur
  4. Reset when reusing: Reset latches between test phases
// Good: Comprehensive latch usage
val latch = TestLatch(expectedEvents)

try {
  Await.ready(latch, reasonableTimeout)
} catch {
  case _: TimeoutException =>
    fail(s"Only ${expectedEvents - latch.count} of $expectedEvents events completed")
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-testkit-2-11

docs

actor-references.md

core-testing.md

deterministic-execution.md

event-filtering.md

index.md

java-api.md

test-utilities.md

utilities-config.md

tile.json