CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--atomicfu-jvm

AtomicFU JVM-specific artifact providing idiomatic and efficient atomic operations optimized for the JVM platform using AtomicXxxFieldUpdater or VarHandle.

Pending
Overview
Eval results
Files

thread-parking.mddocs/

Experimental Thread Parking

Low-level thread parking support for building advanced synchronization primitives, providing fine-grained control over thread blocking and unblocking operations.

Warning: This is an experimental API marked with @ExperimentalThreadBlockingApi. The APIs and semantics can change in the future and are considered low-level. Unless the goal is to create a synchronization primitive like a mutex or semaphore, it is advised to use higher-level concurrency APIs like kotlinx.coroutines.

Required Imports

import kotlinx.atomicfu.locks.*
import kotlin.time.Duration
import kotlin.time.TimeMark

Capabilities

ParkingSupport Object

Central API for thread parking operations with timeout and deadline support.

/**
 * Experimental thread parking support object.
 * Provides low-level thread blocking and unblocking operations.
 */
@ExperimentalThreadBlockingApi
object ParkingSupport {
    /**
     * Parks the current thread for the specified timeout duration.
     * Wakes up when: unpark is called, timeout expires, spurious wakeup, or thread interrupted (JVM only).
     */
    fun park(timeout: Duration)
    
    /**
     * Parks the current thread until the specified deadline is reached.
     * Wakes up when: unpark is called, deadline passes, spurious wakeup, or thread interrupted (JVM only).
     */
    fun parkUntil(deadline: TimeMark)
    
    /**
     * Unparks the thread corresponding to the given handle.
     * If called while thread is not parked, next park call returns immediately.
     */
    fun unpark(handle: ParkingHandle)
    
    /**
     * Returns the ParkingHandle for the current thread.
     * Each thread has a unique handle for unparking operations.
     */
    fun currentThreadHandle(): ParkingHandle
}

/**
 * Handle for unparking a specific thread.
 * On JVM, this is a typealias for Thread.
 */
@ExperimentalThreadBlockingApi
typealias ParkingHandle = Thread

Usage Examples:

import kotlinx.atomicfu.locks.*
import kotlinx.atomicfu.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlin.time.Duration.Companion.milliseconds

@OptIn(ExperimentalThreadBlockingApi::class)
class SimpleMutex {
    private val owner = atomic<ParkingHandle?>(null) 
    private val waiters = mutableListOf<ParkingHandle>()
    private val waitersLock = Any()
    
    fun lock() {
        val currentThread = ParkingSupport.currentThreadHandle()
        
        while (!owner.compareAndSet(null, currentThread)) {
            // Add to waiters list
            synchronized(waitersLock) {
                waiters.add(currentThread)
            }
            
            // Park until unlocked
            ParkingSupport.park(Duration.INFINITE)
            
            // Remove from waiters list after waking up
            synchronized(waitersLock) {
                waiters.remove(currentThread)
            }
        }
    }
    
    fun unlock() {
        val currentThread = ParkingSupport.currentThreadHandle()
        require(owner.compareAndSet(currentThread, null)) {
            "Cannot unlock mutex not owned by current thread"
        }
        
        // Unpark one waiting thread
        synchronized(waitersLock) {
            if (waiters.isNotEmpty()) {
                val waiter = waiters.removeFirst()
                ParkingSupport.unpark(waiter)
            }
        }
    }
    
    fun tryLock(): Boolean {
        val currentThread = ParkingSupport.currentThreadHandle()
        return owner.compareAndSet(null, currentThread)
    }
    
    inline fun <T> withLock(block: () -> T): T {
        lock()
        try {
            return block()
        } finally {
            unlock()
        }
    }
}

Parking with Timeout

Parking operations with configurable timeout support for responsive applications.

/**
 * Parks current thread for specified timeout duration.
 * @param timeout - Maximum time to park (Duration.INFINITE for indefinite parking)
 */
@ExperimentalThreadBlockingApi
fun park(timeout: Duration)

Usage Examples:

import kotlinx.atomicfu.locks.*
import kotlinx.atomicfu.*
import kotlin.time.Duration.Companion.seconds
import kotlin.time.Duration.Companion.milliseconds

@OptIn(ExperimentalThreadBlockingApi::class)
class TimedLatch {
    private val count = atomic(1)
    private val waiters = mutableListOf<ParkingHandle>()
    private val waitersLock = Any()
    
