Comprehensive testing toolkit for Akka actor-based systems with synchronous testing utilities, event filtering, and deterministic execution control
Akka TestKit provides various utilities for test coordination, synchronization, and common testing patterns. These utilities help manage complex test scenarios involving multiple actors and timing constraints.
object TestActors {
val echoActorProps: Props
val blackholeProps: Props
def forwardActorProps(ref: ActorRef): Props
}Collection of common test actor patterns that can be used in various testing scenarios.
class EchoActor extends Actor {
def receive = {
case msg => sender() ! msg
}
}An actor that echoes back any message it receives to the sender.
class BlackholeActor extends Actor {
def receive = {
case _ => // ignore all messages
}
}An actor that ignores all messages it receives, useful for testing scenarios where you need to send messages but don't care about responses.
class ForwardActor(target: ActorRef) extends Actor {
def receive = {
case msg => target.forward(msg)
}
}An actor that forwards all messages to a specified target actor.
import akka.testkit.TestActors
// Create echo actor
val echo = system.actorOf(TestActors.echoActorProps, "echo")
// Test echoing behavior
echo ! "hello"
expectMsg("hello")
echo ! 42
expectMsg(42)
echo ! SomeMessage("data")
expectMsg(SomeMessage("data"))
// Use in testing actor interactions
val myActor = system.actorOf(Props[MyActor])
myActor ! RegisterListener(echo)
myActor ! TriggerNotification("test")
expectMsg("test") // Echo will forward the notification// Create blackhole actor
val blackhole = system.actorOf(TestActors.blackholeProps, "blackhole")
// Test that messages are ignored
blackhole ! "message1"
blackhole ! "message2"
blackhole ! "message3"
expectNoMessage(1.second) // No messages should come back
// Use for testing fire-and-forget scenarios
val producer = system.actorOf(Props[EventProducer])
producer ! SetDestination(blackhole)
producer ! GenerateEvents(100)
// Events are sent but not processed, testing producer behavior only// Create probe to receive forwarded messages
val probe = TestProbe()
val forwarder = system.actorOf(TestActors.forwardActorProps(probe.ref), "forwarder")
// Test forwarding behavior
forwarder ! "test-message"
probe.expectMsg("test-message")
// Test that sender is preserved in forwarding
val sender = TestProbe()
forwarder.tell("forwarded", sender.ref)
probe.expectMsg("forwarded")
assert(probe.lastSender == sender.ref) // Sender is preserved
// Use for testing message routing
val router = system.actorOf(Props[MessageRouter])
router ! AddRoute("test-channel", forwarder)
router ! RouteMessage("test-channel", "routed-message")
probe.expectMsg("routed-message")class TestBarrier(count: Int) {
def await()(implicit system: ActorSystem): Unit
def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit
def reset(): Unit
}A testing utility that implements a cyclic barrier pattern, allowing multiple threads or test execution paths to synchronize at a common point.
object TestBarrier {
val DefaultTimeout: Duration = 5.seconds
def apply(count: Int): TestBarrier
}class TestBarrierTimeoutException(message: String) extends RuntimeException(message)Exception thrown when barrier operations timeout.
import akka.testkit.TestBarrier
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// Create barrier for 3 parties
val barrier = TestBarrier(3)
// Simulate concurrent operations
val future1 = Future {
// Do some work
Thread.sleep(100)
barrier.await() // Wait for others
"task1 complete"
}
val future2 = Future {
// Do different work
Thread.sleep(200)
barrier.await() // Wait for others
"task2 complete"
}
val future3 = Future {
// Do more work
Thread.sleep(150)
barrier.await() // Wait for others
"task3 complete"
}
// All futures will complete after all reach the barrier
val results = Await.result(Future.sequence(List(future1, future2, future3)), 5.seconds)
println(results) // All tasks completed togetherval barrier = TestBarrier(2)
val future1 = Future {
barrier.await(3.seconds)
"completed"
}
// Second party never arrives - will timeout
intercept[TestBarrierTimeoutException] {
Await.result(future1, 5.seconds)
}val barrier = TestBarrier(2)
def worker(id: Int): Future[List[String]] = Future {
val results = mutable.ListBuffer[String]()
for (round <- 1 to 3) {
// Do work for this round
results += s"worker-$id-round-$round"
// Synchronize with other worker
barrier.await()
if (round < 3) {
// Reset barrier for next round (except last)
barrier.reset()
}
}
results.toList
}
val worker1 = worker(1)
val worker2 = worker(2)
val results = Await.result(Future.sequence(List(worker1, worker2)), 10.seconds)
// Both workers complete all rounds togetherclass CoordinatedActor(barrier: TestBarrier) extends Actor {
def receive = {
case "phase1" =>
// Do phase 1 work
sender() ! "phase1-done"
barrier.await()
case "phase2" =>
// Do phase 2 work
sender() ! "phase2-done"
barrier.await()
}
}
val barrier = TestBarrier(2)
val actor1 = system.actorOf(Props(new CoordinatedActor(barrier)))
val actor2 = system.actorOf(Props(new CoordinatedActor(barrier)))
// Both actors will synchronize at barrier
actor1 ! "phase1"
actor2 ! "phase1"
expectMsg("phase1-done")
expectMsg("phase1-done")
// Now phase 2
barrier.reset()
actor1 ! "phase2"
actor2 ! "phase2"
expectMsg("phase2-done")
expectMsg("phase2-done")class TestLatch(count: Int)(implicit system: ActorSystem) extends Awaitable[Unit] {
def countDown(): Unit
def isOpen: Boolean
def open(): Unit
def reset(): Unit
def ready(atMost: Duration)(implicit permit: CanAwait): TestLatch
def result(atMost: Duration)(implicit permit: CanAwait): Unit
}A testing utility that implements a countdown latch pattern, allowing tests to wait for a specific number of events to occur.
object TestLatch {
val DefaultTimeout: Duration = 5.seconds
def apply(count: Int)(implicit system: ActorSystem): TestLatch
}import akka.testkit.TestLatch
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// Create latch that waits for 3 events
val latch = TestLatch(3)
// Simulate events happening
Future {
Thread.sleep(100)
latch.countDown() // Event 1
}
Future {
Thread.sleep(200)
latch.countDown() // Event 2
}
Future {
Thread.sleep(300)
latch.countDown() // Event 3
}
// Wait for all events to complete
Await.ready(latch, 5.seconds)
assert(latch.isOpen)class WorkerActor(latch: TestLatch) extends Actor {
def receive = {
case "work" =>
// Simulate work
Thread.sleep(100)
sender() ! "done"
latch.countDown() // Signal completion
case "fail" =>
throw new RuntimeException("Work failed")
// Latch won't be decremented on failure
}
}
val latch = TestLatch(3)
val workers = (1 to 3).map { i =>
system.actorOf(Props(new WorkerActor(latch)), s"worker-$i")
}
// Send work to all workers
workers.foreach(_ ! "work")
// Wait for all to complete
Await.ready(latch, 5.seconds)
assert(latch.isOpen)
// Verify all completed
expectMsg("done")
expectMsg("done")
expectMsg("done")val latch = TestLatch(5)
// Count down a few times
latch.countDown() // 4 remaining
latch.countDown() // 3 remaining
latch.countDown() // 2 remaining
assert(!latch.isOpen)
// Manually open without waiting for remaining countdowns
latch.open()
assert(latch.isOpen)
// Can wait on already open latch
Await.ready(latch, 1.second) // Returns immediatelyval latch = TestLatch(2)
// Use latch first time
latch.countDown()
latch.countDown()
assert(latch.isOpen)
// Reset for reuse
latch.reset()
assert(!latch.isOpen)
// Use again
Future { latch.countDown() }
Future { latch.countDown() }
Await.ready(latch, 5.seconds)
assert(latch.isOpen)class InitializableService(latch: TestLatch) extends Actor {
override def preStart(): Unit = {
// Simulate initialization work
Future {
Thread.sleep(500) // Simulate async initialization
self ! "initialized"
}
}
def receive = {
case "initialized" =>
latch.countDown() // Signal service is ready
context.become(ready)
}
def ready: Receive = {
case "request" => sender() ! "response"
}
}
// Test system startup
val latch = TestLatch(3) // 3 services
val services = (1 to 3).map { i =>
system.actorOf(Props(new InitializableService(latch)), s"service-$i")
}
// Wait for all services to initialize
Await.ready(latch, 10.seconds)
// Now services are ready for requests
services.head ! "request"
expectMsg("response")// Test scenario: Initialize services, run coordinated work, cleanup
class CoordinatedService(
initLatch: TestLatch,
workBarrier: TestBarrier,
cleanupLatch: TestLatch
) extends Actor {
override def preStart(): Unit = {
// Initialize
Future {
Thread.sleep(Random.nextInt(200))
self ! "ready"
}
}
def receive = {
case "ready" =>
initLatch.countDown()
context.become(ready)
}
def ready: Receive = {
case "do-work" =>
// Wait for all services to start work together
workBarrier.await(3.seconds)
// Do coordinated work
Thread.sleep(100)
sender() ! "work-done"
cleanupLatch.countDown()
}
}
// Test setup
val initLatch = TestLatch(3)
val workBarrier = TestBarrier(3)
val cleanupLatch = TestLatch(3)
val services = (1 to 3).map { i =>
system.actorOf(Props(new CoordinatedService(initLatch, workBarrier, cleanupLatch)))
}
// Wait for initialization
Await.ready(initLatch, 5.seconds)
// Trigger coordinated work
services.foreach(_ ! "do-work")
// Wait for cleanup
Await.ready(cleanupLatch, 5.seconds)
// Verify all completed
expectMsg("work-done")
expectMsg("work-done")
expectMsg("work-done")// Good: Clear purpose and naming
val responseCapture = system.actorOf(TestActors.echoActorProps, "response-capture")
val eventSink = system.actorOf(TestActors.blackholeProps, "event-sink")// Good: Proper timeout and error handling
try {
barrier.await(10.seconds)
} catch {
case _: TestBarrierTimeoutException =>
fail("Not all parties reached barrier in time")
}// Good: Comprehensive latch usage
val latch = TestLatch(expectedEvents)
try {
Await.ready(latch, reasonableTimeout)
} catch {
case _: TimeoutException =>
fail(s"Only ${expectedEvents - latch.count} of $expectedEvents events completed")
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-testkit-2-11