or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-testing.mddeterministic-execution.mdevent-filtering.mdfsm-testing.mdindex.mdsynchronization.mdsynchronous-testing.mdtest-utilities.md
tile.json

deterministic-execution.mddocs/

Deterministic Execution Control

Akka TestKit provides specialized dispatchers and schedulers for predictable, deterministic test execution including CallingThreadDispatcher for synchronous message processing and ExplicitlyTriggeredScheduler for manual time control.

Capabilities

CallingThreadDispatcher

Dispatcher that executes all messages on the calling thread for deterministic, single-threaded execution in tests.

/**
 * Dispatcher that executes messages on the calling thread
 * Provides deterministic execution order for testing
 * No new threads are created - all execution happens synchronously
 */
class CallingThreadDispatcher extends MessageDispatcher {
  // Implementation executes messages immediately on current thread
}

/**
 * Dispatcher configurator for CallingThreadDispatcher
 */
class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) 
    extends MessageDispatcherConfigurator {
  
  def dispatcher(): MessageDispatcher = new CallingThreadDispatcher()
}

/**
 * Specialized mailbox for CallingThreadDispatcher
 * Maintains thread-local message queues for deterministic processing
 */
class CallingThreadMailbox extends Mailbox {
  // Thread-local queue implementation
}

CallingThreadDispatcher Configuration

Configuration constants and settings for CallingThreadDispatcher.

object CallingThreadDispatcher {
  /**
   * Configuration ID for CallingThreadDispatcher
   * Use this ID in actor Props or configuration files
   */
  val Id: String = "akka.test.calling-thread-dispatcher"
}

ExplicitlyTriggeredScheduler

Manual scheduler for controlled timing in tests, allowing precise control over when scheduled tasks execute.

/**
 * Manual scheduler for controlled timing in tests
 * Time only advances when explicitly triggered
 * @param config Scheduler configuration
 * @param log Logging adapter
 * @param tf Thread factory (unused in test scheduler)
 */
class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) 
    extends Scheduler {
  
  /**
   * Schedule recurring task with fixed interval
   * Task will not execute until time is advanced
   * @param initialDelay Delay before first execution
   * @param interval Interval between executions
   * @param runnable Task to execute
   * @param executor Execution context for task
   * @return Cancellable for the scheduled task
   */
  def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)
              (implicit executor: ExecutionContext): Cancellable
  
  /**
   * Schedule single task execution
   * Task will not execute until time is advanced
   * @param delay Delay before execution
   * @param runnable Task to execute
   * @param executor Execution context for task
   * @return Cancellable for the scheduled task
   */
  def scheduleOnce(delay: FiniteDuration, runnable: Runnable)
                  (implicit executor: ExecutionContext): Cancellable
  
  /**
   * Manually advance scheduler time
   * Triggers execution of all tasks scheduled for the advanced time period
   * @param amount Duration to advance time
   */
  def timePasses(amount: FiniteDuration): Unit
  
  /**
   * Get current internal scheduler time
   * @return Current time in milliseconds since scheduler creation
   */
  def currentTimeMs: Long
  
  /**
   * Get maximum frequency for recurring tasks
   * @return Maximum frequency (typically very high for test scheduler)
   */
  def maxFrequency: Double
}

Usage Examples:

import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.{TestKit, TestActorRef, CallingThreadDispatcher}
import scala.concurrent.duration._

class DeterministicExecutionExample extends TestKit(ActorSystem("test")) {
  
  class TimingActor extends Actor {
    def receive = {
      case "schedule" =>
        import context.dispatcher
        context.system.scheduler.scheduleOnce(1.second, self, "scheduled")
      case "scheduled" =>
        sender() ! "timer-fired"
      case msg =>
        sender() ! s"received-$msg"
    }
  }
  
