or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdcoroutine-builders.mddispatchers.mdexception-handling.mdflow-api.mdindex.mdjob-management.mdjvm-integration.mdsynchronization.md

synchronization.mddocs/

0

# Synchronization Primitives

1

2

Thread-safe synchronization mechanisms designed for coroutines including mutexes, semaphores, and atomic operations with suspending behavior and cancellation support.

3

4

## Capabilities

5

6

### Mutex

7

8

Non-reentrant mutual exclusion for protecting critical sections in coroutines.

9

10

```kotlin { .api }

11

interface Mutex {

12

/** True if mutex is currently locked */

13

val isLocked: Boolean

14

15

/** Suspends until lock is acquired */

16

suspend fun lock(owner: Any? = null)

17

18

/** Tries to acquire lock immediately without suspending */

19

fun tryLock(owner: Any? = null): Boolean

20

21

/** Releases the lock */

22

fun unlock(owner: Any? = null)

23

24

/** Checks if locked by specific owner */

25

fun holdsLock(owner: Any): Boolean

26

}

27

28

/** Creates a new Mutex */

29

fun Mutex(locked: Boolean = false): Mutex

30

31

/** Executes block under mutex protection */

32

suspend inline fun <T> Mutex.withLock(

33

owner: Any? = null,

34

action: suspend () -> T

35

): T

36

```

37

38

**Usage Examples:**

39

40

```kotlin

41

import kotlinx.coroutines.*

42

import kotlinx.coroutines.sync.*

43

44

class Counter {

45

private var count = 0

46

private val mutex = Mutex()

47

48

suspend fun increment() = mutex.withLock {

49

count++

50

}

51

52

suspend fun decrement() = mutex.withLock {

53

count--

54

}

55

56

suspend fun get(): Int = mutex.withLock {

57

count

58

}

59

}

60

61

fun main() = runBlocking {

62

val counter = Counter()

63

64

// Launch multiple coroutines that modify counter

65

val jobs = List(100) {

66

launch {

67

repeat(100) {

68

counter.increment()

69

}

70

}

71

}

72

73

jobs.forEach { it.join() }

74

75

println("Final count: ${counter.get()}") // Should be 10000

76

77

// Manual lock/unlock example

78

val mutex = Mutex()

79

80

launch {

81

println("Acquiring lock...")

82

mutex.lock()

83

try {

84

println("Critical section 1")

85

delay(1000)

86

} finally {

87

mutex.unlock()

88

println("Released lock")

89

}

90

}

91

92

launch {

93

delay(100)

94

println("Trying to acquire lock...")

95

mutex.lock()

96

try {

97

println("Critical section 2")

98

} finally {

99

mutex.unlock()

100

}

101

}

102

103

delay(2000)

104

}

105

```

106

107

### Semaphore

108

109

Counting semaphore for limiting the number of concurrent accesses to a resource.

110

111

```kotlin { .api }

112

interface Semaphore {

113

/** Number of permits currently available */

114

val availablePermits: Int

115

116

/** Suspends until permit is acquired */

117

suspend fun acquire()

118

119

/** Tries to acquire permit immediately without suspending */

120

fun tryAcquire(): Boolean

121

122

/** Releases a permit */

123

fun release()

124

}

125

126

/** Creates a new Semaphore */

127

fun Semaphore(

128

permits: Int,

129

acquiredPermits: Int = 0

130

): Semaphore

131

132

/** Executes block with acquired permit */

133

suspend inline fun <T> Semaphore.withPermit(action: suspend () -> T): T

134

```

135

136

**Usage Examples:**

137

138

```kotlin

139

import kotlinx.coroutines.*

140

import kotlinx.coroutines.sync.*

141

142

// Connection pool example

143

class ConnectionPool(maxConnections: Int) {

144

private val semaphore = Semaphore(maxConnections)

145

private var connectionId = 0

146

147

suspend fun <T> useConnection(block: suspend (connectionId: Int) -> T): T {

148

return semaphore.withPermit {

149

val id = ++connectionId

150

println("Acquired connection $id (${semaphore.availablePermits} remaining)")

151

try {

152

block(id)

153

} finally {

154

println("Released connection $id (${semaphore.availablePermits + 1} will be available)")

155

}

156

}

157

}

158

}

159

160

fun main() = runBlocking {

161

val connectionPool = ConnectionPool(maxConnections = 3)

162

163

// Launch more coroutines than available connections

164

val jobs = List(10) { taskId ->

165

launch {

166

connectionPool.useConnection { connectionId ->

167

println("Task $taskId using connection $connectionId")

168

delay((100..500).random().toLong())

169

"Result from task $taskId"

170

}

171

}

172

}

173

174

jobs.forEach { it.join() }

175

176

// Manual acquire/release example

177

val semaphore = Semaphore(2)

178

179

repeat(5) { i ->

180

launch {

181

println("Task $i waiting for permit...")

182

semaphore.acquire()

183

try {

184

println("Task $i acquired permit (${semaphore.availablePermits} remaining)")

185

delay(1000)

186

} finally {

187

semaphore.release()

188

println("Task $i released permit")

189

}

190

}

191

}

192

193

delay(6000)

194

}

195

```

