0
# Synchronization Utilities
1
2
Thread-safe synchronization primitives for coordinating multi-threaded tests and ensuring deterministic test execution across concurrent operations.
3
4
## Capabilities
5
6
### TestBarrier Class
7
8
Cyclic barrier wrapper for multi-thread test synchronization.
9
10
```scala { .api }
11
class TestBarrier(count: Int) {
12
// Waiting methods
13
def await()(implicit system: ActorSystem): Unit
14
def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit
15
16
// State management
17
def reset(): Unit
18
def getNumberWaiting: Int
19
def getParties: Int
20
def isBroken: Boolean
21
}
22
23
object TestBarrier {
24
def apply(count: Int): TestBarrier
25
}
26
```
27
28
**Usage Example:**
29
30
```scala
31
import akka.testkit.TestBarrier
32
import scala.concurrent.duration._
33
import scala.concurrent.Future
34
import scala.concurrent.ExecutionContext.Implicits.global
35
36
// Create barrier for 3 threads
37
val barrier = TestBarrier(3)
38
39
// Start 3 concurrent operations
40
val futures = (1 to 3).map { i =>
41
Future {
42
println(s"Thread $i: Starting work")
43
Thread.sleep(scala.util.Random.nextInt(1000)) // Simulate work
44
45
println(s"Thread $i: Waiting at barrier")
46
barrier.await() // All threads wait here
47
48
println(s"Thread $i: Continuing after barrier")
49
s"Thread $i completed"
50
}
51
}
52
53
// All threads will continue together after barrier
54
val results = Await.result(Future.sequence(futures), 5.seconds)
55
56
// Reset barrier for reuse
57
barrier.reset()
58
```
59
60
### TestLatch Class
61
62
CountDownLatch wrapper for test synchronization.
63
64
```scala { .api }
65
class TestLatch(count: Int = 1)(implicit system: ActorSystem) {
66
// Countdown methods
67
def countDown(): Unit
68
def countDown(delta: Int): Unit
69
70
// State checking
71
def isOpen: Boolean
72
def getCount: Long
73
74
// Waiting methods
75
def ready(atMost: Duration)(implicit system: ActorSystem): Unit
76
def ready()(implicit system: ActorSystem): Unit
77
78
// Control methods
79
def open(): Unit // Opens latch completely
80
def reset(): Unit // Resets to original count
81
}
82
83
object TestLatch {
84
def apply(count: Int = 1)(implicit system: ActorSystem): TestLatch
85
}
86
```
87
88
**Usage Example:**
89
90
```scala
91
import akka.testkit.TestLatch
92
import scala.concurrent.Future
93
import scala.concurrent.ExecutionContext.Implicits.global
94
95
// Create latch that waits for 2 operations
96
val latch = TestLatch(2)
97
98
// Start first operation
99
Future {
100
Thread.sleep(500)
101
println("Operation 1 completed")
102
latch.countDown()
103
}
104
105
// Start second operation
106
Future {
107
Thread.sleep(800)
108
println("Operation 2 completed")
109
latch.countDown()
110
}
111
112
// Wait for both operations to complete
113
latch.ready(2.seconds)
114
println("Both operations completed!")
115
116
// Check if latch is open
117
assert(latch.isOpen)
118
assert(latch.getCount == 0)
119
```
120
121
### Advanced Synchronization Patterns
122
123
**Producer-Consumer with TestLatch:**
124
125
```scala
126
import akka.testkit.{TestKit, TestLatch}
127
import akka.actor.{Actor, Props}
128
129
class Producer(latch: TestLatch) extends Actor {
130
def receive = {
131
case "produce" =>
132
// Do some work
133
Thread.sleep(100)
134
println("Item produced")
135
latch.countDown()
136
}
137
}
138
139
class Consumer(latch: TestLatch) extends Actor {
140
def receive = {
141
case "consume" =>
142
latch.ready() // Wait for producer
143
println("Item consumed")
144
}
145
}
146
147
class ProducerConsumerTest extends TestKit(ActorSystem("TestSystem")) {
148
"Producer-Consumer pattern" should {
149
"synchronize correctly" in {
150
val latch = TestLatch(1)
151
152
val producer = system.actorOf(Props(new Producer(latch)))
153
val consumer = system.actorOf(Props(new Consumer(latch)))
154
155
// Start consumer first (it will wait)
156
consumer ! "consume"
157
158
// Then producer (which signals completion)
159
producer ! "produce"
160
161
// Test passes if consumer doesn't block forever
162
}
163
}
164
}
165
```
166
167
**Multi-Stage Pipeline with TestBarrier:**
168
169
```scala
170
import akka.testkit.{TestKit, TestBarrier}
171
import akka.actor.{Actor, Props}
172
173
class PipelineStage(stageId: Int, barrier: TestBarrier) extends Actor {
174
def receive = {
175
case "process" =>
176
println(s"Stage $stageId: Processing...")
177
Thread.sleep(scala.util.Random.nextInt(500))
178
println(s"Stage $stageId: Done, waiting for others")
179
180
barrier.await() // Wait for all stages
181
182
println(s"Stage $stageId: All stages complete, continuing")
183
sender() ! s"Stage $stageId completed"
184
}
185
}
186
187
class PipelineTest extends TestKit(ActorSystem("TestSystem")) {
188
"Pipeline stages" should {
189
"synchronize at barriers" in {
190
val barrier = TestBarrier(3)
191
192
val stages = (1 to 3).map { i =>
193
system.actorOf(Props(new PipelineStage(i, barrier)))
194
}
195
196
// Start all stages
197
stages.foreach(_ ! "process")
198
199
// All should complete around the same time
200
receiveN(3, 3.seconds)
201
}
202
}
203
}
204
```
205
206
### Integration with TestKit
207
208
Synchronization utilities work seamlessly with TestKit:
209
210
```scala
211
class SynchronizationIntegrationTest extends TestKit(ActorSystem("TestSystem")) {
212
213
"TestLatch with TestKit" should {
214
"coordinate actor testing" in {
215
val completionLatch = TestLatch(2)
216
217
class WorkerActor extends Actor {
218
def receive = {
219
case "work" =>
220
// Simulate work
221
Future {
222
Thread.sleep(200)
223
completionLatch.countDown()
224
}
225
}
226
}
227
228
val worker1 = system.actorOf(Props[WorkerActor]())
229
val worker2 = system.actorOf(Props[WorkerActor]())
230
231
worker1 ! "work"
232
worker2 ! "work"
233
234
// Wait for both workers to complete
235
completionLatch.ready(1.second)
236
237
// Now continue with test assertions
238
assert(completionLatch.isOpen)
239
}
240
}
241
242
"TestBarrier with TestKit" should {
243
"synchronize multiple test probes" in {
244
val barrier = TestBarrier(2)
245
val probe1 = TestProbe()
246
val probe2 = TestProbe()
247
248
// Simulate concurrent operations
249
Future {
250
probe1.send(testActor, "probe1 ready")
251
barrier.await()
252
probe1.send(testActor, "probe1 done")
253
}
254
255
Future {
256
probe2.send(testActor, "probe2 ready")
257
barrier.await()
258
probe2.send(testActor, "probe2 done")
259
}
260
261
// Both should send ready first
262
expectMsgAllOf("probe1 ready", "probe2 ready")
263
264
// Then both should send done (after barrier)
265
expectMsgAllOf("probe1 done", "probe2 done")
266
}
267
}
268
}
269
```
270
271
### Thread Safety and Best Practices
272
273
**Thread Safety:**
274
- Both TestBarrier and TestLatch are thread-safe
275
- Can be safely shared between actors and test code
276
- Use implicit ActorSystem for timeout handling
277
278
**Best Practices:**
279
280
```scala
281
// Good: Specify reasonable timeouts
282
latch.ready(5.seconds)
283
barrier.await(3.seconds)
284
285
// Good: Check state before waiting
286
if (!latch.isOpen) {
287
latch.ready(1.second)
288
}
289
290
// Good: Reset for reuse in multiple tests
291
override def beforeEach(): Unit = {
292
barrier.reset()
293
latch.reset()
294
}
295
296
// Good: Use in try-finally for cleanup
297
try {
298
latch.ready(1.second)
299
// test code
300
} finally {
301
latch.reset()
302
}
303
```
304
305
**Common Patterns:**
306
307
1. **Wait for Multiple Operations**: Use TestLatch with count > 1
308
2. **Synchronize Phases**: Use TestBarrier for multi-step coordination
309
3. **Signal Completion**: Use TestLatch(1) as a simple completion signal
310
4. **Batch Processing**: Use TestBarrier to synchronize batch boundaries
311
5. **Resource Coordination**: Use latches to ensure resources are ready
312
313
**Error Handling:**
314
315
```scala
316
// Handle timeout in latch waiting
317
try {
318
latch.ready(1.second)
319
} catch {
320
case _: java.util.concurrent.TimeoutException =>
321
fail("Operations did not complete in time")
322
}
323
324
// Check barrier state for debugging
325
if (barrier.isBroken) {
326
fail("Barrier was broken by an exception")
327
}
328
329
// Verify expected state
330
assert(barrier.getNumberWaiting == 0, "Some threads still waiting at barrier")
331
assert(latch.getCount == 0, s"Latch still has ${latch.getCount} remaining")
332
```