or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-testing.mddeterministic-execution.mdevent-filtering.mdfsm-testing.mdindex.mdsynchronization.mdsynchronous-testing.mdtest-utilities.md

deterministic-execution.mddocs/

0

# Deterministic Execution Control

1

2

Akka TestKit provides specialized dispatchers and schedulers for predictable, deterministic test execution including CallingThreadDispatcher for synchronous message processing and ExplicitlyTriggeredScheduler for manual time control.

3

4

## Capabilities

5

6

### CallingThreadDispatcher

7

8

Dispatcher that executes all messages on the calling thread for deterministic, single-threaded execution in tests.

9

10

```scala { .api }

11

/**

12

* Dispatcher that executes messages on the calling thread

13

* Provides deterministic execution order for testing

14

* No new threads are created - all execution happens synchronously

15

*/

16

class CallingThreadDispatcher extends MessageDispatcher {

17

// Implementation executes messages immediately on current thread

18

}

19

20

/**

21

* Dispatcher configurator for CallingThreadDispatcher

22

*/

23

class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)

24

extends MessageDispatcherConfigurator {

25

26

def dispatcher(): MessageDispatcher = new CallingThreadDispatcher()

27

}

28

29

/**

30

* Specialized mailbox for CallingThreadDispatcher

31

* Maintains thread-local message queues for deterministic processing

32

*/

33

class CallingThreadMailbox extends Mailbox {

34

// Thread-local queue implementation

35

}

36

```

37

38

### CallingThreadDispatcher Configuration

39

40

Configuration constants and settings for CallingThreadDispatcher.

41

42

```scala { .api }

43

object CallingThreadDispatcher {

44

/**

45

* Configuration ID for CallingThreadDispatcher

46

* Use this ID in actor Props or configuration files

47

*/

48

val Id: String = "akka.test.calling-thread-dispatcher"

49

}

50

```

51

52

### ExplicitlyTriggeredScheduler

53

54

Manual scheduler for controlled timing in tests, allowing precise control over when scheduled tasks execute.

55

56

```scala { .api }

57

/**

58

* Manual scheduler for controlled timing in tests

59

* Time only advances when explicitly triggered

60

* @param config Scheduler configuration

61

* @param log Logging adapter

62

* @param tf Thread factory (unused in test scheduler)

63

*/

64

class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory)

65

extends Scheduler {

66

67

/**

68

* Schedule recurring task with fixed interval

69

* Task will not execute until time is advanced

70

* @param initialDelay Delay before first execution

71

* @param interval Interval between executions

72

* @param runnable Task to execute

73

* @param executor Execution context for task

74

* @return Cancellable for the scheduled task

75

*/

76

def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)

77

(implicit executor: ExecutionContext): Cancellable

78

79

/**

80

* Schedule single task execution

81

* Task will not execute until time is advanced

82

* @param delay Delay before execution

83

* @param runnable Task to execute

84

* @param executor Execution context for task

85

* @return Cancellable for the scheduled task

86

*/

87

def scheduleOnce(delay: FiniteDuration, runnable: Runnable)

88

(implicit executor: ExecutionContext): Cancellable

89

90

/**

91

* Manually advance scheduler time

92

* Triggers execution of all tasks scheduled for the advanced time period

93

* @param amount Duration to advance time

94

*/

95

def timePasses(amount: FiniteDuration): Unit

96

97

/**

98

* Get current internal scheduler time

99

* @return Current time in milliseconds since scheduler creation

100

*/

101

def currentTimeMs: Long

102

103

/**

104

* Get maximum frequency for recurring tasks

105

* @return Maximum frequency (typically very high for test scheduler)

106

*/

107

def maxFrequency: Double

108

}

109

```

110

111

**Usage Examples:**

112

113

