or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

actor-refs.mdconfiguration.mdcore-testing.mddispatchers.mdevent-filtering.mdindex.mdjava-dsl.mdpackage-functions.mdsynchronization.mdtest-actors.mdutilities.md

synchronization.mddocs/

0

# Synchronization Utilities

1

2

Thread-safe synchronization primitives for coordinating multi-threaded tests and ensuring deterministic test execution across concurrent operations.

3

4

## Capabilities

5

6

### TestBarrier Class

7

8

Cyclic barrier wrapper for multi-thread test synchronization.

9

10

```scala { .api }

11

class TestBarrier(count: Int) {

12

// Waiting methods

13

def await()(implicit system: ActorSystem): Unit

14

def await(timeout: FiniteDuration)(implicit system: ActorSystem): Unit

15

16

// State management

17

def reset(): Unit

18

def getNumberWaiting: Int

19

def getParties: Int

20

def isBroken: Boolean

21

}

22

23

object TestBarrier {

24

def apply(count: Int): TestBarrier

25

}

26

```

27

28

**Usage Example:**

29

30

```scala

31

import akka.testkit.TestBarrier

32

import scala.concurrent.duration._

33

import scala.concurrent.Future

34

import scala.concurrent.ExecutionContext.Implicits.global

35

36

// Create barrier for 3 threads

37

val barrier = TestBarrier(3)

38

39

// Start 3 concurrent operations

40

val futures = (1 to 3).map { i =>

41

Future {

42

println(s"Thread $i: Starting work")

43

Thread.sleep(scala.util.Random.nextInt(1000)) // Simulate work

44

45

println(s"Thread $i: Waiting at barrier")

46

barrier.await() // All threads wait here

47

48

println(s"Thread $i: Continuing after barrier")

49

s"Thread $i completed"

50

}

51

}

52

53

// All threads will continue together after barrier

54

val results = Await.result(Future.sequence(futures), 5.seconds)

55

56

// Reset barrier for reuse

57

barrier.reset()

58

```

59

60

### TestLatch Class

61

62

CountDownLatch wrapper for test synchronization.

63

64

```scala { .api }

65

class TestLatch(count: Int = 1)(implicit system: ActorSystem) {

66

// Countdown methods

67

def countDown(): Unit

68

def countDown(delta: Int): Unit

69

70

// State checking

71

def isOpen: Boolean

72

def getCount: Long

73

74

// Waiting methods

75

def ready(atMost: Duration)(implicit system: ActorSystem): Unit

76

def ready()(implicit system: ActorSystem): Unit

77

78

// Control methods

79

def open(): Unit // Opens latch completely

80

def reset(): Unit // Resets to original count

81

}

82

83

object TestLatch {

84

def apply(count: Int = 1)(implicit system: ActorSystem): TestLatch

85

}

86

```

87

88

**Usage Example:**

89

90

```scala

91

import akka.testkit.TestLatch

92

import scala.concurrent.Future

93

import scala.concurrent.ExecutionContext.Implicits.global

94

95

// Create latch that waits for 2 operations

96

val latch = TestLatch(2)

97

98

// Start first operation

99

Future {

100

Thread.sleep(500)

101

println("Operation 1 completed")

102

latch.countDown()

103

}

104

105

// Start second operation

106

Future {

107

Thread.sleep(800)

108

println("Operation 2 completed")

109

latch.countDown()

110

}

111

112

// Wait for both operations to complete

113

latch.ready(2.seconds)

114

println("Both operations completed!")

115

116

// Check if latch is open

117

assert(latch.isOpen)

118

assert(latch.getCount == 0)

119

```

120

121

### Advanced Synchronization Patterns

122

123

**Producer-Consumer with TestLatch:**

124

125

```scala

126

import akka.testkit.{TestKit, TestLatch}

127

import akka.actor.{Actor, Props}

128

129

class Producer(latch: TestLatch) extends Actor {

130

def receive = {

131

case "produce" =>

132

// Do some work

133

Thread.sleep(100)

134

println("Item produced")

135

latch.countDown()

136

}

137

}

138

139

class Consumer(latch: TestLatch) extends Actor {

140

def receive = {

141

case "consume" =>

142

latch.ready() // Wait for producer

143

println("Item consumed")

144

}

145

}

146

147

class ProducerConsumerTest extends TestKit(ActorSystem("TestSystem")) {

148

"Producer-Consumer pattern" should {

149

"synchronize correctly" in {

150

val latch = TestLatch(1)

151

152

val producer = system.actorOf(Props(new Producer(latch)))

153

val consumer = system.actorOf(Props(new Consumer(latch)))

154

155

// Start consumer first (it will wait)

156

consumer ! "consume"

157

158

// Then producer (which signals completion)

159

producer ! "produce"

160

161

// Test passes if consumer doesn't block forever

162

}

163

}

164

}

165

```

166

167

