or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actor-references.mdcore-testing.mddeterministic-execution.mdevent-filtering.mdindex.mdjava-api.mdtest-utilities.mdutilities-config.md

deterministic-execution.mddocs/

0

# Deterministic Execution Control

1

2

Akka TestKit provides specialized dispatchers and schedulers that enable deterministic, predictable test execution. These components eliminate timing-related test flakiness by providing complete control over message processing and scheduling.

3

4

## CallingThreadDispatcher

5

6

### CallingThreadDispatcher Class { .api }

7

8

```scala

9

class CallingThreadDispatcher(_configurator: MessageDispatcherConfigurator) extends MessageDispatcher

10

```

11

12

A dispatcher that executes all tasks on the calling thread instead of using a thread pool. This provides deterministic, synchronous execution for testing.

13

14

### CallingThreadDispatcher Object { .api }

15

16

```scala

17

object CallingThreadDispatcher {

18

val Id: String = "akka.test.calling-thread-dispatcher"

19

}

20

```

21

22

### CallingThreadDispatcherConfigurator { .api }

23

24

```scala

25

class CallingThreadDispatcherConfigurator(

26

config: Config,

27

prerequisites: DispatcherPrerequisites

28

) extends MessageDispatcherConfigurator

29

```

30

31

### CallingThreadMailbox { .api }

32

33

```scala

34

class CallingThreadMailbox(

35

_receiver: akka.actor.Cell,

36

mailboxType: MailboxType

37

) extends Mailbox

38

```

39

40

### Configuration

41

42

To use CallingThreadDispatcher, configure it in your test configuration:

43

44

```hocon

45

akka.test.calling-thread-dispatcher {

46

type = akka.testkit.CallingThreadDispatcherConfigurator

47

}

48

```

49

50

### Usage Examples

51

52

#### Basic CallingThreadDispatcher Usage

53

54

```scala

55

import akka.testkit.CallingThreadDispatcher

56

57

class TestActor extends Actor {

58

def receive = {

59

case "work" =>

60

// This will execute on the calling thread

61

sender() ! "done"

62

}

63

}

64

65

// Create actor with CallingThreadDispatcher

66

val actor = system.actorOf(

67

Props[TestActor].withDispatcher(CallingThreadDispatcher.Id),

68

"test-actor"

69

)

70

71

// Message processing happens synchronously

72

actor ! "work"

73

expectMsg("done") // Response is immediate and deterministic

74

```

75

76

#### Testing with Multiple Actors

77

78

```scala

79

class CounterActor extends Actor {

80

private var count = 0

81

82

def receive = {

83

case "increment" =>

84

count += 1

85

sender() ! count

86

case "get" =>

87

sender() ! count

88

}

89

}

90

91

// Create multiple actors with CallingThreadDispatcher

92

val actors = (1 to 3).map { i =>

93

system.actorOf(

94

Props[CounterActor].withDispatcher(CallingThreadDispatcher.Id),

95

s"counter-$i"

96

)

97

}

98

99

// All processing is deterministic and ordered

100

actors(0) ! "increment"

101

expectMsg(1)

102

103

actors(1) ! "increment"

104

expectMsg(1)

105

106

actors(0) ! "increment"

107

expectMsg(2)

108

109

// Results are predictable because execution is synchronous

110

actors(0) ! "get"

111

expectMsg(2)

112

113

actors(1) ! "get"

114

expectMsg(1)

115

```

116

117

#### Testing Actor Interactions

118

119

```scala

120

class MasterActor extends Actor {

121

var workers = List.empty[ActorRef]

122

var responses = 0

123

124

def receive = {

125

case "add-worker" =>

126

workers = sender() :: workers

127

128

case "broadcast" =>

129

workers.foreach(_ ! "work")

130

131

case "work-done" =>

132

responses += 1

133

if (responses == workers.length) {

134

context.parent ! "all-done"

135

responses = 0

136

}

137

}

138

}

139

140

class WorkerActor extends Actor {

141

def receive = {

142

case "work" =>

143

sender() ! "work-done"

144

}

145

}

146

147

// Create actors with deterministic dispatcher

148

val master = system.actorOf(

149

Props[MasterActor].withDispatcher(CallingThreadDispatcher.Id),

150

"master"

151

)

152

153

val workers = (1 to 3).map { i =>

154

system.actorOf(

155

Props[WorkerActor].withDispatcher(CallingThreadDispatcher.Id),

156

s"worker-$i"

157

)

158

}

159

160

// Register workers

161

workers.foreach { worker =>

162

master.tell("add-worker", worker)

163

}

164

165

// Broadcast work - all processing is synchronous and predictable

166

master ! "broadcast"

167

expectMsg("all-done") // Deterministic completion

168

```

