or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-testing.mddeterministic-execution.mdevent-filtering.mdfsm-testing.mdindex.mdsynchronization.mdsynchronous-testing.mdtest-utilities.md

synchronization.mddocs/

0

# Synchronization Utilities

1

2

Akka TestKit provides synchronization utilities including TestLatch for countdown coordination and TestBarrier for cyclic synchronization, enabling precise coordination of concurrent test scenarios and actor interactions.

3

4

## Capabilities

5

6

### TestLatch

7

8

Count-down latch for test synchronization with timeout support, useful for coordinating multiple threads or waiting for specific conditions.

9

10

```scala { .api }

11

/**

12

* Count-down latch for test synchronization with timeouts

13

* @param count Initial count value (defaults to 1)

14

* @param system Implicit ActorSystem for timeout configuration

15

*/

16

class TestLatch(count: Int = 1)(implicit system: ActorSystem) {

17

18

/**

19

* Decrement the latch counter

20

* When counter reaches zero, latch opens and waiting threads are released

21

*/

22

def countDown(): Unit

23

24

/**

25

* Check if latch is open (counter has reached zero)

26

* @return True if latch is open, false otherwise

27

*/

28

def isOpen: Boolean

29

30

/**

31

* Force latch open regardless of current counter value

32

* Releases all waiting threads immediately

33

*/

34

def open(): Unit

35

36

/**

37

* Reset latch to original count value

38

* Allows reuse of the same latch instance

39

*/

40

def reset(): Unit

41

42

/**

43

* Wait for latch to open with timeout

44

* @param atMost Maximum time to wait for latch to open

45

* @param permit Implicit CanAwait permission for blocking operation

46

* @return This TestLatch instance for method chaining

47

* @throws TimeoutException if latch doesn't open within timeout

48

*/

49

def ready(atMost: Duration)(implicit permit: CanAwait): TestLatch

50

}

51

```

52

53

### TestLatch Factory

54

55

Factory methods and constants for creating TestLatch instances.

56

57

```scala { .api }

58

object TestLatch {

59

/**

60

* Create TestLatch with specified count

61

* @param count Initial count value (defaults to 1)

62

* @param system Implicit ActorSystem

63

* @return New TestLatch instance

64

*/

65

def apply(count: Int = 1)(implicit system: ActorSystem): TestLatch

66

67

/**

68

* Default timeout for TestLatch operations

69

*/

70

val DefaultTimeout: Duration = 5.seconds

71

}

72

```

73

74

### TestBarrier

75

76

Cyclic barrier for coordinating multiple test threads, where all threads must reach the barrier before any can proceed.

77

78

```scala { .api }

79

/**

80

* Cyclic barrier for coordinating multiple test threads

81

* All specified number of threads must reach barrier before any can proceed

82

* @param count Number of threads that must reach barrier

83

*/

84

class TestBarrier(count: Int) {

85

86

/**

87

* Wait at barrier using default timeout

88

* Blocks until all threads reach the barrier

89

* @param system Implicit ActorSystem for timeout configuration

90

* @throws TestBarrierTimeoutException if timeout occurs

91

*/

92

def await()(implicit system: ActorSystem): Unit

93

94

/**

95

* Wait at barrier with specified timeout

96

* Blocks until all threads reach the barrier or timeout occurs

97

* @param timeout Maximum time to wait at barrier

98

* @param system Implicit ActorSystem

99

* @throws TestBarrierTimeoutException if timeout occurs

100

*/

101

def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit

102

103

/**

104

* Reset barrier for reuse

105

* Allows the same barrier to coordinate multiple rounds of synchronization

106

*/

107

def reset(): Unit

108

}

109

```

110

111

### TestBarrier Exception

112

113

Exception thrown when barrier operations timeout.

114

115

```scala { .api }

116

/**

117

* Exception thrown when TestBarrier operations timeout

118

* @param message Descriptive error message

119

*/

120

case class TestBarrierTimeoutException(message: String) extends RuntimeException(message)

121

```

