0
# Synchronization Utilities
1
2
Akka TestKit provides synchronization utilities including TestLatch for countdown coordination and TestBarrier for cyclic synchronization, enabling precise coordination of concurrent test scenarios and actor interactions.
3
4
## Capabilities
5
6
### TestLatch
7
8
Count-down latch for test synchronization with timeout support, useful for coordinating multiple threads or waiting for specific conditions.
9
10
```scala { .api }
11
/**
12
* Count-down latch for test synchronization with timeouts
13
* @param count Initial count value (defaults to 1)
14
* @param system Implicit ActorSystem for timeout configuration
15
*/
16
class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
17
18
/**
19
* Decrement the latch counter
20
* When counter reaches zero, latch opens and waiting threads are released
21
*/
22
def countDown(): Unit
23
24
/**
25
* Check if latch is open (counter has reached zero)
26
* @return True if latch is open, false otherwise
27
*/
28
def isOpen: Boolean
29
30
/**
31
* Force latch open regardless of current counter value
32
* Releases all waiting threads immediately
33
*/
34
def open(): Unit
35
36
/**
37
* Reset latch to original count value
38
* Allows reuse of the same latch instance
39
*/
40
def reset(): Unit
41
42
/**
43
* Wait for latch to open with timeout
44
* @param atMost Maximum time to wait for latch to open
45
* @param permit Implicit CanAwait permission for blocking operation
46
* @return This TestLatch instance for method chaining
47
* @throws TimeoutException if latch doesn't open within timeout
48
*/
49
def ready(atMost: Duration)(implicit permit: CanAwait): TestLatch
50
}
51
```
52
53
### TestLatch Factory
54
55
Factory methods and constants for creating TestLatch instances.
56
57
```scala { .api }
58
object TestLatch {
59
/**
60
* Create TestLatch with specified count
61
* @param count Initial count value (defaults to 1)
62
* @param system Implicit ActorSystem
63
* @return New TestLatch instance
64
*/
65
def apply(count: Int = 1)(implicit system: ActorSystem): TestLatch
66
67
/**
68
* Default timeout for TestLatch operations
69
*/
70
val DefaultTimeout: Duration = 5.seconds
71
}
72
```
73
74
### TestBarrier
75
76
Cyclic barrier for coordinating multiple test threads, where all threads must reach the barrier before any can proceed.
77
78
```scala { .api }
79
/**
80
* Cyclic barrier for coordinating multiple test threads
81
* All specified number of threads must reach barrier before any can proceed
82
* @param count Number of threads that must reach barrier
83
*/
84
class TestBarrier(count: Int) {
85
86
/**
87
* Wait at barrier using default timeout
88
* Blocks until all threads reach the barrier
89
* @param system Implicit ActorSystem for timeout configuration
90
* @throws TestBarrierTimeoutException if timeout occurs
91
*/
92
def await()(implicit system: ActorSystem): Unit
93
94
/**
95
* Wait at barrier with specified timeout
96
* Blocks until all threads reach the barrier or timeout occurs
97
* @param timeout Maximum time to wait at barrier
98
* @param system Implicit ActorSystem
99
* @throws TestBarrierTimeoutException if timeout occurs
100
*/
101
def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit
102
103
/**
104
* Reset barrier for reuse
105
* Allows the same barrier to coordinate multiple rounds of synchronization
106
*/
107
def reset(): Unit
108
}
109
```
110
111
### TestBarrier Exception
112
113
Exception thrown when barrier operations timeout.
114
115
```scala { .api }
116
/**
117
* Exception thrown when TestBarrier operations timeout
118
* @param message Descriptive error message
119
*/
120
case class TestBarrierTimeoutException(message: String) extends RuntimeException(message)
121
```
122
123
**Usage Examples:**
124
125
```scala
126
import akka.actor.{Actor, ActorSystem, Props}
127
import akka.testkit.{TestKit, TestLatch, TestBarrier}
128
import scala.concurrent.{Future, ExecutionContext}
129
import scala.concurrent.duration._
130
131
class SynchronizationExample extends TestKit(ActorSystem("test")) {
132
implicit val ec: ExecutionContext = system.dispatcher
133
134
"TestLatch" should {
135
"coordinate single countdown" in {
136
val latch = TestLatch()
137
138
// Start background task
139
Future {
140
Thread.sleep(100)
141
latch.countDown()
142
}
143
144
// Wait for background task to complete
145
latch.ready(1.second)
146
assert(latch.isOpen)
147
}
148
149
"coordinate multiple countdowns" in {
150
val latch = TestLatch(3)
151
152
// Start multiple background tasks
153
(1 to 3).foreach { i =>
154
Future {
155
Thread.sleep(i * 50)
156
latch.countDown()
157
}
158
}
159
160
// Wait for all tasks to complete
161
latch.ready(1.second)
162
assert(latch.isOpen)
163
}
164
165
"support reset and reuse" in {
166
val latch = TestLatch(2)
167
168
// First use
169
Future { latch.countDown() }
170
Future { latch.countDown() }
171
latch.ready(1.second)
172
assert(latch.isOpen)
173
174
// Reset and reuse
175
latch.reset()
176
assert(!latch.isOpen)
177
178
Future { latch.countDown() }
179
Future { latch.countDown() }
180
latch.ready(1.second)
181
assert(latch.isOpen)
182
}
183
184
"support manual opening" in {
185
val latch = TestLatch(10)
186
187
// Force open without waiting for all countdowns
188
latch.open()
189
assert(latch.isOpen)
190
191
// ready() returns immediately
192
latch.ready(1.second)
193
}
194
}
195
196
"TestBarrier" should {
197
"coordinate multiple threads" in {
198
val barrier = TestBarrier(3)
199
val results = collection.mutable.ListBuffer[Int]()
200
201
// Start multiple threads that must synchronize
202
val futures = (1 to 3).map { i =>
203
Future {
204
// Do some work
205
Thread.sleep(i * 30)
206
207
// Wait at barrier
208
barrier.await(1.second)
209
210
// Continue after all threads reach barrier
211
results += i
212
}
213
}
214
215
// Wait for all threads to complete
216
Future.sequence(futures).ready(2.seconds)
217
218
// All threads should have added their results
219
assert(results.size == 3)
220
assert(results.contains(1))
221
assert(results.contains(2))
222
assert(results.contains(3))
223
}
224
225
"support barrier reset and reuse" in {
226
val barrier = TestBarrier(2)
227
228
// First round of synchronization
229
val round1 = (1 to 2).map { i =>
230
Future {
231
barrier.await(1.second)
232
i
233
}
234
}
235
236
Future.sequence(round1).ready(2.seconds)
237
238
// Reset barrier for second round
239
barrier.reset()
240
241
// Second round of synchronization
242
val round2 = (1 to 2).map { i =>
243
Future {
244
barrier.await(1.second)
245
i + 10
246
}
247
}
248
249
Future.sequence(round2).ready(2.seconds)
250
}
251
252
"timeout when not all threads arrive" in {
253
val barrier = TestBarrier(3)
254
255
// Only start 2 of 3 required threads
256
Future { barrier.await(500.millis) }
257
Future { barrier.await(500.millis) }
258
259
// Third thread never arrives, should timeout
260
intercept[TestBarrierTimeoutException] {
261
// This will timeout since only 2/3 threads arrived
262
Thread.sleep(1000)
263
}
264
}
265
}
266
}
267
```
268
269
### Actor Synchronization Patterns
270
271
Using synchronization utilities with actors for coordinated testing scenarios.
272
273
```scala
274
class ActorSynchronizationExample extends TestKit(ActorSystem("test")) {
275
276
class WorkerActor(latch: TestLatch) extends Actor {
277
def receive = {
278
case "work" =>
279
// Simulate work
280
Thread.sleep(100)
281
sender() ! "done"
282
latch.countDown()
283
}
284
}
285
286
"Actor coordination with TestLatch" should {
287
"wait for multiple actors to complete work" in {
288
val latch = TestLatch(3)
289
val workers = (1 to 3).map { i =>
290
system.actorOf(Props(new WorkerActor(latch)), s"worker-$i")
291
}
292
293
// Send work to all workers
294
workers.foreach(_ ! "work")
295
296
// Wait for all workers to complete
297
latch.ready(2.seconds)
298
299
// All workers have completed their work
300
assert(latch.isOpen)
301
}
302
}
303
304
class BarrierActor(barrier: TestBarrier, id: Int) extends Actor {
305
def receive = {
306
case "sync" =>
307
try {
308
barrier.await(1.second)
309
sender() ! s"synchronized-$id"
310
} catch {
311
case _: TestBarrierTimeoutException =>
312
sender() ! s"timeout-$id"
313
}
314
}
315
}
316
317
"Actor coordination with TestBarrier" should {
318
"synchronize multiple actors" in {
319
val barrier = TestBarrier(2)
320
val actor1 = system.actorOf(Props(new BarrierActor(barrier, 1)))
321
val actor2 = system.actorOf(Props(new BarrierActor(barrier, 2)))
322
323
// Send sync message to both actors
324
actor1 ! "sync"
325
actor2 ! "sync"
326
327
// Both should synchronize and respond
328
expectMsgAnyOf(2.seconds, "synchronized-1", "synchronized-2")
329
expectMsgAnyOf(2.seconds, "synchronized-1", "synchronized-2")
330
}
331
}
332
}
333
```
334
335
### Integration with TestKit Timing
336
337
Combining synchronization utilities with TestKit timing controls for precise test coordination.
338
339
```scala
340
class TimingSynchronizationExample extends TestKit(ActorSystem("test")) {
341
342
"Combined timing and synchronization" should {
343
"coordinate complex scenarios" in {
344
val latch = TestLatch(2)
345
val barrier = TestBarrier(2)
346
347
within(3.seconds) {
348
// Start coordinated operations
349
Future {
350
barrier.await(1.second) // Synchronize start
351
Thread.sleep(500) // Simulate work
352
latch.countDown() // Signal completion
353
}
354
355
Future {
356
barrier.await(1.second) // Synchronize start
357
Thread.sleep(500) // Simulate work
358
latch.countDown() // Signal completion
359
}
360
361
// Wait for both operations to complete
362
latch.ready(2.seconds)
363
}
364
365
assert(latch.isOpen)
366
}
367
}
368
}
369
```
370
371
### Thread Safety and Best Practices
372
373
Guidelines for safe usage of synchronization utilities in concurrent test scenarios.
374
375
```scala
376
// SAFE: Proper timeout handling
377
val latch = TestLatch(3)
378
try {
379
latch.ready(5.seconds)
380
// Continue with test
381
} catch {
382
case _: TimeoutException =>
383
// Handle timeout appropriately
384
fail("Latch did not open within timeout")
385
}
386
387
// SAFE: Proper barrier usage
388
val barrier = TestBarrier(2)
389
try {
390
barrier.await(2.seconds)
391
// All threads synchronized
392
} catch {
393
case _: TestBarrierTimeoutException =>
394
// Handle barrier timeout
395
fail("Not all threads reached barrier")
396
}
397
398
// BEST PRACTICE: Reset for reuse
399
val reusableLatch = TestLatch(2)
400
// Use latch
401
reusableLatch.ready(1.second)
402
// Reset for next test
403
reusableLatch.reset()
404
405
// BEST PRACTICE: Appropriate timeouts
406
// Use timeouts longer than expected operation time
407
val conservativeLatch = TestLatch()
408
conservativeLatch.ready(expectedTime * 2)
409
```