CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core

Coroutines support libraries for Kotlin providing structured concurrency primitives, Flow API for reactive streams, channels for communication, and synchronization utilities across all Kotlin platforms

Pending
Overview
Eval results
Files

synchronization.mddocs/

Synchronization Primitives

Thread-safe synchronization utilities for coordinating access to shared resources in concurrent coroutine applications. These primitives provide cooperative synchronization without blocking threads.

Capabilities

Mutex - Mutual Exclusion

A mutual exclusion synchronization primitive that ensures only one coroutine can access a critical section at a time.

/**
 * Mutual exclusion for coroutines.
 * Mutex has two states: locked and unlocked.
 */
interface Mutex {
    /**
     * Returns true when this mutex is locked by some owner.
     */
    val isLocked: Boolean
    
    /**
     * Tries to lock this mutex, returning false if this mutex is already locked.
     */
    fun tryLock(owner: Any? = null): Boolean
    
    /**
     * Locks this mutex, suspending caller until the lock is acquired.
     */
    suspend fun lock(owner: Any? = null)
    
    /**
     * Unlocks this mutex. Throws IllegalStateException if not locked or
     * if the owner is different from the owner used to lock.
     */
    fun unlock(owner: Any? = null)
    
    /**
     * Executes the given action under this mutex's lock.
     */
    suspend fun <T> withLock(owner: Any? = null, action: suspend () -> T): T
}

/**
 * Creates a new mutex instance that is not locked initially.
 */
fun Mutex(locked: Boolean = false): Mutex

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*

val scope = MainScope()

// Basic mutex usage
val mutex = Mutex()
var counter = 0

repeat(100) {
    scope.launch {
        mutex.withLock {
            counter++  // Thread-safe increment
        }
    }
}

scope.launch {
    delay(1000)
    println("Final counter value: $counter")  // Will be 100
}

// Manual lock/unlock
val manualMutex = Mutex()
var sharedResource = 0

scope.launch {
    manualMutex.lock()
    try {
        sharedResource = expensiveComputation()
        delay(100)  // Simulate work while holding lock
    } finally {
        manualMutex.unlock()
    }
}

// Try lock (non-blocking)
scope.launch {
    if (manualMutex.tryLock()) {
        try {
            println("Got the lock!")
            sharedResource += 10
        } finally {
            manualMutex.unlock()
        }
    } else {
        println("Could not acquire lock")
    }
}

// Mutex with owner (for debugging)
class DataProcessor {
    private val mutex = Mutex()
    private var data = mutableListOf<String>()
    
    suspend fun addData(item: String) {
        mutex.withLock(owner = this) {
            data.add(item)
            println("Added: $item, size: ${data.size}")
        }
    }
    
    suspend fun processData(): List<String> {
        return mutex.withLock(owner = this) {
            val result = data.toList()
            data.clear()
            result
        }
    }
}

Semaphore - Counting Synchronization

A counting synchronization primitive that maintains a set of permits, allowing multiple coroutines to access a resource up to a specified limit.

/**
 * A counting semaphore for coroutines.
 * A semaphore has a number of permits. Each acquire takes a permit; each release adds a permit.
 */
interface Semaphore {
    /**
     * The number of permits currently available in this semaphore.
     */
    val availablePermits: Int
    
    /**
     * Acquires a permit from this semaphore, suspending until one is available.
     */
    suspend fun acquire()
    
    /**
     * Tries to acquire a permit from this semaphore without suspending.
     * Returns true if a permit was acquired, false otherwise.
     */
    fun tryAcquire(): Boolean
    
    /**
     * Releases a permit, returning it to the semaphore.
     */
    fun release()
    
    /**
     * Executes the given action, acquiring a permit before and releasing it after.
     */
    suspend fun <T> withPermit(action: suspend () -> T): T
}

/**
 * Creates a new semaphore instance.
 * @param permits the number of permits available in this semaphore
 * @param acquiredPermits the number of permits already acquired
 */
fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*

val scope = MainScope()

// Limit concurrent network requests
val networkSemaphore = Semaphore(3)  // Allow max 3 concurrent requests

suspend fun makeNetworkRequest(url: String): String {
    return networkSemaphore.withPermit {
        println("Making request to $url (available permits: ${networkSemaphore.availablePermits})")
        delay(1000)  // Simulate network call
        "Response from $url"
    }
}

scope.launch {
    // Launch 10 requests, but only 3 will run concurrently
    val requests = (1..10).map { i ->
        async {
            makeNetworkRequest("https://api.example.com/data$i")
        }
    }
    
    val responses = requests.awaitAll()
    println("All requests completed: ${responses.size}")
}

