or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-testing.mddeterministic-execution.mdevent-filtering.mdfsm-testing.mdindex.mdsynchronization.mdsynchronous-testing.mdtest-utilities.md
tile.json

synchronization.mddocs/

Synchronization Utilities

Akka TestKit provides synchronization utilities including TestLatch for countdown coordination and TestBarrier for cyclic synchronization, enabling precise coordination of concurrent test scenarios and actor interactions.

Capabilities

TestLatch

Count-down latch for test synchronization with timeout support, useful for coordinating multiple threads or waiting for specific conditions.

/**
 * Count-down latch for test synchronization with timeouts
 * @param count Initial count value (defaults to 1)
 * @param system Implicit ActorSystem for timeout configuration
 */
class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
  
  /**
   * Decrement the latch counter
   * When counter reaches zero, latch opens and waiting threads are released
   */
  def countDown(): Unit
  
  /**
   * Check if latch is open (counter has reached zero)
   * @return True if latch is open, false otherwise
   */
  def isOpen: Boolean
  
  /**
   * Force latch open regardless of current counter value
   * Releases all waiting threads immediately
   */
  def open(): Unit
  
  /**
   * Reset latch to original count value
   * Allows reuse of the same latch instance
   */
  def reset(): Unit
  
  /**
   * Wait for latch to open with timeout
   * @param atMost Maximum time to wait for latch to open
   * @param permit Implicit CanAwait permission for blocking operation
   * @return This TestLatch instance for method chaining
   * @throws TimeoutException if latch doesn't open within timeout
   */
  def ready(atMost: Duration)(implicit permit: CanAwait): TestLatch
}

TestLatch Factory

Factory methods and constants for creating TestLatch instances.

object TestLatch {
  /**
   * Create TestLatch with specified count
   * @param count Initial count value (defaults to 1)
   * @param system Implicit ActorSystem
   * @return New TestLatch instance
   */
  def apply(count: Int = 1)(implicit system: ActorSystem): TestLatch
  
  /**
   * Default timeout for TestLatch operations
   */
  val DefaultTimeout: Duration = 5.seconds
}

TestBarrier

Cyclic barrier for coordinating multiple test threads, where all threads must reach the barrier before any can proceed.

/**
 * Cyclic barrier for coordinating multiple test threads
 * All specified number of threads must reach barrier before any can proceed
 * @param count Number of threads that must reach barrier
 */
class TestBarrier(count: Int) {
  
  /**
   * Wait at barrier using default timeout
   * Blocks until all threads reach the barrier
   * @param system Implicit ActorSystem for timeout configuration
   * @throws TestBarrierTimeoutException if timeout occurs
   */
  def await()(implicit system: ActorSystem): Unit
  
  /**
   * Wait at barrier with specified timeout
   * Blocks until all threads reach the barrier or timeout occurs
   * @param timeout Maximum time to wait at barrier
   * @param system Implicit ActorSystem
   * @throws TestBarrierTimeoutException if timeout occurs
   */
  def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit
  
  /**
   * Reset barrier for reuse
   * Allows the same barrier to coordinate multiple rounds of synchronization
   */
  def reset(): Unit
}

TestBarrier Exception

Exception thrown when barrier operations timeout.

/**
 * Exception thrown when TestBarrier operations timeout
 * @param message Descriptive error message
 */
case class TestBarrierTimeoutException(message: String) extends RuntimeException(message)

Usage Examples:

import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.{TestKit, TestLatch, TestBarrier}
import scala.concurrent.{Future, ExecutionContext}
import scala.concurrent.duration._

class SynchronizationExample extends TestKit(ActorSystem("test")) {
  implicit val ec: ExecutionContext = system.dispatcher
  
  "TestLatch" should {
    "coordinate single countdown" in {
      val latch = TestLatch()
      
      // Start background task
      Future {
        Thread.sleep(100)
        latch.countDown()
      }
      
      // Wait for background task to complete
      latch.ready(1.second)
      assert(latch.isOpen)
    }
    
    "coordinate multiple countdowns" in {
      val latch = TestLatch(3)
      
      // Start multiple background tasks
      (1 to 3).foreach { i =>
        Future {
          Thread.sleep(i * 50)
          latch.countDown()
        }
      }
      
      // Wait for all tasks to complete
      latch.ready(1.second)
      assert(latch.isOpen)
    }
    
    "support reset and reuse" in {
      val latch = TestLatch(2)
      
      // First use
      Future { latch.countDown() }
      Future { latch.countDown() }
      latch.ready(1.second)
      assert(latch.isOpen)
      
      // Reset and reuse
      latch.reset()
      assert(!latch.isOpen)
      
      Future { latch.countDown() }
      Future { latch.countDown() }
      latch.ready(1.second)
      assert(latch.isOpen)
    }
    
    "support manual opening" in {
      val latch = TestLatch(10)
      
      // Force open without waiting for all countdowns
      latch.open()
      assert(latch.isOpen)
      
      // ready() returns immediately
      latch.ready(1.second)
    }
  }
  
