or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdcoroutine-builders.mddispatchers.mdexception-handling.mdflow-api.mdindex.mdjobs-deferreds.mdstructured-concurrency.mdsynchronization.md

synchronization.mddocs/

0

# Synchronization Primitives

1

2

Thread-safe synchronization utilities for coordinating access to shared resources in concurrent coroutine applications. These primitives provide cooperative synchronization without blocking threads.

3

4

## Capabilities

5

6

### Mutex - Mutual Exclusion

7

8

A mutual exclusion synchronization primitive that ensures only one coroutine can access a critical section at a time.

9

10

```kotlin { .api }

11

/**

12

* Mutual exclusion for coroutines.

13

* Mutex has two states: locked and unlocked.

14

*/

15

interface Mutex {

16

/**

17

* Returns true when this mutex is locked by some owner.

18

*/

19

val isLocked: Boolean

20

21

/**

22

* Tries to lock this mutex, returning false if this mutex is already locked.

23

*/

24

fun tryLock(owner: Any? = null): Boolean

25

26

/**

27

* Locks this mutex, suspending caller until the lock is acquired.

28

*/

29

suspend fun lock(owner: Any? = null)

30

31

/**

32

* Unlocks this mutex. Throws IllegalStateException if not locked or

33

* if the owner is different from the owner used to lock.

34

*/

35

fun unlock(owner: Any? = null)

36

37

/**

38

* Executes the given action under this mutex's lock.

39

*/

40

suspend fun <T> withLock(owner: Any? = null, action: suspend () -> T): T

41

}

42

43

/**

44

* Creates a new mutex instance that is not locked initially.

45

*/

46

fun Mutex(locked: Boolean = false): Mutex

47

```

48

49

**Usage Examples:**

50

51

```kotlin

52

import kotlinx.coroutines.*

53

import kotlinx.coroutines.sync.*

54

55

val scope = MainScope()

56

57

// Basic mutex usage

58

val mutex = Mutex()

59

var counter = 0

60

61

repeat(100) {

62

scope.launch {

63

mutex.withLock {

64

counter++ // Thread-safe increment

65

}

66

}

67

}

68

69

scope.launch {

70

delay(1000)

71

println("Final counter value: $counter") // Will be 100

72

}

73

74

// Manual lock/unlock

75

val manualMutex = Mutex()

76

var sharedResource = 0

77

78

scope.launch {

79

manualMutex.lock()

80

try {

81

sharedResource = expensiveComputation()

82

delay(100) // Simulate work while holding lock

83

} finally {

84

manualMutex.unlock()

85

}

86

}

87

88

// Try lock (non-blocking)

89

scope.launch {

90

if (manualMutex.tryLock()) {

91

try {

92

println("Got the lock!")

93

sharedResource += 10

94

} finally {

95

manualMutex.unlock()

96

}

97

} else {

98

println("Could not acquire lock")

99

}

100

}

101

102

// Mutex with owner (for debugging)

103

class DataProcessor {

104

private val mutex = Mutex()

105

private var data = mutableListOf<String>()

106

107

suspend fun addData(item: String) {

108

mutex.withLock(owner = this) {

109

data.add(item)

110

println("Added: $item, size: ${data.size}")

111

}

112

}

113

114

suspend fun processData(): List<String> {

115

return mutex.withLock(owner = this) {

116

val result = data.toList()

117

data.clear()

118

result

119

}

120

}

121

}

122

```

123

124

### Semaphore - Counting Synchronization

125

126

A counting synchronization primitive that maintains a set of permits, allowing multiple coroutines to access a resource up to a specified limit.

127

128