196

197

### Atomic Operations

198

199

Thread-safe atomic operations and references.

200

201

```kotlin { .api }

202

/** Atomic reference with compare-and-swap operations */

203

class AtomicReference<V>(value: V) {

204

var value: V

205

fun getAndSet(newValue: V): V

206

fun compareAndSet(expected: V, newValue: V): Boolean

207

fun lazySet(newValue: V)

208

}

209

210

/** Atomic integer operations */

211

class AtomicInteger(value: Int = 0) {

212

var value: Int

213

fun getAndIncrement(): Int

214

fun incrementAndGet(): Int

215

fun getAndDecrement(): Int

216

fun decrementAndGet(): Int

217

fun getAndAdd(delta: Int): Int

218

fun addAndGet(delta: Int): Int

219

fun compareAndSet(expected: Int, newValue: Int): Boolean

220

}

221

222

/** Atomic long operations */

223

class AtomicLong(value: Long = 0L) {

224

var value: Long

225

fun getAndIncrement(): Long

226

fun incrementAndGet(): Long

227

fun getAndAdd(delta: Long): Long

228

fun addAndGet(delta: Long): Long

229

fun compareAndSet(expected: Long, newValue: Long): Boolean

230

}

231

232

/** Atomic boolean operations */

233

class AtomicBoolean(value: Boolean = false) {

234

var value: Boolean

235

fun getAndSet(newValue: Boolean): Boolean

236

fun compareAndSet(expected: Boolean, newValue: Boolean): Boolean

237

}

238

```

239

240

**Usage Examples:**

241

242

```kotlin

243

import kotlinx.coroutines.*

244

import kotlinx.atomicfu.*

245

246

class AtomicCounter {

247

private val count = atomic(0)

248

249

suspend fun increment(): Int = count.incrementAndGet()

250

suspend fun decrement(): Int = count.decrementAndGet()

251

suspend fun get(): Int = count.value

252

253

suspend fun addIfLessThan(delta: Int, limit: Int): Boolean {

254

while (true) {

255

val current = count.value

256

if (current >= limit) return false

257

if (count.compareAndSet(current, current + delta)) {

258

return true

259

}

260

}

261

}

262

}

263

264

class AtomicState<T>(initialValue: T) {

265

private val state = atomic(initialValue)

266

267

fun get(): T = state.value

268

269

fun update(transform: (T) -> T): T {

270

while (true) {

271

val current = state.value

272

val new = transform(current)

273

if (state.compareAndSet(current, new)) {

274

return new

275

}

276

}

277

}

278

279

fun compareAndSet(expected: T, newValue: T): Boolean {

280

return state.compareAndSet(expected, newValue)

281

}

282

}

283

284

fun main() = runBlocking {

285

val counter = AtomicCounter()

286

287

// Concurrent increment operations

288

val jobs = List(1000) {

289

launch {

290

counter.increment()

291

}

292

}

293

294

jobs.forEach { it.join() }

295

println("Final count: ${counter.get()}")

296

297

// Conditional atomic operations

298

val success = counter.addIfLessThan(5, 1010)

299

println("Add operation successful: $success")

300

301

// Atomic state example

302

data class UserState(val name: String, val count: Int)

303

val userState = AtomicState(UserState("John", 0))

304

305

repeat(10) {

306

launch {

307

userState.update { current ->

308

current.copy(count = current.count + 1)

309

}

310

}

311

}

312

313

delay(100)

314

println("User state: ${userState.get()}")

315

}

316

```

317

318

### Channel-based Synchronization

319

320

Using channels for synchronization patterns.

321

322

```kotlin { .api }

323

/** Token-based synchronization using channels */

324

class TokenBucket(capacity: Int) {

325

private val tokens = Channel<Unit>(capacity)

326

327

init {

328

// Fill with initial tokens

329

repeat(capacity) {

330

tokens.trySend(Unit)

331

}

332

}

333

334

suspend fun acquire() {

335

tokens.receive()

336

}

337

338

fun tryAcquire(): Boolean {

339

return tokens.tryReceive().isSuccess

340

}

341

342

fun release() {

343

tokens.trySend(Unit)

344

}

345

}

346

```

347

348

**Usage Examples:**

349

350

