Akka TestKit provides specialized dispatchers and schedulers for predictable, deterministic test execution including CallingThreadDispatcher for synchronous message processing and ExplicitlyTriggeredScheduler for manual time control.
Dispatcher that executes all messages on the calling thread for deterministic, single-threaded execution in tests.
/**
* Dispatcher that executes messages on the calling thread
* Provides deterministic execution order for testing
* No new threads are created - all execution happens synchronously
*/
class CallingThreadDispatcher extends MessageDispatcher {
// Implementation executes messages immediately on current thread
}
/**
* Dispatcher configurator for CallingThreadDispatcher
*/
class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator {
def dispatcher(): MessageDispatcher = new CallingThreadDispatcher()
}
/**
* Specialized mailbox for CallingThreadDispatcher
* Maintains thread-local message queues for deterministic processing
*/
class CallingThreadMailbox extends Mailbox {
// Thread-local queue implementation
}Configuration constants and settings for CallingThreadDispatcher.
object CallingThreadDispatcher {
/**
* Configuration ID for CallingThreadDispatcher
* Use this ID in actor Props or configuration files
*/
val Id: String = "akka.test.calling-thread-dispatcher"
}Manual scheduler for controlled timing in tests, allowing precise control over when scheduled tasks execute.
/**
* Manual scheduler for controlled timing in tests
* Time only advances when explicitly triggered
* @param config Scheduler configuration
* @param log Logging adapter
* @param tf Thread factory (unused in test scheduler)
*/
class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory)
extends Scheduler {
/**
* Schedule recurring task with fixed interval
* Task will not execute until time is advanced
* @param initialDelay Delay before first execution
* @param interval Interval between executions
* @param runnable Task to execute
* @param executor Execution context for task
* @return Cancellable for the scheduled task
*/
def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)
(implicit executor: ExecutionContext): Cancellable
/**
* Schedule single task execution
* Task will not execute until time is advanced
* @param delay Delay before execution
* @param runnable Task to execute
* @param executor Execution context for task
* @return Cancellable for the scheduled task
*/
def scheduleOnce(delay: FiniteDuration, runnable: Runnable)
(implicit executor: ExecutionContext): Cancellable
/**
* Manually advance scheduler time
* Triggers execution of all tasks scheduled for the advanced time period
* @param amount Duration to advance time
*/
def timePasses(amount: FiniteDuration): Unit
/**
* Get current internal scheduler time
* @return Current time in milliseconds since scheduler creation
*/
def currentTimeMs: Long
/**
* Get maximum frequency for recurring tasks
* @return Maximum frequency (typically very high for test scheduler)
*/
def maxFrequency: Double
}Usage Examples:
import akka.actor.{Actor, ActorSystem, Props}
import akka.testkit.{TestKit, TestActorRef, CallingThreadDispatcher}
import scala.concurrent.duration._
class DeterministicExecutionExample extends TestKit(ActorSystem("test")) {
class TimingActor extends Actor {
def receive = {
case "schedule" =>
import context.dispatcher
context.system.scheduler.scheduleOnce(1.second, self, "scheduled")
case "scheduled" =>
sender() ! "timer-fired"
case msg =>
sender() ! s"received-$msg"
}
}
"CallingThreadDispatcher" should {
"execute messages synchronously" in {
// Create actor with CallingThreadDispatcher
val props = Props[TimingActor].withDispatcher(CallingThreadDispatcher.Id)
val actor = system.actorOf(props)
// Messages are processed immediately and deterministically
actor ! "test1"
expectMsg("received-test1")
actor ! "test2"
expectMsg("received-test2")
// Order is completely predictable
}
"work with TestActorRef for complete synchronous testing" in {
// TestActorRef automatically uses CallingThreadDispatcher
val actor = TestActorRef[TimingActor]
// Direct synchronous message injection
actor.receive("direct")
// Can also use normal ! sending
actor ! "normal"
expectMsg("received-normal")
}
}
"ExplicitlyTriggeredScheduler" should {
"provide manual time control" in {
// This example assumes system is configured with ExplicitlyTriggeredScheduler
// Configuration: akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
val actor = system.actorOf(Props[TimingActor])
// Schedule something but time won't advance automatically
actor ! "schedule"
// No message yet because time hasn't advanced
expectNoMessage(100.millis)
// Manually advance time to trigger scheduled task
scheduler.timePasses(1.second)
// Now the scheduled message should arrive
expectMsg("timer-fired")
}
"control complex timing scenarios" in {
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
import system.dispatcher
var results = List.empty[String]
// Schedule multiple tasks at different times
scheduler.scheduleOnce(500.millis, () => results = "500ms" :: results)
scheduler.scheduleOnce(1.second, () => results = "1s" :: results)
scheduler.scheduleOnce(1.5.seconds, () => results = "1.5s" :: results)
// Advance time incrementally
scheduler.timePasses(600.millis)
assert(results == List("500ms"))
scheduler.timePasses(500.millis) // Total: 1.1 seconds
assert(results == List("1s", "500ms"))
scheduler.timePasses(500.millis) // Total: 1.6 seconds
assert(results == List("1.5s", "1s", "500ms"))
}
}
}How to configure ActorSystem for deterministic testing with custom dispatchers and schedulers.
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
class ConfigurationExample {
// Configuration for deterministic testing
val testConfig = ConfigFactory.parseString("""
akka {
# Use CallingThreadDispatcher as default for deterministic execution
actor.default-dispatcher = "akka.test.calling-thread-dispatcher"
# Use ExplicitlyTriggeredScheduler for manual time control
scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
# Disable all built-in actors that might introduce non-determinism
actor.default-mailbox.mailbox-type = "akka.testkit.CallingThreadMailbox"
test {
# Test-specific timing settings
timefactor = 1.0
filter-leeway = 3s
single-expect-default = 3s
default-timeout = 5s
}
}
""")
// Create test ActorSystem with deterministic configuration
def createTestSystem(name: String = "test"): ActorSystem = {
ActorSystem(name, testConfig)
}
}Complex scenarios combining multiple deterministic execution techniques.
class AdvancedDeterministicExample extends TestKit(ActorSystem("test")) {
class ComplexTimingActor extends Actor {
import context.dispatcher
def receive = {
case "start" =>
// Schedule multiple operations with different timing
context.system.scheduler.scheduleOnce(100.millis, self, "step1")
context.system.scheduler.scheduleOnce(500.millis, self, "step2")
context.system.scheduler.scheduleOnce(1.second, self, "step3")
case "step1" =>
sender() ! "completed-step1"
context.system.scheduler.scheduleOnce(200.millis, self, "substep1")
case "substep1" =>
sender() ! "completed-substep1"
case "step2" =>
sender() ! "completed-step2"
case "step3" =>
sender() ! "completed-step3"
}
}
"Complex deterministic scenario" should {
"handle nested timing with complete control" in {
// Assume ExplicitlyTriggeredScheduler is configured
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
val actor = TestActorRef[ComplexTimingActor]
// Start the complex timing scenario
actor ! "start"
// Manually control each timing step
scheduler.timePasses(100.millis)
expectMsg("completed-step1")
scheduler.timePasses(200.millis) // substep1 fires
expectMsg("completed-substep1")
scheduler.timePasses(200.millis) // step2 fires (total 500ms)
expectMsg("completed-step2")
scheduler.timePasses(500.millis) // step3 fires (total 1s)
expectMsg("completed-step3")
}
}
}Understanding the thread safety and determinism guarantees provided by these utilities.
class DeterminismGuaranteesExample extends TestKit(ActorSystem("test")) {
"CallingThreadDispatcher guarantees" should {
"provide single-threaded execution" in {
val actor = TestActorRef[TimingActor]
var executionOrder = List.empty[Int]
// All messages processed on same thread in order
actor.receive("msg1")
executionOrder = 1 :: executionOrder
actor.receive("msg2")
executionOrder = 2 :: executionOrder
actor.receive("msg3")
executionOrder = 3 :: executionOrder
// Guaranteed order: [3, 2, 1] (reverse due to :: prepending)
assert(executionOrder == List(3, 2, 1))
}
"eliminate race conditions" in {
val actor = TestActorRef[TimingActor]
// No race conditions possible - everything is synchronous
(1 to 100).foreach { i =>
actor.receive(s"message-$i")
// Each message is completely processed before next one starts
}
}
}
"ExplicitlyTriggeredScheduler guarantees" should {
"provide predictable timing" in {
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
var executionTimes = List.empty[Long]
scheduler.scheduleOnce(1.second, () =>
executionTimes = scheduler.currentTimeMs :: executionTimes)
scheduler.scheduleOnce(2.seconds, () =>
executionTimes = scheduler.currentTimeMs :: executionTimes)
// Time doesn't advance until explicitly triggered
val startTime = scheduler.currentTimeMs
Thread.sleep(100) // Real time passes but scheduler time doesn't
assert(scheduler.currentTimeMs == startTime)
// Manual time advancement triggers tasks at exact expected times
scheduler.timePasses(1.second)
scheduler.timePasses(1.second)
// Tasks executed at precisely controlled times
assert(executionTimes.reverse == List(1000, 2000))
}
}
}How deterministic execution integrates with other TestKit features for comprehensive testing.
class DeterministicIntegrationExample extends TestKit(ActorSystem("test")) {
"Deterministic execution with TestKit features" should {
"work with message expectations" in {
val actor = TestActorRef[TimingActor]
// Synchronous execution makes message expectations predictable
actor ! "test"
expectMsg("received-test") // Message already processed synchronously
}
"work with timing assertions" in {
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
val actor = system.actorOf(Props[TimingActor])
within(0.seconds) { // Can use zero timeout with manual time control
actor ! "schedule"
scheduler.timePasses(1.second)
expectMsg("timer-fired")
}
}
"work with TestProbe" in {
val probe = TestProbe()
val actor = TestActorRef[TimingActor]
// Deterministic interaction between actors
probe.send(actor, "test")
probe.expectMsg("received-test")
}
}
}