    fun await(timeout: Duration): Boolean {
        if (count.value == 0) return true
        
        val currentThread = ParkingSupport.currentThreadHandle()
        val startTime = System.currentTimeMillis()
        
        synchronized(waitersLock) {
            waiters.add(currentThread)
        }
        
        try {
            while (count.value > 0) {
                val elapsed = System.currentTimeMillis() - startTime
                val remaining = timeout.inWholeMilliseconds - elapsed
                
                if (remaining <= 0) {
                    return false // Timeout
                }
                
                ParkingSupport.park(remaining.milliseconds)
                
                // Check for spurious wakeup
                if (count.value == 0) return true
            }
            return true
        } finally {
            synchronized(waitersLock) {
                waiters.remove(currentThread)
            }
        }
    }
    
    fun countDown() {
        if (count.decrementAndGet() == 0) {
            // Unpark all waiters
            synchronized(waitersLock) {
                waiters.forEach { waiter ->
                    ParkingSupport.unpark(waiter)
                }
                waiters.clear()
            }
        }
    }
    
    fun getCount(): Int = count.value
}

// Usage example
@OptIn(ExperimentalThreadBlockingApi::class)
fun demonstrateTimedLatch() {
    val latch = TimedLatch()
    
    // Start a worker thread
    Thread {
        Thread.sleep(1000) // Simulate work
        latch.countDown()
    }.start()
    
    // Wait with timeout
    val success = latch.await(2.seconds)
    println("Latch completed: $success")
}

Parking until Deadline

Parking operations with absolute deadline support for time-based coordination.

/**
 * Parks current thread until specified deadline is reached.
 * @param deadline - Absolute time point to park until
 */
@ExperimentalThreadBlockingApi
fun parkUntil(deadline: TimeMark)

Usage Examples:

import kotlinx.atomicfu.locks.*
import kotlinx.atomicfu.*
import kotlin.time.*

@OptIn(ExperimentalThreadBlockingApi::class)
class ScheduledTask {
    private val isScheduled = atomic(false)
    private val scheduledTime = atomic<TimeMark?>(null)
    private val workerHandle = atomic<ParkingHandle?>(null)
    
    fun scheduleAt(deadline: TimeMark, task: () -> Unit) {
        require(isScheduled.compareAndSet(false, true)) {
            "Task already scheduled"
        }
        
        scheduledTime.value = deadline
        
        Thread {
            val currentThread = ParkingSupport.currentThreadHandle()
            workerHandle.value = currentThread
            
            val targetTime = scheduledTime.value
            if (targetTime != null && targetTime > TimeSource.Monotonic.markNow()) {
                ParkingSupport.parkUntil(targetTime)
            }
            
            // Execute task if still scheduled
            if (isScheduled.value) {
                try {
                    task()
                } finally {
                    isScheduled.value = false
                    workerHandle.value = null
                }
            }
        }.start()
    }
    
    fun cancel(): Boolean {
        if (isScheduled.compareAndSet(true, false)) {
            workerHandle.value?.let { handle ->
                ParkingSupport.unpark(handle)
            }
            return true
        }
        return false
    }
    
    fun isScheduled(): Boolean = isScheduled.value
}

// Usage example
@OptIn(ExperimentalThreadBlockingApi::class)
fun demonstrateScheduledTask() {
    val task = ScheduledTask()
    val futureTime = TimeSource.Monotonic.markNow() + 3.seconds
    
    task.scheduleAt(futureTime) {
        println("Scheduled task executed at ${TimeSource.Monotonic.markNow()}")
    }
    
    // Cancel after 1 second
    Thread.sleep(1000)
    val cancelled = task.cancel()
    println("Task cancelled: $cancelled")
}

Parking Handle Management

Managing thread handles for unparking operations and thread coordination.

/**
 * Returns the ParkingHandle for the current thread.
 * @returns Unique handle for the current thread
 */
@ExperimentalThreadBlockingApi
fun currentThreadHandle(): ParkingHandle

/**
 * Unparks the thread corresponding to the given handle.
 * @param handle - ParkingHandle of the thread to unpark
 */
@ExperimentalThreadBlockingApi
fun unpark(handle: ParkingHandle)

Usage Examples:

import kotlinx.atomicfu.locks.*
import kotlinx.atomicfu.*
import kotlin.time.Duration.Companion.seconds

@OptIn(ExperimentalThreadBlockingApi::class)
class WorkerPool(poolSize: Int) {
    private val workers = Array(poolSize) { WorkerThread(it) }
    private val tasks = mutableListOf<() -> Unit>()
    private val tasksLock = Any()
    