```scala

114

import akka.actor.{Actor, ActorSystem, Props}

115

import akka.testkit.{TestKit, TestActorRef, CallingThreadDispatcher}

116

import scala.concurrent.duration._

117

118

class DeterministicExecutionExample extends TestKit(ActorSystem("test")) {

119

120

class TimingActor extends Actor {

121

def receive = {

122

case "schedule" =>

123

import context.dispatcher

124

context.system.scheduler.scheduleOnce(1.second, self, "scheduled")

125

case "scheduled" =>

126

sender() ! "timer-fired"

127

case msg =>

128

sender() ! s"received-$msg"

129

}

130

}

131

132

"CallingThreadDispatcher" should {

133

"execute messages synchronously" in {

134

// Create actor with CallingThreadDispatcher

135

val props = Props[TimingActor].withDispatcher(CallingThreadDispatcher.Id)

136

val actor = system.actorOf(props)

137

138

// Messages are processed immediately and deterministically

139

actor ! "test1"

140

expectMsg("received-test1")

141

142

actor ! "test2"

143

expectMsg("received-test2")

144

145

// Order is completely predictable

146

}

147

148

"work with TestActorRef for complete synchronous testing" in {

149

// TestActorRef automatically uses CallingThreadDispatcher

150

val actor = TestActorRef[TimingActor]

151

152

// Direct synchronous message injection

153

actor.receive("direct")

154

155

// Can also use normal ! sending

156

actor ! "normal"

157

expectMsg("received-normal")

158

}

159

}

160

161

"ExplicitlyTriggeredScheduler" should {

162

"provide manual time control" in {

163

// This example assumes system is configured with ExplicitlyTriggeredScheduler

164

// Configuration: akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"

165

166

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

167

val actor = system.actorOf(Props[TimingActor])

168

169

// Schedule something but time won't advance automatically

170

actor ! "schedule"

171

172

// No message yet because time hasn't advanced

173

expectNoMessage(100.millis)

174

175

// Manually advance time to trigger scheduled task

176

scheduler.timePasses(1.second)

177

178

// Now the scheduled message should arrive

179

expectMsg("timer-fired")

180

}

181

182

"control complex timing scenarios" in {

183

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

184

import system.dispatcher

185

186

var results = List.empty[String]

187

188

// Schedule multiple tasks at different times

189

scheduler.scheduleOnce(500.millis, () => results = "500ms" :: results)

190

scheduler.scheduleOnce(1.second, () => results = "1s" :: results)

191

scheduler.scheduleOnce(1.5.seconds, () => results = "1.5s" :: results)

192

193

// Advance time incrementally

194

scheduler.timePasses(600.millis)

195

assert(results == List("500ms"))

196

197

scheduler.timePasses(500.millis) // Total: 1.1 seconds

198

assert(results == List("1s", "500ms"))

199

200

scheduler.timePasses(500.millis) // Total: 1.6 seconds

201

assert(results == List("1.5s", "1s", "500ms"))

202

}

203

}

204

}

205

```

206

207

### Configuration for Deterministic Testing

208

209

How to configure ActorSystem for deterministic testing with custom dispatchers and schedulers.

210

211

```scala

212

import akka.actor.ActorSystem

213

import com.typesafe.config.ConfigFactory

214

215

class ConfigurationExample {

216

217

// Configuration for deterministic testing

218

val testConfig = ConfigFactory.parseString("""

219

akka {

220

# Use CallingThreadDispatcher as default for deterministic execution

221

actor.default-dispatcher = "akka.test.calling-thread-dispatcher"

222

223

# Use ExplicitlyTriggeredScheduler for manual time control

224

scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"

225

226

# Disable all built-in actors that might introduce non-determinism

227

actor.default-mailbox.mailbox-type = "akka.testkit.CallingThreadMailbox"

228

229

test {

230

# Test-specific timing settings

231

timefactor = 1.0

232

filter-leeway = 3s

233

single-expect-default = 3s

234

default-timeout = 5s

235

}

236

}

237

""")

238

239

// Create test ActorSystem with deterministic configuration

240

def createTestSystem(name: String = "test"): ActorSystem = {

241

ActorSystem(name, testConfig)

242

}

243

}

244

```

245

246

### Advanced Deterministic Patterns

247

248

Complex scenarios combining multiple deterministic execution techniques.

249

250

