Coroutines support libraries for Kotlin providing structured concurrency primitives, Flow API for reactive streams, channels for communication, and synchronization utilities across all Kotlin platforms
—
Thread-safe synchronization utilities for coordinating access to shared resources in concurrent coroutine applications. These primitives provide cooperative synchronization without blocking threads.
A mutual exclusion synchronization primitive that ensures only one coroutine can access a critical section at a time.
/**
* Mutual exclusion for coroutines.
* Mutex has two states: locked and unlocked.
*/
interface Mutex {
/**
* Returns true when this mutex is locked by some owner.
*/
val isLocked: Boolean
/**
* Tries to lock this mutex, returning false if this mutex is already locked.
*/
fun tryLock(owner: Any? = null): Boolean
/**
* Locks this mutex, suspending caller until the lock is acquired.
*/
suspend fun lock(owner: Any? = null)
/**
* Unlocks this mutex. Throws IllegalStateException if not locked or
* if the owner is different from the owner used to lock.
*/
fun unlock(owner: Any? = null)
/**
* Executes the given action under this mutex's lock.
*/
suspend fun <T> withLock(owner: Any? = null, action: suspend () -> T): T
}
/**
* Creates a new mutex instance that is not locked initially.
*/
fun Mutex(locked: Boolean = false): MutexUsage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
val scope = MainScope()
// Basic mutex usage
val mutex = Mutex()
var counter = 0
repeat(100) {
scope.launch {
mutex.withLock {
counter++ // Thread-safe increment
}
}
}
scope.launch {
delay(1000)
println("Final counter value: $counter") // Will be 100
}
// Manual lock/unlock
val manualMutex = Mutex()
var sharedResource = 0
scope.launch {
manualMutex.lock()
try {
sharedResource = expensiveComputation()
delay(100) // Simulate work while holding lock
} finally {
manualMutex.unlock()
}
}
// Try lock (non-blocking)
scope.launch {
if (manualMutex.tryLock()) {
try {
println("Got the lock!")
sharedResource += 10
} finally {
manualMutex.unlock()
}
} else {
println("Could not acquire lock")
}
}
// Mutex with owner (for debugging)
class DataProcessor {
private val mutex = Mutex()
private var data = mutableListOf<String>()
suspend fun addData(item: String) {
mutex.withLock(owner = this) {
data.add(item)
println("Added: $item, size: ${data.size}")
}
}
suspend fun processData(): List<String> {
return mutex.withLock(owner = this) {
val result = data.toList()
data.clear()
result
}
}
}A counting synchronization primitive that maintains a set of permits, allowing multiple coroutines to access a resource up to a specified limit.
/**
* A counting semaphore for coroutines.
* A semaphore has a number of permits. Each acquire takes a permit; each release adds a permit.
*/
interface Semaphore {
/**
* The number of permits currently available in this semaphore.
*/
val availablePermits: Int
/**
* Acquires a permit from this semaphore, suspending until one is available.
*/
suspend fun acquire()
/**
* Tries to acquire a permit from this semaphore without suspending.
* Returns true if a permit was acquired, false otherwise.
*/
fun tryAcquire(): Boolean
/**
* Releases a permit, returning it to the semaphore.
*/
fun release()
/**
* Executes the given action, acquiring a permit before and releasing it after.
*/
suspend fun <T> withPermit(action: suspend () -> T): T
}
/**
* Creates a new semaphore instance.
* @param permits the number of permits available in this semaphore
* @param acquiredPermits the number of permits already acquired
*/
fun Semaphore(permits: Int, acquiredPermits: Int = 0): SemaphoreUsage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
val scope = MainScope()
// Limit concurrent network requests
val networkSemaphore = Semaphore(3) // Allow max 3 concurrent requests
suspend fun makeNetworkRequest(url: String): String {
return networkSemaphore.withPermit {
println("Making request to $url (available permits: ${networkSemaphore.availablePermits})")
delay(1000) // Simulate network call
"Response from $url"
}
}
scope.launch {
// Launch 10 requests, but only 3 will run concurrently
val requests = (1..10).map { i ->
async {
makeNetworkRequest("https://api.example.com/data$i")
}
}
val responses = requests.awaitAll()
println("All requests completed: ${responses.size}")
}
// Database connection pool simulation
class DatabasePool(maxConnections: Int) {
private val semaphore = Semaphore(maxConnections)
suspend fun <T> withConnection(block: suspend (Connection) -> T): T {
return semaphore.withPermit {
val connection = getConnection()
try {
block(connection)
} finally {
releaseConnection(connection)
}
}
}
private suspend fun getConnection(): Connection {
println("Acquiring database connection")
delay(50) // Simulate connection setup
return Connection()
}
private fun releaseConnection(connection: Connection) {
println("Releasing database connection")
}
}
class Connection {
suspend fun query(sql: String): List<String> {
delay(200) // Simulate query execution
return listOf("result1", "result2")
}
}
// Usage
val dbPool = DatabasePool(maxConnections = 2)
scope.launch {
repeat(5) { i ->
launch {
val results = dbPool.withConnection { conn ->
conn.query("SELECT * FROM table$i")
}
println("Query $i results: $results")
}
}
}
// Manual acquire/release
val resourceSemaphore = Semaphore(2)
scope.launch {
if (resourceSemaphore.tryAcquire()) {
try {
println("Got permit immediately, available: ${resourceSemaphore.availablePermits}")
delay(1000)
} finally {
resourceSemaphore.release()
}
} else {
println("No permits available")
resourceSemaphore.acquire() // Wait for permit
try {
println("Got permit after waiting, available: ${resourceSemaphore.availablePermits}")
delay(1000)
} finally {
resourceSemaphore.release()
}
}
}Complex synchronization scenarios using combinations of primitives.
Producer-Consumer with Bounded Buffer:
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
class BoundedBuffer<T>(capacity: Int) {
private val buffer = mutableListOf<T>()
private val mutex = Mutex()
private val notEmpty = Semaphore(0) // Signals when buffer has items
private val notFull = Semaphore(capacity) // Signals when buffer has space
suspend fun put(item: T) {
notFull.acquire() // Wait for space
mutex.withLock {
buffer.add(item)
println("Produced: $item, buffer size: ${buffer.size}")
}
notEmpty.release() // Signal that item is available
}
suspend fun take(): T {
notEmpty.acquire() // Wait for item
val item = mutex.withLock {
val result = buffer.removeAt(0)
println("Consumed: $result, buffer size: ${buffer.size}")
result
}
notFull.release() // Signal that space is available
return item
}
}
val scope = MainScope()
val buffer = BoundedBuffer<String>(capacity = 3)
// Producers
repeat(2) { producerId ->
scope.launch {
repeat(5) { i ->
buffer.put("Item-$producerId-$i")
delay(100)
}
}
}
// Consumers
repeat(2) { consumerId ->
scope.launch {
repeat(5) { i ->
val item = buffer.take()
println("Consumer $consumerId got: $item")
delay(200)
}
}
}Reader-Writer Lock Pattern:
class ReadWriteMutex {
private val readerCountMutex = Mutex()
private val writerMutex = Mutex()
private var readerCount = 0
suspend fun <T> withReadLock(action: suspend () -> T): T {
// Acquire reader access
readerCountMutex.withLock {
readerCount++
if (readerCount == 1) {
writerMutex.lock() // First reader blocks writers
}
}
try {
return action()
} finally {
// Release reader access
readerCountMutex.withLock {
readerCount--
if (readerCount == 0) {
writerMutex.unlock() // Last reader unblocks writers
}
}
}
}
suspend fun <T> withWriteLock(action: suspend () -> T): T {
return writerMutex.withLock {
action()
}
}
}
class SharedData {
private val rwMutex = ReadWriteMutex()
private var data = "Initial data"
suspend fun read(): String {
return rwMutex.withReadLock {
println("Reading: $data on ${Thread.currentThread().name}")
delay(100) // Simulate read time
data
}
}
suspend fun write(newData: String) {
rwMutex.withWriteLock {
println("Writing: $newData on ${Thread.currentThread().name}")
delay(200) // Simulate write time
data = newData
}
}
}
val sharedData = SharedData()
scope.launch {
// Multiple readers can run concurrently
repeat(5) { i ->
launch {
val value = sharedData.read()
println("Reader $i got: $value")
}
}
// Writer blocks all readers
launch {
delay(300)
sharedData.write("Updated data")
}
}Rate Limiting Pattern:
class RateLimiter(
permits: Int,
private val periodMs: Long
) {
private val semaphore = Semaphore(permits)
suspend fun <T> execute(action: suspend () -> T): T {
return semaphore.withPermit {
// Schedule permit release after period
scope.launch {
delay(periodMs)
// Permit is automatically released when withPermit block completes
}
action()
}
}
}
val rateLimiter = RateLimiter(permits = 3, periodMs = 1000) // 3 requests per second
suspend fun apiCall(requestId: Int): String {
return rateLimiter.execute {
println("Making API call $requestId at ${System.currentTimeMillis()}")
delay(100) // Simulate API call
"Response $requestId"
}
}
scope.launch {
// Make 10 API calls - they'll be rate limited
repeat(10) { i ->
launch {
val response = apiCall(i)
println("Got: $response")
}
}
}Best practices for avoiding deadlocks when using multiple synchronization primitives.
// BAD: Potential deadlock
val mutex1 = Mutex()
val mutex2 = Mutex()
// Coroutine A
scope.launch {
mutex1.withLock {
delay(100)
mutex2.withLock {
println("A got both locks")
}
}
}
// Coroutine B
scope.launch {
mutex2.withLock {
delay(100)
mutex1.withLock { // Potential deadlock here
println("B got both locks")
}
}
}
// GOOD: Consistent lock ordering
scope.launch {
// Both coroutines acquire locks in same order
mutex1.withLock {
mutex2.withLock {
println("A got both locks safely")
}
}
}
scope.launch {
mutex1.withLock {
mutex2.withLock {
println("B got both locks safely")
}
}
}
// BETTER: Use timeout with tryLock
suspend fun safeDoublelock(action: suspend () -> Unit): Boolean {
if (mutex1.tryLock()) {
try {
// Try to get second lock with timeout
withTimeoutOrNull(1000) {
mutex2.withLock {
action()
}
} ?: return false
return true
} finally {
mutex1.unlock()
}
}
return false
}Tips for optimal performance with synchronization primitives.
// Minimize critical section size
val mutex = Mutex()
var counter = 0
// BAD: Long critical section
scope.launch {
mutex.withLock {
val data = expensiveOperation() // Don't do expensive work in lock
counter += data.size
processData(data) // This should be outside the lock
}
}
// GOOD: Minimal critical section
scope.launch {
val data = expensiveOperation() // Do expensive work outside lock
val size = data.size
mutex.withLock {
counter += size // Only critical operation inside lock
}
processData(data) // Non-critical work outside lock
}
// Use appropriate synchronization primitive
// For simple counters, consider atomic operations or channels instead of mutex
val atomicCounter = AtomicInteger(0) // Better for simple counters
// For producer-consumer, consider channels instead of manual synchronization
val channel = Channel<String>(capacity = 10) // Often simpler than semaphore+mutexStrategies for testing concurrent code with synchronization primitives.
import kotlinx.coroutines.test.*
@Test
fun testMutexSafety() = runTest {
val mutex = Mutex()
var counter = 0
val jobs = mutableListOf<Job>()
repeat(100) {
val job = launch {
mutex.withLock {
counter++
}
}
jobs.add(job)
}
jobs.joinAll()
assertEquals(100, counter)
}
@Test
fun testSemaphoreLimit() = runTest {
val semaphore = Semaphore(3)
var concurrentCount = 0
var maxConcurrent = 0
val maxConcurrentMutex = Mutex()
val jobs = (1..10).map {
launch {
semaphore.withPermit {
val current = maxConcurrentMutex.withLock {
concurrentCount++
maxConcurrent = maxOf(maxConcurrent, concurrentCount)
concurrentCount
}
delay(100) // Simulate work
maxConcurrentMutex.withLock {
concurrentCount--
}
}
}
}
jobs.joinAll()
assertEquals(3, maxConcurrent)
}Install with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core