0
# Synchronization Primitives
1
2
Thread-safe synchronization mechanisms designed for coroutines including mutexes, semaphores, and atomic operations with suspending behavior and cancellation support.
3
4
## Capabilities
5
6
### Mutex
7
8
Non-reentrant mutual exclusion for protecting critical sections in coroutines.
9
10
```kotlin { .api }
11
interface Mutex {
12
/** True if mutex is currently locked */
13
val isLocked: Boolean
14
15
/** Suspends until lock is acquired */
16
suspend fun lock(owner: Any? = null)
17
18
/** Tries to acquire lock immediately without suspending */
19
fun tryLock(owner: Any? = null): Boolean
20
21
/** Releases the lock */
22
fun unlock(owner: Any? = null)
23
24
/** Checks if locked by specific owner */
25
fun holdsLock(owner: Any): Boolean
26
}
27
28
/** Creates a new Mutex */
29
fun Mutex(locked: Boolean = false): Mutex
30
31
/** Executes block under mutex protection */
32
suspend inline fun <T> Mutex.withLock(
33
owner: Any? = null,
34
action: suspend () -> T
35
): T
36
```
37
38
**Usage Examples:**
39
40
```kotlin
41
import kotlinx.coroutines.*
42
import kotlinx.coroutines.sync.*
43
44
class Counter {
45
private var count = 0
46
private val mutex = Mutex()
47
48
suspend fun increment() = mutex.withLock {
49
count++
50
}
51
52
suspend fun decrement() = mutex.withLock {
53
count--
54
}
55
56
suspend fun get(): Int = mutex.withLock {
57
count
58
}
59
}
60
61
fun main() = runBlocking {
62
val counter = Counter()
63
64
// Launch multiple coroutines that modify counter
65
val jobs = List(100) {
66
launch {
67
repeat(100) {
68
counter.increment()
69
}
70
}
71
}
72
73
jobs.forEach { it.join() }
74
75
println("Final count: ${counter.get()}") // Should be 10000
76
77
// Manual lock/unlock example
78
val mutex = Mutex()
79
80
launch {
81
println("Acquiring lock...")
82
mutex.lock()
83
try {
84
println("Critical section 1")
85
delay(1000)
86
} finally {
87
mutex.unlock()
88
println("Released lock")
89
}
90
}
91
92
launch {
93
delay(100)
94
println("Trying to acquire lock...")
95
mutex.lock()
96
try {
97
println("Critical section 2")
98
} finally {
99
mutex.unlock()
100
}
101
}
102
103
delay(2000)
104
}
105
```
106
107
### Semaphore
108
109
Counting semaphore for limiting the number of concurrent accesses to a resource.
110
111
```kotlin { .api }
112
interface Semaphore {
113
/** Number of permits currently available */
114
val availablePermits: Int
115
116
/** Suspends until permit is acquired */
117
suspend fun acquire()
118
119
/** Tries to acquire permit immediately without suspending */
120
fun tryAcquire(): Boolean
121
122
/** Releases a permit */
123
fun release()
124
}
125
126
/** Creates a new Semaphore */
127
fun Semaphore(
128
permits: Int,
129
acquiredPermits: Int = 0
130
): Semaphore
131
132
/** Executes block with acquired permit */
133
suspend inline fun <T> Semaphore.withPermit(action: suspend () -> T): T
134
```
135
136
**Usage Examples:**
137
138
```kotlin
139
import kotlinx.coroutines.*
140
import kotlinx.coroutines.sync.*
141
142
// Connection pool example
143
class ConnectionPool(maxConnections: Int) {
144
private val semaphore = Semaphore(maxConnections)
145
private var connectionId = 0
146
147
suspend fun <T> useConnection(block: suspend (connectionId: Int) -> T): T {
148
return semaphore.withPermit {
149
val id = ++connectionId
150
println("Acquired connection $id (${semaphore.availablePermits} remaining)")
151
try {
152
block(id)
153
} finally {
154
println("Released connection $id (${semaphore.availablePermits + 1} will be available)")
155
}
156
}
157
}
158
}
159
160
fun main() = runBlocking {
161
val connectionPool = ConnectionPool(maxConnections = 3)
162
163
// Launch more coroutines than available connections
164
val jobs = List(10) { taskId ->
165
launch {
166
connectionPool.useConnection { connectionId ->
167
println("Task $taskId using connection $connectionId")
168
delay((100..500).random().toLong())
169
"Result from task $taskId"
170
}
171
}
172
}
173
174
jobs.forEach { it.join() }
175
176
// Manual acquire/release example
177
val semaphore = Semaphore(2)
178
179
repeat(5) { i ->
180
launch {
181
println("Task $i waiting for permit...")
182
semaphore.acquire()
183
try {
184
println("Task $i acquired permit (${semaphore.availablePermits} remaining)")
185
delay(1000)
186
} finally {
187
semaphore.release()
188
println("Task $i released permit")
189
}
190
}
191
}
192
193
delay(6000)
194
}
195
```
196
197
### Atomic Operations
198
199
Thread-safe atomic operations and references.
200
201
```kotlin { .api }
202
/** Atomic reference with compare-and-swap operations */
203
class AtomicReference<V>(value: V) {
204
var value: V
205
fun getAndSet(newValue: V): V
206
fun compareAndSet(expected: V, newValue: V): Boolean
207
fun lazySet(newValue: V)
208
}
209
210
/** Atomic integer operations */
211
class AtomicInteger(value: Int = 0) {
212
var value: Int
213
fun getAndIncrement(): Int
214
fun incrementAndGet(): Int
215
fun getAndDecrement(): Int
216
fun decrementAndGet(): Int
217
fun getAndAdd(delta: Int): Int
218
fun addAndGet(delta: Int): Int
219
fun compareAndSet(expected: Int, newValue: Int): Boolean
220
}
221
222
/** Atomic long operations */
223
class AtomicLong(value: Long = 0L) {
224
var value: Long
225
fun getAndIncrement(): Long
226
fun incrementAndGet(): Long
227
fun getAndAdd(delta: Long): Long
228
fun addAndGet(delta: Long): Long
229
fun compareAndSet(expected: Long, newValue: Long): Boolean
230
}
231
232
/** Atomic boolean operations */
233
class AtomicBoolean(value: Boolean = false) {
234
var value: Boolean
235
fun getAndSet(newValue: Boolean): Boolean
236
fun compareAndSet(expected: Boolean, newValue: Boolean): Boolean
237
}
238
```
239
240
**Usage Examples:**
241
242
```kotlin
243
import kotlinx.coroutines.*
244
import kotlinx.atomicfu.*
245
246
class AtomicCounter {
247
private val count = atomic(0)
248
249
suspend fun increment(): Int = count.incrementAndGet()
250
suspend fun decrement(): Int = count.decrementAndGet()
251
suspend fun get(): Int = count.value
252
253
suspend fun addIfLessThan(delta: Int, limit: Int): Boolean {
254
while (true) {
255
val current = count.value
256
if (current >= limit) return false
257
if (count.compareAndSet(current, current + delta)) {
258
return true
259
}
260
}
261
}
262
}
263
264
class AtomicState<T>(initialValue: T) {
265
private val state = atomic(initialValue)
266
267
fun get(): T = state.value
268
269
fun update(transform: (T) -> T): T {
270
while (true) {
271
val current = state.value
272
val new = transform(current)
273
if (state.compareAndSet(current, new)) {
274
return new
275
}
276
}
277
}
278
279
fun compareAndSet(expected: T, newValue: T): Boolean {
280
return state.compareAndSet(expected, newValue)
281
}
282
}
283
284
fun main() = runBlocking {
285
val counter = AtomicCounter()
286
287
// Concurrent increment operations
288
val jobs = List(1000) {
289
launch {
290
counter.increment()
291
}
292
}
293
294
jobs.forEach { it.join() }
295
println("Final count: ${counter.get()}")
296
297
// Conditional atomic operations
298
val success = counter.addIfLessThan(5, 1010)
299
println("Add operation successful: $success")
300
301
// Atomic state example
302
data class UserState(val name: String, val count: Int)
303
val userState = AtomicState(UserState("John", 0))
304
305
repeat(10) {
306
launch {
307
userState.update { current ->
308
current.copy(count = current.count + 1)
309
}
310
}
311
}
312
313
delay(100)
314
println("User state: ${userState.get()}")
315
}
316
```
317
318
### Channel-based Synchronization
319
320
Using channels for synchronization patterns.
321
322
```kotlin { .api }
323
/** Token-based synchronization using channels */
324
class TokenBucket(capacity: Int) {
325
private val tokens = Channel<Unit>(capacity)
326
327
init {
328
// Fill with initial tokens
329
repeat(capacity) {
330
tokens.trySend(Unit)
331
}
332
}
333
334
suspend fun acquire() {
335
tokens.receive()
336
}
337
338
fun tryAcquire(): Boolean {
339
return tokens.tryReceive().isSuccess
340
}
341
342
fun release() {
343
tokens.trySend(Unit)
344
}
345
}
346
```
347
348
**Usage Examples:**
349
350
```kotlin
351
import kotlinx.coroutines.*
352
import kotlinx.coroutines.channels.*
353
354
// Rate limiting with token bucket
355
class RateLimiter(tokensPerSecond: Int) {
356
private val bucket = TokenBucket(tokensPerSecond)
357
358
init {
359
// Refill tokens periodically
360
GlobalScope.launch {
361
while (true) {
362
delay(1000L / tokensPerSecond)
363
bucket.release()
364
}
365
}
366
}
367
368
suspend fun <T> execute(action: suspend () -> T): T {
369
bucket.acquire()
370
return action()
371
}
372
}
373
374
// Barrier synchronization
375
class CyclicBarrier(private val parties: Int) {
376
private val count = atomic(0)
377
private val generation = atomic(0)
378
private val channels = atomic<Channel<Unit>?>(null)
379
380
suspend fun await() {
381
val currentGeneration = generation.value
382
val currentCount = count.getAndIncrement()
383
384
if (currentCount == parties - 1) {
385
// Last party - release all waiting parties
386
val channel = channels.getAndSet(null)
387
channel?.close()
388
count.value = 0
389
generation.incrementAndGet()
390
} else {
391
// Wait for other parties
392
val channel = channels.value ?: Channel<Unit>().also {
393
channels.compareAndSet(null, it)
394
}
395
396
// Wait until barrier is tripped or generation changes
397
while (generation.value == currentGeneration && !channel.isClosedForReceive) {
398
try {
399
channel.receive()
400
} catch (e: ClosedReceiveChannelException) {
401
break
402
}
403
}
404
}
405
}
406
}
407
408
fun main() = runBlocking {
409
// Rate limiter example
410
val rateLimiter = RateLimiter(tokensPerSecond = 2)
411
412
repeat(5) { i ->
413
launch {
414
rateLimiter.execute {
415
println("Request $i processed at ${System.currentTimeMillis()}")
416
delay(100)
417
}
418
}
419
}
420
421
delay(5000)
422
423
// Barrier example
424
val barrier = CyclicBarrier(3)
425
426
repeat(3) { i ->
427
launch {
428
println("Task $i started")
429
delay((100..300).random().toLong())
430
println("Task $i waiting at barrier")
431
barrier.await()
432
println("Task $i passed barrier")
433
}
434
}
435
436
delay(2000)
437
}
438
```
439
440
### Select-based Synchronization
441
442
Using select expressions for complex synchronization patterns.
443
444
```kotlin { .api }
445
/** Select among multiple suspending operations */
446
suspend fun <R> select(builder: SelectBuilder<R>.() -> Unit): R
447
448
interface SelectBuilder<in R> {
449
/** Select on channel receive */
450
fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R)
451
452
/** Select on channel send */
453
fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R)
454
455
/** Select on job completion */
456
fun Job.onJoin(block: suspend () -> R)
457
458
/** Select on deferred completion */
459
fun <T> Deferred<T>.onAwait(block: suspend (T) -> R)
460
461
/** Select with timeout */
462
fun onTimeout(timeMillis: Long, block: suspend () -> R)
463
}
464
```
465
466
**Usage Examples:**
467
468
```kotlin
469
import kotlinx.coroutines.*
470
import kotlinx.coroutines.channels.*
471
import kotlinx.coroutines.selects.*
472
473
suspend fun selectExample() = coroutineScope {
474
val channel1 = Channel<String>()
475
val channel2 = Channel<String>()
476
477
launch {
478
delay(100)
479
channel1.send("Message from channel 1")
480
}
481
482
launch {
483
delay(200)
484
channel2.send("Message from channel 2")
485
}
486
487
// Select first available message
488
val result = select<String> {
489
channel1.onReceive { "Received from channel1: $it" }
490
channel2.onReceive { "Received from channel2: $it" }
491
onTimeout(300) { "Timeout occurred" }
492
}
493
494
println(result)
495
496
channel1.close()
497
channel2.close()
498
}
499
500
// Fan-in pattern with select
501
suspend fun fanIn(vararg channels: ReceiveChannel<String>): ReceiveChannel<String> =
502
produce {
503
while (true) {
504
val message = select<String?> {
505
channels.forEach { channel ->
506
channel.onReceiveCatching { result ->
507
result.getOrNull()
508
}
509
}
510
}
511
512
if (message != null) {
513
send(message)
514
} else {
515
break // All channels closed
516
}
517
}
518
}
519
520
fun main() = runBlocking {
521
selectExample()
522
523
// Fan-in example
524
val producer1 = produce {
525
repeat(3) { i ->
526
send("Producer1-$i")
527
delay(100)
528
}
529
}
530
531
val producer2 = produce {
532
repeat(3) { i ->
533
send("Producer2-$i")
534
delay(150)
535
}
536
}
537
538
val combined = fanIn(producer1, producer2)
539
540
for (message in combined) {
541
println("Fan-in received: $message")
542
}
543
}
544
```
545
546
## Types
547
548
### Synchronization Exceptions
549
550
Exceptions related to synchronization operations.
551
552
```kotlin { .api }
553
/** Exception thrown when trying to unlock mutex not owned by current thread */
554
class IllegalStateException : RuntimeException
555
556
/** Exception thrown on cancellation */
557
class CancellationException : IllegalStateException
558
```