JVM-specific implementation of kotlinx.coroutines core library providing coroutine primitives, builders, dispatchers, and synchronization primitives for asynchronous programming in Kotlin.
—
Thread-safe synchronization mechanisms designed for coroutines including mutexes, semaphores, and atomic operations with suspending behavior and cancellation support.
Non-reentrant mutual exclusion for protecting critical sections in coroutines.
interface Mutex {
/** True if mutex is currently locked */
val isLocked: Boolean
/** Suspends until lock is acquired */
suspend fun lock(owner: Any? = null)
/** Tries to acquire lock immediately without suspending */
fun tryLock(owner: Any? = null): Boolean
/** Releases the lock */
fun unlock(owner: Any? = null)
/** Checks if locked by specific owner */
fun holdsLock(owner: Any): Boolean
}
/** Creates a new Mutex */
fun Mutex(locked: Boolean = false): Mutex
/** Executes block under mutex protection */
suspend inline fun <T> Mutex.withLock(
owner: Any? = null,
action: suspend () -> T
): TUsage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
class Counter {
private var count = 0
private val mutex = Mutex()
suspend fun increment() = mutex.withLock {
count++
}
suspend fun decrement() = mutex.withLock {
count--
}
suspend fun get(): Int = mutex.withLock {
count
}
}
fun main() = runBlocking {
val counter = Counter()
// Launch multiple coroutines that modify counter
val jobs = List(100) {
launch {
repeat(100) {
counter.increment()
}
}
}
jobs.forEach { it.join() }
println("Final count: ${counter.get()}") // Should be 10000
// Manual lock/unlock example
val mutex = Mutex()
launch {
println("Acquiring lock...")
mutex.lock()
try {
println("Critical section 1")
delay(1000)
} finally {
mutex.unlock()
println("Released lock")
}
}
launch {
delay(100)
println("Trying to acquire lock...")
mutex.lock()
try {
println("Critical section 2")
} finally {
mutex.unlock()
}
}
delay(2000)
}Counting semaphore for limiting the number of concurrent accesses to a resource.
interface Semaphore {
/** Number of permits currently available */
val availablePermits: Int
/** Suspends until permit is acquired */
suspend fun acquire()
/** Tries to acquire permit immediately without suspending */
fun tryAcquire(): Boolean
/** Releases a permit */
fun release()
}
/** Creates a new Semaphore */
fun Semaphore(
permits: Int,
acquiredPermits: Int = 0
): Semaphore
/** Executes block with acquired permit */
suspend inline fun <T> Semaphore.withPermit(action: suspend () -> T): TUsage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
// Connection pool example
class ConnectionPool(maxConnections: Int) {
private val semaphore = Semaphore(maxConnections)
private var connectionId = 0
suspend fun <T> useConnection(block: suspend (connectionId: Int) -> T): T {
return semaphore.withPermit {
val id = ++connectionId
println("Acquired connection $id (${semaphore.availablePermits} remaining)")
try {
block(id)
} finally {
println("Released connection $id (${semaphore.availablePermits + 1} will be available)")
}
}
}
}
fun main() = runBlocking {
val connectionPool = ConnectionPool(maxConnections = 3)
// Launch more coroutines than available connections
val jobs = List(10) { taskId ->
launch {
connectionPool.useConnection { connectionId ->
println("Task $taskId using connection $connectionId")
delay((100..500).random().toLong())
"Result from task $taskId"
}
}
}
jobs.forEach { it.join() }
// Manual acquire/release example
val semaphore = Semaphore(2)
repeat(5) { i ->
launch {
println("Task $i waiting for permit...")
semaphore.acquire()
try {
println("Task $i acquired permit (${semaphore.availablePermits} remaining)")
delay(1000)
} finally {
semaphore.release()
println("Task $i released permit")
}
}
}
delay(6000)
}Thread-safe atomic operations and references.
/** Atomic reference with compare-and-swap operations */
class AtomicReference<V>(value: V) {
var value: V
fun getAndSet(newValue: V): V
fun compareAndSet(expected: V, newValue: V): Boolean
fun lazySet(newValue: V)
}
/** Atomic integer operations */
class AtomicInteger(value: Int = 0) {
var value: Int
fun getAndIncrement(): Int
fun incrementAndGet(): Int
fun getAndDecrement(): Int
fun decrementAndGet(): Int
fun getAndAdd(delta: Int): Int
fun addAndGet(delta: Int): Int
fun compareAndSet(expected: Int, newValue: Int): Boolean
}
/** Atomic long operations */
class AtomicLong(value: Long = 0L) {
var value: Long
fun getAndIncrement(): Long
fun incrementAndGet(): Long
fun getAndAdd(delta: Long): Long
fun addAndGet(delta: Long): Long
fun compareAndSet(expected: Long, newValue: Long): Boolean
}
/** Atomic boolean operations */
class AtomicBoolean(value: Boolean = false) {
var value: Boolean
fun getAndSet(newValue: Boolean): Boolean
fun compareAndSet(expected: Boolean, newValue: Boolean): Boolean
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.atomicfu.*
class AtomicCounter {
private val count = atomic(0)
suspend fun increment(): Int = count.incrementAndGet()
suspend fun decrement(): Int = count.decrementAndGet()
suspend fun get(): Int = count.value
suspend fun addIfLessThan(delta: Int, limit: Int): Boolean {
while (true) {
val current = count.value
if (current >= limit) return false
if (count.compareAndSet(current, current + delta)) {
return true
}
}
}
}
class AtomicState<T>(initialValue: T) {
private val state = atomic(initialValue)
fun get(): T = state.value
fun update(transform: (T) -> T): T {
while (true) {
val current = state.value
val new = transform(current)
if (state.compareAndSet(current, new)) {
return new
}
}
}
fun compareAndSet(expected: T, newValue: T): Boolean {
return state.compareAndSet(expected, newValue)
}
}
fun main() = runBlocking {
val counter = AtomicCounter()
// Concurrent increment operations
val jobs = List(1000) {
launch {
counter.increment()
}
}
jobs.forEach { it.join() }
println("Final count: ${counter.get()}")
// Conditional atomic operations
val success = counter.addIfLessThan(5, 1010)
println("Add operation successful: $success")
// Atomic state example
data class UserState(val name: String, val count: Int)
val userState = AtomicState(UserState("John", 0))
repeat(10) {
launch {
userState.update { current ->
current.copy(count = current.count + 1)
}
}
}
delay(100)
println("User state: ${userState.get()}")
}Using channels for synchronization patterns.
/** Token-based synchronization using channels */
class TokenBucket(capacity: Int) {
private val tokens = Channel<Unit>(capacity)
init {
// Fill with initial tokens
repeat(capacity) {
tokens.trySend(Unit)
}
}
suspend fun acquire() {
tokens.receive()
}
fun tryAcquire(): Boolean {
return tokens.tryReceive().isSuccess
}
fun release() {
tokens.trySend(Unit)
}
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
// Rate limiting with token bucket
class RateLimiter(tokensPerSecond: Int) {
private val bucket = TokenBucket(tokensPerSecond)
init {
// Refill tokens periodically
GlobalScope.launch {
while (true) {
delay(1000L / tokensPerSecond)
bucket.release()
}
}
}
suspend fun <T> execute(action: suspend () -> T): T {
bucket.acquire()
return action()
}
}
// Barrier synchronization
class CyclicBarrier(private val parties: Int) {
private val count = atomic(0)
private val generation = atomic(0)
private val channels = atomic<Channel<Unit>?>(null)
suspend fun await() {
val currentGeneration = generation.value
val currentCount = count.getAndIncrement()
if (currentCount == parties - 1) {
// Last party - release all waiting parties
val channel = channels.getAndSet(null)
channel?.close()
count.value = 0
generation.incrementAndGet()
} else {
// Wait for other parties
val channel = channels.value ?: Channel<Unit>().also {
channels.compareAndSet(null, it)
}
// Wait until barrier is tripped or generation changes
while (generation.value == currentGeneration && !channel.isClosedForReceive) {
try {
channel.receive()
} catch (e: ClosedReceiveChannelException) {
break
}
}
}
}
}
fun main() = runBlocking {
// Rate limiter example
val rateLimiter = RateLimiter(tokensPerSecond = 2)
repeat(5) { i ->
launch {
rateLimiter.execute {
println("Request $i processed at ${System.currentTimeMillis()}")
delay(100)
}
}
}
delay(5000)
// Barrier example
val barrier = CyclicBarrier(3)
repeat(3) { i ->
launch {
println("Task $i started")
delay((100..300).random().toLong())
println("Task $i waiting at barrier")
barrier.await()
println("Task $i passed barrier")
}
}
delay(2000)
}Using select expressions for complex synchronization patterns.
/** Select among multiple suspending operations */
suspend fun <R> select(builder: SelectBuilder<R>.() -> Unit): R
interface SelectBuilder<in R> {
/** Select on channel receive */
fun <E> ReceiveChannel<E>.onReceive(block: suspend (E) -> R)
/** Select on channel send */
fun <E> SendChannel<E>.onSend(element: E, block: suspend () -> R)
/** Select on job completion */
fun Job.onJoin(block: suspend () -> R)
/** Select on deferred completion */
fun <T> Deferred<T>.onAwait(block: suspend (T) -> R)
/** Select with timeout */
fun onTimeout(timeMillis: Long, block: suspend () -> R)
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
suspend fun selectExample() = coroutineScope {
val channel1 = Channel<String>()
val channel2 = Channel<String>()
launch {
delay(100)
channel1.send("Message from channel 1")
}
launch {
delay(200)
channel2.send("Message from channel 2")
}
// Select first available message
val result = select<String> {
channel1.onReceive { "Received from channel1: $it" }
channel2.onReceive { "Received from channel2: $it" }
onTimeout(300) { "Timeout occurred" }
}
println(result)
channel1.close()
channel2.close()
}
// Fan-in pattern with select
suspend fun fanIn(vararg channels: ReceiveChannel<String>): ReceiveChannel<String> =
produce {
while (true) {
val message = select<String?> {
channels.forEach { channel ->
channel.onReceiveCatching { result ->
result.getOrNull()
}
}
}
if (message != null) {
send(message)
} else {
break // All channels closed
}
}
}
fun main() = runBlocking {
selectExample()
// Fan-in example
val producer1 = produce {
repeat(3) { i ->
send("Producer1-$i")
delay(100)
}
}
val producer2 = produce {
repeat(3) { i ->
send("Producer2-$i")
delay(150)
}
}
val combined = fanIn(producer1, producer2)
for (message in combined) {
println("Fan-in received: $message")
}
}Exceptions related to synchronization operations.
/** Exception thrown when trying to unlock mutex not owned by current thread */
class IllegalStateException : RuntimeException
/** Exception thrown on cancellation */
class CancellationException : IllegalStateExceptionInstall with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-jvm