or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actor-refs.mdconfiguration.mdcore-testing.mddispatchers.mdevent-filtering.mdindex.mdjava-dsl.mdpackage-functions.mdsynchronization.mdtest-actors.mdutilities.md

dispatchers.mddocs/

0

# Custom Dispatchers and Scheduling

1

2

Specialized dispatchers and schedulers that provide deterministic execution environments for testing, including single-threaded execution and manual time advancement.

3

4

## Capabilities

5

6

### CallingThreadDispatcher

7

8

Dispatcher that runs on current thread for deterministic testing.

9

10

```scala { .api }

11

class CallingThreadDispatcher(_config: Config, _prerequisites: DispatcherPrerequisites)

12

extends MessageDispatcher(_config, _prerequisites) {

13

14

// Dispatcher ID constant

15

val Id = "akka.test.calling-thread-dispatcher"

16

17

// Core dispatcher methods

18

def dispatch(receiver: ActorCell, invocation: Envelope): Unit

19

def systemDispatch(receiver: ActorCell, invocation: SystemMessage): Unit

20

def executeTask(invocation: TaskInvocation): Unit

21

def createMailbox(actor: akka.actor.Cell, mailboxType: MailboxType): Mailbox

22

def shutdown(): Unit

23

}

24

25

class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)

26

extends MessageDispatcherConfigurator(config, prerequisites) {

27

28

def dispatcher(): MessageDispatcher

29

}

30

```

31

32

**Usage Examples:**

33

34

```scala

35

import akka.testkit.{TestKit, CallingThreadDispatcher}

36

37

class CallingThreadDispatcherTest extends TestKit(ActorSystem("TestSystem")) {

38

"CallingThreadDispatcher" should {

39

"execute actors on current thread" in {

40

val mainThreadId = Thread.currentThread().getId

41

@volatile var actorThreadId: Long = -1

42

43

val actor = system.actorOf(Props(new Actor {

44

def receive = {

45

case "get-thread" =>

46

actorThreadId = Thread.currentThread().getId

47

sender() ! actorThreadId

48

}

49

}).withDispatcher(CallingThreadDispatcher.Id))

50

51

actor ! "get-thread"

52

expectMsg(mainThreadId)

53

54

actorThreadId should equal(mainThreadId)

55

}

56

57

"provide deterministic execution order" in {

58

val results = mutable.ListBuffer[Int]()

59

60

val actors = (1 to 3).map { i =>

61

system.actorOf(Props(new Actor {

62

def receive = {

63

case "execute" =>

64

results += i

65

sender() ! s"done-$i"

66

}

67

}).withDispatcher(CallingThreadDispatcher.Id))

68

}

69

70

// Messages processed in order sent (deterministic)

71

actors.foreach(_ ! "execute")

72

73

receiveN(3) should equal(Seq("done-1", "done-2", "done-3"))

74

results.toList should equal(List(1, 2, 3))

75

}

76

77

"work with TestActorRef" in {

78

val props = Props(new Actor {

79

def receive = {

80

case msg => sender() ! s"processed: $msg"

81

}

82

}).withDispatcher(CallingThreadDispatcher.Id)

83

84

val actor = TestActorRef(props)

85

86

// Direct message processing (synchronous)

87

actor ! "test"

88

expectMsg("processed: test")

89

}

90

}

91

}

92

```

93

94

### ExplicitlyTriggeredScheduler

95

96

Scheduler that requires manual time advancement for testing.

97

98

```scala { .api }

99

class ExplicitlyTriggeredScheduler extends Scheduler {

100

// Time control methods

101

def timePasses(amount: FiniteDuration): Unit

102

def currentTimeMs: Long

103

104

// Scheduler interface methods

105

def schedule(

106

initialDelay: FiniteDuration,

107

interval: FiniteDuration,

108

runnable: Runnable

109

)(implicit executor: ExecutionContext): Cancellable

110

111

def scheduleOnce(

112

delay: FiniteDuration,

113

runnable: Runnable

114

)(implicit executor: ExecutionContext): Cancellable

115

116

def scheduleWithFixedDelay(

117

initialDelay: FiniteDuration,

118

delay: FiniteDuration,

119

runnable: Runnable

120

)(implicit executor: ExecutionContext): Cancellable

121

122

def scheduleAtFixedRate(

123

initialDelay: FiniteDuration,

124

interval: FiniteDuration,

125

runnable: Runnable

126

)(implicit executor: ExecutionContext): Cancellable

127

}

128

```