122

123

**Usage Examples:**

124

125

```scala

126

import akka.actor.{Actor, ActorSystem, Props}

127

import akka.testkit.{TestKit, TestLatch, TestBarrier}

128

import scala.concurrent.{Future, ExecutionContext}

129

import scala.concurrent.duration._

130

131

class SynchronizationExample extends TestKit(ActorSystem("test")) {

132

implicit val ec: ExecutionContext = system.dispatcher

133

134

"TestLatch" should {

135

"coordinate single countdown" in {

136

val latch = TestLatch()

137

138

// Start background task

139

Future {

140

Thread.sleep(100)

141

latch.countDown()

142

}

143

144

// Wait for background task to complete

145

latch.ready(1.second)

146

assert(latch.isOpen)

147

}

148

149

"coordinate multiple countdowns" in {

150

val latch = TestLatch(3)

151

152

// Start multiple background tasks

153

(1 to 3).foreach { i =>

154

Future {

155

Thread.sleep(i * 50)

156

latch.countDown()

157

}

158

}

159

160

// Wait for all tasks to complete

161

latch.ready(1.second)

162

assert(latch.isOpen)

163

}

164

165

"support reset and reuse" in {

166

val latch = TestLatch(2)

167

168

// First use

169

Future { latch.countDown() }

170

Future { latch.countDown() }

171

latch.ready(1.second)

172

assert(latch.isOpen)

173

174

// Reset and reuse

175

latch.reset()

176

assert(!latch.isOpen)

177

178

Future { latch.countDown() }

179

Future { latch.countDown() }

180

latch.ready(1.second)

181

assert(latch.isOpen)

182

}

183

184

"support manual opening" in {

185

val latch = TestLatch(10)

186

187

// Force open without waiting for all countdowns

188

latch.open()

189

assert(latch.isOpen)

190

191

// ready() returns immediately

192

latch.ready(1.second)

193

}

194

}

195

196

"TestBarrier" should {

197

"coordinate multiple threads" in {

198

val barrier = TestBarrier(3)

199

val results = collection.mutable.ListBuffer[Int]()

200

201

// Start multiple threads that must synchronize

202

val futures = (1 to 3).map { i =>

203

Future {

204

// Do some work

205

Thread.sleep(i * 30)

206

207

// Wait at barrier

208

barrier.await(1.second)

209

210

// Continue after all threads reach barrier

211

results += i

212

}

213

}

214

215

// Wait for all threads to complete

216

Future.sequence(futures).ready(2.seconds)

217

218

// All threads should have added their results

219

assert(results.size == 3)

220

assert(results.contains(1))

221

assert(results.contains(2))

222

assert(results.contains(3))

223

}

224

225

"support barrier reset and reuse" in {

226

val barrier = TestBarrier(2)

227

228

// First round of synchronization

229

val round1 = (1 to 2).map { i =>

230

Future {

231

barrier.await(1.second)

232

i

233

}

234

}

235

236

Future.sequence(round1).ready(2.seconds)

237

238

// Reset barrier for second round

239

barrier.reset()

240

241

// Second round of synchronization

242

val round2 = (1 to 2).map { i =>

243

Future {

244

barrier.await(1.second)

245

i + 10

246

}

247

}

248

249

Future.sequence(round2).ready(2.seconds)

250

}

251

252

"timeout when not all threads arrive" in {

253

val barrier = TestBarrier(3)

254

255

// Only start 2 of 3 required threads

256

Future { barrier.await(500.millis) }

257

Future { barrier.await(500.millis) }

258

259

// Third thread never arrives, should timeout

260

intercept[TestBarrierTimeoutException] {

261

// This will timeout since only 2/3 threads arrived

262

Thread.sleep(1000)

263

}

264

}

265

}

266

}

267

```

268

269

### Actor Synchronization Patterns

270

271

Using synchronization utilities with actors for coordinated testing scenarios.

272

273

