0
# Deterministic Execution Control
1
2
Akka TestKit provides specialized dispatchers and schedulers that enable deterministic, predictable test execution. These components eliminate timing-related test flakiness by providing complete control over message processing and scheduling.
3
4
## CallingThreadDispatcher
5
6
### CallingThreadDispatcher Class { .api }
7
8
```scala
9
class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) extends MessageDispatcher
10
```
11
12
A dispatcher that executes all tasks on the calling thread instead of using a thread pool. This provides deterministic, synchronous execution for testing.
13
14
### CallingThreadDispatcher Object { .api }
15
16
```scala
17
object CallingThreadDispatcher {
18
val Id: String = "akka.test.calling-thread-dispatcher"
19
}
20
```
21
22
### CallingThreadDispatcherConfigurator { .api }
23
24
```scala
25
class CallingThreadDispatcherConfigurator(
26
config: Config,
27
prerequisites: DispatcherPrerequisites
28
) extends MessageDispatcherConfigurator
29
```
30
31
### CallingThreadMailbox { .api }
32
33
```scala
34
class CallingThreadMailbox(
35
_receiver: akka.actor.Cell,
36
mailboxType: MailboxType
37
) extends Mailbox
38
```
39
40
### Configuration
41
42
To use CallingThreadDispatcher, configure it in your test configuration:
43
44
```hocon
45
akka.test.calling-thread-dispatcher {
46
type = akka.testkit.CallingThreadDispatcherConfigurator
47
}
48
```
49
50
### Usage Examples
51
52
#### Basic CallingThreadDispatcher Usage
53
54
```scala
55
import akka.testkit.CallingThreadDispatcher
56
57
class TestActor extends Actor {
58
def receive = {
59
case "work" =>
60
// This will execute on the calling thread
61
sender() ! "done"
62
}
63
}
64
65
// Create actor with CallingThreadDispatcher
66
val actor = system.actorOf(
67
Props[TestActor].withDispatcher(CallingThreadDispatcher.Id),
68
"test-actor"
69
)
70
71
// Message processing happens synchronously
72
actor ! "work"
73
expectMsg("done") // Response is immediate and deterministic
74
```
75
76
#### Testing with Multiple Actors
77
78
```scala
79
class CounterActor extends Actor {
80
private var count = 0
81
82
def receive = {
83
case "increment" =>
84
count += 1
85
sender() ! count
86
case "get" =>
87
sender() ! count
88
}
89
}
90
91
// Create multiple actors with CallingThreadDispatcher
92
val actors = (1 to 3).map { i =>
93
system.actorOf(
94
Props[CounterActor].withDispatcher(CallingThreadDispatcher.Id),
95
s"counter-$i"
96
)
97
}
98
99
// All processing is deterministic and ordered
100
actors(0) ! "increment"
101
expectMsg(1)
102
103
actors(1) ! "increment"
104
expectMsg(1)
105
106
actors(0) ! "increment"
107
expectMsg(2)
108
109
// Results are predictable because execution is synchronous
110
actors(0) ! "get"
111
expectMsg(2)
112
113
actors(1) ! "get"
114
expectMsg(1)
115
```
116
117
#### Testing Actor Interactions
118
119
```scala
120
class MasterActor extends Actor {
121
var workers = List.empty[ActorRef]
122
var responses = 0
123
124
def receive = {
125
case "add-worker" =>
126
workers = sender() :: workers
127
128
case "broadcast" =>
129
workers.foreach(_ ! "work")
130
131
case "work-done" =>
132
responses += 1
133
if (responses == workers.length) {
134
context.parent ! "all-done"
135
responses = 0
136
}
137
}
138
}
139
140
class WorkerActor extends Actor {
141
def receive = {
142
case "work" =>
143
sender() ! "work-done"
144
}
145
}
146
147
// Create actors with deterministic dispatcher
148
val master = system.actorOf(
149
Props[MasterActor].withDispatcher(CallingThreadDispatcher.Id),
150
"master"
151
)
152
153
val workers = (1 to 3).map { i =>
154
system.actorOf(
155
Props[WorkerActor].withDispatcher(CallingThreadDispatcher.Id),
156
s"worker-$i"
157
)
158
}
159
160
// Register workers
161
workers.foreach { worker =>
162
master.tell("add-worker", worker)
163
}
164
165
// Broadcast work - all processing is synchronous and predictable
166
master ! "broadcast"
167
expectMsg("all-done") // Deterministic completion
168
```
169
170
#### Configuration-based Usage
171
172
```scala
173
// In application.conf for tests
174
akka {
175
actor {
176
default-dispatcher {
177
type = akka.testkit.CallingThreadDispatcherConfigurator
178
}
179
}
180
}
181
182
// All actors will use CallingThreadDispatcher by default
183
val actor = system.actorOf(Props[MyActor])
184
// Deterministic execution without explicit dispatcher configuration
185
```
186
187
## ExplicitlyTriggeredScheduler
188
189
### ExplicitlyTriggeredScheduler Class { .api }
190
191
```scala
192
class ExplicitlyTriggeredScheduler(
193
config: Config,
194
log: LoggingAdapter,
195
tf: ThreadFactory
196
) extends Scheduler {
197
198
def timePasses(amount: FiniteDuration): Unit
199
def schedule(
200
initialDelay: FiniteDuration,
201
interval: FiniteDuration,
202
runnable: Runnable
203
)(implicit executor: ExecutionContext): Cancellable
204
def scheduleOnce(
205
delay: FiniteDuration,
206
runnable: Runnable
207
)(implicit executor: ExecutionContext): Cancellable
208
def maxFrequency: Double
209
}
210
```
211
212
A scheduler that doesn't automatically progress time - time must be manually advanced using `timePasses()`. This allows complete control over timing in tests.
213
214
### Configuration
215
216
Configure ExplicitlyTriggeredScheduler in test configuration:
217
218
```hocon
219
akka {
220
scheduler {
221
implementation = akka.testkit.ExplicitlyTriggeredScheduler
222
}
223
}
224
```
225
226
### Usage Examples
227
228
#### Basic Scheduler Control
229
230
```scala
231
import akka.testkit.ExplicitlyTriggeredScheduler
232
import scala.concurrent.duration._
233
234
// Get scheduler from system (must be configured as ExplicitlyTriggeredScheduler)
235
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
236
237
class TimedActor extends Actor {
238
import context.dispatcher
239
240
override def preStart(): Unit = {
241
// Schedule a message to self in 5 seconds
242
context.system.scheduler.scheduleOnce(5.seconds, self, "timeout")
243
}
244
245
def receive = {
246
case "start" =>
247
sender() ! "started"
248
case "timeout" =>
249
sender() ! "timed-out"
250
}
251
}
252
253
val actor = system.actorOf(Props[TimedActor])
254
actor ! "start"
255
expectMsg("started")
256
257
// No timeout message yet - time hasn't passed
258
expectNoMessage(100.millis)
259
260
// Manually advance time by 5 seconds
261
scheduler.timePasses(5.seconds)
262
263
// Now timeout message is delivered
264
expectMsg("timed-out")
265
```
266
267
#### Testing Periodic Scheduling
268
269
```scala
270
class HeartbeatActor extends Actor {
271
import context.dispatcher
272
273
override def preStart(): Unit = {
274
// Send heartbeat every 2 seconds
275
context.system.scheduler.schedule(
276
2.seconds,
277
2.seconds,
278
context.parent,
279
"heartbeat"
280
)
281
}
282
283
def receive = Actor.emptyBehavior
284
}
285
286
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
287
val heartbeat = system.actorOf(Props[HeartbeatActor])
288
289
// No heartbeats yet
290
expectNoMessage(100.millis)
291
292
// Advance past first delay
293
scheduler.timePasses(2.seconds)
294
expectMsg("heartbeat")
295
296
// Advance to next interval
297
scheduler.timePasses(2.seconds)
298
expectMsg("heartbeat")
299
300
// Skip ahead multiple intervals
301
scheduler.timePasses(6.seconds) // 3 more intervals
302
expectMsg("heartbeat")
303
expectMsg("heartbeat")
304
expectMsg("heartbeat")
305
```
306
307
#### Testing Timeout Behavior
308
309
```scala
310
class TimeoutActor extends Actor {
311
import context.dispatcher
312
313
private var timeoutHandle: Option[Cancellable] = None
314
315
def receive = {
316
case "start-timer" =>
317
timeoutHandle = Some(
318
context.system.scheduler.scheduleOnce(3.seconds, self, "timeout")
319
)
320
sender() ! "timer-started"
321
322
case "cancel-timer" =>
323
timeoutHandle.foreach(_.cancel())
324
timeoutHandle = None
325
sender() ! "timer-cancelled"
326
327
case "timeout" =>
328
sender() ! "timeout-occurred"
329
timeoutHandle = None
330
}
331
}
332
333
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
334
val actor = system.actorOf(Props[TimeoutActor])
335
336
// Start timer
337
actor ! "start-timer"
338
expectMsg("timer-started")
339
340
// Advance time but not enough for timeout
341
scheduler.timePasses(2.seconds)
342
expectNoMessage(100.millis)
343
344
// Cancel timer before timeout
345
actor ! "cancel-timer"
346
expectMsg("timer-cancelled")
347
348
// Advance past original timeout - no message should come
349
scheduler.timePasses(2.seconds)
350
expectNoMessage(100.millis)
351
352
// Test timeout actually occurring
353
actor ! "start-timer"
354
expectMsg("timer-started")
355
356
scheduler.timePasses(3.seconds)
357
expectMsg("timeout-occurred")
358
```
359
360
#### Testing Complex Timing Scenarios
361
362
```scala
363
class BatchProcessor extends Actor {
364
import context.dispatcher
365
366
private var batch = List.empty[String]
367
private var flushHandle: Option[Cancellable] = None
368
369
def receive = {
370
case item: String =>
371
batch = item :: batch
372
373
// Reset flush timer on each new item
374
flushHandle.foreach(_.cancel())
375
flushHandle = Some(
376
context.system.scheduler.scheduleOnce(1.second, self, "flush")
377
)
378
379
case "flush" =>
380
if (batch.nonEmpty) {
381
context.parent ! s"batch: ${batch.reverse.mkString(",")}"
382
batch = List.empty
383
}
384
flushHandle = None
385
386
case "force-flush" =>
387
flushHandle.foreach(_.cancel())
388
self ! "flush"
389
}
390
}
391
392
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
393
val processor = system.actorOf(Props[BatchProcessor])
394
395
// Add items to batch
396
processor ! "item1"
397
processor ! "item2"
398
processor ! "item3"
399
400
// No flush yet
401
expectNoMessage(100.millis)
402
403
// Advance time to trigger flush
404
scheduler.timePasses(1.second)
405
expectMsg("batch: item1,item2,item3")
406
407
// Test timer reset behavior
408
processor ! "item4"
409
scheduler.timePasses(500.millis) // Half timeout
410
processor ! "item5" // Resets timer
411
412
scheduler.timePasses(500.millis) // Still within new timeout
413
expectNoMessage(100.millis)
414
415
scheduler.timePasses(500.millis) // Now past timeout
416
expectMsg("batch: item4,item5")
417
```
418
419
## Combining Deterministic Components
420
421
### Complete Deterministic Test Environment
422
423
```scala
424
class DeterministicTestSystem extends TestKit(ActorSystem("TestSystem", ConfigFactory.parseString(
425
"""
426
akka {
427
actor {
428
default-dispatcher {
429
type = akka.testkit.CallingThreadDispatcherConfigurator
430
}
431
}
432
scheduler {
433
implementation = akka.testkit.ExplicitlyTriggeredScheduler
434
}
435
}
436
"""
437
))) with ImplicitSender {
438
439
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
440
441
def advanceTime(duration: FiniteDuration): Unit = {
442
scheduler.timePasses(duration)
443
}
444
}
445
446
class ComplexActor extends Actor {
447
import context.dispatcher
448
449
private var state = "idle"
450
private var workCount = 0
451
452
override def preStart(): Unit = {
453
// Schedule periodic status reports
454
context.system.scheduler.schedule(5.seconds, 5.seconds, self, "report-status")
455
}
456
457
def receive = {
458
case "start-work" =>
459
state = "working"
460
// Simulate async work completion
461
context.system.scheduler.scheduleOnce(2.seconds, self, "work-complete")
462
sender() ! "work-started"
463
464
case "work-complete" =>
465
workCount += 1
466
state = "idle"
467
context.parent ! s"work-completed-$workCount"
468
469
case "report-status" =>
470
context.parent ! s"status: $state, completed: $workCount"
471
}
472
}
473
474
// Usage in test
475
class ComplexActorSpec extends DeterministicTestSystem {
476
"ComplexActor" should {
477
"handle work and reporting deterministically" in {
478
val actor = system.actorOf(Props[ComplexActor])
479
480
// Start work
481
actor ! "start-work"
482
expectMsg("work-started")
483
484
// No completion yet
485
expectNoMessage(100.millis)
486
487
// Advance time to complete work
488
advanceTime(2.seconds)
489
expectMsg("work-completed-1")
490
491
// Advance to first status report
492
advanceTime(3.seconds) // Total 5 seconds from start
493
expectMsg("status: idle, completed: 1")
494
495
// Start more work
496
actor ! "start-work"
497
expectMsg("work-started")
498
499
// Advance time through work completion and next status report
500
advanceTime(5.seconds)
501
expectMsg("work-completed-2")
502
expectMsg("status: idle, completed: 2")
503
}
504
}
505
}
506
```
507
508
## Best Practices
509
510
### CallingThreadDispatcher Best Practices
511
512
1. **Use for unit tests**: CallingThreadDispatcher is ideal for testing individual actor logic
513
2. **Avoid for integration tests**: May hide concurrency issues that occur in production
514
3. **Configure consistently**: Use either globally or per-actor, not mixed
515
4. **Test both sync and async**: Also test with real dispatchers to catch concurrency issues
516
517
```scala
518
// Good: Consistent usage for deterministic unit testing
519
val actor = system.actorOf(
520
Props[MyActor].withDispatcher(CallingThreadDispatcher.Id)
521
)
522
523
// Also good: Global configuration for test suite
524
akka.actor.default-dispatcher.type = akka.testkit.CallingThreadDispatcherConfigurator
525
```
526
527
### ExplicitlyTriggeredScheduler Best Practices
528
529
1. **Advance time methodically**: Use small, controlled time advances
530
2. **Test edge cases**: Test timer cancellation, overlapping timers
531
3. **Verify no unexpected scheduling**: Check for unexpected scheduled tasks
532
4. **Document time flow**: Comment time advances in complex tests
533
534
```scala
535
// Good: Clear time progression
536
scheduler.timePasses(1.second) // Initial delay
537
expectMsg("first-event")
538
539
scheduler.timePasses(2.seconds) // Regular interval
540
expectMsg("second-event")
541
542
scheduler.timePasses(2.seconds) // Another interval
543
expectMsg("third-event")
544
```
545
546
### Combined Usage Best Practices
547
548
1. **Start simple**: Begin with just one deterministic component
549
2. **Test incrementally**: Add complexity gradually
550
3. **Verify assumptions**: Ensure deterministic behavior is actually achieved
551
4. **Document setup**: Clearly document deterministic test environment setup
552
553
```scala
554
// Good: Well-documented deterministic test setup
555
class MyDeterministicSpec extends TestKit(ActorSystem("test",
556
ConfigFactory.parseString("""
557
# Deterministic execution for predictable testing
558
akka.actor.default-dispatcher.type = akka.testkit.CallingThreadDispatcherConfigurator
559
akka.scheduler.implementation = akka.testkit.ExplicitlyTriggeredScheduler
560
""")
561
)) {
562
563
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
564
565
// Helper for controlled time advancement
566
def tick(duration: FiniteDuration = 1.second): Unit = {
567
scheduler.timePasses(duration)
568
}
569
}
570
```