```kotlin { .api }

129

/**

130

* A counting semaphore for coroutines.

131

* A semaphore has a number of permits. Each acquire takes a permit; each release adds a permit.

132

*/

133

interface Semaphore {

134

/**

135

* The number of permits currently available in this semaphore.

136

*/

137

val availablePermits: Int

138

139

/**

140

* Acquires a permit from this semaphore, suspending until one is available.

141

*/

142

suspend fun acquire()

143

144

/**

145

* Tries to acquire a permit from this semaphore without suspending.

146

* Returns true if a permit was acquired, false otherwise.

147

*/

148

fun tryAcquire(): Boolean

149

150

/**

151

* Releases a permit, returning it to the semaphore.

152

*/

153

fun release()

154

155

/**

156

* Executes the given action, acquiring a permit before and releasing it after.

157

*/

158

suspend fun <T> withPermit(action: suspend () -> T): T

159

}

160

161

/**

162

* Creates a new semaphore instance.

163

* @param permits the number of permits available in this semaphore

164

* @param acquiredPermits the number of permits already acquired

165

*/

166

fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore

167

```

168

169

**Usage Examples:**

170

171

```kotlin

172

import kotlinx.coroutines.*

173

import kotlinx.coroutines.sync.*

174

175

val scope = MainScope()

176

177

// Limit concurrent network requests

178

val networkSemaphore = Semaphore(3) // Allow max 3 concurrent requests

179

180

suspend fun makeNetworkRequest(url: String): String {

181

return networkSemaphore.withPermit {

182

println("Making request to $url (available permits: ${networkSemaphore.availablePermits})")

183

delay(1000) // Simulate network call

184

"Response from $url"

185

}

186

}

187

188

scope.launch {

189

// Launch 10 requests, but only 3 will run concurrently

190

val requests = (1..10).map { i ->

191

async {

192

makeNetworkRequest("https://api.example.com/data$i")

193

}

194

}

195

196

val responses = requests.awaitAll()

197

println("All requests completed: ${responses.size}")

198

}

199

200

// Database connection pool simulation

201

class DatabasePool(maxConnections: Int) {

202

private val semaphore = Semaphore(maxConnections)

203

204

suspend fun <T> withConnection(block: suspend (Connection) -> T): T {

205

return semaphore.withPermit {

206

val connection = getConnection()

207

try {

208

block(connection)

209

} finally {

210

releaseConnection(connection)

211

}

212

}

213

}

214

215

private suspend fun getConnection(): Connection {

216

println("Acquiring database connection")

217

delay(50) // Simulate connection setup

218

return Connection()

219

}

220

221

private fun releaseConnection(connection: Connection) {

222

println("Releasing database connection")

223

}

224

}

225

226

class Connection {

227

suspend fun query(sql: String): List<String> {

228

delay(200) // Simulate query execution

229

return listOf("result1", "result2")

230

}

231

}

232

233

// Usage

234

val dbPool = DatabasePool(maxConnections = 2)

235

236

scope.launch {

237

repeat(5) { i ->

238

launch {

239

val results = dbPool.withConnection { conn ->

240

conn.query("SELECT * FROM table$i")

241

}

242

println("Query $i results: $results")

243

}

244

}

245

}

246

247

// Manual acquire/release

248

val resourceSemaphore = Semaphore(2)

249

250

scope.launch {

251

if (resourceSemaphore.tryAcquire()) {

252

try {

253

println("Got permit immediately, available: ${resourceSemaphore.availablePermits}")

254

delay(1000)

255

} finally {

256

resourceSemaphore.release()

257

}

258

} else {

259

println("No permits available")

260

resourceSemaphore.acquire() // Wait for permit

261

try {

262

println("Got permit after waiting, available: ${resourceSemaphore.availablePermits}")

263

delay(1000)

264

} finally {

265

resourceSemaphore.release()

266

}

267

}

268

}

269

```

270

271

### Advanced Synchronization Patterns

272

273

Complex synchronization scenarios using combinations of primitives.

274

275

**Producer-Consumer with Bounded Buffer:**

276

277

