0
# Synchronization Primitives
1
2
Thread-safe synchronization utilities for coordinating access to shared resources in concurrent coroutine applications. These primitives provide cooperative synchronization without blocking threads.
3
4
## Capabilities
5
6
### Mutex - Mutual Exclusion
7
8
A mutual exclusion synchronization primitive that ensures only one coroutine can access a critical section at a time.
9
10
```kotlin { .api }
11
/**
12
* Mutual exclusion for coroutines.
13
* Mutex has two states: locked and unlocked.
14
*/
15
interface Mutex {
16
/**
17
* Returns true when this mutex is locked by some owner.
18
*/
19
val isLocked: Boolean
20
21
/**
22
* Tries to lock this mutex, returning false if this mutex is already locked.
23
*/
24
fun tryLock(owner: Any? = null): Boolean
25
26
/**
27
* Locks this mutex, suspending caller until the lock is acquired.
28
*/
29
suspend fun lock(owner: Any? = null)
30
31
/**
32
* Unlocks this mutex. Throws IllegalStateException if not locked or
33
* if the owner is different from the owner used to lock.
34
*/
35
fun unlock(owner: Any? = null)
36
37
/**
38
* Executes the given action under this mutex's lock.
39
*/
40
suspend fun <T> withLock(owner: Any? = null, action: suspend () -> T): T
41
}
42
43
/**
44
* Creates a new mutex instance that is not locked initially.
45
*/
46
fun Mutex(locked: Boolean = false): Mutex
47
```
48
49
**Usage Examples:**
50
51
```kotlin
52
import kotlinx.coroutines.*
53
import kotlinx.coroutines.sync.*
54
55
val scope = MainScope()
56
57
// Basic mutex usage
58
val mutex = Mutex()
59
var counter = 0
60
61
repeat(100) {
62
scope.launch {
63
mutex.withLock {
64
counter++ // Thread-safe increment
65
}
66
}
67
}
68
69
scope.launch {
70
delay(1000)
71
println("Final counter value: $counter") // Will be 100
72
}
73
74
// Manual lock/unlock
75
val manualMutex = Mutex()
76
var sharedResource = 0
77
78
scope.launch {
79
manualMutex.lock()
80
try {
81
sharedResource = expensiveComputation()
82
delay(100) // Simulate work while holding lock
83
} finally {
84
manualMutex.unlock()
85
}
86
}
87
88
// Try lock (non-blocking)
89
scope.launch {
90
if (manualMutex.tryLock()) {
91
try {
92
println("Got the lock!")
93
sharedResource += 10
94
} finally {
95
manualMutex.unlock()
96
}
97
} else {
98
println("Could not acquire lock")
99
}
100
}
101
102
// Mutex with owner (for debugging)
103
class DataProcessor {
104
private val mutex = Mutex()
105
private var data = mutableListOf<String>()
106
107
suspend fun addData(item: String) {
108
mutex.withLock(owner = this) {
109
data.add(item)
110
println("Added: $item, size: ${data.size}")
111
}
112
}
113
114
suspend fun processData(): List<String> {
115
return mutex.withLock(owner = this) {
116
val result = data.toList()
117
data.clear()
118
result
119
}
120
}
121
}
122
```
123
124
### Semaphore - Counting Synchronization
125
126
A counting synchronization primitive that maintains a set of permits, allowing multiple coroutines to access a resource up to a specified limit.
127
128
```kotlin { .api }
129
/**
130
* A counting semaphore for coroutines.
131
* A semaphore has a number of permits. Each acquire takes a permit; each release adds a permit.
132
*/
133
interface Semaphore {
134
/**
135
* The number of permits currently available in this semaphore.
136
*/
137
val availablePermits: Int
138
139
/**
140
* Acquires a permit from this semaphore, suspending until one is available.
141
*/
142
suspend fun acquire()
143
144
/**
145
* Tries to acquire a permit from this semaphore without suspending.
146
* Returns true if a permit was acquired, false otherwise.
147
*/
148
fun tryAcquire(): Boolean
149
150
/**
151
* Releases a permit, returning it to the semaphore.
152
*/
153
fun release()
154
155
/**
156
* Executes the given action, acquiring a permit before and releasing it after.
157
*/
158
suspend fun <T> withPermit(action: suspend () -> T): T
159
}
160
161
/**
162
* Creates a new semaphore instance.
163
* @param permits the number of permits available in this semaphore
164
* @param acquiredPermits the number of permits already acquired
165
*/
166
fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore
167
```
168
169
**Usage Examples:**
170
171
```kotlin
172
import kotlinx.coroutines.*
173
import kotlinx.coroutines.sync.*
174
175
val scope = MainScope()
176
177
// Limit concurrent network requests
178
val networkSemaphore = Semaphore(3) // Allow max 3 concurrent requests
179
180
suspend fun makeNetworkRequest(url: String): String {
181
return networkSemaphore.withPermit {
182
println("Making request to $url (available permits: ${networkSemaphore.availablePermits})")
183
delay(1000) // Simulate network call
184
"Response from $url"
185
}
186
}
187
188
scope.launch {
189
// Launch 10 requests, but only 3 will run concurrently
190
val requests = (1..10).map { i ->
191
async {
192
makeNetworkRequest("https://api.example.com/data$i")
193
}
194
}
195
196
val responses = requests.awaitAll()
197
println("All requests completed: ${responses.size}")
198
}
199
200
// Database connection pool simulation
201
class DatabasePool(maxConnections: Int) {
202
private val semaphore = Semaphore(maxConnections)
203
204
suspend fun <T> withConnection(block: suspend (Connection) -> T): T {
205
return semaphore.withPermit {
206
val connection = getConnection()
207
try {
208
block(connection)
209
} finally {
210
releaseConnection(connection)
211
}
212
}
213
}
214
215
private suspend fun getConnection(): Connection {
216
println("Acquiring database connection")
217
delay(50) // Simulate connection setup
218
return Connection()
219
}
220
221
private fun releaseConnection(connection: Connection) {
222
println("Releasing database connection")
223
}
224
}
225
226
class Connection {
227
suspend fun query(sql: String): List<String> {
228
delay(200) // Simulate query execution
229
return listOf("result1", "result2")
230
}
231
}
232
233
// Usage
234
val dbPool = DatabasePool(maxConnections = 2)
235
236
scope.launch {
237
repeat(5) { i ->
238
launch {
239
val results = dbPool.withConnection { conn ->
240
conn.query("SELECT * FROM table$i")
241
}
242
println("Query $i results: $results")
243
}
244
}
245
}
246
247
// Manual acquire/release
248
val resourceSemaphore = Semaphore(2)
249
250
scope.launch {
251
if (resourceSemaphore.tryAcquire()) {
252
try {
253
println("Got permit immediately, available: ${resourceSemaphore.availablePermits}")
254
delay(1000)
255
} finally {
256
resourceSemaphore.release()
257
}
258
} else {
259
println("No permits available")
260
resourceSemaphore.acquire() // Wait for permit
261
try {
262
println("Got permit after waiting, available: ${resourceSemaphore.availablePermits}")
263
delay(1000)
264
} finally {
265
resourceSemaphore.release()
266
}
267
}
268
}
269
```
270
271
### Advanced Synchronization Patterns
272
273
Complex synchronization scenarios using combinations of primitives.
274
275
**Producer-Consumer with Bounded Buffer:**
276
277
```kotlin
278
import kotlinx.coroutines.*
279
import kotlinx.coroutines.sync.*
280
281
class BoundedBuffer<T>(capacity: Int) {
282
private val buffer = mutableListOf<T>()
283
private val mutex = Mutex()
284
private val notEmpty = Semaphore(0) // Signals when buffer has items
285
private val notFull = Semaphore(capacity) // Signals when buffer has space
286
287
suspend fun put(item: T) {
288
notFull.acquire() // Wait for space
289
mutex.withLock {
290
buffer.add(item)
291
println("Produced: $item, buffer size: ${buffer.size}")
292
}
293
notEmpty.release() // Signal that item is available
294
}
295
296
suspend fun take(): T {
297
notEmpty.acquire() // Wait for item
298
val item = mutex.withLock {
299
val result = buffer.removeAt(0)
300
println("Consumed: $result, buffer size: ${buffer.size}")
301
result
302
}
303
notFull.release() // Signal that space is available
304
return item
305
}
306
}
307
308
val scope = MainScope()
309
val buffer = BoundedBuffer<String>(capacity = 3)
310
311
// Producers
312
repeat(2) { producerId ->
313
scope.launch {
314
repeat(5) { i ->
315
buffer.put("Item-$producerId-$i")
316
delay(100)
317
}
318
}
319
}
320
321
// Consumers
322
repeat(2) { consumerId ->
323
scope.launch {
324
repeat(5) { i ->
325
val item = buffer.take()
326
println("Consumer $consumerId got: $item")
327
delay(200)
328
}
329
}
330
}
331
```
332
333
**Reader-Writer Lock Pattern:**
334
335
```kotlin
336
class ReadWriteMutex {
337
private val readerCountMutex = Mutex()
338
private val writerMutex = Mutex()
339
private var readerCount = 0
340
341
suspend fun <T> withReadLock(action: suspend () -> T): T {
342
// Acquire reader access
343
readerCountMutex.withLock {
344
readerCount++
345
if (readerCount == 1) {
346
writerMutex.lock() // First reader blocks writers
347
}
348
}
349
350
try {
351
return action()
352
} finally {
353
// Release reader access
354
readerCountMutex.withLock {
355
readerCount--
356
if (readerCount == 0) {
357
writerMutex.unlock() // Last reader unblocks writers
358
}
359
}
360
}
361
}
362
363
suspend fun <T> withWriteLock(action: suspend () -> T): T {
364
return writerMutex.withLock {
365
action()
366
}
367
}
368
}
369
370
class SharedData {
371
private val rwMutex = ReadWriteMutex()
372
private var data = "Initial data"
373
374
suspend fun read(): String {
375
return rwMutex.withReadLock {
376
println("Reading: $data on ${Thread.currentThread().name}")
377
delay(100) // Simulate read time
378
data
379
}
380
}
381
382
suspend fun write(newData: String) {
383
rwMutex.withWriteLock {
384
println("Writing: $newData on ${Thread.currentThread().name}")
385
delay(200) // Simulate write time
386
data = newData
387
}
388
}
389
}
390
391
val sharedData = SharedData()
392
393
scope.launch {
394
// Multiple readers can run concurrently
395
repeat(5) { i ->
396
launch {
397
val value = sharedData.read()
398
println("Reader $i got: $value")
399
}
400
}
401
402
// Writer blocks all readers
403
launch {
404
delay(300)
405
sharedData.write("Updated data")
406
}
407
}
408
```
409
410
**Rate Limiting Pattern:**
411
412
```kotlin
413
class RateLimiter(
414
permits: Int,
415
private val periodMs: Long
416
) {
417
private val semaphore = Semaphore(permits)
418
419
suspend fun <T> execute(action: suspend () -> T): T {
420
return semaphore.withPermit {
421
// Schedule permit release after period
422
scope.launch {
423
delay(periodMs)
424
// Permit is automatically released when withPermit block completes
425
}
426
action()
427
}
428
}
429
}
430
431
val rateLimiter = RateLimiter(permits = 3, periodMs = 1000) // 3 requests per second
432
433
suspend fun apiCall(requestId: Int): String {
434
return rateLimiter.execute {
435
println("Making API call $requestId at ${System.currentTimeMillis()}")
436
delay(100) // Simulate API call
437
"Response $requestId"
438
}
439
}
440
441
scope.launch {
442
// Make 10 API calls - they'll be rate limited
443
repeat(10) { i ->
444
launch {
445
val response = apiCall(i)
446
println("Got: $response")
447
}
448
}
449
}
450
```
451
452
### Deadlock Prevention
453
454
Best practices for avoiding deadlocks when using multiple synchronization primitives.
455
456
```kotlin
457
// BAD: Potential deadlock
458
val mutex1 = Mutex()
459
val mutex2 = Mutex()
460
461
// Coroutine A
462
scope.launch {
463
mutex1.withLock {
464
delay(100)
465
mutex2.withLock {
466
println("A got both locks")
467
}
468
}
469
}
470
471
// Coroutine B
472
scope.launch {
473
mutex2.withLock {
474
delay(100)
475
mutex1.withLock { // Potential deadlock here
476
println("B got both locks")
477
}
478
}
479
}
480
481
// GOOD: Consistent lock ordering
482
scope.launch {
483
// Both coroutines acquire locks in same order
484
mutex1.withLock {
485
mutex2.withLock {
486
println("A got both locks safely")
487
}
488
}
489
}
490
491
scope.launch {
492
mutex1.withLock {
493
mutex2.withLock {
494
println("B got both locks safely")
495
}
496
}
497
}
498
499
// BETTER: Use timeout with tryLock
500
suspend fun safeDoublelock(action: suspend () -> Unit): Boolean {
501
if (mutex1.tryLock()) {
502
try {
503
// Try to get second lock with timeout
504
withTimeoutOrNull(1000) {
505
mutex2.withLock {
506
action()
507
}
508
} ?: return false
509
return true
510
} finally {
511
mutex1.unlock()
512
}
513
}
514
return false
515
}
516
```
517
518
### Performance Considerations
519
520
Tips for optimal performance with synchronization primitives.
521
522
```kotlin
523
// Minimize critical section size
524
val mutex = Mutex()
525
var counter = 0
526
527
// BAD: Long critical section
528
scope.launch {
529
mutex.withLock {
530
val data = expensiveOperation() // Don't do expensive work in lock
531
counter += data.size
532
processData(data) // This should be outside the lock
533
}
534
}
535
536
// GOOD: Minimal critical section
537
scope.launch {
538
val data = expensiveOperation() // Do expensive work outside lock
539
val size = data.size
540
541
mutex.withLock {
542
counter += size // Only critical operation inside lock
543
}
544
545
processData(data) // Non-critical work outside lock
546
}
547
548
// Use appropriate synchronization primitive
549
// For simple counters, consider atomic operations or channels instead of mutex
550
val atomicCounter = AtomicInteger(0) // Better for simple counters
551
552
// For producer-consumer, consider channels instead of manual synchronization
553
val channel = Channel<String>(capacity = 10) // Often simpler than semaphore+mutex
554
```
555
556
### Testing Synchronization
557
558
Strategies for testing concurrent code with synchronization primitives.
559
560
```kotlin
561
import kotlinx.coroutines.test.*
562
563
@Test
564
fun testMutexSafety() = runTest {
565
val mutex = Mutex()
566
var counter = 0
567
val jobs = mutableListOf<Job>()
568
569
repeat(100) {
570
val job = launch {
571
mutex.withLock {
572
counter++
573
}
574
}
575
jobs.add(job)
576
}
577
578
jobs.joinAll()
579
assertEquals(100, counter)
580
}
581
582
@Test
583
fun testSemaphoreLimit() = runTest {
584
val semaphore = Semaphore(3)
585
var concurrentCount = 0
586
var maxConcurrent = 0
587
val maxConcurrentMutex = Mutex()
588
589
val jobs = (1..10).map {
590
launch {
591
semaphore.withPermit {
592
val current = maxConcurrentMutex.withLock {
593
concurrentCount++
594
maxConcurrent = maxOf(maxConcurrent, concurrentCount)
595
concurrentCount
596
}
597
598
delay(100) // Simulate work
599
600
maxConcurrentMutex.withLock {
601
concurrentCount--
602
}
603
}
604
}
605
}
606
607
jobs.joinAll()
608
assertEquals(3, maxConcurrent)
609
}
610
```