Specialized dispatchers and schedulers that provide deterministic execution environments for testing, including single-threaded execution and manual time advancement.
Dispatcher that runs on current thread for deterministic testing.
class CallingThreadDispatcher(_config: Config, _prerequisites: DispatcherPrerequisites)
extends MessageDispatcher(_config, _prerequisites) {
// Dispatcher ID constant
val Id = "akka.test.calling-thread-dispatcher"
// Core dispatcher methods
def dispatch(receiver: ActorCell, invocation: Envelope): Unit
def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit
def executeTask(invocation: TaskInvocation): Unit
def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType): Mailbox
def shutdown(): Unit
}
class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
def dispatcher(): MessageDispatcher
}Usage Examples:
import akka.testkit.{TestKit, CallingThreadDispatcher}
class CallingThreadDispatcherTest extends TestKit(ActorSystem("TestSystem")) {
"CallingThreadDispatcher" should {
"execute actors on current thread" in {
val mainThreadId = Thread.currentThread().getId
@volatile var actorThreadId: Long = -1
val actor = system.actorOf(Props(new Actor {
def receive = {
case "get-thread" =>
actorThreadId = Thread.currentThread().getId
sender() ! actorThreadId
}
}).withDispatcher(CallingThreadDispatcher.Id))
actor ! "get-thread"
expectMsg(mainThreadId)
actorThreadId should equal(mainThreadId)
}
"provide deterministic execution order" in {
val results = mutable.ListBuffer[Int]()
val actors = (1 to 3).map { i =>
system.actorOf(Props(new Actor {
def receive = {
case "execute" =>
results += i
sender() ! s"done-$i"
}
}).withDispatcher(CallingThreadDispatcher.Id))
}
// Messages processed in order sent (deterministic)
actors.foreach(_ ! "execute")
receiveN(3) should equal(Seq("done-1", "done-2", "done-3"))
results.toList should equal(List(1, 2, 3))
}
"work with TestActorRef" in {
val props = Props(new Actor {
def receive = {
case msg => sender() ! s"processed: $msg"
}
}).withDispatcher(CallingThreadDispatcher.Id)
val actor = TestActorRef(props)
// Direct message processing (synchronous)
actor ! "test"
expectMsg("processed: test")
}
}
}Scheduler that requires manual time advancement for testing.
class ExplicitlyTriggeredScheduler extends Scheduler {
// Time control methods
def timePasses(amount: FiniteDuration): Unit
def currentTimeMs: Long
// Scheduler interface methods
def schedule(
initialDelay: FiniteDuration,
interval: FiniteDuration,
runnable: Runnable
)(implicit executor: ExecutionContext): Cancellable
def scheduleOnce(
delay: FiniteDuration,
runnable: Runnable
)(implicit executor: ExecutionContext): Cancellable
def scheduleWithFixedDelay(
initialDelay: FiniteDuration,
delay: FiniteDuration,
runnable: Runnable
)(implicit executor: ExecutionContext): Cancellable
def scheduleAtFixedRate(
initialDelay: FiniteDuration,
interval: FiniteDuration,
runnable: Runnable
)(implicit executor: ExecutionContext): Cancellable
}Configuration and Usage:
// Configuration for explicitly triggered scheduler
val config = ConfigFactory.parseString("""
akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
""")
val system = ActorSystem("TestSystem", config)
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
class ExplicitSchedulerTest extends TestKit(system) {
"ExplicitlyTriggeredScheduler" should {
"allow manual time advancement" in {
@volatile var executed = false
@volatile var executionTime = 0L
// Schedule task for 5 seconds from now
scheduler.scheduleOnce(5.seconds, new Runnable {
def run(): Unit = {
executed = true
executionTime = scheduler.currentTimeMs
}
})
// Task not executed yet
executed should be(false)
// Advance time by 3 seconds - still not enough
scheduler.timePasses(3.seconds)
executed should be(false)
// Advance time by 2 more seconds - now it executes
scheduler.timePasses(2.seconds)
executed should be(true)
executionTime should equal(scheduler.currentTimeMs)
}
"handle repeated scheduling" in {
val executions = mutable.ListBuffer[Long]()
// Schedule repeated task every 2 seconds
scheduler.schedule(
initialDelay = 1.second,
interval = 2.seconds,
new Runnable {
def run(): Unit = executions += scheduler.currentTimeMs
}
)
executions should be(empty)
// First execution after 1 second
scheduler.timePasses(1.second)
executions should have size 1
// Second execution after 2 more seconds (3 total)
scheduler.timePasses(2.seconds)
executions should have size 2
// Third execution after 2 more seconds (5 total)
scheduler.timePasses(2.seconds)
executions should have size 3
// Verify execution times
executions.toList should equal(List(1000, 3000, 5000))
}
"work with actor timers" in {
class TimerActor extends Actor with Timers {
def receive = {
case "start-timer" =>
timers.startSingleTimer("test-timer", "tick", 3.seconds)
case "tick" =>
sender() ! "timer-fired"
}
}
val actor = system.actorOf(Props[TimerActor]())
actor ! "start-timer"
// Timer not fired yet
expectNoMessage(100.millis)
// Advance time to trigger timer
scheduler.timePasses(3.seconds)
expectMsg("timer-fired")
}
}
}Both dispatchers work seamlessly with TestKit:
class DispatcherIntegrationTest extends TestKit(ActorSystem("TestSystem")) {
"Dispatcher integration" should {
"combine CallingThreadDispatcher with expectations" in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case x: Int => sender() ! (x * 2)
}
}).withDispatcher(CallingThreadDispatcher.Id))
// Synchronous processing with deterministic order
(1 to 5).foreach(actor ! _)
receiveN(5) should equal(List(2, 4, 6, 8, 10))
}
"use ExplicitlyTriggeredScheduler with awaitCond" in {
val system = ActorSystem("ExplicitSchedulerSystem", ConfigFactory.parseString("""
akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
"""))
val testKit = new TestKit(system)
import testKit._
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
@volatile var condition = false
scheduler.scheduleOnce(2.seconds, new Runnable {
def run(): Unit = condition = true
})
// Condition not met yet
intercept[AssertionError] {
awaitCond(condition, max = 100.millis)
}
// Advance time and condition becomes true
scheduler.timePasses(2.seconds)
awaitCond(condition, max = 100.millis) // Should succeed now
system.terminate()
}
}
}Testing Periodic Tasks:
class PeriodicTaskTest extends TestKit(ActorSystem("TestSystem", ConfigFactory.parseString("""
akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
"""))) {
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
"Periodic task actor" should {
"send messages at regular intervals" in {
class PeriodicActor extends Actor {
import context.dispatcher
override def preStart(): Unit = {
context.system.scheduler.schedule(
initialDelay = 1.second,
interval = 2.seconds,
self,
"tick"
)
}
def receive = {
case "tick" => testActor ! "periodic-message"
}
}
system.actorOf(Props[PeriodicActor]())
// No messages initially
expectNoMessage(100.millis)
// First message after 1 second
scheduler.timePasses(1.second)
expectMsg("periodic-message")
// Second message after 2 more seconds
scheduler.timePasses(2.seconds)
expectMsg("periodic-message")
// Third message after 2 more seconds
scheduler.timePasses(2.seconds)
expectMsg("periodic-message")
}
}
}Testing Timeout Behavior:
class TimeoutTest extends TestKit(ActorSystem("TestSystem", ConfigFactory.parseString("""
akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
"""))) {
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
"Timeout actor" should {
"handle timeouts correctly" in {
class TimeoutActor extends Actor {
import context.dispatcher
def receive = {
case "start-with-timeout" =>
val originalSender = sender()
val cancellable = context.system.scheduler.scheduleOnce(5.seconds) {
originalSender ! "timeout"
}
context.become(waitingForResponse(cancellable, originalSender))
}
def waitingForResponse(cancellable: Cancellable, client: ActorRef): Receive = {
case "response" =>
cancellable.cancel()
client ! "success"
context.unbecome()
case "timeout" =>
client ! "timeout-occurred"
context.unbecome()
}
}
val actor = system.actorOf(Props[TimeoutActor]())
// Start operation with timeout
actor ! "start-with-timeout"
// No response yet
expectNoMessage(100.millis)
// Advance time but not enough for timeout
scheduler.timePasses(3.seconds)
expectNoMessage(100.millis)
// Advance time to trigger timeout
scheduler.timePasses(2.seconds)
expectMsg("timeout-occurred")
}
}
}Configure dispatchers and schedulers in application.conf:
akka {
actor {
default-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
}
}
test {
calling-thread-dispatcher {
type = akka.testkit.CallingThreadDispatcherConfigurator
}
}
# For explicit scheduler testing
scheduler {
implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
}
}// Good: Explicit dispatcher configuration for tests
val actor = system.actorOf(
Props[MyActor]().withDispatcher(CallingThreadDispatcher.Id),
"test-actor"
)
// Good: Manual time control for timing tests
scheduler.timePasses(expectedDelay)
expectMsg("scheduled-message")
// Good: Combine for comprehensive testing
val testActorRef = TestActorRef(
Props[TimedActor]().withDispatcher(CallingThreadDispatcher.Id)
)
scheduler.timePasses(triggerTime)
testActorRef.underlyingActor.timersFired should be(true)