or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

actor-refs.mdconfiguration.mdcore-testing.mddispatchers.mdevent-filtering.mdindex.mdjava-dsl.mdpackage-functions.mdsynchronization.mdtest-actors.mdutilities.md
tile.json

dispatchers.mddocs/

Custom Dispatchers and Scheduling

Specialized dispatchers and schedulers that provide deterministic execution environments for testing, including single-threaded execution and manual time advancement.

Capabilities

CallingThreadDispatcher

Dispatcher that runs on current thread for deterministic testing.

class CallingThreadDispatcher(_config: Config, _prerequisites: DispatcherPrerequisites)
  extends MessageDispatcher(_config, _prerequisites) {
  
  // Dispatcher ID constant
  val Id = "akka.test.calling-thread-dispatcher"
  
  // Core dispatcher methods
  def dispatch(receiver: ActorCell, invocation: Envelope): Unit
  def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit
  def executeTask(invocation: TaskInvocation): Unit  
  def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType): Mailbox
  def shutdown(): Unit
}

class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
  extends MessageDispatcherConfigurator(config, prerequisites) {
  
  def dispatcher(): MessageDispatcher
}

Usage Examples:

import akka.testkit.{TestKit, CallingThreadDispatcher}

class CallingThreadDispatcherTest extends TestKit(ActorSystem("TestSystem")) {
  "CallingThreadDispatcher" should {
    "execute actors on current thread" in {
      val mainThreadId = Thread.currentThread().getId
      @volatile var actorThreadId: Long = -1
      
      val actor = system.actorOf(Props(new Actor {
        def receive = {
          case "get-thread" =>
            actorThreadId = Thread.currentThread().getId
            sender() ! actorThreadId
        }
      }).withDispatcher(CallingThreadDispatcher.Id))
      
      actor ! "get-thread"
      expectMsg(mainThreadId)
      
      actorThreadId should equal(mainThreadId)
    }
    
    "provide deterministic execution order" in {
      val results = mutable.ListBuffer[Int]()
      
      val actors = (1 to 3).map { i =>
        system.actorOf(Props(new Actor {
          def receive = {
            case "execute" => 
              results += i
              sender() ! s"done-$i"
          }
        }).withDispatcher(CallingThreadDispatcher.Id))
      }
      
      // Messages processed in order sent (deterministic)
      actors.foreach(_ ! "execute")
      
      receiveN(3) should equal(Seq("done-1", "done-2", "done-3"))
      results.toList should equal(List(1, 2, 3))
    }
    
    "work with TestActorRef" in {
      val props = Props(new Actor {
        def receive = {
          case msg => sender() ! s"processed: $msg"
        }
      }).withDispatcher(CallingThreadDispatcher.Id)
      
      val actor = TestActorRef(props)
      
      // Direct message processing (synchronous)
      actor ! "test"
      expectMsg("processed: test")
    }
  }
}

ExplicitlyTriggeredScheduler

Scheduler that requires manual time advancement for testing.

class ExplicitlyTriggeredScheduler extends Scheduler {
  // Time control methods
  def timePasses(amount: FiniteDuration): Unit
  def currentTimeMs: Long
  
  // Scheduler interface methods
  def schedule(
    initialDelay: FiniteDuration,
    interval: FiniteDuration,
    runnable: Runnable
  )(implicit executor: ExecutionContext): Cancellable
  
  def scheduleOnce(
    delay: FiniteDuration,
    runnable: Runnable
  )(implicit executor: ExecutionContext): Cancellable
  
  def scheduleWithFixedDelay(
    initialDelay: FiniteDuration,
    delay: FiniteDuration,
    runnable: Runnable
  )(implicit executor: ExecutionContext): Cancellable
  
  def scheduleAtFixedRate(
    initialDelay: FiniteDuration,
    interval: FiniteDuration,
    runnable: Runnable
  )(implicit executor: ExecutionContext): Cancellable
}

Configuration and Usage:

// Configuration for explicitly triggered scheduler
val config = ConfigFactory.parseString("""
  akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
""")

val system = ActorSystem("TestSystem", config)
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

class ExplicitSchedulerTest extends TestKit(system) {
  "ExplicitlyTriggeredScheduler" should {
    "allow manual time advancement" in {
      @volatile var executed = false
      @volatile var executionTime = 0L
      
      // Schedule task for 5 seconds from now
      scheduler.scheduleOnce(5.seconds, new Runnable {
        def run(): Unit = {
          executed = true
          executionTime = scheduler.currentTimeMs
        }
      })
      
      // Task not executed yet
      executed should be(false)
      
      // Advance time by 3 seconds - still not enough
      scheduler.timePasses(3.seconds)
      executed should be(false)
      
      // Advance time by 2 more seconds - now it executes  
      scheduler.timePasses(2.seconds)
      executed should be(true)
      executionTime should equal(scheduler.currentTimeMs)
    }
    
    "handle repeated scheduling" in {
      val executions = mutable.ListBuffer[Long]()
      
      // Schedule repeated task every 2 seconds
      scheduler.schedule(
        initialDelay = 1.second,
        interval = 2.seconds,
        new Runnable {
          def run(): Unit = executions += scheduler.currentTimeMs
        }
      )
      
      executions should be(empty)
      
      // First execution after 1 second
      scheduler.timePasses(1.second)
      executions should have size 1
      
      // Second execution after 2 more seconds (3 total)
      scheduler.timePasses(2.seconds)
      executions should have size 2
      
      // Third execution after 2 more seconds (5 total)
      scheduler.timePasses(2.seconds)
      executions should have size 3
      
      // Verify execution times
      executions.toList should equal(List(1000, 3000, 5000))
    }
    
    "work with actor timers" in {
      class TimerActor extends Actor with Timers {
        def receive = {
          case "start-timer" =>
            timers.startSingleTimer("test-timer", "tick", 3.seconds)
          case "tick" =>
            sender() ! "timer-fired"
        }
      }
      
      val actor = system.actorOf(Props[TimerActor]())
      actor ! "start-timer"
      
      // Timer not fired yet
      expectNoMessage(100.millis)
      
      // Advance time to trigger timer
      scheduler.timePasses(3.seconds)
      expectMsg("timer-fired")
    }
  }
}

