0
# Custom Dispatchers and Scheduling
1
2
Specialized dispatchers and schedulers that provide deterministic execution environments for testing, including single-threaded execution and manual time advancement.
3
4
## Capabilities
5
6
### CallingThreadDispatcher
7
8
Dispatcher that runs on current thread for deterministic testing.
9
10
```scala { .api }
11
class CallingThreadDispatcher(_config: Config, _prerequisites: DispatcherPrerequisites)
12
extends MessageDispatcher(_config, _prerequisites) {
13
14
// Dispatcher ID constant
15
val Id = "akka.test.calling-thread-dispatcher"
16
17
// Core dispatcher methods
18
def dispatch(receiver: ActorCell, invocation: Envelope): Unit
19
def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit
20
def executeTask(invocation: TaskInvocation): Unit
21
def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType): Mailbox
22
def shutdown(): Unit
23
}
24
25
class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
26
extends MessageDispatcherConfigurator(config, prerequisites) {
27
28
def dispatcher(): MessageDispatcher
29
}
30
```
31
32
**Usage Examples:**
33
34
```scala
35
import akka.testkit.{TestKit, CallingThreadDispatcher}
36
37
class CallingThreadDispatcherTest extends TestKit(ActorSystem("TestSystem")) {
38
"CallingThreadDispatcher" should {
39
"execute actors on current thread" in {
40
val mainThreadId = Thread.currentThread().getId
41
@volatile var actorThreadId: Long = -1
42
43
val actor = system.actorOf(Props(new Actor {
44
def receive = {
45
case "get-thread" =>
46
actorThreadId = Thread.currentThread().getId
47
sender() ! actorThreadId
48
}
49
}).withDispatcher(CallingThreadDispatcher.Id))
50
51
actor ! "get-thread"
52
expectMsg(mainThreadId)
53
54
actorThreadId should equal(mainThreadId)
55
}
56
57
"provide deterministic execution order" in {
58
val results = mutable.ListBuffer[Int]()
59
60
val actors = (1 to 3).map { i =>
61
system.actorOf(Props(new Actor {
62
def receive = {
63
case "execute" =>
64
results += i
65
sender() ! s"done-$i"
66
}
67
}).withDispatcher(CallingThreadDispatcher.Id))
68
}
69
70
// Messages processed in order sent (deterministic)
71
actors.foreach(_ ! "execute")
72
73
receiveN(3) should equal(Seq("done-1", "done-2", "done-3"))
74
results.toList should equal(List(1, 2, 3))
75
}
76
77
"work with TestActorRef" in {
78
val props = Props(new Actor {
79
def receive = {
80
case msg => sender() ! s"processed: $msg"
81
}
82
}).withDispatcher(CallingThreadDispatcher.Id)
83
84
val actor = TestActorRef(props)
85
86
// Direct message processing (synchronous)
87
actor ! "test"
88
expectMsg("processed: test")
89
}
90
}
91
}
92
```
93
94
### ExplicitlyTriggeredScheduler
95
96
Scheduler that requires manual time advancement for testing.
97
98
```scala { .api }
99
class ExplicitlyTriggeredScheduler extends Scheduler {
100
// Time control methods
101
def timePasses(amount: FiniteDuration): Unit
102
def currentTimeMs: Long
103
104
// Scheduler interface methods
105
def schedule(
106
initialDelay: FiniteDuration,
107
interval: FiniteDuration,
108
runnable: Runnable
109
)(implicit executor: ExecutionContext): Cancellable
110
111
def scheduleOnce(
112
delay: FiniteDuration,
113
runnable: Runnable
114
)(implicit executor: ExecutionContext): Cancellable
115
116
def scheduleWithFixedDelay(
117
initialDelay: FiniteDuration,
118
delay: FiniteDuration,
119
runnable: Runnable
120
)(implicit executor: ExecutionContext): Cancellable
121
122
def scheduleAtFixedRate(
123
initialDelay: FiniteDuration,
124
interval: FiniteDuration,
125
runnable: Runnable
126
)(implicit executor: ExecutionContext): Cancellable
127
}
128
```
129
130
**Configuration and Usage:**
131
132
```scala
133
// Configuration for explicitly triggered scheduler
134
val config = ConfigFactory.parseString("""
135
akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
136
""")
137
138
val system = ActorSystem("TestSystem", config)
139
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
140
141
class ExplicitSchedulerTest extends TestKit(system) {
142
"ExplicitlyTriggeredScheduler" should {
143
"allow manual time advancement" in {
144
@volatile var executed = false
145
@volatile var executionTime = 0L
146
147
// Schedule task for 5 seconds from now
148
scheduler.scheduleOnce(5.seconds, new Runnable {
149
def run(): Unit = {
150
executed = true
151
executionTime = scheduler.currentTimeMs
152
}
153
})
154
155
// Task not executed yet
156
executed should be(false)
157
158
// Advance time by 3 seconds - still not enough
159
scheduler.timePasses(3.seconds)
160
executed should be(false)
161
162
// Advance time by 2 more seconds - now it executes
163
scheduler.timePasses(2.seconds)
164
executed should be(true)
165
executionTime should equal(scheduler.currentTimeMs)
166
}
167
168
"handle repeated scheduling" in {
169
val executions = mutable.ListBuffer[Long]()
170
171
// Schedule repeated task every 2 seconds
172
scheduler.schedule(
173
initialDelay = 1.second,
174
interval = 2.seconds,
175
new Runnable {
176
def run(): Unit = executions += scheduler.currentTimeMs
177
}
178
)
179
180
executions should be(empty)
181
182
// First execution after 1 second
183
scheduler.timePasses(1.second)
184
executions should have size 1
185
186
// Second execution after 2 more seconds (3 total)
187
scheduler.timePasses(2.seconds)
188
executions should have size 2
189
190
// Third execution after 2 more seconds (5 total)
191
scheduler.timePasses(2.seconds)
192
executions should have size 3
193
194
// Verify execution times
195
executions.toList should equal(List(1000, 3000, 5000))
196
}
197
198
"work with actor timers" in {
199
class TimerActor extends Actor with Timers {
200
def receive = {
201
case "start-timer" =>
202
timers.startSingleTimer("test-timer", "tick", 3.seconds)
203
case "tick" =>
204
sender() ! "timer-fired"
205
}
206
}
207
208
val actor = system.actorOf(Props[TimerActor]())
209
actor ! "start-timer"
210
211
// Timer not fired yet
212
expectNoMessage(100.millis)
213
214
// Advance time to trigger timer
215
scheduler.timePasses(3.seconds)
216
expectMsg("timer-fired")
217
}
218
}
219
}
220
```
221
222
### Integration with TestKit
223
224
Both dispatchers work seamlessly with TestKit:
225
226
```scala
227
class DispatcherIntegrationTest extends TestKit(ActorSystem("TestSystem")) {
228
"Dispatcher integration" should {
229
"combine CallingThreadDispatcher with expectations" in {
230
val actor = system.actorOf(Props(new Actor {
231
def receive = {
232
case x: Int => sender() ! (x * 2)
233
}
234
}).withDispatcher(CallingThreadDispatcher.Id))
235
236
// Synchronous processing with deterministic order
237
(1 to 5).foreach(actor ! _)
238
receiveN(5) should equal(List(2, 4, 6, 8, 10))
239
}
240
241
"use ExplicitlyTriggeredScheduler with awaitCond" in {
242
val system = ActorSystem("ExplicitSchedulerSystem", ConfigFactory.parseString("""
243
akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
244
"""))
245
246
val testKit = new TestKit(system)
247
import testKit._
248
249
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
250
@volatile var condition = false
251
252
scheduler.scheduleOnce(2.seconds, new Runnable {
253
def run(): Unit = condition = true
254
})
255
256
// Condition not met yet
257
intercept[AssertionError] {
258
awaitCond(condition, max = 100.millis)
259
}
260
261
// Advance time and condition becomes true
262
scheduler.timePasses(2.seconds)
263
awaitCond(condition, max = 100.millis) // Should succeed now
264
265
system.terminate()
266
}
267
}
268
}
269
```
270
271
### Testing Actor Scheduling Behavior
272
273
**Testing Periodic Tasks:**
274
275
```scala
276
class PeriodicTaskTest extends TestKit(ActorSystem("TestSystem", ConfigFactory.parseString("""
277
akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
278
"""))) {
279
280
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
281
282
"Periodic task actor" should {
283
"send messages at regular intervals" in {
284
class PeriodicActor extends Actor {
285
import context.dispatcher
286
287
override def preStart(): Unit = {
288
context.system.scheduler.schedule(
289
initialDelay = 1.second,
290
interval = 2.seconds,
291
self,
292
"tick"
293
)
294
}
295
296
def receive = {
297
case "tick" => testActor ! "periodic-message"
298
}
299
}
300
301
system.actorOf(Props[PeriodicActor]())
302
303
// No messages initially
304
expectNoMessage(100.millis)
305
306
// First message after 1 second
307
scheduler.timePasses(1.second)
308
expectMsg("periodic-message")
309
310
// Second message after 2 more seconds
311
scheduler.timePasses(2.seconds)
312
expectMsg("periodic-message")
313
314
// Third message after 2 more seconds
315
scheduler.timePasses(2.seconds)
316
expectMsg("periodic-message")
317
}
318
}
319
}
320
```
321
322
**Testing Timeout Behavior:**
323
324
```scala
325
class TimeoutTest extends TestKit(ActorSystem("TestSystem", ConfigFactory.parseString("""
326
akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
327
"""))) {
328
329
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
330
331
"Timeout actor" should {
332
"handle timeouts correctly" in {
333
class TimeoutActor extends Actor {
334
import context.dispatcher
335
336
def receive = {
337
case "start-with-timeout" =>
338
val originalSender = sender()
339
val cancellable = context.system.scheduler.scheduleOnce(5.seconds) {
340
originalSender ! "timeout"
341
}
342
343
context.become(waitingForResponse(cancellable, originalSender))
344
}
345
346
def waitingForResponse(cancellable: Cancellable, client: ActorRef): Receive = {
347
case "response" =>
348
cancellable.cancel()
349
client ! "success"
350
context.unbecome()
351
case "timeout" =>
352
client ! "timeout-occurred"
353
context.unbecome()
354
}
355
}
356
357
val actor = system.actorOf(Props[TimeoutActor]())
358
359
// Start operation with timeout
360
actor ! "start-with-timeout"
361
362
// No response yet
363
expectNoMessage(100.millis)
364
365
// Advance time but not enough for timeout
366
scheduler.timePasses(3.seconds)
367
expectNoMessage(100.millis)
368
369
// Advance time to trigger timeout
370
scheduler.timePasses(2.seconds)
371
expectMsg("timeout-occurred")
372
}
373
}
374
}
375
```
376
377
### Configuration
378
379
Configure dispatchers and schedulers in application.conf:
380
381
```hocon
382
akka {
383
actor {
384
default-dispatcher {
385
type = Dispatcher
386
executor = "thread-pool-executor"
387
}
388
}
389
390
test {
391
calling-thread-dispatcher {
392
type = akka.testkit.CallingThreadDispatcherConfigurator
393
}
394
}
395
396
# For explicit scheduler testing
397
scheduler {
398
implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
399
}
400
}
401
```
402
403
### Best Practices
404
405
1. **Use CallingThreadDispatcher for Unit Tests**: Provides deterministic, synchronous execution
406
2. **Use ExplicitlyTriggeredScheduler for Time-Dependent Tests**: Full control over time advancement
407
3. **Combine with TestActorRef**: Perfect combination for synchronous testing
408
4. **Configure Appropriately**: Use test dispatchers only in test environments
409
5. **Clean Resource Usage**: Both are lightweight but still manage resources
410
411
```scala
412
// Good: Explicit dispatcher configuration for tests
413
val actor = system.actorOf(
414
Props[MyActor]().withDispatcher(CallingThreadDispatcher.Id),
415
"test-actor"
416
)
417
418
// Good: Manual time control for timing tests
419
scheduler.timePasses(expectedDelay)
420
expectMsg("scheduled-message")
421
422
// Good: Combine for comprehensive testing
423
val testActorRef = TestActorRef(
424
Props[TimedActor]().withDispatcher(CallingThreadDispatcher.Id)
425
)
426
scheduler.timePasses(triggerTime)
427
testActorRef.underlyingActor.timersFired should be(true)
428
```