129

130

**Configuration and Usage:**

131

132

```scala

133

// Configuration for explicitly triggered scheduler

134

val config = ConfigFactory.parseString("""

135

akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"

136

""")

137

138

val system = ActorSystem("TestSystem", config)

139

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

140

141

class ExplicitSchedulerTest extends TestKit(system) {

142

"ExplicitlyTriggeredScheduler" should {

143

"allow manual time advancement" in {

144

@volatile var executed = false

145

@volatile var executionTime = 0L

146

147

// Schedule task for 5 seconds from now

148

scheduler.scheduleOnce(5.seconds, new Runnable {

149

def run(): Unit = {

150

executed = true

151

executionTime = scheduler.currentTimeMs

152

}

153

})

154

155

// Task not executed yet

156

executed should be(false)

157

158

// Advance time by 3 seconds - still not enough

159

scheduler.timePasses(3.seconds)

160

executed should be(false)

161

162

// Advance time by 2 more seconds - now it executes

163

scheduler.timePasses(2.seconds)

164

executed should be(true)

165

executionTime should equal(scheduler.currentTimeMs)

166

}

167

168

"handle repeated scheduling" in {

169

val executions = mutable.ListBuffer[Long]()

170

171

// Schedule repeated task every 2 seconds

172

scheduler.schedule(

173

initialDelay = 1.second,

174

interval = 2.seconds,

175

new Runnable {

176

def run(): Unit = executions += scheduler.currentTimeMs

177

}

178

)

179

180

executions should be(empty)

181

182

// First execution after 1 second

183

scheduler.timePasses(1.second)

184

executions should have size 1

185

186

// Second execution after 2 more seconds (3 total)

187

scheduler.timePasses(2.seconds)

188

executions should have size 2

189

190

// Third execution after 2 more seconds (5 total)

191

scheduler.timePasses(2.seconds)

192

executions should have size 3

193

194

// Verify execution times

195

executions.toList should equal(List(1000, 3000, 5000))

196

}

197

198

"work with actor timers" in {

199

class TimerActor extends Actor with Timers {

200

def receive = {

201

case "start-timer" =>

202

timers.startSingleTimer("test-timer", "tick", 3.seconds)

203

case "tick" =>

204

sender() ! "timer-fired"

205

}

206

}

207

208

val actor = system.actorOf(Props[TimerActor]())

209

actor ! "start-timer"

210

211

// Timer not fired yet

212

expectNoMessage(100.millis)

213

214

// Advance time to trigger timer

215

scheduler.timePasses(3.seconds)

216

expectMsg("timer-fired")

217

}

218

}

219

}

220

```

221

222

### Integration with TestKit

223

224

Both dispatchers work seamlessly with TestKit:

225

226

```scala

227

class DispatcherIntegrationTest extends TestKit(ActorSystem("TestSystem")) {

228

"Dispatcher integration" should {

229

"combine CallingThreadDispatcher with expectations" in {

230

val actor = system.actorOf(Props(new Actor {

231

def receive = {

232

case x: Int => sender() ! (x * 2)

233

}

234

}).withDispatcher(CallingThreadDispatcher.Id))

235

236

// Synchronous processing with deterministic order

237

(1 to 5).foreach(actor ! _)

238

receiveN(5) should equal(List(2, 4, 6, 8, 10))

239

}

240

241

"use ExplicitlyTriggeredScheduler with awaitCond" in {

242

val system = ActorSystem("ExplicitSchedulerSystem", ConfigFactory.parseString("""

243

akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"

244

"""))

245

246

val testKit = new TestKit(system)

247

import testKit._

248

249

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

250

@volatile var condition = false

251

252

scheduler.scheduleOnce(2.seconds, new Runnable {

253

def run(): Unit = condition = true

254

})

255

256

// Condition not met yet

257

intercept[AssertionError] {

258

awaitCond(condition, max = 100.millis)

259

}

260

261

// Advance time and condition becomes true

262

scheduler.timePasses(2.seconds)

263

awaitCond(condition, max = 100.millis) // Should succeed now

264

265

system.terminate()

266

}

267

}

268

}

269

```

270

271

### Testing Actor Scheduling Behavior

272

273

**Testing Periodic Tasks:**

274

275