```kotlin

278

import kotlinx.coroutines.*

279

import kotlinx.coroutines.sync.*

280

281

class BoundedBuffer<T>(capacity: Int) {

282

private val buffer = mutableListOf<T>()

283

private val mutex = Mutex()

284

private val notEmpty = Semaphore(0) // Signals when buffer has items

285

private val notFull = Semaphore(capacity) // Signals when buffer has space

286

287

suspend fun put(item: T) {

288

notFull.acquire() // Wait for space

289

mutex.withLock {

290

buffer.add(item)

291

println("Produced: $item, buffer size: ${buffer.size}")

292

}

293

notEmpty.release() // Signal that item is available

294

}

295

296

suspend fun take(): T {

297

notEmpty.acquire() // Wait for item

298

val item = mutex.withLock {

299

val result = buffer.removeAt(0)

300

println("Consumed: $result, buffer size: ${buffer.size}")

301

result

302

}

303

notFull.release() // Signal that space is available

304

return item

305

}

306

}

307

308

val scope = MainScope()

309

val buffer = BoundedBuffer<String>(capacity = 3)

310

311

// Producers

312

repeat(2) { producerId ->

313

scope.launch {

314

repeat(5) { i ->

315

buffer.put("Item-$producerId-$i")

316

delay(100)

317

}

318

}

319

}

320

321

// Consumers

322

repeat(2) { consumerId ->

323

scope.launch {

324

repeat(5) { i ->

325

val item = buffer.take()

326

println("Consumer $consumerId got: $item")

327

delay(200)

328

}

329

}

330

}

331

```

332

333

**Reader-Writer Lock Pattern:**

334

335

```kotlin

336

class ReadWriteMutex {

337

private val readerCountMutex = Mutex()

338

private val writerMutex = Mutex()

339

private var readerCount = 0

340

341

suspend fun <T> withReadLock(action: suspend () -> T): T {

342

// Acquire reader access

343

readerCountMutex.withLock {

344

readerCount++

345

if (readerCount == 1) {

346

writerMutex.lock() // First reader blocks writers

347

}

348

}

349

350

try {

351

return action()

352

} finally {

353

// Release reader access

354

readerCountMutex.withLock {

355

readerCount--

356

if (readerCount == 0) {

357

writerMutex.unlock() // Last reader unblocks writers

358

}

359

}

360

}

361

}

362

363

suspend fun <T> withWriteLock(action: suspend () -> T): T {

364

return writerMutex.withLock {

365

action()

366

}

367

}

368

}

369

370

class SharedData {

371

private val rwMutex = ReadWriteMutex()

372

private var data = "Initial data"

373

374

suspend fun read(): String {

375

return rwMutex.withReadLock {

376

println("Reading: $data on ${Thread.currentThread().name}")

377

delay(100) // Simulate read time

378

data

379

}

380

}

381

382

suspend fun write(newData: String) {

383

rwMutex.withWriteLock {

384

println("Writing: $newData on ${Thread.currentThread().name}")

385

delay(200) // Simulate write time

386

data = newData

387

}

388

}

389

}

390

391

val sharedData = SharedData()

392

393

scope.launch {

394

// Multiple readers can run concurrently

395

repeat(5) { i ->

396

launch {

397

val value = sharedData.read()

398

println("Reader $i got: $value")

399

}

400

}

401

402

// Writer blocks all readers

403

launch {

404

delay(300)

405

sharedData.write("Updated data")

406

}

407

}

408

```

409

410

**Rate Limiting Pattern:**

411

412

```kotlin

413

class RateLimiter(

414

permits: Int,

415

private val periodMs: Long

416

) {

417

private val semaphore = Semaphore(permits)

418

419

suspend fun <T> execute(action: suspend () -> T): T {

420

return semaphore.withPermit {

421

// Schedule permit release after period

422

scope.launch {

423

delay(periodMs)

424

// Permit is automatically released when withPermit block completes

425

}

426

action()

427

}

428

}

429

}

430

431

val rateLimiter = RateLimiter(permits = 3, periodMs = 1000) // 3 requests per second

432

433

suspend fun apiCall(requestId: Int): String {

434

return rateLimiter.execute {

435

println("Making API call $requestId at ${System.currentTimeMillis()}")

436

delay(100) // Simulate API call

437

"Response $requestId"

438

}

439

}

440

441

scope.launch {

442

// Make 10 API calls - they'll be rate limited

443

repeat(10) { i ->

444

launch {

445

val response = apiCall(i)

446

println("Got: $response")

447

}

448

}

449

}

450

```

