AtomicFU JVM-specific artifact providing idiomatic and efficient atomic operations optimized for the JVM platform using AtomicXxxFieldUpdater or VarHandle.
—
Low-level thread parking support for building advanced synchronization primitives, providing fine-grained control over thread blocking and unblocking operations.
Warning: This is an experimental API marked with @ExperimentalThreadBlockingApi. The APIs and semantics can change in the future and are considered low-level. Unless the goal is to create a synchronization primitive like a mutex or semaphore, it is advised to use higher-level concurrency APIs like kotlinx.coroutines.
import kotlinx.atomicfu.locks.*
import kotlin.time.Duration
import kotlin.time.TimeMarkCentral API for thread parking operations with timeout and deadline support.
/**
* Experimental thread parking support object.
* Provides low-level thread blocking and unblocking operations.
*/
@ExperimentalThreadBlockingApi
object ParkingSupport {
/**
* Parks the current thread for the specified timeout duration.
* Wakes up when: unpark is called, timeout expires, spurious wakeup, or thread interrupted (JVM only).
*/
fun park(timeout: Duration)
/**
* Parks the current thread until the specified deadline is reached.
* Wakes up when: unpark is called, deadline passes, spurious wakeup, or thread interrupted (JVM only).
*/
fun parkUntil(deadline: TimeMark)
/**
* Unparks the thread corresponding to the given handle.
* If called while thread is not parked, next park call returns immediately.
*/
fun unpark(handle: ParkingHandle)
/**
* Returns the ParkingHandle for the current thread.
* Each thread has a unique handle for unparking operations.
*/
fun currentThreadHandle(): ParkingHandle
}
/**
* Handle for unparking a specific thread.
* On JVM, this is a typealias for Thread.
*/
@ExperimentalThreadBlockingApi
typealias ParkingHandle = ThreadUsage Examples:
import kotlinx.atomicfu.locks.*
import kotlinx.atomicfu.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.Duration.Companion.milliseconds
@OptIn(ExperimentalThreadBlockingApi::class)
class SimpleMutex {
private val owner = atomic<ParkingHandle?>(null)
private val waiters = mutableListOf<ParkingHandle>()
private val waitersLock = Any()
fun lock() {
val currentThread = ParkingSupport.currentThreadHandle()
while (!owner.compareAndSet(null, currentThread)) {
// Add to waiters list
synchronized(waitersLock) {
waiters.add(currentThread)
}
// Park until unlocked
ParkingSupport.park(Duration.INFINITE)
// Remove from waiters list after waking up
synchronized(waitersLock) {
waiters.remove(currentThread)
}
}
}
fun unlock() {
val currentThread = ParkingSupport.currentThreadHandle()
require(owner.compareAndSet(currentThread, null)) {
"Cannot unlock mutex not owned by current thread"
}
// Unpark one waiting thread
synchronized(waitersLock) {
if (waiters.isNotEmpty()) {
val waiter = waiters.removeFirst()
ParkingSupport.unpark(waiter)
}
}
}
fun tryLock(): Boolean {
val currentThread = ParkingSupport.currentThreadHandle()
return owner.compareAndSet(null, currentThread)
}
inline fun <T> withLock(block: () -> T): T {
lock()
try {
return block()
} finally {
unlock()
}
}
}Parking operations with configurable timeout support for responsive applications.
/**
* Parks current thread for specified timeout duration.
* @param timeout - Maximum time to park (Duration.INFINITE for indefinite parking)
*/
@ExperimentalThreadBlockingApi
fun park(timeout: Duration)Usage Examples:
import kotlinx.atomicfu.locks.*
import kotlinx.atomicfu.*
import kotlin.time.Duration.Companion.seconds
import kotlin.time.Duration.Companion.milliseconds
@OptIn(ExperimentalThreadBlockingApi::class)
class TimedLatch {
private val count = atomic(1)
private val waiters = mutableListOf<ParkingHandle>()
private val waitersLock = Any()
fun await(timeout: Duration): Boolean {
if (count.value == 0) return true
val currentThread = ParkingSupport.currentThreadHandle()
val startTime = System.currentTimeMillis()
synchronized(waitersLock) {
waiters.add(currentThread)
}
try {
while (count.value > 0) {
val elapsed = System.currentTimeMillis() - startTime
val remaining = timeout.inWholeMilliseconds - elapsed
if (remaining <= 0) {
return false // Timeout
}
ParkingSupport.park(remaining.milliseconds)
// Check for spurious wakeup
if (count.value == 0) return true
}
return true
} finally {
synchronized(waitersLock) {
waiters.remove(currentThread)
}
}
}
fun countDown() {
if (count.decrementAndGet() == 0) {
// Unpark all waiters
synchronized(waitersLock) {
waiters.forEach { waiter ->
ParkingSupport.unpark(waiter)
}
waiters.clear()
}
}
}
fun getCount(): Int = count.value
}
// Usage example
@OptIn(ExperimentalThreadBlockingApi::class)
fun demonstrateTimedLatch() {
val latch = TimedLatch()
// Start a worker thread
Thread {
Thread.sleep(1000) // Simulate work
latch.countDown()
}.start()
// Wait with timeout
val success = latch.await(2.seconds)
println("Latch completed: $success")
}Parking operations with absolute deadline support for time-based coordination.
/**
* Parks current thread until specified deadline is reached.
* @param deadline - Absolute time point to park until
*/
@ExperimentalThreadBlockingApi
fun parkUntil(deadline: TimeMark)Usage Examples:
import kotlinx.atomicfu.locks.*
import kotlinx.atomicfu.*
import kotlin.time.*
@OptIn(ExperimentalThreadBlockingApi::class)
class ScheduledTask {
private val isScheduled = atomic(false)
private val scheduledTime = atomic<TimeMark?>(null)
private val workerHandle = atomic<ParkingHandle?>(null)
fun scheduleAt(deadline: TimeMark, task: () -> Unit) {
require(isScheduled.compareAndSet(false, true)) {
"Task already scheduled"
}
scheduledTime.value = deadline
Thread {
val currentThread = ParkingSupport.currentThreadHandle()
workerHandle.value = currentThread
val targetTime = scheduledTime.value
if (targetTime != null && targetTime > TimeSource.Monotonic.markNow()) {
ParkingSupport.parkUntil(targetTime)
}
// Execute task if still scheduled
if (isScheduled.value) {
try {
task()
} finally {
isScheduled.value = false
workerHandle.value = null
}
}
}.start()
}
fun cancel(): Boolean {
if (isScheduled.compareAndSet(true, false)) {
workerHandle.value?.let { handle ->
ParkingSupport.unpark(handle)
}
return true
}
return false
}
fun isScheduled(): Boolean = isScheduled.value
}
// Usage example
@OptIn(ExperimentalThreadBlockingApi::class)
fun demonstrateScheduledTask() {
val task = ScheduledTask()
val futureTime = TimeSource.Monotonic.markNow() + 3.seconds
task.scheduleAt(futureTime) {
println("Scheduled task executed at ${TimeSource.Monotonic.markNow()}")
}
// Cancel after 1 second
Thread.sleep(1000)
val cancelled = task.cancel()
println("Task cancelled: $cancelled")
}Managing thread handles for unparking operations and thread coordination.
/**
* Returns the ParkingHandle for the current thread.
* @returns Unique handle for the current thread
*/
@ExperimentalThreadBlockingApi
fun currentThreadHandle(): ParkingHandle
/**
* Unparks the thread corresponding to the given handle.
* @param handle - ParkingHandle of the thread to unpark
*/
@ExperimentalThreadBlockingApi
fun unpark(handle: ParkingHandle)Usage Examples:
import kotlinx.atomicfu.locks.*
import kotlinx.atomicfu.*
import kotlin.time.Duration.Companion.seconds
@OptIn(ExperimentalThreadBlockingApi::class)
class WorkerPool(poolSize: Int) {
private val workers = Array(poolSize) { WorkerThread(it) }
private val tasks = mutableListOf<() -> Unit>()
private val tasksLock = Any()
private inner class WorkerThread(private val id: Int) {
private val handle = atomic<ParkingHandle?>(null)
private val isRunning = atomic(true)
fun start() {
Thread {
val currentHandle = ParkingSupport.currentThreadHandle()
handle.value = currentHandle
while (isRunning.value) {
val task = synchronized(tasksLock) {
if (tasks.isNotEmpty()) tasks.removeFirst() else null
}
if (task != null) {
try {
task()
} catch (e: Exception) {
println("Worker $id error: ${e.message}")
}
} else {
// No tasks available, park until work arrives
ParkingSupport.park(1.seconds)
}
}
}.apply {
name = "Worker-$id"
start()
}
}
fun wakeUp() {
handle.value?.let { ParkingSupport.unpark(it) }
}
fun stop() {
isRunning.value = false
wakeUp()
}
}
fun submit(task: () -> Unit) {
synchronized(tasksLock) {
tasks.add(task)
}
// Wake up one worker
workers.firstOrNull()?.wakeUp()
}
fun shutdown() {
workers.forEach { it.stop() }
}
fun start() {
workers.forEach { it.start() }
}
}
// Usage example
@OptIn(ExperimentalThreadBlockingApi::class)
fun demonstrateWorkerPool() {
val pool = WorkerPool(3)
pool.start()
// Submit some tasks
repeat(10) { taskId ->
pool.submit {
println("Executing task $taskId on ${Thread.currentThread().name}")
Thread.sleep(100) // Simulate work
}
}
// Let tasks complete
Thread.sleep(2000)
pool.shutdown()
}Using parking operations to build a counting semaphore:
import kotlinx.atomicfu.locks.*
import kotlinx.atomicfu.*
import kotlin.time.Duration
@OptIn(ExperimentalThreadBlockingApi::class)
class Semaphore(initialPermits: Int) {
private val permits = atomic(initialPermits)
private val waiters = mutableListOf<ParkingHandle>()
private val waitersLock = Any()
fun acquire(timeout: Duration = Duration.INFINITE): Boolean {
while (true) {
val currentPermits = permits.value
if (currentPermits > 0 && permits.compareAndSet(currentPermits, currentPermits - 1)) {
return true
}
if (timeout == Duration.ZERO) return false
val currentThread = ParkingSupport.currentThreadHandle()
synchronized(waitersLock) {
waiters.add(currentThread)
}
try {
ParkingSupport.park(timeout)
// Try again after waking up
val newPermits = permits.value
if (newPermits > 0 && permits.compareAndSet(newPermits, newPermits - 1)) {
return true
}
} finally {
synchronized(waitersLock) {
waiters.remove(currentThread)
}
}
}
}
fun release() {
permits.incrementAndGet()
// Unpark one waiter
synchronized(waitersLock) {
if (waiters.isNotEmpty()) {
val waiter = waiters.removeFirst()
ParkingSupport.unpark(waiter)
}
}
}
fun availablePermits(): Int = permits.value
}/**
* Duration from kotlin.time package representing time spans
*/
typealias Duration = kotlin.time.Duration
/**
* TimeMark from kotlin.time package representing absolute time points
*/
typealias TimeMark = kotlin.time.TimeMark
/**
* Experimental annotation marking thread parking APIs
*/
@Retention(AnnotationRetention.BINARY)
@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY, AnnotationTarget.TYPEALIAS)
@RequiresOptIn(level = RequiresOptIn.Level.ERROR, message = "This API is experimental. It is low-level and might change in the future.")
annotation class ExperimentalThreadBlockingApijava.util.concurrent.locks.LockSupport under the hoodkotlinx.coroutines for most use casesInstall with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--atomicfu-jvm