or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

actor-references.mdcore-testing.mddeterministic-execution.mdevent-filtering.mdindex.mdjava-api.mdtest-utilities.mdutilities-config.md
tile.json

deterministic-execution.mddocs/

Deterministic Execution Control

Akka TestKit provides specialized dispatchers and schedulers that enable deterministic, predictable test execution. These components eliminate timing-related test flakiness by providing complete control over message processing and scheduling.

CallingThreadDispatcher

CallingThreadDispatcher Class { .api }

class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) extends MessageDispatcher

A dispatcher that executes all tasks on the calling thread instead of using a thread pool. This provides deterministic, synchronous execution for testing.

CallingThreadDispatcher Object { .api }

object CallingThreadDispatcher {
  val Id: String = "akka.test.calling-thread-dispatcher"
}

CallingThreadDispatcherConfigurator { .api }

class CallingThreadDispatcherConfigurator(
  config: Config, 
  prerequisites: DispatcherPrerequisites
) extends MessageDispatcherConfigurator

CallingThreadMailbox { .api }

class CallingThreadMailbox(
  _receiver: akka.actor.Cell, 
  mailboxType: MailboxType
) extends Mailbox

Configuration

To use CallingThreadDispatcher, configure it in your test configuration:

akka.test.calling-thread-dispatcher {
  type = akka.testkit.CallingThreadDispatcherConfigurator
}

Usage Examples

Basic CallingThreadDispatcher Usage

import akka.testkit.CallingThreadDispatcher

class TestActor extends Actor {
  def receive = {
    case "work" => 
      // This will execute on the calling thread
      sender() ! "done"
  }
}

// Create actor with CallingThreadDispatcher
val actor = system.actorOf(
  Props[TestActor].withDispatcher(CallingThreadDispatcher.Id),
  "test-actor"
)

// Message processing happens synchronously
actor ! "work"
expectMsg("done")  // Response is immediate and deterministic

Testing with Multiple Actors

class CounterActor extends Actor {
  private var count = 0
  
  def receive = {
    case "increment" => 
      count += 1
      sender() ! count
    case "get" => 
      sender() ! count
  }
}

// Create multiple actors with CallingThreadDispatcher
val actors = (1 to 3).map { i =>
  system.actorOf(
    Props[CounterActor].withDispatcher(CallingThreadDispatcher.Id),
    s"counter-$i"
  )
}

// All processing is deterministic and ordered
actors(0) ! "increment"
expectMsg(1)

actors(1) ! "increment" 
expectMsg(1)

actors(0) ! "increment"
expectMsg(2)

// Results are predictable because execution is synchronous
actors(0) ! "get"
expectMsg(2)

actors(1) ! "get"  
expectMsg(1)

Testing Actor Interactions

class MasterActor extends Actor {
  var workers = List.empty[ActorRef]
  var responses = 0
  
  def receive = {
    case "add-worker" =>
      workers = sender() :: workers
      
    case "broadcast" =>
      workers.foreach(_ ! "work")
      
    case "work-done" =>
      responses += 1
      if (responses == workers.length) {
        context.parent ! "all-done"
        responses = 0
      }
  }
}

class WorkerActor extends Actor {
  def receive = {
    case "work" =>
      sender() ! "work-done"
  }
}

// Create actors with deterministic dispatcher
val master = system.actorOf(
  Props[MasterActor].withDispatcher(CallingThreadDispatcher.Id),
  "master"
)

val workers = (1 to 3).map { i =>
  system.actorOf(
    Props[WorkerActor].withDispatcher(CallingThreadDispatcher.Id),
    s"worker-$i"
  )
}

// Register workers
workers.foreach { worker =>
  master.tell("add-worker", worker)
}

// Broadcast work - all processing is synchronous and predictable
master ! "broadcast"
expectMsg("all-done")  // Deterministic completion

Configuration-based Usage

// In application.conf for tests
akka {
  actor {
    default-dispatcher {
      type = akka.testkit.CallingThreadDispatcherConfigurator
    }
  }
}

// All actors will use CallingThreadDispatcher by default
val actor = system.actorOf(Props[MyActor])
// Deterministic execution without explicit dispatcher configuration

ExplicitlyTriggeredScheduler

ExplicitlyTriggeredScheduler Class { .api }

class ExplicitlyTriggeredScheduler(
  config: Config,
  log: LoggingAdapter, 
  tf: ThreadFactory
) extends Scheduler {
  
  def timePasses(amount: FiniteDuration): Unit
  def schedule(
    initialDelay: FiniteDuration,
    interval: FiniteDuration, 
    runnable: Runnable
  )(implicit executor: ExecutionContext): Cancellable
  def scheduleOnce(
    delay: FiniteDuration,
    runnable: Runnable
  )(implicit executor: ExecutionContext): Cancellable
  def maxFrequency: Double
}