```scala

251

class AdvancedDeterministicExample extends TestKit(ActorSystem("test")) {

252

253

class ComplexTimingActor extends Actor {

254

import context.dispatcher

255

256

def receive = {

257

case "start" =>

258

// Schedule multiple operations with different timing

259

context.system.scheduler.scheduleOnce(100.millis, self, "step1")

260

context.system.scheduler.scheduleOnce(500.millis, self, "step2")

261

context.system.scheduler.scheduleOnce(1.second, self, "step3")

262

263

case "step1" =>

264

sender() ! "completed-step1"

265

context.system.scheduler.scheduleOnce(200.millis, self, "substep1")

266

267

case "substep1" =>

268

sender() ! "completed-substep1"

269

270

case "step2" =>

271

sender() ! "completed-step2"

272

273

case "step3" =>

274

sender() ! "completed-step3"

275

}

276

}

277

278

"Complex deterministic scenario" should {

279

"handle nested timing with complete control" in {

280

// Assume ExplicitlyTriggeredScheduler is configured

281

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

282

val actor = TestActorRef[ComplexTimingActor]

283

284

// Start the complex timing scenario

285

actor ! "start"

286

287

// Manually control each timing step

288

scheduler.timePasses(100.millis)

289

expectMsg("completed-step1")

290

291

scheduler.timePasses(200.millis) // substep1 fires

292

expectMsg("completed-substep1")

293

294

scheduler.timePasses(200.millis) // step2 fires (total 500ms)

295

expectMsg("completed-step2")

296

297

scheduler.timePasses(500.millis) // step3 fires (total 1s)

298

expectMsg("completed-step3")

299

}

300

}

301

}

302

```

303

304

### Thread Safety and Determinism Guarantees

305

306

Understanding the thread safety and determinism guarantees provided by these utilities.

307

308

```scala

309

class DeterminismGuaranteesExample extends TestKit(ActorSystem("test")) {

310

311

"CallingThreadDispatcher guarantees" should {

312

"provide single-threaded execution" in {

313

val actor = TestActorRef[TimingActor]

314

var executionOrder = List.empty[Int]

315

316

// All messages processed on same thread in order

317

actor.receive("msg1")

318

executionOrder = 1 :: executionOrder

319

320

actor.receive("msg2")

321

executionOrder = 2 :: executionOrder

322

323

actor.receive("msg3")

324

executionOrder = 3 :: executionOrder

325

326

// Guaranteed order: [3, 2, 1] (reverse due to :: prepending)

327

assert(executionOrder == List(3, 2, 1))

328

}

329

330

"eliminate race conditions" in {

331

val actor = TestActorRef[TimingActor]

332

333

// No race conditions possible - everything is synchronous

334

(1 to 100).foreach { i =>

335

actor.receive(s"message-$i")

336

// Each message is completely processed before next one starts

337

}

338

}

339

}

340

341

"ExplicitlyTriggeredScheduler guarantees" should {

342

"provide predictable timing" in {

343

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

344

345

var executionTimes = List.empty[Long]

346

347

scheduler.scheduleOnce(1.second, () =>

348

executionTimes = scheduler.currentTimeMs :: executionTimes)

349

scheduler.scheduleOnce(2.seconds, () =>

350

executionTimes = scheduler.currentTimeMs :: executionTimes)

351

352

// Time doesn't advance until explicitly triggered

353

val startTime = scheduler.currentTimeMs

354

Thread.sleep(100) // Real time passes but scheduler time doesn't

355

assert(scheduler.currentTimeMs == startTime)

356

357

// Manual time advancement triggers tasks at exact expected times

358

scheduler.timePasses(1.second)

359

scheduler.timePasses(1.second)

360

361

// Tasks executed at precisely controlled times

362

assert(executionTimes.reverse == List(1000, 2000))

363

}

364

}

365

}

366

```

367

368

### Integration with TestKit Features

369

370

How deterministic execution integrates with other TestKit features for comprehensive testing.

371

372

```scala

373

class DeterministicIntegrationExample extends TestKit(ActorSystem("test")) {

374

375

"Deterministic execution with TestKit features" should {

376

"work with message expectations" in {

377

val actor = TestActorRef[TimingActor]

378

379

// Synchronous execution makes message expectations predictable

380

actor ! "test"

381

expectMsg("received-test") // Message already processed synchronously

382

}

383

384

"work with timing assertions" in {

385

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

386

val actor = system.actorOf(Props[TimingActor])

387

388

within(0.seconds) { // Can use zero timeout with manual time control

389

actor ! "schedule"

390

scheduler.timePasses(1.second)

391

expectMsg("timer-fired")

392

}

393

}

394

395

"work with TestProbe" in {

396

val probe = TestProbe()

397

val actor = TestActorRef[TimingActor]

398

399

// Deterministic interaction between actors

400

probe.send(actor, "test")

401

probe.expectMsg("received-test")

402

}

403

}

404

}

405

```