Akka TestKit - A comprehensive testing toolkit for Actor-based systems built with the Akka framework
Akka TestKit provides synchronization utilities including TestLatch for countdown coordination and TestBarrier for cyclic synchronization, enabling precise coordination of concurrent test scenarios and actor interactions.
Count-down latch for test synchronization with timeout support, useful for coordinating multiple threads or waiting for specific conditions.
/**
* Count-down latch for test synchronization with timeouts
* @param count Initial count value (defaults to 1)
* @param system Implicit ActorSystem for timeout configuration
*/
class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
/**
* Decrement the latch counter
* When counter reaches zero, latch opens and waiting threads are released
*/
def countDown(): Unit
/**
* Check if latch is open (counter has reached zero)
* @return True if latch is open, false otherwise
*/
def isOpen: Boolean
/**
* Force latch open regardless of current counter value
* Releases all waiting threads immediately
*/
def open(): Unit
/**
* Reset latch to original count value
* Allows reuse of the same latch instance
*/
def reset(): Unit
/**
* Wait for latch to open with timeout
* @param atMost Maximum time to wait for latch to open
* @param permit Implicit CanAwait permission for blocking operation
* @return This TestLatch instance for method chaining
* @throws TimeoutException if latch doesn't open within timeout
*/
def ready(atMost: Duration)(implicit permit: CanAwait): TestLatch
}Factory methods and constants for creating TestLatch instances.
object TestLatch {
/**
* Create TestLatch with specified count
* @param count Initial count value (defaults to 1)
* @param system Implicit ActorSystem
* @return New TestLatch instance
*/
def apply(count: Int = 1)(implicit system: ActorSystem): TestLatch
/**
* Default timeout for TestLatch operations
*/
val DefaultTimeout: Duration = 5.seconds
}Cyclic barrier for coordinating multiple test threads, where all threads must reach the barrier before any can proceed.
/**
* Cyclic barrier for coordinating multiple test threads
* All specified number of threads must reach barrier before any can proceed
* @param count Number of threads that must reach barrier
*/
class TestBarrier(count: Int) {
/**
* Wait at barrier using default timeout
* Blocks until all threads reach the barrier
* @param system Implicit ActorSystem for timeout configuration
* @throws TestBarrierTimeoutException if timeout occurs
*/
def await()(implicit system: ActorSystem): Unit
/**
* Wait at barrier with specified timeout
* Blocks until all threads reach the barrier or timeout occurs
* @param timeout Maximum time to wait at barrier
* @param system Implicit ActorSystem
* @throws TestBarrierTimeoutException if timeout occurs
*/
def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit
/**
* Reset barrier for reuse
* Allows the same barrier to coordinate multiple rounds of synchronization
*/
def reset(): Unit
}Exception thrown when barrier operations timeout.
/**
* Exception thrown when TestBarrier operations timeout
* @param message Descriptive error message
*/
case class TestBarrierTimeoutException(message: String) extends RuntimeException(message)Usage Examples:
import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.{TestKit, TestLatch, TestBarrier}
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.duration._
class SynchronizationExample extends TestKit(ActorSystem("test")) {
implicit val ec: ExecutionContext = system.dispatcher
"TestLatch" should {
"coordinate single countdown" in {
val latch = TestLatch()
// Start background task
Future {
Thread.sleep(100)
latch.countDown()
}
// Wait for background task to complete
latch.ready(1.second)
assert(latch.isOpen)
}
"coordinate multiple countdowns" in {
val latch = TestLatch(3)
// Start multiple background tasks
(1 to 3).foreach { i =>
Future {
Thread.sleep(i * 50)
latch.countDown()
}
}
// Wait for all tasks to complete
latch.ready(1.second)
assert(latch.isOpen)
}
"support reset and reuse" in {
val latch = TestLatch(2)
// First use
Future { latch.countDown() }
Future { latch.countDown() }
latch.ready(1.second)
assert(latch.isOpen)
// Reset and reuse
latch.reset()
assert(!latch.isOpen)
Future { latch.countDown() }
Future { latch.countDown() }
latch.ready(1.second)
assert(latch.isOpen)
}
"support manual opening" in {
val latch = TestLatch(10)
// Force open without waiting for all countdowns
latch.open()
assert(latch.isOpen)
// ready() returns immediately
latch.ready(1.second)
}
}
"TestBarrier" should {
"coordinate multiple threads" in {
val barrier = TestBarrier(3)
val results = collection.mutable.ListBuffer[Int]()
// Start multiple threads that must synchronize
val futures = (1 to 3).map { i =>
Future {
// Do some work
Thread.sleep(i * 30)
// Wait at barrier
barrier.await(1.second)
// Continue after all threads reach barrier
results += i
}
}
// Wait for all threads to complete
Future.sequence(futures).ready(2.seconds)
// All threads should have added their results
assert(results.size == 3)
assert(results.contains(1))
assert(results.contains(2))
assert(results.contains(3))
}
"support barrier reset and reuse" in {
val barrier = TestBarrier(2)
// First round of synchronization
val round1 = (1 to 2).map { i =>
Future {
barrier.await(1.second)
i
}
}
Future.sequence(round1).ready(2.seconds)
// Reset barrier for second round
barrier.reset()
// Second round of synchronization
val round2 = (1 to 2).map { i =>
Future {
barrier.await(1.second)
i + 10
}
}
Future.sequence(round2).ready(2.seconds)
}
"timeout when not all threads arrive" in {
val barrier = TestBarrier(3)
// Only start 2 of 3 required threads
Future { barrier.await(500.millis) }
Future { barrier.await(500.millis) }
// Third thread never arrives, should timeout
intercept[TestBarrierTimeoutException] {
// This will timeout since only 2/3 threads arrived
Thread.sleep(1000)
}
}
}
}Using synchronization utilities with actors for coordinated testing scenarios.
class ActorSynchronizationExample extends TestKit(ActorSystem("test")) {
class WorkerActor(latch: TestLatch) extends Actor {
def receive = {
case "work" =>
// Simulate work
Thread.sleep(100)
sender() ! "done"
latch.countDown()
}
}
"Actor coordination with TestLatch" should {
"wait for multiple actors to complete work" in {
val latch = TestLatch(3)
val workers = (1 to 3).map { i =>
system.actorOf(Props(new WorkerActor(latch)), s"worker-$i")
}
// Send work to all workers
workers.foreach(_ ! "work")
// Wait for all workers to complete
latch.ready(2.seconds)
// All workers have completed their work
assert(latch.isOpen)
}
}
class BarrierActor(barrier: TestBarrier, id: Int) extends Actor {
def receive = {
case "sync" =>
try {
barrier.await(1.second)
sender() ! s"synchronized-$id"
} catch {
case _: TestBarrierTimeoutException =>
sender() ! s"timeout-$id"
}
}
}
"Actor coordination with TestBarrier" should {
"synchronize multiple actors" in {
val barrier = TestBarrier(2)
val actor1 = system.actorOf(Props(new BarrierActor(barrier, 1)))
val actor2 = system.actorOf(Props(new BarrierActor(barrier, 2)))
// Send sync message to both actors
actor1 ! "sync"
actor2 ! "sync"
// Both should synchronize and respond
expectMsgAnyOf(2.seconds, "synchronized-1", "synchronized-2")
expectMsgAnyOf(2.seconds, "synchronized-1", "synchronized-2")
}
}
}Combining synchronization utilities with TestKit timing controls for precise test coordination.
class TimingSynchronizationExample extends TestKit(ActorSystem("test")) {
"Combined timing and synchronization" should {
"coordinate complex scenarios" in {
val latch = TestLatch(2)
val barrier = TestBarrier(2)
within(3.seconds) {
// Start coordinated operations
Future {
barrier.await(1.second) // Synchronize start
Thread.sleep(500) // Simulate work
latch.countDown() // Signal completion
}
Future {
barrier.await(1.second) // Synchronize start
Thread.sleep(500) // Simulate work
latch.countDown() // Signal completion
}
// Wait for both operations to complete
latch.ready(2.seconds)
}
assert(latch.isOpen)
}
}
}Guidelines for safe usage of synchronization utilities in concurrent test scenarios.
// SAFE: Proper timeout handling
val latch = TestLatch(3)
try {
latch.ready(5.seconds)
// Continue with test
} catch {
case _: TimeoutException =>
// Handle timeout appropriately
fail("Latch did not open within timeout")
}
// SAFE: Proper barrier usage
val barrier = TestBarrier(2)
try {
barrier.await(2.seconds)
// All threads synchronized
} catch {
case _: TestBarrierTimeoutException =>
// Handle barrier timeout
fail("Not all threads reached barrier")
}
// BEST PRACTICE: Reset for reuse
val reusableLatch = TestLatch(2)
// Use latch
reusableLatch.ready(1.second)
// Reset for next test
reusableLatch.reset()
// BEST PRACTICE: Appropriate timeouts
// Use timeouts longer than expected operation time
val conservativeLatch = TestLatch()
conservativeLatch.ready(expectedTime * 2)Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-testkit-2-12