CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-jvm

JVM-specific implementation of kotlinx.coroutines core library providing coroutine primitives, builders, dispatchers, and synchronization primitives for asynchronous programming in Kotlin.

Pending
Overview
Eval results
Files

synchronization.mddocs/

Synchronization Primitives

Thread-safe synchronization mechanisms designed for coroutines including mutexes, semaphores, and atomic operations with suspending behavior and cancellation support.

Capabilities

Mutex

Non-reentrant mutual exclusion for protecting critical sections in coroutines.

interface Mutex {
    /** True if mutex is currently locked */
    val isLocked: Boolean
    
    /** Suspends until lock is acquired */
    suspend fun lock(owner: Any? = null)
    
    /** Tries to acquire lock immediately without suspending */
    fun tryLock(owner: Any? = null): Boolean
    
    /** Releases the lock */
    fun unlock(owner: Any? = null)
    
    /** Checks if locked by specific owner */
    fun holdsLock(owner: Any): Boolean
}

/** Creates a new Mutex */
fun Mutex(locked: Boolean = false): Mutex

/** Executes block under mutex protection */
suspend inline fun <T> Mutex.withLock(
    owner: Any? = null,
    action: suspend () -> T
): T

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*

class Counter {
    private var count = 0
    private val mutex = Mutex()
    
    suspend fun increment() = mutex.withLock {
        count++
    }
    
    suspend fun decrement() = mutex.withLock {
        count--
    }
    
    suspend fun get(): Int = mutex.withLock {
        count
    }
}

fun main() = runBlocking {
    val counter = Counter()
    
    // Launch multiple coroutines that modify counter
    val jobs = List(100) {
        launch {
            repeat(100) {
                counter.increment()
            }
        }
    }
    
    jobs.forEach { it.join() }
    
    println("Final count: ${counter.get()}") // Should be 10000
    
    // Manual lock/unlock example
    val mutex = Mutex()
    
    launch {
        println("Acquiring lock...")
        mutex.lock()
        try {
            println("Critical section 1")
            delay(1000)
        } finally {
            mutex.unlock()
            println("Released lock")
        }
    }
    
    launch {
        delay(100)
        println("Trying to acquire lock...")
        mutex.lock()
        try {
            println("Critical section 2")
        } finally {
            mutex.unlock()
        }
    }
    
    delay(2000)
}

Semaphore

Counting semaphore for limiting the number of concurrent accesses to a resource.

interface Semaphore {
    /** Number of permits currently available */
    val availablePermits: Int
    
    /** Suspends until permit is acquired */
    suspend fun acquire()
    
    /** Tries to acquire permit immediately without suspending */
    fun tryAcquire(): Boolean
    
    /** Releases a permit */
    fun release()
}

/** Creates a new Semaphore */
fun Semaphore(
    permits: Int,
    acquiredPermits: Int = 0
): Semaphore

/** Executes block with acquired permit */
suspend inline fun <T> Semaphore.withPermit(action: suspend () -> T): T

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*

// Connection pool example
class ConnectionPool(maxConnections: Int) {
    private val semaphore = Semaphore(maxConnections)
    private var connectionId = 0
    
    suspend fun <T> useConnection(block: suspend (connectionId: Int) -> T): T {
        return semaphore.withPermit {
            val id = ++connectionId
            println("Acquired connection $id (${semaphore.availablePermits} remaining)")
            try {
                block(id)
            } finally {
                println("Released connection $id (${semaphore.availablePermits + 1} will be available)")
            }
        }
    }
}

fun main() = runBlocking {
    val connectionPool = ConnectionPool(maxConnections = 3)
    
    // Launch more coroutines than available connections
    val jobs = List(10) { taskId ->
        launch {
            connectionPool.useConnection { connectionId ->
                println("Task $taskId using connection $connectionId")
                delay((100..500).random().toLong())
                "Result from task $taskId"
            }
        }
    }
    
    jobs.forEach { it.join() }
    
    // Manual acquire/release example
    val semaphore = Semaphore(2)
    
    repeat(5) { i ->
        launch {
            println("Task $i waiting for permit...")
            semaphore.acquire()
            try {
                println("Task $i acquired permit (${semaphore.availablePermits} remaining)")
                delay(1000)
            } finally {
                semaphore.release()
                println("Task $i released permit")
            }
        }
    }
    
    delay(6000)
}