169

170

#### Configuration-based Usage

171

172

```scala

173

// In application.conf for tests

174

akka {

175

actor {

176

default-dispatcher {

177

type = akka.testkit.CallingThreadDispatcherConfigurator

178

}

179

}

180

}

181

182

// All actors will use CallingThreadDispatcher by default

183

val actor = system.actorOf(Props[MyActor])

184

// Deterministic execution without explicit dispatcher configuration

185

```

186

187

## ExplicitlyTriggeredScheduler

188

189

### ExplicitlyTriggeredScheduler Class { .api }

190

191

```scala

192

class ExplicitlyTriggeredScheduler(

193

config: Config,

194

log: LoggingAdapter,

195

tf: ThreadFactory

196

) extends Scheduler {

197

198

def timePasses(amount: FiniteDuration): Unit

199

def schedule(

200

initialDelay: FiniteDuration,

201

interval: FiniteDuration,

202

runnable: Runnable

203

)(implicit executor: ExecutionContext): Cancellable

204

def scheduleOnce(

205

delay: FiniteDuration,

206

runnable: Runnable

207

)(implicit executor: ExecutionContext): Cancellable

208

def maxFrequency: Double

209

}

210

```

211

212

A scheduler that doesn't automatically progress time - time must be manually advanced using `timePasses()`. This allows complete control over timing in tests.

213

214

### Configuration

215

216

Configure ExplicitlyTriggeredScheduler in test configuration:

217

218

```hocon

219

akka {

220

scheduler {

221

implementation = akka.testkit.ExplicitlyTriggeredScheduler

222

}

223

}

224

```

225

226

### Usage Examples

227

228

#### Basic Scheduler Control

229

230

```scala

231

import akka.testkit.ExplicitlyTriggeredScheduler

232

import scala.concurrent.duration._

233

234

// Get scheduler from system (must be configured as ExplicitlyTriggeredScheduler)

235

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

236

237

class TimedActor extends Actor {

238

import context.dispatcher

239

240

override def preStart(): Unit = {

241

// Schedule a message to self in 5 seconds

242

context.system.scheduler.scheduleOnce(5.seconds, self, "timeout")

243

}

244

245

def receive = {

246

case "start" =>

247

sender() ! "started"

248

case "timeout" =>

249

sender() ! "timed-out"

250

}

251

}

252

253

val actor = system.actorOf(Props[TimedActor])

254

actor ! "start"

255

expectMsg("started")

256

257

// No timeout message yet - time hasn't passed

258

expectNoMessage(100.millis)

259

260

// Manually advance time by 5 seconds

261

scheduler.timePasses(5.seconds)

262

263

// Now timeout message is delivered

264

expectMsg("timed-out")

265

```

266

267

#### Testing Periodic Scheduling

268

269

```scala

270

class HeartbeatActor extends Actor {

271

import context.dispatcher

272

273

override def preStart(): Unit = {

274

// Send heartbeat every 2 seconds

275

context.system.scheduler.schedule(

276

2.seconds,

277

2.seconds,

278

context.parent,

279

"heartbeat"

280

)

281

}

282

283

def receive = Actor.emptyBehavior

284

}

285

286

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

287

val heartbeat = system.actorOf(Props[HeartbeatActor])

288

289

// No heartbeats yet

290

expectNoMessage(100.millis)

291

292

// Advance past first delay

293

scheduler.timePasses(2.seconds)

294

expectMsg("heartbeat")

295

296

// Advance to next interval

297

scheduler.timePasses(2.seconds)

298

expectMsg("heartbeat")

299

300

// Skip ahead multiple intervals

301

scheduler.timePasses(6.seconds) // 3 more intervals

302

expectMsg("heartbeat")

303

expectMsg("heartbeat")

304

expectMsg("heartbeat")

305

```