A scheduler that doesn't automatically progress time - time must be manually advanced using timePasses(). This allows complete control over timing in tests.

Configuration

Configure ExplicitlyTriggeredScheduler in test configuration:

akka {
  scheduler {
    implementation = akka.testkit.ExplicitlyTriggeredScheduler
  }
}

Usage Examples

Basic Scheduler Control

import akka.testkit.ExplicitlyTriggeredScheduler
import scala.concurrent.duration._

// Get scheduler from system (must be configured as ExplicitlyTriggeredScheduler)
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

class TimedActor extends Actor {
  import context.dispatcher
  
  override def preStart(): Unit = {
    // Schedule a message to self in 5 seconds
    context.system.scheduler.scheduleOnce(5.seconds, self, "timeout")
  }
  
  def receive = {
    case "start" => 
      sender() ! "started"
    case "timeout" =>
      sender() ! "timed-out"
  }
}

val actor = system.actorOf(Props[TimedActor])
actor ! "start"
expectMsg("started")

// No timeout message yet - time hasn't passed
expectNoMessage(100.millis)

// Manually advance time by 5 seconds
scheduler.timePasses(5.seconds)

// Now timeout message is delivered
expectMsg("timed-out")

Testing Periodic Scheduling

class HeartbeatActor extends Actor {
  import context.dispatcher
  
  override def preStart(): Unit = {
    // Send heartbeat every 2 seconds
    context.system.scheduler.schedule(
      2.seconds, 
      2.seconds,
      context.parent,
      "heartbeat"
    )
  }
  
  def receive = Actor.emptyBehavior
}

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
val heartbeat = system.actorOf(Props[HeartbeatActor])

// No heartbeats yet
expectNoMessage(100.millis)

// Advance past first delay
scheduler.timePasses(2.seconds)
expectMsg("heartbeat")

// Advance to next interval
scheduler.timePasses(2.seconds)  
expectMsg("heartbeat")

// Skip ahead multiple intervals
scheduler.timePasses(6.seconds)  // 3 more intervals
expectMsg("heartbeat")
expectMsg("heartbeat") 
expectMsg("heartbeat")

Testing Timeout Behavior

class TimeoutActor extends Actor {
  import context.dispatcher
  
  private var timeoutHandle: Option[Cancellable] = None
  
  def receive = {
    case "start-timer" =>
      timeoutHandle = Some(
        context.system.scheduler.scheduleOnce(3.seconds, self, "timeout")
      )
      sender() ! "timer-started"
      
    case "cancel-timer" =>
      timeoutHandle.foreach(_.cancel())
      timeoutHandle = None
      sender() ! "timer-cancelled"
      
    case "timeout" =>
      sender() ! "timeout-occurred"
      timeoutHandle = None
  }
}

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
val actor = system.actorOf(Props[TimeoutActor])

// Start timer
actor ! "start-timer"
expectMsg("timer-started")

// Advance time but not enough for timeout
scheduler.timePasses(2.seconds)
expectNoMessage(100.millis)

// Cancel timer before timeout
actor ! "cancel-timer"
expectMsg("timer-cancelled")

// Advance past original timeout - no message should come
scheduler.timePasses(2.seconds)
expectNoMessage(100.millis)

// Test timeout actually occurring
actor ! "start-timer"
expectMsg("timer-started")

scheduler.timePasses(3.seconds)
expectMsg("timeout-occurred")

Testing Complex Timing Scenarios

class BatchProcessor extends Actor {
  import context.dispatcher
  
  private var batch = List.empty[String]
  private var flushHandle: Option[Cancellable] = None
  
  def receive = {
    case item: String =>
      batch = item :: batch
      
      // Reset flush timer on each new item
      flushHandle.foreach(_.cancel())
      flushHandle = Some(
        context.system.scheduler.scheduleOnce(1.second, self, "flush")
      )
      
    case "flush" =>
      if (batch.nonEmpty) {
        context.parent ! s"batch: ${batch.reverse.mkString(",")}"
        batch = List.empty
      }
      flushHandle = None
      
    case "force-flush" =>
      flushHandle.foreach(_.cancel())
      self ! "flush"
  }
}

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
val processor = system.actorOf(Props[BatchProcessor])

// Add items to batch
processor ! "item1"
processor ! "item2"
processor ! "item3"

// No flush yet
expectNoMessage(100.millis)

// Advance time to trigger flush
scheduler.timePasses(1.second)
expectMsg("batch: item1,item2,item3")