// Database connection pool simulation
class DatabasePool(maxConnections: Int) {
    private val semaphore = Semaphore(maxConnections)
    
    suspend fun <T> withConnection(block: suspend (Connection) -> T): T {
        return semaphore.withPermit {
            val connection = getConnection()
            try {
                block(connection)
            } finally {
                releaseConnection(connection)
            }
        }
    }
    
    private suspend fun getConnection(): Connection {
        println("Acquiring database connection")
        delay(50)  // Simulate connection setup
        return Connection()
    }
    
    private fun releaseConnection(connection: Connection) {
        println("Releasing database connection")
    }
}

class Connection {
    suspend fun query(sql: String): List<String> {
        delay(200)  // Simulate query execution
        return listOf("result1", "result2")
    }
}

// Usage
val dbPool = DatabasePool(maxConnections = 2)

scope.launch {
    repeat(5) { i ->
        launch {
            val results = dbPool.withConnection { conn ->
                conn.query("SELECT * FROM table$i")
            }
            println("Query $i results: $results")
        }
    }
}

// Manual acquire/release
val resourceSemaphore = Semaphore(2)

scope.launch {
    if (resourceSemaphore.tryAcquire()) {
        try {
            println("Got permit immediately, available: ${resourceSemaphore.availablePermits}")
            delay(1000)
        } finally {
            resourceSemaphore.release()
        }
    } else {
        println("No permits available")
        resourceSemaphore.acquire()  // Wait for permit
        try {
            println("Got permit after waiting, available: ${resourceSemaphore.availablePermits}")
            delay(1000)
        } finally {
            resourceSemaphore.release()
        }
    }
}

Advanced Synchronization Patterns

Complex synchronization scenarios using combinations of primitives.

Producer-Consumer with Bounded Buffer:

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*

class BoundedBuffer<T>(capacity: Int) {
    private val buffer = mutableListOf<T>()
    private val mutex = Mutex()
    private val notEmpty = Semaphore(0)  // Signals when buffer has items
    private val notFull = Semaphore(capacity)  // Signals when buffer has space
    
    suspend fun put(item: T) {
        notFull.acquire()  // Wait for space
        mutex.withLock {
            buffer.add(item)
            println("Produced: $item, buffer size: ${buffer.size}")
        }
        notEmpty.release()  // Signal that item is available
    }
    
    suspend fun take(): T {
        notEmpty.acquire()  // Wait for item
        val item = mutex.withLock {
            val result = buffer.removeAt(0)
            println("Consumed: $result, buffer size: ${buffer.size}")
            result
        }
        notFull.release()  // Signal that space is available
        return item
    }
}

val scope = MainScope()
val buffer = BoundedBuffer<String>(capacity = 3)

// Producers
repeat(2) { producerId ->
    scope.launch {
        repeat(5) { i ->
            buffer.put("Item-$producerId-$i")
            delay(100)
        }
    }
}

// Consumers
repeat(2) { consumerId ->
    scope.launch {
        repeat(5) { i ->
            val item = buffer.take()
            println("Consumer $consumerId got: $item")
            delay(200)
        }
    }
}

Reader-Writer Lock Pattern:

class ReadWriteMutex {
    private val readerCountMutex = Mutex()
    private val writerMutex = Mutex()
    private var readerCount = 0
    
    suspend fun <T> withReadLock(action: suspend () -> T): T {
        // Acquire reader access
        readerCountMutex.withLock {
            readerCount++
            if (readerCount == 1) {
                writerMutex.lock()  // First reader blocks writers
            }
        }
        
        try {
            return action()
        } finally {
            // Release reader access
            readerCountMutex.withLock {
                readerCount--
                if (readerCount == 0) {
                    writerMutex.unlock()  // Last reader unblocks writers
                }
            }
        }
    }
    
    suspend fun <T> withWriteLock(action: suspend () -> T): T {
        return writerMutex.withLock {
            action()
        }
    }
}

class SharedData {
    private val rwMutex = ReadWriteMutex()
    private var data = "Initial data"
    
    suspend fun read(): String {
        return rwMutex.withReadLock {
            println("Reading: $data on ${Thread.currentThread().name}")
            delay(100)  // Simulate read time
            data
        }
    }
    
    suspend fun write(newData: String) {
        rwMutex.withWriteLock {
            println("Writing: $newData on ${Thread.currentThread().name}")
            delay(200)  // Simulate write time
            data = newData
        }
    }
}

val sharedData = SharedData()

scope.launch {
    // Multiple readers can run concurrently
    repeat(5) { i ->
        launch {
            val value = sharedData.read()
            println("Reader $i got: $value")
        }
    }
    
    // Writer blocks all readers
    launch {
        delay(300)
        sharedData.write("Updated data")
    }
}