451

452

### Deadlock Prevention

453

454

Best practices for avoiding deadlocks when using multiple synchronization primitives.

455

456

```kotlin

457

// BAD: Potential deadlock

458

val mutex1 = Mutex()

459

val mutex2 = Mutex()

460

461

// Coroutine A

462

scope.launch {

463

mutex1.withLock {

464

delay(100)

465

mutex2.withLock {

466

println("A got both locks")

467

}

468

}

469

}

470

471

// Coroutine B

472

scope.launch {

473

mutex2.withLock {

474

delay(100)

475

mutex1.withLock { // Potential deadlock here

476

println("B got both locks")

477

}

478

}

479

}

480

481

// GOOD: Consistent lock ordering

482

scope.launch {

483

// Both coroutines acquire locks in same order

484

mutex1.withLock {

485

mutex2.withLock {

486

println("A got both locks safely")

487

}

488

}

489

}

490

491

scope.launch {

492

mutex1.withLock {

493

mutex2.withLock {

494

println("B got both locks safely")

495

}

496

}

497

}

498

499

// BETTER: Use timeout with tryLock

500

suspend fun safeDoublelock(action: suspend () -> Unit): Boolean {

501

if (mutex1.tryLock()) {

502

try {

503

// Try to get second lock with timeout

504

withTimeoutOrNull(1000) {

505

mutex2.withLock {

506

action()

507

}

508

} ?: return false

509

return true

510

} finally {

511

mutex1.unlock()

512

}

513

}

514

return false

515

}

516

```

517

518

### Performance Considerations

519

520

Tips for optimal performance with synchronization primitives.

521

522

```kotlin

523

// Minimize critical section size

524

val mutex = Mutex()

525

var counter = 0

526

527

// BAD: Long critical section

528

scope.launch {

529

mutex.withLock {

530

val data = expensiveOperation() // Don't do expensive work in lock

531

counter += data.size

532

processData(data) // This should be outside the lock

533

}

534

}

535

536

// GOOD: Minimal critical section

537

scope.launch {

538

val data = expensiveOperation() // Do expensive work outside lock

539

val size = data.size

540

541

mutex.withLock {

542

counter += size // Only critical operation inside lock

543

}

544

545

processData(data) // Non-critical work outside lock

546

}

547

548

// Use appropriate synchronization primitive

549

// For simple counters, consider atomic operations or channels instead of mutex

550

val atomicCounter = AtomicInteger(0) // Better for simple counters

551

552

// For producer-consumer, consider channels instead of manual synchronization

553

val channel = Channel<String>(capacity = 10) // Often simpler than semaphore+mutex

554

```

555

556

### Testing Synchronization

557

558

Strategies for testing concurrent code with synchronization primitives.

559

560

```kotlin

561

import kotlinx.coroutines.test.*

562

563

@Test

564

fun testMutexSafety() = runTest {

565

val mutex = Mutex()

566

var counter = 0

567

val jobs = mutableListOf<Job>()

568

569

repeat(100) {

570

val job = launch {

571

mutex.withLock {

572

counter++

573

}

574

}

575

jobs.add(job)

576

}

577

578

jobs.joinAll()

579

assertEquals(100, counter)

580

}

581

582

@Test

583

fun testSemaphoreLimit() = runTest {

584

val semaphore = Semaphore(3)

585

var concurrentCount = 0

586

var maxConcurrent = 0

587

val maxConcurrentMutex = Mutex()

588

589

val jobs = (1..10).map {

590

launch {

591

semaphore.withPermit {

592

val current = maxConcurrentMutex.withLock {

593

concurrentCount++

594

maxConcurrent = maxOf(maxConcurrent, concurrentCount)

595

concurrentCount

596

}

597

598

delay(100) // Simulate work

599

600

maxConcurrentMutex.withLock {

601

concurrentCount--

602

}

603

}

604

}

605

}

606

607

jobs.joinAll()

608

assertEquals(3, maxConcurrent)

609

}

610

```