Atomic Operations

Thread-safe atomic operations and references.

/** Atomic reference with compare-and-swap operations */
class AtomicReference<V>(value: V) {
    var value: V
    fun getAndSet(newValue: V): V
    fun compareAndSet(expected: V, newValue: V): Boolean
    fun lazySet(newValue: V)
}

/** Atomic integer operations */
class AtomicInteger(value: Int = 0) {
    var value: Int
    fun getAndIncrement(): Int
    fun incrementAndGet(): Int
    fun getAndDecrement(): Int
    fun decrementAndGet(): Int
    fun getAndAdd(delta: Int): Int
    fun addAndGet(delta: Int): Int
    fun compareAndSet(expected: Int, newValue: Int): Boolean
}

/** Atomic long operations */
class AtomicLong(value: Long = 0L) {
    var value: Long
    fun getAndIncrement(): Long
    fun incrementAndGet(): Long
    fun getAndAdd(delta: Long): Long
    fun addAndGet(delta: Long): Long
    fun compareAndSet(expected: Long, newValue: Long): Boolean
}

/** Atomic boolean operations */  
class AtomicBoolean(value: Boolean = false) {
    var value: Boolean
    fun getAndSet(newValue: Boolean): Boolean
    fun compareAndSet(expected: Boolean, newValue: Boolean): Boolean
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.atomicfu.*

class AtomicCounter {
    private val count = atomic(0)
    
    suspend fun increment(): Int = count.incrementAndGet()
    suspend fun decrement(): Int = count.decrementAndGet()
    suspend fun get(): Int = count.value
    
    suspend fun addIfLessThan(delta: Int, limit: Int): Boolean {
        while (true) {
            val current = count.value
            if (current >= limit) return false
            if (count.compareAndSet(current, current + delta)) {
                return true
            }
        }
    }
}

class AtomicState<T>(initialValue: T) {
    private val state = atomic(initialValue)
    
    fun get(): T = state.value
    
    fun update(transform: (T) -> T): T {
        while (true) {
            val current = state.value
            val new = transform(current)
            if (state.compareAndSet(current, new)) {
                return new
            }
        }
    }
    
    fun compareAndSet(expected: T, newValue: T): Boolean {
        return state.compareAndSet(expected, newValue)
    }
}

fun main() = runBlocking {
    val counter = AtomicCounter()
    
    // Concurrent increment operations
    val jobs = List(1000) {
        launch {
            counter.increment()
        }
    }
    
    jobs.forEach { it.join() }
    println("Final count: ${counter.get()}")
    
    // Conditional atomic operations
    val success = counter.addIfLessThan(5, 1010)
    println("Add operation successful: $success")
    
    // Atomic state example
    data class UserState(val name: String, val count: Int)
    val userState = AtomicState(UserState("John", 0))
    
    repeat(10) {
        launch {
            userState.update { current ->
                current.copy(count = current.count + 1)
            }
        }
    }
    
    delay(100)
    println("User state: ${userState.get()}")
}

Channel-based Synchronization

Using channels for synchronization patterns.

/** Token-based synchronization using channels */
class TokenBucket(capacity: Int) {
    private val tokens = Channel<Unit>(capacity)
    
    init {
        // Fill with initial tokens
        repeat(capacity) {
            tokens.trySend(Unit)
        }
    }
    
    suspend fun acquire() {
        tokens.receive()
    }
    
    fun tryAcquire(): Boolean {
        return tokens.tryReceive().isSuccess
    }
    
    fun release() {
        tokens.trySend(Unit)
    }
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

// Rate limiting with token bucket
class RateLimiter(tokensPerSecond: Int) {
    private val bucket = TokenBucket(tokensPerSecond)
    
    init {
        // Refill tokens periodically
        GlobalScope.launch {
            while (true) {
                delay(1000L / tokensPerSecond)
                bucket.release()
            }
        }
    }
    