  "CallingThreadDispatcher" should {
    "execute messages synchronously" in {
      // Create actor with CallingThreadDispatcher
      val props = Props[TimingActor].withDispatcher(CallingThreadDispatcher.Id)
      val actor = system.actorOf(props)
      
      // Messages are processed immediately and deterministically
      actor ! "test1"
      expectMsg("received-test1")
      
      actor ! "test2"  
      expectMsg("received-test2")
      
      // Order is completely predictable
    }
    
    "work with TestActorRef for complete synchronous testing" in {
      // TestActorRef automatically uses CallingThreadDispatcher
      val actor = TestActorRef[TimingActor]
      
      // Direct synchronous message injection
      actor.receive("direct")
      
      // Can also use normal ! sending
      actor ! "normal"
      expectMsg("received-normal")
    }
  }
  
  "ExplicitlyTriggeredScheduler" should {
    "provide manual time control" in {
      // This example assumes system is configured with ExplicitlyTriggeredScheduler
      // Configuration: akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
      
      val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
      val actor = system.actorOf(Props[TimingActor])
      
      // Schedule something but time won't advance automatically
      actor ! "schedule"
      
      // No message yet because time hasn't advanced
      expectNoMessage(100.millis)
      
      // Manually advance time to trigger scheduled task
      scheduler.timePasses(1.second)
      
      // Now the scheduled message should arrive
      expectMsg("timer-fired")
    }
    
    "control complex timing scenarios" in {
      val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
      import system.dispatcher
      
      var results = List.empty[String]
      
      // Schedule multiple tasks at different times
      scheduler.scheduleOnce(500.millis, () => results = "500ms" :: results)
      scheduler.scheduleOnce(1.second, () => results = "1s" :: results)
      scheduler.scheduleOnce(1.5.seconds, () => results = "1.5s" :: results)
      
      // Advance time incrementally
      scheduler.timePasses(600.millis)
      assert(results == List("500ms"))
      
      scheduler.timePasses(500.millis) // Total: 1.1 seconds
      assert(results == List("1s", "500ms"))
      
      scheduler.timePasses(500.millis) // Total: 1.6 seconds  
      assert(results == List("1.5s", "1s", "500ms"))
    }
  }
}

Configuration for Deterministic Testing

How to configure ActorSystem for deterministic testing with custom dispatchers and schedulers.

import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory

class ConfigurationExample {
  
  // Configuration for deterministic testing
  val testConfig = ConfigFactory.parseString("""
    akka {
      # Use CallingThreadDispatcher as default for deterministic execution
      actor.default-dispatcher = "akka.test.calling-thread-dispatcher"
      
      # Use ExplicitlyTriggeredScheduler for manual time control
      scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
      
      # Disable all built-in actors that might introduce non-determinism
      actor.default-mailbox.mailbox-type = "akka.testkit.CallingThreadMailbox"
      
      test {
        # Test-specific timing settings
        timefactor = 1.0
        filter-leeway = 3s
        single-expect-default = 3s
        default-timeout = 5s
      }
    }
  """)
  
  // Create test ActorSystem with deterministic configuration
  def createTestSystem(name: String = "test"): ActorSystem = {
    ActorSystem(name, testConfig)
  }
}

Advanced Deterministic Patterns

Complex scenarios combining multiple deterministic execution techniques.

class AdvancedDeterministicExample extends TestKit(ActorSystem("test")) {
  
  class ComplexTimingActor extends Actor {
    import context.dispatcher
    
    def receive = {
      case "start" =>
        // Schedule multiple operations with different timing
        context.system.scheduler.scheduleOnce(100.millis, self, "step1")
        context.system.scheduler.scheduleOnce(500.millis, self, "step2")
        context.system.scheduler.scheduleOnce(1.second, self, "step3")
        
      case "step1" =>
        sender() ! "completed-step1"
        context.system.scheduler.scheduleOnce(200.millis, self, "substep1")
        
      case "substep1" =>
        sender() ! "completed-substep1"
        
      case "step2" =>
        sender() ! "completed-step2"
        
      case "step3" =>
        sender() ! "completed-step3"
    }
  }
  