```scala

274

class ActorSynchronizationExample extends TestKit(ActorSystem("test")) {

275

276

class WorkerActor(latch: TestLatch) extends Actor {

277

def receive = {

278

case "work" =>

279

// Simulate work

280

Thread.sleep(100)

281

sender() ! "done"

282

latch.countDown()

283

}

284

}

285

286

"Actor coordination with TestLatch" should {

287

"wait for multiple actors to complete work" in {

288

val latch = TestLatch(3)

289

val workers = (1 to 3).map { i =>

290

system.actorOf(Props(new WorkerActor(latch)), s"worker-$i")

291

}

292

293

// Send work to all workers

294

workers.foreach(_ ! "work")

295

296

// Wait for all workers to complete

297

latch.ready(2.seconds)

298

299

// All workers have completed their work

300

assert(latch.isOpen)

301

}

302

}

303

304

class BarrierActor(barrier: TestBarrier, id: Int) extends Actor {

305

def receive = {

306

case "sync" =>

307

try {

308

barrier.await(1.second)

309

sender() ! s"synchronized-$id"

310

} catch {

311

case _: TestBarrierTimeoutException =>

312

sender() ! s"timeout-$id"

313

}

314

}

315

}

316

317

"Actor coordination with TestBarrier" should {

318

"synchronize multiple actors" in {

319

val barrier = TestBarrier(2)

320

val actor1 = system.actorOf(Props(new BarrierActor(barrier, 1)))

321

val actor2 = system.actorOf(Props(new BarrierActor(barrier, 2)))

322

323

// Send sync message to both actors

324

actor1 ! "sync"

325

actor2 ! "sync"

326

327

// Both should synchronize and respond

328

expectMsgAnyOf(2.seconds, "synchronized-1", "synchronized-2")

329

expectMsgAnyOf(2.seconds, "synchronized-1", "synchronized-2")

330

}

331

}

332

}

333

```

334

335

### Integration with TestKit Timing

336

337

Combining synchronization utilities with TestKit timing controls for precise test coordination.

338

339

```scala

340

class TimingSynchronizationExample extends TestKit(ActorSystem("test")) {

341

342

"Combined timing and synchronization" should {

343

"coordinate complex scenarios" in {

344

val latch = TestLatch(2)

345

val barrier = TestBarrier(2)

346

347

within(3.seconds) {

348

// Start coordinated operations

349

Future {

350

barrier.await(1.second) // Synchronize start

351

Thread.sleep(500) // Simulate work

352

latch.countDown() // Signal completion

353

}

354

355

Future {

356

barrier.await(1.second) // Synchronize start

357

Thread.sleep(500) // Simulate work

358

latch.countDown() // Signal completion

359

}

360

361

// Wait for both operations to complete

362

latch.ready(2.seconds)

363

}

364

365

assert(latch.isOpen)

366

}

367

}

368

}

369

```

370

371

### Thread Safety and Best Practices

372

373

Guidelines for safe usage of synchronization utilities in concurrent test scenarios.

374

375

```scala

376

// SAFE: Proper timeout handling

377

val latch = TestLatch(3)

378

try {

379

latch.ready(5.seconds)

380

// Continue with test

381

} catch {

382

case _: TimeoutException =>

383

// Handle timeout appropriately

384

fail("Latch did not open within timeout")

385

}

386

387

// SAFE: Proper barrier usage

388

val barrier = TestBarrier(2)

389

try {

390

barrier.await(2.seconds)

391

// All threads synchronized

392

} catch {

393

case _: TestBarrierTimeoutException =>

394

// Handle barrier timeout

395

fail("Not all threads reached barrier")

396

}

397

398

// BEST PRACTICE: Reset for reuse

399

val reusableLatch = TestLatch(2)

400

// Use latch

401

reusableLatch.ready(1.second)

402

// Reset for next test

403

reusableLatch.reset()

404

405

// BEST PRACTICE: Appropriate timeouts

406

// Use timeouts longer than expected operation time

407

val conservativeLatch = TestLatch()

408

conservativeLatch.ready(expectedTime * 2)

409

```