    suspend fun <T> execute(action: suspend () -> T): T {
        bucket.acquire()
        return action()
    }
}

// Barrier synchronization
class CyclicBarrier(private val parties: Int) {
    private val count = atomic(0)
    private val generation = atomic(0)
    private val channels = atomic<Channel<Unit>?>(null)
    
    suspend fun await() {
        val currentGeneration = generation.value
        val currentCount = count.getAndIncrement()
        
        if (currentCount == parties - 1) {
            // Last party - release all waiting parties
            val channel = channels.getAndSet(null)
            channel?.close()
            count.value = 0
            generation.incrementAndGet()
        } else {
            // Wait for other parties
            val channel = channels.value ?: Channel<Unit>().also { 
                channels.compareAndSet(null, it) 
            }
            
            // Wait until barrier is tripped or generation changes
            while (generation.value == currentGeneration && !channel.isClosedForReceive) {
                try {
                    channel.receive()
                } catch (e: ClosedReceiveChannelException) {
                    break
                }
            }
        }
    }
}

fun main() = runBlocking {
    // Rate limiter example
    val rateLimiter = RateLimiter(tokensPerSecond = 2)
    
    repeat(5) { i ->
        launch {
            rateLimiter.execute {
                println("Request $i processed at ${System.currentTimeMillis()}")
                delay(100)
            }
        }
    }
    
    delay(5000)
    
    // Barrier example
    val barrier = CyclicBarrier(3)
    
    repeat(3) { i ->
        launch {
            println("Task $i started")
            delay((100..300).random().toLong())
            println("Task $i waiting at barrier")
            barrier.await()
            println("Task $i passed barrier")
        }
    }
    
    delay(2000)
}

Select-based Synchronization

Using select expressions for complex synchronization patterns.

/** Select among multiple suspending operations */
suspend fun <R> select(builder: SelectBuilder<R>.() -> Unit): R

interface SelectBuilder<in R> {
    /** Select on channel receive */
    fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R)
    
    /** Select on channel send */
    fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R)
    
    /** Select on job completion */
    fun Job.onJoin(block: suspend () -> R)
    
    /** Select on deferred completion */
    fun <T> Deferred<T>.onAwait(block: suspend (T) -> R)
    
    /** Select with timeout */
    fun onTimeout(timeMillis: Long, block: suspend () -> R)
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*

suspend fun selectExample() = coroutineScope {
    val channel1 = Channel<String>()
    val channel2 = Channel<String>()
    
    launch {
        delay(100)
        channel1.send("Message from channel 1")
    }
    
    launch {
        delay(200)
        channel2.send("Message from channel 2")
    }
    
    // Select first available message
    val result = select<String> {
        channel1.onReceive { "Received from channel1: $it" }
        channel2.onReceive { "Received from channel2: $it" }
        onTimeout(300) { "Timeout occurred" }
    }
    
    println(result)
    
    channel1.close()
    channel2.close()
}

// Fan-in pattern with select
suspend fun fanIn(vararg channels: ReceiveChannel<String>): ReceiveChannel<String> = 
    produce {
        while (true) {
            val message = select<String?> {
                channels.forEach { channel ->
                    channel.onReceiveCatching { result ->
                        result.getOrNull()
                    }
                }
            }
            
            if (message != null) {
                send(message)
            } else {
                break // All channels closed
            }
        }
    }

fun main() = runBlocking {
    selectExample()
    
    // Fan-in example
    val producer1 = produce {
        repeat(3) { i ->
            send("Producer1-$i")
            delay(100)
        }
    }
    
    val producer2 = produce {
        repeat(3) { i ->
            send("Producer2-$i")
            delay(150)
        }
    }
    
    val combined = fanIn(producer1, producer2)
    
    for (message in combined) {
        println("Fan-in received: $message")
    }
}

Types

Synchronization Exceptions

Exceptions related to synchronization operations.

/** Exception thrown when trying to unlock mutex not owned by current thread */
class IllegalStateException : RuntimeException

/** Exception thrown on cancellation */
class CancellationException : IllegalStateException

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-jvm

docs

channels.md

coroutine-builders.md

dispatchers.md

exception-handling.md

flow-api.md

index.md

job-management.md

jvm-integration.md

synchronization.md

tile.json