Integration with TestKit

Both dispatchers work seamlessly with TestKit:

class DispatcherIntegrationTest extends TestKit(ActorSystem("TestSystem")) {
  "Dispatcher integration" should {
    "combine CallingThreadDispatcher with expectations" in {
      val actor = system.actorOf(Props(new Actor {
        def receive = {
          case x: Int => sender() ! (x * 2)
        }
      }).withDispatcher(CallingThreadDispatcher.Id))
      
      // Synchronous processing with deterministic order
      (1 to 5).foreach(actor ! _)
      receiveN(5) should equal(List(2, 4, 6, 8, 10))
    }
    
    "use ExplicitlyTriggeredScheduler with awaitCond" in {
      val system = ActorSystem("ExplicitSchedulerSystem", ConfigFactory.parseString("""
        akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
      """))
      
      val testKit = new TestKit(system)
      import testKit._
      
      val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
      @volatile var condition = false
      
      scheduler.scheduleOnce(2.seconds, new Runnable {
        def run(): Unit = condition = true
      })
      
      // Condition not met yet
      intercept[AssertionError] {
        awaitCond(condition, max = 100.millis)
      }
      
      // Advance time and condition becomes true
      scheduler.timePasses(2.seconds)
      awaitCond(condition, max = 100.millis) // Should succeed now
      
      system.terminate()
    }
  }
}

Testing Actor Scheduling Behavior

Testing Periodic Tasks:

class PeriodicTaskTest extends TestKit(ActorSystem("TestSystem", ConfigFactory.parseString("""
  akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
"""))) {
  
  val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
  
  "Periodic task actor" should {
    "send messages at regular intervals" in {
      class PeriodicActor extends Actor {
        import context.dispatcher
        
        override def preStart(): Unit = {
          context.system.scheduler.schedule(
            initialDelay = 1.second,
            interval = 2.seconds,
            self,
            "tick"
          )
        }
        
        def receive = {
          case "tick" => testActor ! "periodic-message"
        }
      }
      
      system.actorOf(Props[PeriodicActor]())
      
      // No messages initially
      expectNoMessage(100.millis)
      
      // First message after 1 second
      scheduler.timePasses(1.second)
      expectMsg("periodic-message")
      
      // Second message after 2 more seconds
      scheduler.timePasses(2.seconds)
      expectMsg("periodic-message")
      
      // Third message after 2 more seconds
      scheduler.timePasses(2.seconds)
      expectMsg("periodic-message")
    }
  }
}

Testing Timeout Behavior:

class TimeoutTest extends TestKit(ActorSystem("TestSystem", ConfigFactory.parseString("""
  akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
"""))) {
  
  val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
  
  "Timeout actor" should {
    "handle timeouts correctly" in {
      class TimeoutActor extends Actor {
        import context.dispatcher
        
        def receive = {
          case "start-with-timeout" =>
            val originalSender = sender()
            val cancellable = context.system.scheduler.scheduleOnce(5.seconds) {
              originalSender ! "timeout"
            }
            
            context.become(waitingForResponse(cancellable, originalSender))
        }
        
        def waitingForResponse(cancellable: Cancellable, client: ActorRef): Receive = {
          case "response" =>
            cancellable.cancel()
            client ! "success"
            context.unbecome()
          case "timeout" =>
            client ! "timeout-occurred"
            context.unbecome()
        }
      }
      
      val actor = system.actorOf(Props[TimeoutActor]())
      
      // Start operation with timeout
      actor ! "start-with-timeout"
      
      // No response yet
      expectNoMessage(100.millis)
      
      // Advance time but not enough for timeout
      scheduler.timePasses(3.seconds)
      expectNoMessage(100.millis)
      
      // Advance time to trigger timeout
      scheduler.timePasses(2.seconds)
      expectMsg("timeout-occurred")
    }
  }
}

Configuration

Configure dispatchers and schedulers in application.conf:

akka {
  actor {
    default-dispatcher {
      type = Dispatcher
      executor = "thread-pool-executor"
    }
  }
  
  test {
    calling-thread-dispatcher {
      type = akka.testkit.CallingThreadDispatcherConfigurator
    }
  }
  
  # For explicit scheduler testing
  scheduler {
    implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
  }
}

Best Practices

  1. Use CallingThreadDispatcher for Unit Tests: Provides deterministic, synchronous execution
  2. Use ExplicitlyTriggeredScheduler for Time-Dependent Tests: Full control over time advancement
  3. Combine with TestActorRef: Perfect combination for synchronous testing
  4. Configure Appropriately: Use test dispatchers only in test environments
  5. Clean Resource Usage: Both are lightweight but still manage resources
// Good: Explicit dispatcher configuration for tests
val actor = system.actorOf(
  Props[MyActor]().withDispatcher(CallingThreadDispatcher.Id),
  "test-actor"
)

// Good: Manual time control for timing tests
scheduler.timePasses(expectedDelay)
expectMsg("scheduled-message")

// Good: Combine for comprehensive testing
val testActorRef = TestActorRef(
  Props[TimedActor]().withDispatcher(CallingThreadDispatcher.Id)
)
scheduler.timePasses(triggerTime)
testActorRef.underlyingActor.timersFired should be(true)