Akka TestKit provides synchronization utilities including TestLatch for countdown coordination and TestBarrier for cyclic synchronization, enabling precise coordination of concurrent test scenarios and actor interactions.
Count-down latch for test synchronization with timeout support, useful for coordinating multiple threads or waiting for specific conditions.
/**
* Count-down latch for test synchronization with timeouts
* @param count Initial count value (defaults to 1)
* @param system Implicit ActorSystem for timeout configuration
*/
class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
/**
* Decrement the latch counter
* When counter reaches zero, latch opens and waiting threads are released
*/
def countDown(): Unit
/**
* Check if latch is open (counter has reached zero)
* @return True if latch is open, false otherwise
*/
def isOpen: Boolean
/**
* Force latch open regardless of current counter value
* Releases all waiting threads immediately
*/
def open(): Unit
/**
* Reset latch to original count value
* Allows reuse of the same latch instance
*/
def reset(): Unit
/**
* Wait for latch to open with timeout
* @param atMost Maximum time to wait for latch to open
* @param permit Implicit CanAwait permission for blocking operation
* @return This TestLatch instance for method chaining
* @throws TimeoutException if latch doesn't open within timeout
*/
def ready(atMost: Duration)(implicit permit: CanAwait): TestLatch
}Factory methods and constants for creating TestLatch instances.
object TestLatch {
/**
* Create TestLatch with specified count
* @param count Initial count value (defaults to 1)
* @param system Implicit ActorSystem
* @return New TestLatch instance
*/
def apply(count: Int = 1)(implicit system: ActorSystem): TestLatch
/**
* Default timeout for TestLatch operations
*/
val DefaultTimeout: Duration = 5.seconds
}Cyclic barrier for coordinating multiple test threads, where all threads must reach the barrier before any can proceed.
/**
* Cyclic barrier for coordinating multiple test threads
* All specified number of threads must reach barrier before any can proceed
* @param count Number of threads that must reach barrier
*/
class TestBarrier(count: Int) {
/**
* Wait at barrier using default timeout
* Blocks until all threads reach the barrier
* @param system Implicit ActorSystem for timeout configuration
* @throws TestBarrierTimeoutException if timeout occurs
*/
def await()(implicit system: ActorSystem): Unit
/**
* Wait at barrier with specified timeout
* Blocks until all threads reach the barrier or timeout occurs
* @param timeout Maximum time to wait at barrier
* @param system Implicit ActorSystem
* @throws TestBarrierTimeoutException if timeout occurs
*/
def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit
/**
* Reset barrier for reuse
* Allows the same barrier to coordinate multiple rounds of synchronization
*/
def reset(): Unit
}Exception thrown when barrier operations timeout.
/**
* Exception thrown when TestBarrier operations timeout
* @param message Descriptive error message
*/
case class TestBarrierTimeoutException(message: String) extends RuntimeException(message)Usage Examples:
import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.{TestKit, TestLatch, TestBarrier}
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.duration._
class SynchronizationExample extends TestKit(ActorSystem("test")) {
implicit val ec: ExecutionContext = system.dispatcher
"TestLatch" should {
"coordinate single countdown" in {
val latch = TestLatch()
// Start background task
Future {
Thread.sleep(100)
latch.countDown()
}
// Wait for background task to complete
latch.ready(1.second)
assert(latch.isOpen)
}
"coordinate multiple countdowns" in {
val latch = TestLatch(3)
// Start multiple background tasks
(1 to 3).foreach { i =>
Future {
Thread.sleep(i * 50)
latch.countDown()
}
}
// Wait for all tasks to complete
latch.ready(1.second)
assert(latch.isOpen)
}
"support reset and reuse" in {
val latch = TestLatch(2)
// First use
Future { latch.countDown() }
Future { latch.countDown() }
latch.ready(1.second)
assert(latch.isOpen)
// Reset and reuse
latch.reset()
assert(!latch.isOpen)
Future { latch.countDown() }
Future { latch.countDown() }
latch.ready(1.second)
assert(latch.isOpen)
}
"support manual opening" in {
val latch = TestLatch(10)
// Force open without waiting for all countdowns
latch.open()
assert(latch.isOpen)
// ready() returns immediately
latch.ready(1.second)
}
}
"TestBarrier" should {
"coordinate multiple threads" in {
val barrier = TestBarrier(3)
val results = collection.mutable.ListBuffer[Int]()
// Start multiple threads that must synchronize
val futures = (1 to 3).map { i =>
Future {
// Do some work
Thread.sleep(i * 30)
// Wait at barrier
barrier.await(1.second)
// Continue after all threads reach barrier
results += i
}
}
// Wait for all threads to complete
Future.sequence(futures).ready(2.seconds)
// All threads should have added their results
assert(results.size == 3)
assert(results.contains(1))
assert(results.contains(2))
assert(results.contains(3))
}
"support barrier reset and reuse" in {
val barrier = TestBarrier(2)
// First round of synchronization
val round1 = (1 to 2).map { i =>
Future {
barrier.await(1.second)
i
}
}
Future.sequence(round1).ready(2.seconds)
// Reset barrier for second round
barrier.reset()
// Second round of synchronization
val round2 = (1 to 2).map { i =>
Future {
barrier.await(1.second)
i + 10
}
}
Future.sequence(round2).ready(2.seconds)
}
"timeout when not all threads arrive" in {
val barrier = TestBarrier(3)
// Only start 2 of 3 required threads
Future { barrier.await(500.millis) }
Future { barrier.await(500.millis) }
// Third thread never arrives, should timeout
intercept[TestBarrierTimeoutException] {
// This will timeout since only 2/3 threads arrived
Thread.sleep(1000)
}
}
}
}Using synchronization utilities with actors for coordinated testing scenarios.
class ActorSynchronizationExample extends TestKit(ActorSystem("test")) {
class WorkerActor(latch: TestLatch) extends Actor {
def receive = {
case "work" =>
// Simulate work
Thread.sleep(100)
sender() ! "done"
latch.countDown()
}
}
"Actor coordination with TestLatch" should {
"wait for multiple actors to complete work" in {
val latch = TestLatch(3)
val workers = (1 to 3).map { i =>
system.actorOf(Props(new WorkerActor(latch)), s"worker-$i")
}
// Send work to all workers
workers.foreach(_ ! "work")
// Wait for all workers to complete
latch.ready(2.seconds)
// All workers have completed their work
assert(latch.isOpen)
}
}
class BarrierActor(barrier: TestBarrier, id: Int) extends Actor {
def receive = {
case "sync" =>
try {
barrier.await(1.second)
sender() ! s"synchronized-$id"
} catch {
case _: TestBarrierTimeoutException =>
sender() ! s"timeout-$id"
}
}
}
"Actor coordination with TestBarrier" should {
"synchronize multiple actors" in {
val barrier = TestBarrier(2)
val actor1 = system.actorOf(Props(new BarrierActor(barrier, 1)))
val actor2 = system.actorOf(Props(new BarrierActor(barrier, 2)))
// Send sync message to both actors
actor1 ! "sync"
actor2 ! "sync"
// Both should synchronize and respond
expectMsgAnyOf(2.seconds, "synchronized-1", "synchronized-2")
expectMsgAnyOf(2.seconds, "synchronized-1", "synchronized-2")
}
}
}Combining synchronization utilities with TestKit timing controls for precise test coordination.
class TimingSynchronizationExample extends TestKit(ActorSystem("test")) {
"Combined timing and synchronization" should {
"coordinate complex scenarios" in {
val latch = TestLatch(2)
val barrier = TestBarrier(2)
within(3.seconds) {
// Start coordinated operations
Future {
barrier.await(1.second) // Synchronize start
Thread.sleep(500) // Simulate work
latch.countDown() // Signal completion
}
Future {
barrier.await(1.second) // Synchronize start
Thread.sleep(500) // Simulate work
latch.countDown() // Signal completion
}
// Wait for both operations to complete
latch.ready(2.seconds)
}
assert(latch.isOpen)
}
}
}Guidelines for safe usage of synchronization utilities in concurrent test scenarios.
// SAFE: Proper timeout handling
val latch = TestLatch(3)
try {
latch.ready(5.seconds)
// Continue with test
} catch {
case _: TimeoutException =>
// Handle timeout appropriately
fail("Latch did not open within timeout")
}
// SAFE: Proper barrier usage
val barrier = TestBarrier(2)
try {
barrier.await(2.seconds)
// All threads synchronized
} catch {
case _: TestBarrierTimeoutException =>
// Handle barrier timeout
fail("Not all threads reached barrier")
}
// BEST PRACTICE: Reset for reuse
val reusableLatch = TestLatch(2)
// Use latch
reusableLatch.ready(1.second)
// Reset for next test
reusableLatch.reset()
// BEST PRACTICE: Appropriate timeouts
// Use timeouts longer than expected operation time
val conservativeLatch = TestLatch()
conservativeLatch.ready(expectedTime * 2)