**Multi-Stage Pipeline with TestBarrier:**

168

169

```scala

170

import akka.testkit.{TestKit, TestBarrier}

171

import akka.actor.{Actor, Props}

172

173

class PipelineStage(stageId: Int, barrier: TestBarrier) extends Actor {

174

def receive = {

175

case "process" =>

176

println(s"Stage $stageId: Processing...")

177

Thread.sleep(scala.util.Random.nextInt(500))

178

println(s"Stage $stageId: Done, waiting for others")

179

180

barrier.await() // Wait for all stages

181

182

println(s"Stage $stageId: All stages complete, continuing")

183

sender() ! s"Stage $stageId completed"

184

}

185

}

186

187

class PipelineTest extends TestKit(ActorSystem("TestSystem")) {

188

"Pipeline stages" should {

189

"synchronize at barriers" in {

190

val barrier = TestBarrier(3)

191

192

val stages = (1 to 3).map { i =>

193

system.actorOf(Props(new PipelineStage(i, barrier)))

194

}

195

196

// Start all stages

197

stages.foreach(_ ! "process")

198

199

// All should complete around the same time

200

receiveN(3, 3.seconds)

201

}

202

}

203

}

204

```

205

206

### Integration with TestKit

207

208

Synchronization utilities work seamlessly with TestKit:

209

210

```scala

211

class SynchronizationIntegrationTest extends TestKit(ActorSystem("TestSystem")) {

212

213

"TestLatch with TestKit" should {

214

"coordinate actor testing" in {

215

val completionLatch = TestLatch(2)

216

217

class WorkerActor extends Actor {

218

def receive = {

219

case "work" =>

220

// Simulate work

221

Future {

222

Thread.sleep(200)

223

completionLatch.countDown()

224

}

225

}

226

}

227

228

val worker1 = system.actorOf(Props[WorkerActor]())

229

val worker2 = system.actorOf(Props[WorkerActor]())

230

231

worker1 ! "work"

232

worker2 ! "work"

233

234

// Wait for both workers to complete

235

completionLatch.ready(1.second)

236

237

// Now continue with test assertions

238

assert(completionLatch.isOpen)

239

}

240

}

241

242

"TestBarrier with TestKit" should {

243

"synchronize multiple test probes" in {

244

val barrier = TestBarrier(2)

245

val probe1 = TestProbe()

246

val probe2 = TestProbe()

247

248

// Simulate concurrent operations

249

Future {

250

probe1.send(testActor, "probe1 ready")

251

barrier.await()

252

probe1.send(testActor, "probe1 done")

253

}

254

255

Future {

256

probe2.send(testActor, "probe2 ready")

257

barrier.await()

258

probe2.send(testActor, "probe2 done")

259

}

260

261

// Both should send ready first

262

expectMsgAllOf("probe1 ready", "probe2 ready")

263

264

// Then both should send done (after barrier)

265

expectMsgAllOf("probe1 done", "probe2 done")

266

}

267

}

268

}

269

```

270

271

### Thread Safety and Best Practices

272

273

**Thread Safety:**

274

- Both TestBarrier and TestLatch are thread-safe

275

- Can be safely shared between actors and test code

276

- Use implicit ActorSystem for timeout handling

277

278

**Best Practices:**

279

280

```scala

281

// Good: Specify reasonable timeouts

282

latch.ready(5.seconds)

283

barrier.await(3.seconds)

284

285

// Good: Check state before waiting

286

if (!latch.isOpen) {

287

latch.ready(1.second)

288

}

289

290

// Good: Reset for reuse in multiple tests

291

override def beforeEach(): Unit = {

292

barrier.reset()

293

latch.reset()

294

}

295

296

// Good: Use in try-finally for cleanup

297

try {

298

latch.ready(1.second)

299

// test code

300

} finally {

301

latch.reset()

302

}

303

```

304

305

**Common Patterns:**

306

307

1. **Wait for Multiple Operations**: Use TestLatch with count > 1

308

2. **Synchronize Phases**: Use TestBarrier for multi-step coordination

309

3. **Signal Completion**: Use TestLatch(1) as a simple completion signal

310

4. **Batch Processing**: Use TestBarrier to synchronize batch boundaries

311

5. **Resource Coordination**: Use latches to ensure resources are ready

312

313

**Error Handling:**

314

315

```scala

316

// Handle timeout in latch waiting

317

try {

318

latch.ready(1.second)

319

} catch {

320

case _: java.util.concurrent.TimeoutException =>

321

fail("Operations did not complete in time")

322

}

323

324

// Check barrier state for debugging

325

if (barrier.isBroken) {

326

fail("Barrier was broken by an exception")

327

}

328

329

// Verify expected state

330

assert(barrier.getNumberWaiting == 0, "Some threads still waiting at barrier")

331

assert(latch.getCount == 0, s"Latch still has ${latch.getCount} remaining")

332

```