0
# Test Utilities and Coordination
1
2
Akka TestKit provides various utilities for test coordination, synchronization, and common testing patterns. These utilities help manage complex test scenarios involving multiple actors and timing constraints.
3
4
## TestActors
5
6
### TestActors Object { .api }
7
8
```scala
9
object TestActors {
10
val echoActorProps: Props
11
val blackholeProps: Props
12
def forwardActorProps(ref: ActorRef): Props
13
}
14
```
15
16
Collection of common test actor patterns that can be used in various testing scenarios.
17
18
### Built-in Test Actors
19
20
#### EchoActor { .api }
21
22
```scala
23
class EchoActor extends Actor {
24
def receive = {
25
case msg => sender() ! msg
26
}
27
}
28
```
29
30
An actor that echoes back any message it receives to the sender.
31
32
#### BlackholeActor { .api }
33
34
```scala
35
class BlackholeActor extends Actor {
36
def receive = {
37
case _ => // ignore all messages
38
}
39
}
40
```
41
42
An actor that ignores all messages it receives, useful for testing scenarios where you need to send messages but don't care about responses.
43
44
#### ForwardActor { .api }
45
46
```scala
47
class ForwardActor(target: ActorRef) extends Actor {
48
def receive = {
49
case msg => target.forward(msg)
50
}
51
}
52
```
53
54
An actor that forwards all messages to a specified target actor.
55
56
### Usage Examples
57
58
#### EchoActor Usage
59
60
```scala
61
import akka.testkit.TestActors
62
63
// Create echo actor
64
val echo = system.actorOf(TestActors.echoActorProps, "echo")
65
66
// Test echoing behavior
67
echo ! "hello"
68
expectMsg("hello")
69
70
echo ! 42
71
expectMsg(42)
72
73
echo ! SomeMessage("data")
74
expectMsg(SomeMessage("data"))
75
76
// Use in testing actor interactions
77
val myActor = system.actorOf(Props[MyActor])
78
myActor ! RegisterListener(echo)
79
80
myActor ! TriggerNotification("test")
81
expectMsg("test") // Echo will forward the notification
82
```
83
84
#### BlackholeActor Usage
85
86
```scala
87
// Create blackhole actor
88
val blackhole = system.actorOf(TestActors.blackholeProps, "blackhole")
89
90
// Test that messages are ignored
91
blackhole ! "message1"
92
blackhole ! "message2"
93
blackhole ! "message3"
94
95
expectNoMessage(1.second) // No messages should come back
96
97
// Use for testing fire-and-forget scenarios
98
val producer = system.actorOf(Props[EventProducer])
99
producer ! SetDestination(blackhole)
100
producer ! GenerateEvents(100)
101
102
// Events are sent but not processed, testing producer behavior only
103
```
104
105
#### ForwardActor Usage
106
107
```scala
108
// Create probe to receive forwarded messages
109
val probe = TestProbe()
110
val forwarder = system.actorOf(TestActors.forwardActorProps(probe.ref), "forwarder")
111
112
// Test forwarding behavior
113
forwarder ! "test-message"
114
probe.expectMsg("test-message")
115
116
// Test that sender is preserved in forwarding
117
val sender = TestProbe()
118
forwarder.tell("forwarded", sender.ref)
119
probe.expectMsg("forwarded")
120
assert(probe.lastSender == sender.ref) // Sender is preserved
121
122
// Use for testing message routing
123
val router = system.actorOf(Props[MessageRouter])
124
router ! AddRoute("test-channel", forwarder)
125
router ! RouteMessage("test-channel", "routed-message")
126
probe.expectMsg("routed-message")
127
```
128
129
## TestBarrier
130
131
### TestBarrier Class { .api }
132
133
```scala
134
class TestBarrier(count: Int) {
135
def await()(implicit system: ActorSystem): Unit
136
def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit
137
def reset(): Unit
138
}
139
```
140
141
A testing utility that implements a cyclic barrier pattern, allowing multiple threads or test execution paths to synchronize at a common point.
142
143
### TestBarrier Object { .api }
144
145
```scala
146
object TestBarrier {
147
val DefaultTimeout: Duration = 5.seconds
148
def apply(count: Int): TestBarrier
149
}
150
```
151
152
### TestBarrierTimeoutException { .api }
153
154
```scala
155
class TestBarrierTimeoutException(message: String) extends RuntimeException(message)
156
```
157
158
Exception thrown when barrier operations timeout.
159
160
### Usage Examples
161
162
#### Basic Barrier Usage
163
164
```scala
165
import akka.testkit.TestBarrier
166
import scala.concurrent.Future
167
import scala.concurrent.ExecutionContext.Implicits.global
168
169
// Create barrier for 3 parties
170
val barrier = TestBarrier(3)
171
172
// Simulate concurrent operations
173
val future1 = Future {
174
// Do some work
175
Thread.sleep(100)
176
barrier.await() // Wait for others
177
"task1 complete"
178
}
179
180
val future2 = Future {
181
// Do different work
182
Thread.sleep(200)
183
barrier.await() // Wait for others
184
"task2 complete"
185
}
186
187
val future3 = Future {
188
// Do more work
189
Thread.sleep(150)
190
barrier.await() // Wait for others
191
"task3 complete"
192
}
193
194
// All futures will complete after all reach the barrier
195
val results = Await.result(Future.sequence(List(future1, future2, future3)), 5.seconds)
196
println(results) // All tasks completed together
197
```
198
199
#### Barrier with Timeout
200
201
```scala
202
val barrier = TestBarrier(2)
203
204
val future1 = Future {
205
barrier.await(3.seconds)
206
"completed"
207
}
208
209
// Second party never arrives - will timeout
210
intercept[TestBarrierTimeoutException] {
211
Await.result(future1, 5.seconds)
212
}
213
```
214
215
#### Multiple Barrier Rounds
216
217
```scala
218
val barrier = TestBarrier(2)
219
220
def worker(id: Int): Future[List[String]] = Future {
221
val results = mutable.ListBuffer[String]()
222
223
for (round <- 1 to 3) {
224
// Do work for this round
225
results += s"worker-$id-round-$round"
226
227
// Synchronize with other worker
228
barrier.await()
229
230
if (round < 3) {
231
// Reset barrier for next round (except last)
232
barrier.reset()
233
}
234
}
235
236
results.toList
237
}
238
239
val worker1 = worker(1)
240
val worker2 = worker(2)
241
242
val results = Await.result(Future.sequence(List(worker1, worker2)), 10.seconds)
243
// Both workers complete all rounds together
244
```
245
246
#### Testing Actor Coordination
247
248
```scala
249
class CoordinatedActor(barrier: TestBarrier) extends Actor {
250
def receive = {
251
case "phase1" =>
252
// Do phase 1 work
253
sender() ! "phase1-done"
254
barrier.await()
255
256
case "phase2" =>
257
// Do phase 2 work
258
sender() ! "phase2-done"
259
barrier.await()
260
}
261
}
262
263
val barrier = TestBarrier(2)
264
val actor1 = system.actorOf(Props(new CoordinatedActor(barrier)))
265
val actor2 = system.actorOf(Props(new CoordinatedActor(barrier)))
266
267
// Both actors will synchronize at barrier
268
actor1 ! "phase1"
269
actor2 ! "phase1"
270
271
expectMsg("phase1-done")
272
expectMsg("phase1-done")
273
274
// Now phase 2
275
barrier.reset()
276
actor1 ! "phase2"
277
actor2 ! "phase2"
278
279
expectMsg("phase2-done")
280
expectMsg("phase2-done")
281
```
282
283
## TestLatch
284
285
### TestLatch Class { .api }
286
287
```scala
288
class TestLatch(count: Int)(implicit system: ActorSystem) extends Awaitable[Unit] {
289
def countDown(): Unit
290
def isOpen: Boolean
291
def open(): Unit
292
def reset(): Unit
293
def ready(atMost: Duration)(implicit permit: CanAwait): TestLatch
294
def result(atMost: Duration)(implicit permit: CanAwait): Unit
295
}
296
```
297
298
A testing utility that implements a countdown latch pattern, allowing tests to wait for a specific number of events to occur.
299
300
### TestLatch Object { .api }
301
302
```scala
303
object TestLatch {
304
val DefaultTimeout: Duration = 5.seconds
305
def apply(count: Int)(implicit system: ActorSystem): TestLatch
306
}
307
```
308
309
### Usage Examples
310
311
#### Basic Latch Usage
312
313
```scala
314
import akka.testkit.TestLatch
315
import scala.concurrent.Future
316
import scala.concurrent.ExecutionContext.Implicits.global
317
318
// Create latch that waits for 3 events
319
val latch = TestLatch(3)
320
321
// Simulate events happening
322
Future {
323
Thread.sleep(100)
324
latch.countDown() // Event 1
325
}
326
327
Future {
328
Thread.sleep(200)
329
latch.countDown() // Event 2
330
}
331
332
Future {
333
Thread.sleep(300)
334
latch.countDown() // Event 3
335
}
336
337
// Wait for all events to complete
338
Await.ready(latch, 5.seconds)
339
assert(latch.isOpen)
340
```
341
342
#### Testing Actor Completion
343
344
```scala
345
class WorkerActor(latch: TestLatch) extends Actor {
346
def receive = {
347
case "work" =>
348
// Simulate work
349
Thread.sleep(100)
350
sender() ! "done"
351
latch.countDown() // Signal completion
352
353
case "fail" =>
354
throw new RuntimeException("Work failed")
355
// Latch won't be decremented on failure
356
}
357
}
358
359
val latch = TestLatch(3)
360
val workers = (1 to 3).map { i =>
361
system.actorOf(Props(new WorkerActor(latch)), s"worker-$i")
362
}
363
364
// Send work to all workers
365
workers.foreach(_ ! "work")
366
367
// Wait for all to complete
368
Await.ready(latch, 5.seconds)
369
assert(latch.isOpen)
370
371
// Verify all completed
372
expectMsg("done")
373
expectMsg("done")
374
expectMsg("done")
375
```
376
377
#### Latch with Manual Open
378
379
```scala
380
val latch = TestLatch(5)
381
382
// Count down a few times
383
latch.countDown() // 4 remaining
384
latch.countDown() // 3 remaining
385
latch.countDown() // 2 remaining
386
387
assert(!latch.isOpen)
388
389
// Manually open without waiting for remaining countdowns
390
latch.open()
391
assert(latch.isOpen)
392
393
// Can wait on already open latch
394
Await.ready(latch, 1.second) // Returns immediately
395
```
396
397
#### Resetting Latch
398
399
```scala
400
val latch = TestLatch(2)
401
402
// Use latch first time
403
latch.countDown()
404
latch.countDown()
405
assert(latch.isOpen)
406
407
// Reset for reuse
408
latch.reset()
409
assert(!latch.isOpen)
410
411
// Use again
412
Future { latch.countDown() }
413
Future { latch.countDown() }
414
415
Await.ready(latch, 5.seconds)
416
assert(latch.isOpen)
417
```
418
419
#### Testing System Initialization
420
421
```scala
422
class InitializableService(latch: TestLatch) extends Actor {
423
override def preStart(): Unit = {
424
// Simulate initialization work
425
Future {
426
Thread.sleep(500) // Simulate async initialization
427
self ! "initialized"
428
}
429
}
430
431
def receive = {
432
case "initialized" =>
433
latch.countDown() // Signal service is ready
434
context.become(ready)
435
}
436
437
def ready: Receive = {
438
case "request" => sender() ! "response"
439
}
440
}
441
442
// Test system startup
443
val latch = TestLatch(3) // 3 services
444
val services = (1 to 3).map { i =>
445
system.actorOf(Props(new InitializableService(latch)), s"service-$i")
446
}
447
448
// Wait for all services to initialize
449
Await.ready(latch, 10.seconds)
450
451
// Now services are ready for requests
452
services.head ! "request"
453
expectMsg("response")
454
```
455
456
## Combining Barriers and Latches
457
458
### Complex Coordination Scenarios
459
460
```scala
461
// Test scenario: Initialize services, run coordinated work, cleanup
462
class CoordinatedService(
463
initLatch: TestLatch,
464
workBarrier: TestBarrier,
465
cleanupLatch: TestLatch
466
) extends Actor {
467
468
override def preStart(): Unit = {
469
// Initialize
470
Future {
471
Thread.sleep(Random.nextInt(200))
472
self ! "ready"
473
}
474
}
475
476
def receive = {
477
case "ready" =>
478
initLatch.countDown()
479
context.become(ready)
480
}
481
482
def ready: Receive = {
483
case "do-work" =>
484
// Wait for all services to start work together
485
workBarrier.await(3.seconds)
486
487
// Do coordinated work
488
Thread.sleep(100)
489
sender() ! "work-done"
490
491
cleanupLatch.countDown()
492
}
493
}
494
495
// Test setup
496
val initLatch = TestLatch(3)
497
val workBarrier = TestBarrier(3)
498
val cleanupLatch = TestLatch(3)
499
500
val services = (1 to 3).map { i =>
501
system.actorOf(Props(new CoordinatedService(initLatch, workBarrier, cleanupLatch)))
502
}
503
504
// Wait for initialization
505
Await.ready(initLatch, 5.seconds)
506
507
// Trigger coordinated work
508
services.foreach(_ ! "do-work")
509
510
// Wait for cleanup
511
Await.ready(cleanupLatch, 5.seconds)
512
513
// Verify all completed
514
expectMsg("work-done")
515
expectMsg("work-done")
516
expectMsg("work-done")
517
```
518
519
## Best Practices
520
521
### TestActors Best Practices
522
523
1. **Use appropriate test actor**: Choose EchoActor, BlackholeActor, or ForwardActor based on testing needs
524
2. **Name test actors**: Use meaningful names for easier debugging
525
3. **Clean up resources**: Stop test actors when no longer needed
526
527
```scala
528
// Good: Clear purpose and naming
529
val responseCapture = system.actorOf(TestActors.echoActorProps, "response-capture")
530
val eventSink = system.actorOf(TestActors.blackholeProps, "event-sink")
531
```
532
533
### TestBarrier Best Practices
534
535
1. **Match party count**: Ensure barrier count matches number of participants
536
2. **Handle timeouts**: Always specify appropriate timeouts for barrier operations
537
3. **Reset after use**: Reset barriers when reusing in multiple test phases
538
4. **Avoid deadlocks**: Ensure all parties will eventually reach the barrier
539
540
```scala
541
// Good: Proper timeout and error handling
542
try {
543
barrier.await(10.seconds)
544
} catch {
545
case _: TestBarrierTimeoutException =>
546
fail("Not all parties reached barrier in time")
547
}
548
```
549
550
### TestLatch Best Practices
551
552
1. **Count accurately**: Set latch count to match expected number of events
553
2. **Use timeouts**: Always specify timeouts when waiting on latches
554
3. **Handle failures**: Consider what happens if some events fail to occur
555
4. **Reset when reusing**: Reset latches between test phases
556
557
```scala
558
// Good: Comprehensive latch usage
559
val latch = TestLatch(expectedEvents)
560
561
try {
562
Await.ready(latch, reasonableTimeout)
563
} catch {
564
case _: TimeoutException =>
565
fail(s"Only ${expectedEvents - latch.count} of $expectedEvents events completed")
566
}
567
```