0
# Deterministic Execution Control
1
2
Akka TestKit provides specialized dispatchers and schedulers for predictable, deterministic test execution including CallingThreadDispatcher for synchronous message processing and ExplicitlyTriggeredScheduler for manual time control.
3
4
## Capabilities
5
6
### CallingThreadDispatcher
7
8
Dispatcher that executes all messages on the calling thread for deterministic, single-threaded execution in tests.
9
10
```scala { .api }
11
/**
12
* Dispatcher that executes messages on the calling thread
13
* Provides deterministic execution order for testing
14
* No new threads are created - all execution happens synchronously
15
*/
16
class CallingThreadDispatcher extends MessageDispatcher {
17
// Implementation executes messages immediately on current thread
18
}
19
20
/**
21
* Dispatcher configurator for CallingThreadDispatcher
22
*/
23
class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
24
extends MessageDispatcherConfigurator {
25
26
def dispatcher(): MessageDispatcher = new CallingThreadDispatcher()
27
}
28
29
/**
30
* Specialized mailbox for CallingThreadDispatcher
31
* Maintains thread-local message queues for deterministic processing
32
*/
33
class CallingThreadMailbox extends Mailbox {
34
// Thread-local queue implementation
35
}
36
```
37
38
### CallingThreadDispatcher Configuration
39
40
Configuration constants and settings for CallingThreadDispatcher.
41
42
```scala { .api }
43
object CallingThreadDispatcher {
44
/**
45
* Configuration ID for CallingThreadDispatcher
46
* Use this ID in actor Props or configuration files
47
*/
48
val Id: String = "akka.test.calling-thread-dispatcher"
49
}
50
```
51
52
### ExplicitlyTriggeredScheduler
53
54
Manual scheduler for controlled timing in tests, allowing precise control over when scheduled tasks execute.
55
56
```scala { .api }
57
/**
58
* Manual scheduler for controlled timing in tests
59
* Time only advances when explicitly triggered
60
* @param config Scheduler configuration
61
* @param log Logging adapter
62
* @param tf Thread factory (unused in test scheduler)
63
*/
64
class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory)
65
extends Scheduler {
66
67
/**
68
* Schedule recurring task with fixed interval
69
* Task will not execute until time is advanced
70
* @param initialDelay Delay before first execution
71
* @param interval Interval between executions
72
* @param runnable Task to execute
73
* @param executor Execution context for task
74
* @return Cancellable for the scheduled task
75
*/
76
def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)
77
(implicit executor: ExecutionContext): Cancellable
78
79
/**
80
* Schedule single task execution
81
* Task will not execute until time is advanced
82
* @param delay Delay before execution
83
* @param runnable Task to execute
84
* @param executor Execution context for task
85
* @return Cancellable for the scheduled task
86
*/
87
def scheduleOnce(delay: FiniteDuration, runnable: Runnable)
88
(implicit executor: ExecutionContext): Cancellable
89
90
/**
91
* Manually advance scheduler time
92
* Triggers execution of all tasks scheduled for the advanced time period
93
* @param amount Duration to advance time
94
*/
95
def timePasses(amount: FiniteDuration): Unit
96
97
/**
98
* Get current internal scheduler time
99
* @return Current time in milliseconds since scheduler creation
100
*/
101
def currentTimeMs: Long
102
103
/**
104
* Get maximum frequency for recurring tasks
105
* @return Maximum frequency (typically very high for test scheduler)
106
*/
107
def maxFrequency: Double
108
}
109
```
110
111
**Usage Examples:**
112
113
```scala
114
import akka.actor.{Actor, ActorSystem, Props}
115
import akka.testkit.{TestKit, TestActorRef, CallingThreadDispatcher}
116
import scala.concurrent.duration._
117
118
class DeterministicExecutionExample extends TestKit(ActorSystem("test")) {
119
120
class TimingActor extends Actor {
121
def receive = {
122
case "schedule" =>
123
import context.dispatcher
124
context.system.scheduler.scheduleOnce(1.second, self, "scheduled")
125
case "scheduled" =>
126
sender() ! "timer-fired"
127
case msg =>
128
sender() ! s"received-$msg"
129
}
130
}
131
132
"CallingThreadDispatcher" should {
133
"execute messages synchronously" in {
134
// Create actor with CallingThreadDispatcher
135
val props = Props[TimingActor].withDispatcher(CallingThreadDispatcher.Id)
136
val actor = system.actorOf(props)
137
138
// Messages are processed immediately and deterministically
139
actor ! "test1"
140
expectMsg("received-test1")
141
142
actor ! "test2"
143
expectMsg("received-test2")
144
145
// Order is completely predictable
146
}
147
148
"work with TestActorRef for complete synchronous testing" in {
149
// TestActorRef automatically uses CallingThreadDispatcher
150
val actor = TestActorRef[TimingActor]
151
152
// Direct synchronous message injection
153
actor.receive("direct")
154
155
// Can also use normal ! sending
156
actor ! "normal"
157
expectMsg("received-normal")
158
}
159
}
160
161
"ExplicitlyTriggeredScheduler" should {
162
"provide manual time control" in {
163
// This example assumes system is configured with ExplicitlyTriggeredScheduler
164
// Configuration: akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
165
166
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
167
val actor = system.actorOf(Props[TimingActor])
168
169
// Schedule something but time won't advance automatically
170
actor ! "schedule"
171
172
// No message yet because time hasn't advanced
173
expectNoMessage(100.millis)
174
175
// Manually advance time to trigger scheduled task
176
scheduler.timePasses(1.second)
177
178
// Now the scheduled message should arrive
179
expectMsg("timer-fired")
180
}
181
182
"control complex timing scenarios" in {
183
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
184
import system.dispatcher
185
186
var results = List.empty[String]
187
188
// Schedule multiple tasks at different times
189
scheduler.scheduleOnce(500.millis, () => results = "500ms" :: results)
190
scheduler.scheduleOnce(1.second, () => results = "1s" :: results)
191
scheduler.scheduleOnce(1.5.seconds, () => results = "1.5s" :: results)
192
193
// Advance time incrementally
194
scheduler.timePasses(600.millis)
195
assert(results == List("500ms"))
196
197
scheduler.timePasses(500.millis) // Total: 1.1 seconds
198
assert(results == List("1s", "500ms"))
199
200
scheduler.timePasses(500.millis) // Total: 1.6 seconds
201
assert(results == List("1.5s", "1s", "500ms"))
202
}
203
}
204
}
205
```
206
207
### Configuration for Deterministic Testing
208
209
How to configure ActorSystem for deterministic testing with custom dispatchers and schedulers.
210
211
```scala
212
import akka.actor.ActorSystem
213
import com.typesafe.config.ConfigFactory
214
215
class ConfigurationExample {
216
217
// Configuration for deterministic testing
218
val testConfig = ConfigFactory.parseString("""
219
akka {
220
# Use CallingThreadDispatcher as default for deterministic execution
221
actor.default-dispatcher = "akka.test.calling-thread-dispatcher"
222
223
# Use ExplicitlyTriggeredScheduler for manual time control
224
scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"
225
226
# Disable all built-in actors that might introduce non-determinism
227
actor.default-mailbox.mailbox-type = "akka.testkit.CallingThreadMailbox"
228
229
test {
230
# Test-specific timing settings
231
timefactor = 1.0
232
filter-leeway = 3s
233
single-expect-default = 3s
234
default-timeout = 5s
235
}
236
}
237
""")
238
239
// Create test ActorSystem with deterministic configuration
240
def createTestSystem(name: String = "test"): ActorSystem = {
241
ActorSystem(name, testConfig)
242
}
243
}
244
```
245
246
### Advanced Deterministic Patterns
247
248
Complex scenarios combining multiple deterministic execution techniques.
249
250
```scala
251
class AdvancedDeterministicExample extends TestKit(ActorSystem("test")) {
252
253
class ComplexTimingActor extends Actor {
254
import context.dispatcher
255
256
def receive = {
257
case "start" =>
258
// Schedule multiple operations with different timing
259
context.system.scheduler.scheduleOnce(100.millis, self, "step1")
260
context.system.scheduler.scheduleOnce(500.millis, self, "step2")
261
context.system.scheduler.scheduleOnce(1.second, self, "step3")
262
263
case "step1" =>
264
sender() ! "completed-step1"
265
context.system.scheduler.scheduleOnce(200.millis, self, "substep1")
266
267
case "substep1" =>
268
sender() ! "completed-substep1"
269
270
case "step2" =>
271
sender() ! "completed-step2"
272
273
case "step3" =>
274
sender() ! "completed-step3"
275
}
276
}
277
278
"Complex deterministic scenario" should {
279
"handle nested timing with complete control" in {
280
// Assume ExplicitlyTriggeredScheduler is configured
281
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
282
val actor = TestActorRef[ComplexTimingActor]
283
284
// Start the complex timing scenario
285
actor ! "start"
286
287
// Manually control each timing step
288
scheduler.timePasses(100.millis)
289
expectMsg("completed-step1")
290
291
scheduler.timePasses(200.millis) // substep1 fires
292
expectMsg("completed-substep1")
293
294
scheduler.timePasses(200.millis) // step2 fires (total 500ms)
295
expectMsg("completed-step2")
296
297
scheduler.timePasses(500.millis) // step3 fires (total 1s)
298
expectMsg("completed-step3")
299
}
300
}
301
}
302
```
303
304
### Thread Safety and Determinism Guarantees
305
306
Understanding the thread safety and determinism guarantees provided by these utilities.
307
308
```scala
309
class DeterminismGuaranteesExample extends TestKit(ActorSystem("test")) {
310
311
"CallingThreadDispatcher guarantees" should {
312
"provide single-threaded execution" in {
313
val actor = TestActorRef[TimingActor]
314
var executionOrder = List.empty[Int]
315
316
// All messages processed on same thread in order
317
actor.receive("msg1")
318
executionOrder = 1 :: executionOrder
319
320
actor.receive("msg2")
321
executionOrder = 2 :: executionOrder
322
323
actor.receive("msg3")
324
executionOrder = 3 :: executionOrder
325
326
// Guaranteed order: [3, 2, 1] (reverse due to :: prepending)
327
assert(executionOrder == List(3, 2, 1))
328
}
329
330
"eliminate race conditions" in {
331
val actor = TestActorRef[TimingActor]
332
333
// No race conditions possible - everything is synchronous
334
(1 to 100).foreach { i =>
335
actor.receive(s"message-$i")
336
// Each message is completely processed before next one starts
337
}
338
}
339
}
340
341
"ExplicitlyTriggeredScheduler guarantees" should {
342
"provide predictable timing" in {
343
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
344
345
var executionTimes = List.empty[Long]
346
347
scheduler.scheduleOnce(1.second, () =>
348
executionTimes = scheduler.currentTimeMs :: executionTimes)
349
scheduler.scheduleOnce(2.seconds, () =>
350
executionTimes = scheduler.currentTimeMs :: executionTimes)
351
352
// Time doesn't advance until explicitly triggered
353
val startTime = scheduler.currentTimeMs
354
Thread.sleep(100) // Real time passes but scheduler time doesn't
355
assert(scheduler.currentTimeMs == startTime)
356
357
// Manual time advancement triggers tasks at exact expected times
358
scheduler.timePasses(1.second)
359
scheduler.timePasses(1.second)
360
361
// Tasks executed at precisely controlled times
362
assert(executionTimes.reverse == List(1000, 2000))
363
}
364
}
365
}
366
```
367
368
### Integration with TestKit Features
369
370
How deterministic execution integrates with other TestKit features for comprehensive testing.
371
372
```scala
373
class DeterministicIntegrationExample extends TestKit(ActorSystem("test")) {
374
375
"Deterministic execution with TestKit features" should {
376
"work with message expectations" in {
377
val actor = TestActorRef[TimingActor]
378
379
// Synchronous execution makes message expectations predictable
380
actor ! "test"
381
expectMsg("received-test") // Message already processed synchronously
382
}
383
384
"work with timing assertions" in {
385
val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
386
val actor = system.actorOf(Props[TimingActor])
387
388
within(0.seconds) { // Can use zero timeout with manual time control
389
actor ! "schedule"
390
scheduler.timePasses(1.second)
391
expectMsg("timer-fired")
392
}
393
}
394
395
"work with TestProbe" in {
396
val probe = TestProbe()
397
val actor = TestActorRef[TimingActor]
398
399
// Deterministic interaction between actors
400
probe.send(actor, "test")
401
probe.expectMsg("received-test")
402
}
403
}
404
}
405
```