306

307

#### Testing Timeout Behavior

308

309

```scala

310

class TimeoutActor extends Actor {

311

import context.dispatcher

312

313

private var timeoutHandle: Option[Cancellable] = None

314

315

def receive = {

316

case "start-timer" =>

317

timeoutHandle = Some(

318

context.system.scheduler.scheduleOnce(3.seconds, self, "timeout")

319

)

320

sender() ! "timer-started"

321

322

case "cancel-timer" =>

323

timeoutHandle.foreach(_.cancel())

324

timeoutHandle = None

325

sender() ! "timer-cancelled"

326

327

case "timeout" =>

328

sender() ! "timeout-occurred"

329

timeoutHandle = None

330

}

331

}

332

333

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

334

val actor = system.actorOf(Props[TimeoutActor])

335

336

// Start timer

337

actor ! "start-timer"

338

expectMsg("timer-started")

339

340

// Advance time but not enough for timeout

341

scheduler.timePasses(2.seconds)

342

expectNoMessage(100.millis)

343

344

// Cancel timer before timeout

345

actor ! "cancel-timer"

346

expectMsg("timer-cancelled")

347

348

// Advance past original timeout - no message should come

349

scheduler.timePasses(2.seconds)

350

expectNoMessage(100.millis)

351

352

// Test timeout actually occurring

353

actor ! "start-timer"

354

expectMsg("timer-started")

355

356

scheduler.timePasses(3.seconds)

357

expectMsg("timeout-occurred")

358

```

359

360

#### Testing Complex Timing Scenarios

361

362

```scala

363

class BatchProcessor extends Actor {

364

import context.dispatcher

365

366

private var batch = List.empty[String]

367

private var flushHandle: Option[Cancellable] = None

368

369

def receive = {

370

case item: String =>

371

batch = item :: batch

372

373

// Reset flush timer on each new item

374

flushHandle.foreach(_.cancel())

375

flushHandle = Some(

376

context.system.scheduler.scheduleOnce(1.second, self, "flush")

377

)

378

379

case "flush" =>

380

if (batch.nonEmpty) {

381

context.parent ! s"batch: ${batch.reverse.mkString(",")}"

382

batch = List.empty

383

}

384

flushHandle = None

385

386

case "force-flush" =>

387

flushHandle.foreach(_.cancel())

388

self ! "flush"

389

}

390

}

391

392

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

393

val processor = system.actorOf(Props[BatchProcessor])

394

395

// Add items to batch

396

processor ! "item1"

397

processor ! "item2"

398

processor ! "item3"

399

400

// No flush yet

401

expectNoMessage(100.millis)

402

403

// Advance time to trigger flush

404

scheduler.timePasses(1.second)

405

expectMsg("batch: item1,item2,item3")

406

407

// Test timer reset behavior

408

processor ! "item4"

409

scheduler.timePasses(500.millis) // Half timeout

410

processor ! "item5" // Resets timer

411

412

scheduler.timePasses(500.millis) // Still within new timeout

413

expectNoMessage(100.millis)

414

415

scheduler.timePasses(500.millis) // Now past timeout

416

expectMsg("batch: item4,item5")

417

```

418

419

## Combining Deterministic Components

420

421

### Complete Deterministic Test Environment

422

423

