or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

actor-refs.mdconfiguration.mdcore-testing.mddispatchers.mdevent-filtering.mdindex.mdjava-dsl.mdpackage-functions.mdsynchronization.mdtest-actors.mdutilities.md
tile.json

synchronization.mddocs/

Synchronization Utilities

Thread-safe synchronization primitives for coordinating multi-threaded tests and ensuring deterministic test execution across concurrent operations.

Capabilities

TestBarrier Class

Cyclic barrier wrapper for multi-thread test synchronization.

class TestBarrier(count: Int) {
  // Waiting methods
  def await()(implicit system: ActorSystem): Unit
  def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit
  
  // State management
  def reset(): Unit
  def getNumberWaiting: Int
  def getParties: Int
  def isBroken: Boolean
}

object TestBarrier {
  def apply(count: Int): TestBarrier
}

Usage Example:

import akka.testkit.TestBarrier
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// Create barrier for 3 threads
val barrier = TestBarrier(3)

// Start 3 concurrent operations
val futures = (1 to 3).map { i =>
  Future {
    println(s"Thread $i: Starting work")
    Thread.sleep(scala.util.Random.nextInt(1000)) // Simulate work
    
    println(s"Thread $i: Waiting at barrier")
    barrier.await() // All threads wait here
    
    println(s"Thread $i: Continuing after barrier")
    s"Thread $i completed"
  }
}

// All threads will continue together after barrier
val results = Await.result(Future.sequence(futures), 5.seconds)

// Reset barrier for reuse
barrier.reset()

TestLatch Class

CountDownLatch wrapper for test synchronization.

class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
  // Countdown methods
  def countDown(): Unit
  def countDown(delta: Int): Unit
  
  // State checking
  def isOpen: Boolean
  def getCount: Long
  
  // Waiting methods
  def ready(atMost: Duration)(implicit system: ActorSystem): Unit
  def ready()(implicit system: ActorSystem): Unit
  
  // Control methods
  def open(): Unit  // Opens latch completely
  def reset(): Unit // Resets to original count
}

object TestLatch {
  def apply(count: Int = 1)(implicit system: ActorSystem): TestLatch
}

Usage Example:

import akka.testkit.TestLatch
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// Create latch that waits for 2 operations
val latch = TestLatch(2)

// Start first operation
Future {
  Thread.sleep(500)
  println("Operation 1 completed")
  latch.countDown()
}

// Start second operation  
Future {
  Thread.sleep(800)
  println("Operation 2 completed")
  latch.countDown()
}

// Wait for both operations to complete
latch.ready(2.seconds)
println("Both operations completed!")

// Check if latch is open
assert(latch.isOpen)
assert(latch.getCount == 0)

Advanced Synchronization Patterns

Producer-Consumer with TestLatch:

import akka.testkit.{TestKit, TestLatch}
import akka.actor.{Actor, Props}

class Producer(latch: TestLatch) extends Actor {
  def receive = {
    case "produce" =>
      // Do some work
      Thread.sleep(100)
      println("Item produced")
      latch.countDown()
  }
}

class Consumer(latch: TestLatch) extends Actor {
  def receive = {
    case "consume" =>
      latch.ready() // Wait for producer
      println("Item consumed")
  }
}

class ProducerConsumerTest extends TestKit(ActorSystem("TestSystem")) {
  "Producer-Consumer pattern" should {
    "synchronize correctly" in {
      val latch = TestLatch(1)
      
      val producer = system.actorOf(Props(new Producer(latch)))
      val consumer = system.actorOf(Props(new Consumer(latch)))
      
      // Start consumer first (it will wait)
      consumer ! "consume"
      
      // Then producer (which signals completion)
      producer ! "produce"
      
      // Test passes if consumer doesn't block forever
    }
  }
}

Multi-Stage Pipeline with TestBarrier:

import akka.testkit.{TestKit, TestBarrier}
import akka.actor.{Actor, Props}