```kotlin

351

import kotlinx.coroutines.*

352

import kotlinx.coroutines.channels.*

353

354

// Rate limiting with token bucket

355

class RateLimiter(tokensPerSecond: Int) {

356

private val bucket = TokenBucket(tokensPerSecond)

357

358

init {

359

// Refill tokens periodically

360

GlobalScope.launch {

361

while (true) {

362

delay(1000L / tokensPerSecond)

363

bucket.release()

364

}

365

}

366

}

367

368

suspend fun <T> execute(action: suspend () -> T): T {

369

bucket.acquire()

370

return action()

371

}

372

}

373

374

// Barrier synchronization

375

class CyclicBarrier(private val parties: Int) {

376

private val count = atomic(0)

377

private val generation = atomic(0)

378

private val channels = atomic<Channel<Unit>?>(null)

379

380

suspend fun await() {

381

val currentGeneration = generation.value

382

val currentCount = count.getAndIncrement()

383

384

if (currentCount == parties - 1) {

385

// Last party - release all waiting parties

386

val channel = channels.getAndSet(null)

387

channel?.close()

388

count.value = 0

389

generation.incrementAndGet()

390

} else {

391

// Wait for other parties

392

val channel = channels.value ?: Channel<Unit>().also {

393

channels.compareAndSet(null, it)

394

}

395

396

// Wait until barrier is tripped or generation changes

397

while (generation.value == currentGeneration && !channel.isClosedForReceive) {

398

try {

399

channel.receive()

400

} catch (e: ClosedReceiveChannelException) {

401

break

402

}

403

}

404

}

405

}

406

}

407

408

fun main() = runBlocking {

409

// Rate limiter example

410

val rateLimiter = RateLimiter(tokensPerSecond = 2)

411

412

repeat(5) { i ->

413

launch {

414

rateLimiter.execute {

415

println("Request $i processed at ${System.currentTimeMillis()}")

416

delay(100)

417

}

418

}

419

}

420

421

delay(5000)

422

423

// Barrier example

424

val barrier = CyclicBarrier(3)

425

426

repeat(3) { i ->

427

launch {

428

println("Task $i started")

429

delay((100..300).random().toLong())

430

println("Task $i waiting at barrier")

431

barrier.await()

432

println("Task $i passed barrier")

433

}

434

}

435

436

delay(2000)

437

}

438

```

439

440

### Select-based Synchronization

441

442

Using select expressions for complex synchronization patterns.

443

444

```kotlin { .api }

445

/** Select among multiple suspending operations */

446

suspend fun <R> select(builder: SelectBuilder<R>.() -> Unit): R

447

448

interface SelectBuilder<in R> {

449

/** Select on channel receive */

450

fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R)

451

452

/** Select on channel send */

453

fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R)

454

455

/** Select on job completion */

456

fun Job.onJoin(block: suspend () -> R)

457

458

/** Select on deferred completion */

459

fun <T> Deferred<T>.onAwait(block: suspend (T) -> R)

460

461

/** Select with timeout */

462

fun onTimeout(timeMillis: Long, block: suspend () -> R)

463

}

464

```

465

466

**Usage Examples:**

467

468

```kotlin

469

import kotlinx.coroutines.*

470

import kotlinx.coroutines.channels.*

471

import kotlinx.coroutines.selects.*

472

473

suspend fun selectExample() = coroutineScope {

474

val channel1 = Channel<String>()

475

val channel2 = Channel<String>()

476

477

launch {

478

delay(100)

479

channel1.send("Message from channel 1")

480

}

481

482

launch {

483

delay(200)

484

channel2.send("Message from channel 2")

485

}

486

487

// Select first available message

488

val result = select<String> {

489

channel1.onReceive { "Received from channel1: $it" }

490

channel2.onReceive { "Received from channel2: $it" }

491

onTimeout(300) { "Timeout occurred" }

492

}

493

494

println(result)

495

496

channel1.close()

497

channel2.close()

498

}

499

500

// Fan-in pattern with select

501

suspend fun fanIn(vararg channels: ReceiveChannel<String>): ReceiveChannel<String> =

502

produce {

503

while (true) {

504

val message = select<String?> {

505

channels.forEach { channel ->

506

channel.onReceiveCatching { result ->

507

result.getOrNull()

508

}

509

}

510

}

511

512

if (message != null) {

513

send(message)

514

} else {

515

break // All channels closed

516

}

517

}

518

}

519

520

fun main() = runBlocking {

521

selectExample()

522

523

// Fan-in example

524

val producer1 = produce {

525

repeat(3) { i ->

526

send("Producer1-$i")

527

delay(100)

528

}

529

}

530

531

val producer2 = produce {

532

repeat(3) { i ->

533

send("Producer2-$i")

534

delay(150)

535

}

536

}

537

538

val combined = fanIn(producer1, producer2)

539

540

for (message in combined) {

541

println("Fan-in received: $message")

542

}

543

}

544

```

545

546

## Types

547

548

### Synchronization Exceptions

549

550

Exceptions related to synchronization operations.

551

552

```kotlin { .api }

553

/** Exception thrown when trying to unlock mutex not owned by current thread */

554

class IllegalStateException : RuntimeException

555

556

/** Exception thrown on cancellation */

557

class CancellationException : IllegalStateException

558

```