```scala

276

class PeriodicTaskTest extends TestKit(ActorSystem("TestSystem", ConfigFactory.parseString("""

277

akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"

278

"""))) {

279

280

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

281

282

"Periodic task actor" should {

283

"send messages at regular intervals" in {

284

class PeriodicActor extends Actor {

285

import context.dispatcher

286

287

override def preStart(): Unit = {

288

context.system.scheduler.schedule(

289

initialDelay = 1.second,

290

interval = 2.seconds,

291

self,

292

"tick"

293

)

294

}

295

296

def receive = {

297

case "tick" => testActor ! "periodic-message"

298

}

299

}

300

301

system.actorOf(Props[PeriodicActor]())

302

303

// No messages initially

304

expectNoMessage(100.millis)

305

306

// First message after 1 second

307

scheduler.timePasses(1.second)

308

expectMsg("periodic-message")

309

310

// Second message after 2 more seconds

311

scheduler.timePasses(2.seconds)

312

expectMsg("periodic-message")

313

314

// Third message after 2 more seconds

315

scheduler.timePasses(2.seconds)

316

expectMsg("periodic-message")

317

}

318

}

319

}

320

```

321

322

**Testing Timeout Behavior:**

323

324

```scala

325

class TimeoutTest extends TestKit(ActorSystem("TestSystem", ConfigFactory.parseString("""

326

akka.scheduler.implementation = "akka.testkit.ExplicitlyTriggeredScheduler"

327

"""))) {

328

329

val scheduler = system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]

330

331

"Timeout actor" should {

332

"handle timeouts correctly" in {

333

class TimeoutActor extends Actor {

334

import context.dispatcher

335

336

def receive = {

337

case "start-with-timeout" =>

338

val originalSender = sender()

339

val cancellable = context.system.scheduler.scheduleOnce(5.seconds) {

340

originalSender ! "timeout"

341

}

342

343

context.become(waitingForResponse(cancellable, originalSender))

344

}

345

346

def waitingForResponse(cancellable: Cancellable, client: ActorRef): Receive = {

347

case "response" =>

348

cancellable.cancel()

349

client ! "success"

350

context.unbecome()

351

case "timeout" =>

352

client ! "timeout-occurred"

353

context.unbecome()

354

}

355

}

356

357

val actor = system.actorOf(Props[TimeoutActor]())

358

359

// Start operation with timeout

360

actor ! "start-with-timeout"

361

362

// No response yet

363

expectNoMessage(100.millis)

364

365

// Advance time but not enough for timeout

366

scheduler.timePasses(3.seconds)

367

expectNoMessage(100.millis)

368

369

// Advance time to trigger timeout

370

scheduler.timePasses(2.seconds)

371

expectMsg("timeout-occurred")

372

}

373

}

374

}

375

```

376

377

### Configuration

378

379

Configure dispatchers and schedulers in application.conf:

380

381

```hocon

382

akka {

383

actor {

384

default-dispatcher {

385

type = Dispatcher

386

executor = "thread-pool-executor"

387

}

388

}

389

390

test {

391

calling-thread-dispatcher {

392

type = akka.testkit.CallingThreadDispatcherConfigurator

393

}

394

}

395

396

# For explicit scheduler testing

397

scheduler {

398

implementation = "akka.testkit.ExplicitlyTriggeredScheduler"

399

}

400

}

401

```

402

403

### Best Practices

404

405

1. **Use CallingThreadDispatcher for Unit Tests**: Provides deterministic, synchronous execution

406

2. **Use ExplicitlyTriggeredScheduler for Time-Dependent Tests**: Full control over time advancement

407

3. **Combine with TestActorRef**: Perfect combination for synchronous testing

408

4. **Configure Appropriately**: Use test dispatchers only in test environments

409

5. **Clean Resource Usage**: Both are lightweight but still manage resources

410

411

```scala

412

// Good: Explicit dispatcher configuration for tests

413

val actor = system.actorOf(

414

Props[MyActor]().withDispatcher(CallingThreadDispatcher.Id),

415

"test-actor"

416

)

417

418

// Good: Manual time control for timing tests

419

scheduler.timePasses(expectedDelay)

420

expectMsg("scheduled-message")

421

422

// Good: Combine for comprehensive testing

423

val testActorRef = TestActorRef(

424

Props[TimedActor]().withDispatcher(CallingThreadDispatcher.Id)

425

)

426

scheduler.timePasses(triggerTime)

427

testActorRef.underlyingActor.timersFired should be(true)

428

```