    private inner class WorkerThread(private val id: Int) {
        private val handle = atomic<ParkingHandle?>(null)
        private val isRunning = atomic(true)
        
        fun start() {
            Thread {
                val currentHandle = ParkingSupport.currentThreadHandle()
                handle.value = currentHandle
                
                while (isRunning.value) {
                    val task = synchronized(tasksLock) {
                        if (tasks.isNotEmpty()) tasks.removeFirst() else null
                    }
                    
                    if (task != null) {
                        try {
                            task()
                        } catch (e: Exception) {
                            println("Worker $id error: ${e.message}")
                        }
                    } else {
                        // No tasks available, park until work arrives
                        ParkingSupport.park(1.seconds)
                    }
                }
            }.apply {
                name = "Worker-$id"
                start()
            }
        }
        
        fun wakeUp() {
            handle.value?.let { ParkingSupport.unpark(it) }
        }
        
        fun stop() {
            isRunning.value = false
            wakeUp()
        }
    }
    
    fun submit(task: () -> Unit) {
        synchronized(tasksLock) {
            tasks.add(task)
        }
        
        // Wake up one worker
        workers.firstOrNull()?.wakeUp()
    }
    
    fun shutdown() {
        workers.forEach { it.stop() }
    }
    
    fun start() {
        workers.forEach { it.start() }
    }
}

// Usage example
@OptIn(ExperimentalThreadBlockingApi::class)
fun demonstrateWorkerPool() {
    val pool = WorkerPool(3)
    pool.start()
    
    // Submit some tasks
    repeat(10) { taskId ->
        pool.submit {
            println("Executing task $taskId on ${Thread.currentThread().name}")
            Thread.sleep(100) // Simulate work
        }
    }
    
    // Let tasks complete
    Thread.sleep(2000)
    pool.shutdown()
}

Advanced Patterns

Building a Semaphore

Using parking operations to build a counting semaphore:

import kotlinx.atomicfu.locks.*
import kotlinx.atomicfu.*
import kotlin.time.Duration

@OptIn(ExperimentalThreadBlockingApi::class)
class Semaphore(initialPermits: Int) {
    private val permits = atomic(initialPermits)
    private val waiters = mutableListOf<ParkingHandle>()
    private val waitersLock = Any()
    
    fun acquire(timeout: Duration = Duration.INFINITE): Boolean {
        while (true) {
            val currentPermits = permits.value
            if (currentPermits > 0 && permits.compareAndSet(currentPermits, currentPermits - 1)) {
                return true
            }
            
            if (timeout == Duration.ZERO) return false
            
            val currentThread = ParkingSupport.currentThreadHandle()
            synchronized(waitersLock) {
                waiters.add(currentThread)
            }
            
            try {
                ParkingSupport.park(timeout)
                
                // Try again after waking up
                val newPermits = permits.value
                if (newPermits > 0 && permits.compareAndSet(newPermits, newPermits - 1)) {
                    return true
                }
            } finally {
                synchronized(waitersLock) {
                    waiters.remove(currentThread)
                }
            }
        }
    }
    
    fun release() {
        permits.incrementAndGet()
        
        // Unpark one waiter
        synchronized(waitersLock) {
            if (waiters.isNotEmpty()) {
                val waiter = waiters.removeFirst()
                ParkingSupport.unpark(waiter)
            }
        }
    }
    
    fun availablePermits(): Int = permits.value
}

Types

/**
 * Duration from kotlin.time package representing time spans
 */
typealias Duration = kotlin.time.Duration

/**
 * TimeMark from kotlin.time package representing absolute time points  
 */
typealias TimeMark = kotlin.time.TimeMark

/**
 * Experimental annotation marking thread parking APIs
 */
@Retention(AnnotationRetention.BINARY)
@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY, AnnotationTarget.TYPEALIAS)
@RequiresOptIn(level = RequiresOptIn.Level.ERROR, message = "This API is experimental. It is low-level and might change in the future.")
annotation class ExperimentalThreadBlockingApi

Implementation Notes

Platform Behavior

  • JVM: Uses java.util.concurrent.locks.LockSupport under the hood
  • Thread Interruption: On JVM, interrupted threads wake up from parking, and the interrupted flag remains set
  • Spurious Wakeups: Park operations may wake up without explicit unpark calls
  • Pre-unpark: Calling unpark before park makes the next park return immediately

Performance Considerations

  • Parking operations have system call overhead
  • Use atomic operations when possible instead of parking-based synchronization
  • Consider using higher-level primitives like kotlinx.coroutines for most use cases
  • Parking is most useful for building custom synchronization primitives

Best Practices

  • Always check the condition after waking up from park (spurious wakeups)
  • Handle timeouts and interruptions appropriately
  • Use proper exception handling around park/unpark operations
  • Consider using existing high-level concurrency libraries before implementing custom primitives
  • Test thoroughly on target platforms as behavior may vary

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--atomicfu-jvm

docs

atomic-arrays.md

atomic-operations.md

index.md

locks.md

thread-parking.md

tracing.md

tile.json