Akka TestKit provides specialized dispatchers and schedulers that enable deterministic, predictable test execution. These components eliminate timing-related test flakiness by providing complete control over message processing and scheduling.
class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) extends MessageDispatcherA dispatcher that executes all tasks on the calling thread instead of using a thread pool. This provides deterministic, synchronous execution for testing.
object CallingThreadDispatcher {
val Id: String = "akka.test.calling-thread-dispatcher"
}class CallingThreadDispatcherConfigurator(
config: Config,
prerequisites: DispatcherPrerequisites
) extends MessageDispatcherConfiguratorclass CallingThreadMailbox(
_receiver: akka.actor.Cell,
mailboxType: MailboxType
) extends MailboxTo use CallingThreadDispatcher, configure it in your test configuration:
akka.test.calling-thread-dispatcher {
type = akka.testkit.CallingThreadDispatcherConfigurator
}import akka.testkit.CallingThreadDispatcher
class TestActor extends Actor {
def receive = {
case "work" =>
// This will execute on the calling thread
sender() ! "done"
}
}
// Create actor with CallingThreadDispatcher
val actor = system.actorOf(
Props[TestActor].withDispatcher(CallingThreadDispatcher.Id),
"test-actor"
)
// Message processing happens synchronously
actor ! "work"
expectMsg("done") // Response is immediate and deterministicclass CounterActor extends Actor {
private var count = 0
def receive = {
case "increment" =>
count += 1
sender() ! count
case "get" =>
sender() ! count
}
}
// Create multiple actors with CallingThreadDispatcher
val actors = (1 to 3).map { i =>
system.actorOf(
Props[CounterActor].withDispatcher(CallingThreadDispatcher.Id),
s"counter-$i"
)
}
// All processing is deterministic and ordered
actors(0) ! "increment"
expectMsg(1)
actors(1) ! "increment"
expectMsg(1)
actors(0) ! "increment"
expectMsg(2)
// Results are predictable because execution is synchronous
actors(0) ! "get"
expectMsg(2)
actors(1) ! "get"
expectMsg(1)class MasterActor extends Actor {
var workers = List.empty[ActorRef]
var responses = 0
def receive = {
case "add-worker" =>
workers = sender() :: workers
case "broadcast" =>
workers.foreach(_ ! "work")
case "work-done" =>
responses += 1
if (responses == workers.length) {
context.parent ! "all-done"
responses = 0
}
}
}
class WorkerActor extends Actor {
def receive = {
case "work" =>
sender() ! "work-done"
}
}
// Create actors with deterministic dispatcher
val master = system.actorOf(
Props[MasterActor].withDispatcher(CallingThreadDispatcher.Id),
"master"
)
val workers = (1 to 3).map { i =>
system.actorOf(
Props[WorkerActor].withDispatcher(CallingThreadDispatcher.Id),
s"worker-$i"
)
}
// Register workers
workers.foreach { worker =>
master.tell("add-worker", worker)
}
// Broadcast work - all processing is synchronous and predictable
master ! "broadcast"
expectMsg("all-done") // Deterministic completion// In application.conf for tests
akka {
actor {
default-dispatcher {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
}
}
// All actors will use CallingThreadDispatcher by default
val actor = system.actorOf(Props[MyActor])
// Deterministic execution without explicit dispatcher configurationclass ExplicitlyTriggeredScheduler(
config: Config,
log: LoggingAdapter,
tf: ThreadFactory
) extends Scheduler {
def timePasses(amount: FiniteDuration): Unit
def schedule(
initialDelay: FiniteDuration,
interval: FiniteDuration,
runnable: Runnable
)(implicit executor: ExecutionContext): Cancellable
def scheduleOnce(
delay: FiniteDuration,
runnable: Runnable
)(implicit executor: ExecutionContext): Cancellable
def maxFrequency: Double
}A scheduler that doesn't automatically progress time - time must be manually advanced using timePasses(). This allows complete control over timing in tests.
Configure ExplicitlyTriggeredScheduler in test configuration:
akka {
scheduler {
implementation = akka.testkit.ExplicitlyTriggeredScheduler
}
}import akka.testkit.ExplicitlyTriggeredScheduler
import scala.concurrent.duration._
// Get scheduler from system (must be configured as ExplicitlyTriggeredScheduler)
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
class TimedActor extends Actor {
import context.dispatcher
override def preStart(): Unit = {
// Schedule a message to self in 5 seconds
context.system.scheduler.scheduleOnce(5.seconds, self, "timeout")
}
def receive = {
case "start" =>
sender() ! "started"
case "timeout" =>
sender() ! "timed-out"
}
}
val actor = system.actorOf(Props[TimedActor])
actor ! "start"
expectMsg("started")
// No timeout message yet - time hasn't passed
expectNoMessage(100.millis)
// Manually advance time by 5 seconds
scheduler.timePasses(5.seconds)
// Now timeout message is delivered
expectMsg("timed-out")class HeartbeatActor extends Actor {
import context.dispatcher
override def preStart(): Unit = {
// Send heartbeat every 2 seconds
context.system.scheduler.schedule(
2.seconds,
2.seconds,
context.parent,
"heartbeat"
)
}
def receive = Actor.emptyBehavior
}
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
val heartbeat = system.actorOf(Props[HeartbeatActor])
// No heartbeats yet
expectNoMessage(100.millis)
// Advance past first delay
scheduler.timePasses(2.seconds)
expectMsg("heartbeat")
// Advance to next interval
scheduler.timePasses(2.seconds)
expectMsg("heartbeat")
// Skip ahead multiple intervals
scheduler.timePasses(6.seconds) // 3 more intervals
expectMsg("heartbeat")
expectMsg("heartbeat")
expectMsg("heartbeat")class TimeoutActor extends Actor {
import context.dispatcher
private var timeoutHandle: Option[Cancellable] = None
def receive = {
case "start-timer" =>
timeoutHandle = Some(
context.system.scheduler.scheduleOnce(3.seconds, self, "timeout")
)
sender() ! "timer-started"
case "cancel-timer" =>
timeoutHandle.foreach(_.cancel())
timeoutHandle = None
sender() ! "timer-cancelled"
case "timeout" =>
sender() ! "timeout-occurred"
timeoutHandle = None
}
}
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
val actor = system.actorOf(Props[TimeoutActor])
// Start timer
actor ! "start-timer"
expectMsg("timer-started")
// Advance time but not enough for timeout
scheduler.timePasses(2.seconds)
expectNoMessage(100.millis)
// Cancel timer before timeout
actor ! "cancel-timer"
expectMsg("timer-cancelled")
// Advance past original timeout - no message should come
scheduler.timePasses(2.seconds)
expectNoMessage(100.millis)
// Test timeout actually occurring
actor ! "start-timer"
expectMsg("timer-started")
scheduler.timePasses(3.seconds)
expectMsg("timeout-occurred")class BatchProcessor extends Actor {
import context.dispatcher
private var batch = List.empty[String]
private var flushHandle: Option[Cancellable] = None
def receive = {
case item: String =>
batch = item :: batch
// Reset flush timer on each new item
flushHandle.foreach(_.cancel())
flushHandle = Some(
context.system.scheduler.scheduleOnce(1.second, self, "flush")
)
case "flush" =>
if (batch.nonEmpty) {
context.parent ! s"batch: ${batch.reverse.mkString(",")}"
batch = List.empty
}
flushHandle = None
case "force-flush" =>
flushHandle.foreach(_.cancel())
self ! "flush"
}
}
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
val processor = system.actorOf(Props[BatchProcessor])
// Add items to batch
processor ! "item1"
processor ! "item2"
processor ! "item3"
// No flush yet
expectNoMessage(100.millis)
// Advance time to trigger flush
scheduler.timePasses(1.second)
expectMsg("batch: item1,item2,item3")
// Test timer reset behavior
processor ! "item4"
scheduler.timePasses(500.millis) // Half timeout
processor ! "item5" // Resets timer
scheduler.timePasses(500.millis) // Still within new timeout
expectNoMessage(100.millis)
scheduler.timePasses(500.millis) // Now past timeout
expectMsg("batch: item4,item5")class DeterministicTestSystem extends TestKit(ActorSystem("TestSystem", ConfigFactory.parseString(
"""
akka {
actor {
default-dispatcher {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
}
scheduler {
implementation = akka.testkit.ExplicitlyTriggeredScheduler
}
}
"""
))) with ImplicitSender {
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
def advanceTime(duration: FiniteDuration): Unit = {
scheduler.timePasses(duration)
}
}
class ComplexActor extends Actor {
import context.dispatcher
private var state = "idle"
private var workCount = 0
override def preStart(): Unit = {
// Schedule periodic status reports
context.system.scheduler.schedule(5.seconds, 5.seconds, self, "report-status")
}
def receive = {
case "start-work" =>
state = "working"
// Simulate async work completion
context.system.scheduler.scheduleOnce(2.seconds, self, "work-complete")
sender() ! "work-started"
case "work-complete" =>
workCount += 1
state = "idle"
context.parent ! s"work-completed-$workCount"
case "report-status" =>
context.parent ! s"status: $state, completed: $workCount"
}
}
// Usage in test
class ComplexActorSpec extends DeterministicTestSystem {
"ComplexActor" should {
"handle work and reporting deterministically" in {
val actor = system.actorOf(Props[ComplexActor])
// Start work
actor ! "start-work"
expectMsg("work-started")
// No completion yet
expectNoMessage(100.millis)
// Advance time to complete work
advanceTime(2.seconds)
expectMsg("work-completed-1")
// Advance to first status report
advanceTime(3.seconds) // Total 5 seconds from start
expectMsg("status: idle, completed: 1")
// Start more work
actor ! "start-work"
expectMsg("work-started")
// Advance time through work completion and next status report
advanceTime(5.seconds)
expectMsg("work-completed-2")
expectMsg("status: idle, completed: 2")
}
}
}// Good: Consistent usage for deterministic unit testing
val actor = system.actorOf(
Props[MyActor].withDispatcher(CallingThreadDispatcher.Id)
)
// Also good: Global configuration for test suite
akka.actor.default-dispatcher.type = akka.testkit.CallingThreadDispatcherConfigurator// Good: Clear time progression
scheduler.timePasses(1.second) // Initial delay
expectMsg("first-event")
scheduler.timePasses(2.seconds) // Regular interval
expectMsg("second-event")
scheduler.timePasses(2.seconds) // Another interval
expectMsg("third-event")// Good: Well-documented deterministic test setup
class MyDeterministicSpec extends TestKit(ActorSystem("test",
ConfigFactory.parseString("""
# Deterministic execution for predictable testing
akka.actor.default-dispatcher.type = akka.testkit.CallingThreadDispatcherConfigurator
akka.scheduler.implementation = akka.testkit.ExplicitlyTriggeredScheduler
""")
)) {
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
// Helper for controlled time advancement
def tick(duration: FiniteDuration = 1.second): Unit = {
scheduler.timePasses(duration)
}
}