```scala

424

class DeterministicTestSystem extends TestKit(ActorSystem("TestSystem", ConfigFactory.parseString(

425

"""

426

akka {

427

actor {

428

default-dispatcher {

429

type = akka.testkit.CallingThreadDispatcherConfigurator

430

}

431

}

432

scheduler {

433

implementation = akka.testkit.ExplicitlyTriggeredScheduler

434

}

435

}

436

"""

437

))) with ImplicitSender {

438

439

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

440

441

def advanceTime(duration: FiniteDuration): Unit = {

442

scheduler.timePasses(duration)

443

}

444

}

445

446

class ComplexActor extends Actor {

447

import context.dispatcher

448

449

private var state = "idle"

450

private var workCount = 0

451

452

override def preStart(): Unit = {

453

// Schedule periodic status reports

454

context.system.scheduler.schedule(5.seconds, 5.seconds, self, "report-status")

455

}

456

457

def receive = {

458

case "start-work" =>

459

state = "working"

460

// Simulate async work completion

461

context.system.scheduler.scheduleOnce(2.seconds, self, "work-complete")

462

sender() ! "work-started"

463

464

case "work-complete" =>

465

workCount += 1

466

state = "idle"

467

context.parent ! s"work-completed-$workCount"

468

469

case "report-status" =>

470

context.parent ! s"status: $state, completed: $workCount"

471

}

472

}

473

474

// Usage in test

475

class ComplexActorSpec extends DeterministicTestSystem {

476

"ComplexActor" should {

477

"handle work and reporting deterministically" in {

478

val actor = system.actorOf(Props[ComplexActor])

479

480

// Start work

481

actor ! "start-work"

482

expectMsg("work-started")

483

484

// No completion yet

485

expectNoMessage(100.millis)

486

487

// Advance time to complete work

488

advanceTime(2.seconds)

489

expectMsg("work-completed-1")

490

491

// Advance to first status report

492

advanceTime(3.seconds) // Total 5 seconds from start

493

expectMsg("status: idle, completed: 1")

494

495

// Start more work

496

actor ! "start-work"

497

expectMsg("work-started")

498

499

// Advance time through work completion and next status report

500

advanceTime(5.seconds)

501

expectMsg("work-completed-2")

502

expectMsg("status: idle, completed: 2")

503

}

504

}

505

}

506

```

507

508

## Best Practices

509

510

### CallingThreadDispatcher Best Practices

511

512

1. **Use for unit tests**: CallingThreadDispatcher is ideal for testing individual actor logic

513

2. **Avoid for integration tests**: May hide concurrency issues that occur in production

514

3. **Configure consistently**: Use either globally or per-actor, not mixed

515

4. **Test both sync and async**: Also test with real dispatchers to catch concurrency issues

516

517

```scala

518

// Good: Consistent usage for deterministic unit testing

519

val actor = system.actorOf(

520

Props[MyActor].withDispatcher(CallingThreadDispatcher.Id)

521

)

522

523

// Also good: Global configuration for test suite

524

akka.actor.default-dispatcher.type = akka.testkit.CallingThreadDispatcherConfigurator

525

```

526

527

### ExplicitlyTriggeredScheduler Best Practices

528

529

1. **Advance time methodically**: Use small, controlled time advances

530

2. **Test edge cases**: Test timer cancellation, overlapping timers

531

3. **Verify no unexpected scheduling**: Check for unexpected scheduled tasks

532

4. **Document time flow**: Comment time advances in complex tests

533

534

```scala

535

// Good: Clear time progression

536

scheduler.timePasses(1.second) // Initial delay

537

expectMsg("first-event")

538

539

scheduler.timePasses(2.seconds) // Regular interval

540

expectMsg("second-event")

541

542

scheduler.timePasses(2.seconds) // Another interval

543

expectMsg("third-event")

544

```

545

546

### Combined Usage Best Practices

547

548

1. **Start simple**: Begin with just one deterministic component

549

2. **Test incrementally**: Add complexity gradually

550

3. **Verify assumptions**: Ensure deterministic behavior is actually achieved

551

4. **Document setup**: Clearly document deterministic test environment setup

552

553

```scala

554

// Good: Well-documented deterministic test setup

555

class MyDeterministicSpec extends TestKit(ActorSystem("test",

556

ConfigFactory.parseString("""

557

# Deterministic execution for predictable testing

558

akka.actor.default-dispatcher.type = akka.testkit.CallingThreadDispatcherConfigurator

559

akka.scheduler.implementation = akka.testkit.ExplicitlyTriggeredScheduler

560

""")

561

)) {

562

563

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

564

565

// Helper for controlled time advancement

566

def tick(duration: FiniteDuration = 1.second): Unit = {

567

scheduler.timePasses(duration)

568

}

569

}

570

```