Rate Limiting Pattern:

class RateLimiter(
    permits: Int,
    private val periodMs: Long
) {
    private val semaphore = Semaphore(permits)
    
    suspend fun <T> execute(action: suspend () -> T): T {
        return semaphore.withPermit {
            // Schedule permit release after period
            scope.launch {
                delay(periodMs)
                // Permit is automatically released when withPermit block completes
            }
            action()
        }
    }
}

val rateLimiter = RateLimiter(permits = 3, periodMs = 1000)  // 3 requests per second

suspend fun apiCall(requestId: Int): String {
    return rateLimiter.execute {
        println("Making API call $requestId at ${System.currentTimeMillis()}")
        delay(100)  // Simulate API call
        "Response $requestId"
    }
}

scope.launch {
    // Make 10 API calls - they'll be rate limited
    repeat(10) { i ->
        launch {
            val response = apiCall(i)
            println("Got: $response")
        }
    }
}

Deadlock Prevention

Best practices for avoiding deadlocks when using multiple synchronization primitives.

// BAD: Potential deadlock
val mutex1 = Mutex()
val mutex2 = Mutex()

// Coroutine A
scope.launch {
    mutex1.withLock {
        delay(100)
        mutex2.withLock {
            println("A got both locks")
        }
    }
}

// Coroutine B
scope.launch {
    mutex2.withLock {
        delay(100)
        mutex1.withLock {  // Potential deadlock here
            println("B got both locks")
        }
    }
}

// GOOD: Consistent lock ordering
scope.launch {
    // Both coroutines acquire locks in same order
    mutex1.withLock {
        mutex2.withLock {
            println("A got both locks safely")
        }
    }
}

scope.launch {
    mutex1.withLock {
        mutex2.withLock {
            println("B got both locks safely")
        }
    }
}

// BETTER: Use timeout with tryLock
suspend fun safeDoublelock(action: suspend () -> Unit): Boolean {
    if (mutex1.tryLock()) {
        try {
            // Try to get second lock with timeout
            withTimeoutOrNull(1000) {
                mutex2.withLock {
                    action()
                }
            } ?: return false
            return true
        } finally {
            mutex1.unlock()
        }
    }
    return false
}

Performance Considerations

Tips for optimal performance with synchronization primitives.

// Minimize critical section size
val mutex = Mutex()
var counter = 0

// BAD: Long critical section
scope.launch {
    mutex.withLock {
        val data = expensiveOperation()  // Don't do expensive work in lock
        counter += data.size
        processData(data)  // This should be outside the lock
    }
}

// GOOD: Minimal critical section
scope.launch {
    val data = expensiveOperation()  // Do expensive work outside lock
    val size = data.size
    
    mutex.withLock {
        counter += size  // Only critical operation inside lock
    }
    
    processData(data)  // Non-critical work outside lock
}

// Use appropriate synchronization primitive
// For simple counters, consider atomic operations or channels instead of mutex
val atomicCounter = AtomicInteger(0)  // Better for simple counters

// For producer-consumer, consider channels instead of manual synchronization
val channel = Channel<String>(capacity = 10)  // Often simpler than semaphore+mutex

Testing Synchronization

Strategies for testing concurrent code with synchronization primitives.

import kotlinx.coroutines.test.*

@Test
fun testMutexSafety() = runTest {
    val mutex = Mutex()
    var counter = 0
    val jobs = mutableListOf<Job>()
    
    repeat(100) {
        val job = launch {
            mutex.withLock {
                counter++
            }
        }
        jobs.add(job)
    }
    
    jobs.joinAll()
    assertEquals(100, counter)
}

@Test
fun testSemaphoreLimit() = runTest {
    val semaphore = Semaphore(3)
    var concurrentCount = 0
    var maxConcurrent = 0
    val maxConcurrentMutex = Mutex()
    
    val jobs = (1..10).map {
        launch {
            semaphore.withPermit {
                val current = maxConcurrentMutex.withLock {
                    concurrentCount++
                    maxConcurrent = maxOf(maxConcurrent, concurrentCount)
                    concurrentCount
                }
                
                delay(100)  // Simulate work
                
                maxConcurrentMutex.withLock {
                    concurrentCount--
                }
            }
        }
    }
    
    jobs.joinAll()
    assertEquals(3, maxConcurrent)
}

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core

docs

channels.md

coroutine-builders.md

dispatchers.md

exception-handling.md

flow-api.md

index.md

jobs-deferreds.md

structured-concurrency.md

synchronization.md

tile.json