class PipelineStage(stageId: Int, barrier: TestBarrier) extends Actor {
  def receive = {
    case "process" =>
      println(s"Stage $stageId: Processing...")
      Thread.sleep(scala.util.Random.nextInt(500))
      println(s"Stage $stageId: Done, waiting for others")
      
      barrier.await() // Wait for all stages
      
      println(s"Stage $stageId: All stages complete, continuing")
      sender() ! s"Stage $stageId completed"
  }
}

class PipelineTest extends TestKit(ActorSystem("TestSystem")) {
  "Pipeline stages" should {
    "synchronize at barriers" in {
      val barrier = TestBarrier(3)
      
      val stages = (1 to 3).map { i =>
        system.actorOf(Props(new PipelineStage(i, barrier)))
      }
      
      // Start all stages
      stages.foreach(_ ! "process")
      
      // All should complete around the same time
      receiveN(3, 3.seconds)
    }
  }
}

Integration with TestKit

Synchronization utilities work seamlessly with TestKit:

class SynchronizationIntegrationTest extends TestKit(ActorSystem("TestSystem")) {
  
  "TestLatch with TestKit" should {
    "coordinate actor testing" in {
      val completionLatch = TestLatch(2) 
      
      class WorkerActor extends Actor {
        def receive = {
          case "work" =>
            // Simulate work
            Future {
              Thread.sleep(200)
              completionLatch.countDown()
            }
        }
      }
      
      val worker1 = system.actorOf(Props[WorkerActor]())
      val worker2 = system.actorOf(Props[WorkerActor]())
      
      worker1 ! "work"
      worker2 ! "work"
      
      // Wait for both workers to complete
      completionLatch.ready(1.second)
      
      // Now continue with test assertions
      assert(completionLatch.isOpen)
    }
  }
  
  "TestBarrier with TestKit" should {
    "synchronize multiple test probes" in {
      val barrier = TestBarrier(2)
      val probe1 = TestProbe()
      val probe2 = TestProbe()
      
      // Simulate concurrent operations
      Future {
        probe1.send(testActor, "probe1 ready")
        barrier.await()
        probe1.send(testActor, "probe1 done")
      }
      
      Future {
        probe2.send(testActor, "probe2 ready") 
        barrier.await()
        probe2.send(testActor, "probe2 done")
      }
      
      // Both should send ready first
      expectMsgAllOf("probe1 ready", "probe2 ready")
      
      // Then both should send done (after barrier)
      expectMsgAllOf("probe1 done", "probe2 done")
    }
  }
}

Thread Safety and Best Practices

Thread Safety:

  • Both TestBarrier and TestLatch are thread-safe
  • Can be safely shared between actors and test code
  • Use implicit ActorSystem for timeout handling

Best Practices:

// Good: Specify reasonable timeouts
latch.ready(5.seconds)
barrier.await(3.seconds)

// Good: Check state before waiting
if (!latch.isOpen) {
  latch.ready(1.second)
}

// Good: Reset for reuse in multiple tests
override def beforeEach(): Unit = {
  barrier.reset()
  latch.reset()
}

// Good: Use in try-finally for cleanup
try {
  latch.ready(1.second)
  // test code
} finally {
  latch.reset()
}

Common Patterns:

  1. Wait for Multiple Operations: Use TestLatch with count > 1
  2. Synchronize Phases: Use TestBarrier for multi-step coordination
  3. Signal Completion: Use TestLatch(1) as a simple completion signal
  4. Batch Processing: Use TestBarrier to synchronize batch boundaries
  5. Resource Coordination: Use latches to ensure resources are ready

Error Handling:

// Handle timeout in latch waiting
try {
  latch.ready(1.second)
} catch {
  case _: java.util.concurrent.TimeoutException =>
    fail("Operations did not complete in time")
}

// Check barrier state for debugging
if (barrier.isBroken) {
  fail("Barrier was broken by an exception")
}

// Verify expected state
assert(barrier.getNumberWaiting == 0, "Some threads still waiting at barrier")
assert(latch.getCount == 0, s"Latch still has ${latch.getCount} remaining")