// Test timer reset behavior
processor ! "item4"
scheduler.timePasses(500.millis)  // Half timeout
processor ! "item5"  // Resets timer

scheduler.timePasses(500.millis)  // Still within new timeout
expectNoMessage(100.millis)

scheduler.timePasses(500.millis)  // Now past timeout
expectMsg("batch: item4,item5")

Combining Deterministic Components

Complete Deterministic Test Environment

class DeterministicTestSystem extends TestKit(ActorSystem("TestSystem", ConfigFactory.parseString(
  """
  akka {
    actor {
      default-dispatcher {
        type = akka.testkit.CallingThreadDispatcherConfigurator
      }
    }
    scheduler {
      implementation = akka.testkit.ExplicitlyTriggeredScheduler
    }
  }
  """
))) with ImplicitSender {
  
  val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
  
  def advanceTime(duration: FiniteDuration): Unit = {
    scheduler.timePasses(duration)
  }
}

class ComplexActor extends Actor {
  import context.dispatcher
  
  private var state = "idle"
  private var workCount = 0
  
  override def preStart(): Unit = {
    // Schedule periodic status reports
    context.system.scheduler.schedule(5.seconds, 5.seconds, self, "report-status")
  }
  
  def receive = {
    case "start-work" =>
      state = "working"
      // Simulate async work completion
      context.system.scheduler.scheduleOnce(2.seconds, self, "work-complete")
      sender() ! "work-started"
      
    case "work-complete" =>
      workCount += 1
      state = "idle"
      context.parent ! s"work-completed-$workCount"
      
    case "report-status" =>
      context.parent ! s"status: $state, completed: $workCount"
  }
}

// Usage in test
class ComplexActorSpec extends DeterministicTestSystem {
  "ComplexActor" should {
    "handle work and reporting deterministically" in {
      val actor = system.actorOf(Props[ComplexActor])
      
      // Start work
      actor ! "start-work"
      expectMsg("work-started")
      
      // No completion yet
      expectNoMessage(100.millis)
      
      // Advance time to complete work
      advanceTime(2.seconds)
      expectMsg("work-completed-1")
      
      // Advance to first status report
      advanceTime(3.seconds)  // Total 5 seconds from start
      expectMsg("status: idle, completed: 1")
      
      // Start more work
      actor ! "start-work"
      expectMsg("work-started")
      
      // Advance time through work completion and next status report
      advanceTime(5.seconds)
      expectMsg("work-completed-2")
      expectMsg("status: idle, completed: 2")
    }
  }
}

Best Practices

CallingThreadDispatcher Best Practices

  1. Use for unit tests: CallingThreadDispatcher is ideal for testing individual actor logic
  2. Avoid for integration tests: May hide concurrency issues that occur in production
  3. Configure consistently: Use either globally or per-actor, not mixed
  4. Test both sync and async: Also test with real dispatchers to catch concurrency issues
// Good: Consistent usage for deterministic unit testing
val actor = system.actorOf(
  Props[MyActor].withDispatcher(CallingThreadDispatcher.Id)
)

// Also good: Global configuration for test suite
akka.actor.default-dispatcher.type = akka.testkit.CallingThreadDispatcherConfigurator

ExplicitlyTriggeredScheduler Best Practices

  1. Advance time methodically: Use small, controlled time advances
  2. Test edge cases: Test timer cancellation, overlapping timers
  3. Verify no unexpected scheduling: Check for unexpected scheduled tasks
  4. Document time flow: Comment time advances in complex tests
// Good: Clear time progression
scheduler.timePasses(1.second)   // Initial delay
expectMsg("first-event")

scheduler.timePasses(2.seconds)  // Regular interval  
expectMsg("second-event")

scheduler.timePasses(2.seconds)  // Another interval
expectMsg("third-event")

Combined Usage Best Practices

  1. Start simple: Begin with just one deterministic component
  2. Test incrementally: Add complexity gradually
  3. Verify assumptions: Ensure deterministic behavior is actually achieved
  4. Document setup: Clearly document deterministic test environment setup
// Good: Well-documented deterministic test setup
class MyDeterministicSpec extends TestKit(ActorSystem("test", 
  ConfigFactory.parseString("""
    # Deterministic execution for predictable testing
    akka.actor.default-dispatcher.type = akka.testkit.CallingThreadDispatcherConfigurator
    akka.scheduler.implementation = akka.testkit.ExplicitlyTriggeredScheduler
  """)
)) {
  
  val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
  
  // Helper for controlled time advancement
  def tick(duration: FiniteDuration = 1.second): Unit = {
    scheduler.timePasses(duration)
  }
}