0
# Synchronization
1
2
Non-blocking synchronization primitives including mutex and semaphore for coroutine coordination. These primitives provide thread-safe coordination without blocking OS threads.
3
4
## Capabilities
5
6
### Mutex
7
8
Non-reentrant mutual exclusion primitive for protecting shared resources.
9
10
```kotlin { .api }
11
/**
12
* Mutual exclusion primitive that is not reentrant
13
*/
14
interface Mutex {
15
/** True if mutex is currently locked */
16
val isLocked: Boolean
17
18
/** Lock the mutex, suspending if already locked */
19
suspend fun lock(owner: Any? = null)
20
21
/** Unlock the mutex */
22
fun unlock(owner: Any? = null)
23
24
/** Try to lock immediately without suspending */
25
fun tryLock(owner: Any? = null): Boolean
26
27
/** Check if current owner holds the lock */
28
fun holdsLock(owner: Any): Boolean
29
}
30
31
/**
32
* Create a new Mutex
33
* @param locked initial lock state
34
*/
35
fun Mutex(locked: Boolean = false): Mutex
36
37
/**
38
* Execute block with mutex locked
39
*/
40
suspend fun <T> Mutex.withLock(
41
owner: Any? = null,
42
action: suspend () -> T
43
): T
44
```
45
46
**Usage Examples:**
47
48
```kotlin
49
import kotlinx.coroutines.*
50
import kotlinx.coroutines.sync.*
51
52
class Counter {
53
private var count = 0
54
private val mutex = Mutex()
55
56
suspend fun increment() = mutex.withLock {
57
count++
58
}
59
60
suspend fun get() = mutex.withLock {
61
count
62
}
63
64
suspend fun decrement() = mutex.withLock {
65
count--
66
}
67
}
68
69
// Usage
70
val counter = Counter()
71
72
// Launch multiple coroutines
73
repeat(1000) {
74
launch {
75
counter.increment()
76
}
77
}
78
79
println("Final count: ${counter.get()}") // Always 1000
80
```
81
82
### Semaphore
83
84
Counting semaphore for controlling access to a limited number of resources.
85
86
```kotlin { .api }
87
/**
88
* Counting semaphore for resource access control
89
*/
90
interface Semaphore {
91
/** Number of permits currently available */
92
val availablePermits: Int
93
94
/** Acquire a permit, suspending if none available */
95
suspend fun acquire()
96
97
/** Release a permit */
98
fun release()
99
100
/** Try to acquire a permit immediately without suspending */
101
fun tryAcquire(): Boolean
102
}
103
104
/**
105
* Create a new Semaphore
106
* @param permits number of permits available
107
* @param acquiredPermits number of permits already acquired
108
*/
109
fun Semaphore(
110
permits: Int,
111
acquiredPermits: Int = 0
112
): Semaphore
113
114
/**
115
* Execute block with semaphore permit
116
*/
117
suspend fun <T> Semaphore.withPermit(action: suspend () -> T): T
118
```
119
120
**Usage Examples:**
121
122
```kotlin
123
import kotlinx.coroutines.*
124
import kotlinx.coroutines.sync.*
125
126
// Limit concurrent network requests to 3
127
val networkSemaphore = Semaphore(3)
128
129
suspend fun fetchData(url: String): String {
130
return networkSemaphore.withPermit {
131
// Only 3 coroutines can execute this block concurrently
132
httpClient.get(url)
133
}
134
}
135
136
// Database connection pool
137
class DatabasePool(private val maxConnections: Int = 10) {
138
private val semaphore = Semaphore(maxConnections)
139
private val connections = mutableListOf<Connection>()
140
141
suspend fun <T> useConnection(block: suspend (Connection) -> T): T {
142
return semaphore.withPermit {
143
val connection = getConnection()
144
try {
145
block(connection)
146
} finally {
147
releaseConnection(connection)
148
}
149
}
150
}
151
}
152
153
// Rate limiting example
154
class RateLimiter(requestsPerSecond: Int) {
155
private val semaphore = Semaphore(requestsPerSecond)
156
157
init {
158
// Replenish permits every second
159
CoroutineScope(Dispatchers.Default).launch {
160
while (true) {
161
delay(1000 / requestsPerSecond)
162
if (semaphore.availablePermits < requestsPerSecond) {
163
semaphore.release()
164
}
165
}
166
}
167
}
168
169
suspend fun acquire() = semaphore.acquire()
170
}
171
```
172
173
## Advanced Usage
174
175
### Multiple Resource Access
176
177
Using multiple synchronization primitives together.
178
179
```kotlin
180
import kotlinx.coroutines.*
181
import kotlinx.coroutines.sync.*
182
183
class SharedResource {
184
private val readWriteMutex = Mutex()
185
private val readers = Semaphore(5) // Allow up to 5 concurrent readers
186
private var readerCount = 0
187
188
suspend fun read(block: suspend () -> Unit) {
189
readers.withPermit {
190
readWriteMutex.withLock {
191
readerCount++
192
if (readerCount == 1) {
193
// First reader, acquire write lock
194
}
195
}
196
197
try {
198
block() // Reading happens here
199
} finally {
200
readWriteMutex.withLock {
201
readerCount--
202
if (readerCount == 0) {
203
// Last reader, release write lock
204
}
205
}
206
}
207
}
208
}
209
210
suspend fun write(block: suspend () -> Unit) {
211
readWriteMutex.withLock {
212
// Exclusive access for writing
213
block()
214
}
215
}
216
}
217
```
218
219
### Timeout with Synchronization
220
221
Using synchronization primitives with timeouts.
222
223
```kotlin
224
import kotlinx.coroutines.*
225
import kotlinx.coroutines.sync.*
226
227
suspend fun tryLockWithTimeout(mutex: Mutex, timeoutMs: Long): Boolean {
228
return try {
229
withTimeout(timeoutMs) {
230
mutex.lock()
231
true
232
}
233
} catch (e: TimeoutCancellationException) {
234
false
235
}
236
}
237
238
suspend fun tryAcquireWithTimeout(semaphore: Semaphore, timeoutMs: Long): Boolean {
239
return try {
240
withTimeout(timeoutMs) {
241
semaphore.acquire()
242
true
243
}
244
} catch (e: TimeoutCancellationException) {
245
false
246
}
247
}
248
249
// Usage
250
val mutex = Mutex()
251
if (tryLockWithTimeout(mutex, 1000)) {
252
try {
253
// Critical section
254
performCriticalOperation()
255
} finally {
256
mutex.unlock()
257
}
258
} else {
259
// Handle timeout
260
println("Could not acquire lock within timeout")
261
}
262
```
263
264
### Fair vs Unfair Locking
265
266
Mutex and Semaphore in kotlinx.coroutines provide fair locking by default.
267
268
```kotlin
269
// Fair locking - coroutines acquire locks in FIFO order
270
val fairMutex = Mutex()
271
val fairSemaphore = Semaphore(1)
272
273
// Multiple coroutines waiting for the same resource
274
// will acquire it in the order they requested it
275
repeat(10) { i ->
276
launch {
277
fairMutex.withLock {
278
println("Coroutine $i acquired lock")
279
delay(100)
280
}
281
}
282
}
283
```
284
285
## Best Practices
286
287
### Resource Management
288
289
Always use `withLock` and `withPermit` to ensure proper cleanup:
290
291
```kotlin
292
// Good - automatic cleanup
293
mutex.withLock {
294
// Critical section
295
riskyOperation()
296
}
297
298
// Avoid - manual lock management
299
mutex.lock()
300
try {
301
riskyOperation()
302
} finally {
303
mutex.unlock() // Easy to forget or skip on exception
304
}
305
```
306
307
### Avoiding Deadlocks
308
309
Be careful with multiple locks to avoid deadlocks:
310
311
```kotlin
312
// Potential deadlock - different lock ordering
313
suspend fun transfer1(from: Account, to: Account, amount: Int) {
314
from.mutex.withLock {
315
to.mutex.withLock {
316
from.balance -= amount
317
to.balance += amount
318
}
319
}
320
}
321
322
suspend fun transfer2(from: Account, to: Account, amount: Int) {
323
to.mutex.withLock { // Different order - potential deadlock!
324
from.mutex.withLock {
325
from.balance -= amount
326
to.balance += amount
327
}
328
}
329
}
330
331
// Solution - consistent lock ordering
332
suspend fun safeTransfer(from: Account, to: Account, amount: Int) {
333
val (first, second) = if (from.id < to.id) from to to else to to from
334
335
first.mutex.withLock {
336
second.mutex.withLock {
337
from.balance -= amount
338
to.balance += amount
339
}
340
}
341
}
342
```
343
344
### Performance Considerations
345
346
- Mutex and Semaphore are lightweight and don't block OS threads
347
- Use them instead of thread-blocking synchronization primitives
348
- Consider using actors or channels for more complex coordination
349
- Profile your application to ensure synchronization isn't a bottleneck