  "Complex deterministic scenario" should {
    "handle nested timing with complete control" in {
      // Assume ExplicitlyTriggeredScheduler is configured
      val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
      val actor = TestActorRef[ComplexTimingActor]
      
      // Start the complex timing scenario
      actor ! "start"
      
      // Manually control each timing step
      scheduler.timePasses(100.millis)
      expectMsg("completed-step1")
      
      scheduler.timePasses(200.millis) // substep1 fires
      expectMsg("completed-substep1")
      
      scheduler.timePasses(200.millis) // step2 fires (total 500ms)
      expectMsg("completed-step2")
      
      scheduler.timePasses(500.millis) // step3 fires (total 1s)
      expectMsg("completed-step3")
    }
  }
}

Thread Safety and Determinism Guarantees

Understanding the thread safety and determinism guarantees provided by these utilities.

class DeterminismGuaranteesExample extends TestKit(ActorSystem("test")) {
  
  "CallingThreadDispatcher guarantees" should {
    "provide single-threaded execution" in {
      val actor = TestActorRef[TimingActor]
      var executionOrder = List.empty[Int]
      
      // All messages processed on same thread in order
      actor.receive("msg1")
      executionOrder = 1 :: executionOrder
      
      actor.receive("msg2")  
      executionOrder = 2 :: executionOrder
      
      actor.receive("msg3")
      executionOrder = 3 :: executionOrder
      
      // Guaranteed order: [3, 2, 1] (reverse due to :: prepending)
      assert(executionOrder == List(3, 2, 1))
    }
    
    "eliminate race conditions" in {
      val actor = TestActorRef[TimingActor]
      
      // No race conditions possible - everything is synchronous
      (1 to 100).foreach { i =>
        actor.receive(s"message-$i")
        // Each message is completely processed before next one starts
      }
    }
  }
  
  "ExplicitlyTriggeredScheduler guarantees" should {
    "provide predictable timing" in {
      val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
      
      var executionTimes = List.empty[Long]
      
      scheduler.scheduleOnce(1.second, () => 
        executionTimes = scheduler.currentTimeMs :: executionTimes)
      scheduler.scheduleOnce(2.seconds, () => 
        executionTimes = scheduler.currentTimeMs :: executionTimes)
      
      // Time doesn't advance until explicitly triggered
      val startTime = scheduler.currentTimeMs
      Thread.sleep(100) // Real time passes but scheduler time doesn't
      assert(scheduler.currentTimeMs == startTime)
      
      // Manual time advancement triggers tasks at exact expected times
      scheduler.timePasses(1.second)
      scheduler.timePasses(1.second)
      
      // Tasks executed at precisely controlled times
      assert(executionTimes.reverse == List(1000, 2000))
    }
  }
}

Integration with TestKit Features

How deterministic execution integrates with other TestKit features for comprehensive testing.

class DeterministicIntegrationExample extends TestKit(ActorSystem("test")) {
  
  "Deterministic execution with TestKit features" should {
    "work with message expectations" in {
      val actor = TestActorRef[TimingActor]
      
      // Synchronous execution makes message expectations predictable
      actor ! "test"
      expectMsg("received-test") // Message already processed synchronously
    }
    
    "work with timing assertions" in {
      val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
      val actor = system.actorOf(Props[TimingActor])
      
      within(0.seconds) { // Can use zero timeout with manual time control
        actor ! "schedule"
        scheduler.timePasses(1.second)
        expectMsg("timer-fired")
      }
    }
    
    "work with TestProbe" in {
      val probe = TestProbe()
      val actor = TestActorRef[TimingActor]
      
      // Deterministic interaction between actors
      probe.send(actor, "test")
      probe.expectMsg("received-test")
    }
  }
}