Thread-safe synchronization primitives for coordinating multi-threaded tests and ensuring deterministic test execution across concurrent operations.
Cyclic barrier wrapper for multi-thread test synchronization.
class TestBarrier(count: Int) {
// Waiting methods
def await()(implicit system: ActorSystem): Unit
def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit
// State management
def reset(): Unit
def getNumberWaiting: Int
def getParties: Int
def isBroken: Boolean
}
object TestBarrier {
def apply(count: Int): TestBarrier
}Usage Example:
import akka.testkit.TestBarrier
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// Create barrier for 3 threads
val barrier = TestBarrier(3)
// Start 3 concurrent operations
val futures = (1 to 3).map { i =>
Future {
println(s"Thread $i: Starting work")
Thread.sleep(scala.util.Random.nextInt(1000)) // Simulate work
println(s"Thread $i: Waiting at barrier")
barrier.await() // All threads wait here
println(s"Thread $i: Continuing after barrier")
s"Thread $i completed"
}
}
// All threads will continue together after barrier
val results = Await.result(Future.sequence(futures), 5.seconds)
// Reset barrier for reuse
barrier.reset()CountDownLatch wrapper for test synchronization.
class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
// Countdown methods
def countDown(): Unit
def countDown(delta: Int): Unit
// State checking
def isOpen: Boolean
def getCount: Long
// Waiting methods
def ready(atMost: Duration)(implicit system: ActorSystem): Unit
def ready()(implicit system: ActorSystem): Unit
// Control methods
def open(): Unit // Opens latch completely
def reset(): Unit // Resets to original count
}
object TestLatch {
def apply(count: Int = 1)(implicit system: ActorSystem): TestLatch
}Usage Example:
import akka.testkit.TestLatch
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// Create latch that waits for 2 operations
val latch = TestLatch(2)
// Start first operation
Future {
Thread.sleep(500)
println("Operation 1 completed")
latch.countDown()
}
// Start second operation
Future {
Thread.sleep(800)
println("Operation 2 completed")
latch.countDown()
}
// Wait for both operations to complete
latch.ready(2.seconds)
println("Both operations completed!")
// Check if latch is open
assert(latch.isOpen)
assert(latch.getCount == 0)Producer-Consumer with TestLatch:
import akka.testkit.{TestKit, TestLatch}
import akka.actor.{Actor, Props}
class Producer(latch: TestLatch) extends Actor {
def receive = {
case "produce" =>
// Do some work
Thread.sleep(100)
println("Item produced")
latch.countDown()
}
}
class Consumer(latch: TestLatch) extends Actor {
def receive = {
case "consume" =>
latch.ready() // Wait for producer
println("Item consumed")
}
}
class ProducerConsumerTest extends TestKit(ActorSystem("TestSystem")) {
"Producer-Consumer pattern" should {
"synchronize correctly" in {
val latch = TestLatch(1)
val producer = system.actorOf(Props(new Producer(latch)))
val consumer = system.actorOf(Props(new Consumer(latch)))
// Start consumer first (it will wait)
consumer ! "consume"
// Then producer (which signals completion)
producer ! "produce"
// Test passes if consumer doesn't block forever
}
}
}Multi-Stage Pipeline with TestBarrier:
import akka.testkit.{TestKit, TestBarrier}
import akka.actor.{Actor, Props}
class PipelineStage(stageId: Int, barrier: TestBarrier) extends Actor {
def receive = {
case "process" =>
println(s"Stage $stageId: Processing...")
Thread.sleep(scala.util.Random.nextInt(500))
println(s"Stage $stageId: Done, waiting for others")
barrier.await() // Wait for all stages
println(s"Stage $stageId: All stages complete, continuing")
sender() ! s"Stage $stageId completed"
}
}
class PipelineTest extends TestKit(ActorSystem("TestSystem")) {
"Pipeline stages" should {
"synchronize at barriers" in {
val barrier = TestBarrier(3)
val stages = (1 to 3).map { i =>
system.actorOf(Props(new PipelineStage(i, barrier)))
}
// Start all stages
stages.foreach(_ ! "process")
// All should complete around the same time
receiveN(3, 3.seconds)
}
}
}Synchronization utilities work seamlessly with TestKit:
class SynchronizationIntegrationTest extends TestKit(ActorSystem("TestSystem")) {
"TestLatch with TestKit" should {
"coordinate actor testing" in {
val completionLatch = TestLatch(2)
class WorkerActor extends Actor {
def receive = {
case "work" =>
// Simulate work
Future {
Thread.sleep(200)
completionLatch.countDown()
}
}
}
val worker1 = system.actorOf(Props[WorkerActor]())
val worker2 = system.actorOf(Props[WorkerActor]())
worker1 ! "work"
worker2 ! "work"
// Wait for both workers to complete
completionLatch.ready(1.second)
// Now continue with test assertions
assert(completionLatch.isOpen)
}
}
"TestBarrier with TestKit" should {
"synchronize multiple test probes" in {
val barrier = TestBarrier(2)
val probe1 = TestProbe()
val probe2 = TestProbe()
// Simulate concurrent operations
Future {
probe1.send(testActor, "probe1 ready")
barrier.await()
probe1.send(testActor, "probe1 done")
}
Future {
probe2.send(testActor, "probe2 ready")
barrier.await()
probe2.send(testActor, "probe2 done")
}
// Both should send ready first
expectMsgAllOf("probe1 ready", "probe2 ready")
// Then both should send done (after barrier)
expectMsgAllOf("probe1 done", "probe2 done")
}
}
}Thread Safety:
Best Practices:
// Good: Specify reasonable timeouts
latch.ready(5.seconds)
barrier.await(3.seconds)
// Good: Check state before waiting
if (!latch.isOpen) {
latch.ready(1.second)
}
// Good: Reset for reuse in multiple tests
override def beforeEach(): Unit = {
barrier.reset()
latch.reset()
}
// Good: Use in try-finally for cleanup
try {
latch.ready(1.second)
// test code
} finally {
latch.reset()
}Common Patterns:
Error Handling:
// Handle timeout in latch waiting
try {
latch.ready(1.second)
} catch {
case _: java.util.concurrent.TimeoutException =>
fail("Operations did not complete in time")
}
// Check barrier state for debugging
if (barrier.isBroken) {
fail("Barrier was broken by an exception")
}
// Verify expected state
assert(barrier.getNumberWaiting == 0, "Some threads still waiting at barrier")
assert(latch.getCount == 0, s"Latch still has ${latch.getCount} remaining")