  "TestBarrier" should {
    "coordinate multiple threads" in {
      val barrier = TestBarrier(3)
      val results = collection.mutable.ListBuffer[Int]()
      
      // Start multiple threads that must synchronize
      val futures = (1 to 3).map { i =>
        Future {
          // Do some work
          Thread.sleep(i * 30)
          
          // Wait at barrier
          barrier.await(1.second)
          
          // Continue after all threads reach barrier
          results += i
        }
      }
      
      // Wait for all threads to complete
      Future.sequence(futures).ready(2.seconds)
      
      // All threads should have added their results
      assert(results.size == 3)
      assert(results.contains(1))
      assert(results.contains(2))  
      assert(results.contains(3))
    }
    
    "support barrier reset and reuse" in {
      val barrier = TestBarrier(2)
      
      // First round of synchronization
      val round1 = (1 to 2).map { i =>
        Future {
          barrier.await(1.second)
          i
        }
      }
      
      Future.sequence(round1).ready(2.seconds)
      
      // Reset barrier for second round
      barrier.reset()
      
      // Second round of synchronization
      val round2 = (1 to 2).map { i =>
        Future {
          barrier.await(1.second)
          i + 10
        }
      }
      
      Future.sequence(round2).ready(2.seconds)
    }
    
    "timeout when not all threads arrive" in {
      val barrier = TestBarrier(3)
      
      // Only start 2 of 3 required threads
      Future { barrier.await(500.millis) }
      Future { barrier.await(500.millis) }
      
      // Third thread never arrives, should timeout
      intercept[TestBarrierTimeoutException] {
        // This will timeout since only 2/3 threads arrived
        Thread.sleep(1000)
      }
    }
  }
}

Actor Synchronization Patterns

Using synchronization utilities with actors for coordinated testing scenarios.

class ActorSynchronizationExample extends TestKit(ActorSystem("test")) {
  
  class WorkerActor(latch: TestLatch) extends Actor {
    def receive = {
      case "work" =>
        // Simulate work
        Thread.sleep(100)
        sender() ! "done"
        latch.countDown()
    }
  }
  
  "Actor coordination with TestLatch" should {
    "wait for multiple actors to complete work" in {
      val latch = TestLatch(3)
      val workers = (1 to 3).map { i =>
        system.actorOf(Props(new WorkerActor(latch)), s"worker-$i")
      }
      
      // Send work to all workers
      workers.foreach(_ ! "work")
      
      // Wait for all workers to complete
      latch.ready(2.seconds)
      
      // All workers have completed their work
      assert(latch.isOpen)
    }
  }
  
  class BarrierActor(barrier: TestBarrier, id: Int) extends Actor {
    def receive = {
      case "sync" =>
        try {
          barrier.await(1.second)
          sender() ! s"synchronized-$id"
        } catch {
          case _: TestBarrierTimeoutException =>
            sender() ! s"timeout-$id"
        }
    }
  }
  
  "Actor coordination with TestBarrier" should {
    "synchronize multiple actors" in {
      val barrier = TestBarrier(2)
      val actor1 = system.actorOf(Props(new BarrierActor(barrier, 1)))
      val actor2 = system.actorOf(Props(new BarrierActor(barrier, 2)))
      
      // Send sync message to both actors
      actor1 ! "sync"
      actor2 ! "sync"
      
      // Both should synchronize and respond
      expectMsgAnyOf(2.seconds, "synchronized-1", "synchronized-2")
      expectMsgAnyOf(2.seconds, "synchronized-1", "synchronized-2")
    }
  }
}

Integration with TestKit Timing

Combining synchronization utilities with TestKit timing controls for precise test coordination.

class TimingSynchronizationExample extends TestKit(ActorSystem("test")) {
  
  "Combined timing and synchronization" should {
    "coordinate complex scenarios" in {
      val latch = TestLatch(2)
      val barrier = TestBarrier(2)
      
      within(3.seconds) {
        // Start coordinated operations
        Future {
          barrier.await(1.second)  // Synchronize start
          Thread.sleep(500)        // Simulate work
          latch.countDown()        // Signal completion
        }
        
        Future {
          barrier.await(1.second)  // Synchronize start
          Thread.sleep(500)        // Simulate work
          latch.countDown()        // Signal completion
        }
        
        // Wait for both operations to complete
        latch.ready(2.seconds)
      }
      
      assert(latch.isOpen)
    }
  }
}

Thread Safety and Best Practices

Guidelines for safe usage of synchronization utilities in concurrent test scenarios.

// SAFE: Proper timeout handling
val latch = TestLatch(3)
try {
  latch.ready(5.seconds)
  // Continue with test
} catch {
  case _: TimeoutException =>
    // Handle timeout appropriately
    fail("Latch did not open within timeout")
}

// SAFE: Proper barrier usage
val barrier = TestBarrier(2)
try {
  barrier.await(2.seconds)
  // All threads synchronized
} catch {
  case _: TestBarrierTimeoutException =>
    // Handle barrier timeout
    fail("Not all threads reached barrier")
}

// BEST PRACTICE: Reset for reuse
val reusableLatch = TestLatch(2)
// Use latch
reusableLatch.ready(1.second)
// Reset for next test
reusableLatch.reset()

// BEST PRACTICE: Appropriate timeouts
// Use timeouts longer than expected operation time
val conservativeLatch = TestLatch()
conservativeLatch.ready(expectedTime * 2)