0
# Experimental Thread Parking
1
2
Low-level thread parking support for building advanced synchronization primitives, providing fine-grained control over thread blocking and unblocking operations.
3
4
**Warning**: This is an experimental API marked with `@ExperimentalThreadBlockingApi`. The APIs and semantics can change in the future and are considered low-level. Unless the goal is to create a synchronization primitive like a mutex or semaphore, it is advised to use higher-level concurrency APIs like `kotlinx.coroutines`.
5
6
## Required Imports
7
8
```kotlin
9
import kotlinx.atomicfu.locks.*
10
import kotlin.time.Duration
11
import kotlin.time.TimeMark
12
```
13
14
## Capabilities
15
16
### ParkingSupport Object
17
18
Central API for thread parking operations with timeout and deadline support.
19
20
```kotlin { .api }
21
/**
22
* Experimental thread parking support object.
23
* Provides low-level thread blocking and unblocking operations.
24
*/
25
@ExperimentalThreadBlockingApi
26
object ParkingSupport {
27
/**
28
* Parks the current thread for the specified timeout duration.
29
* Wakes up when: unpark is called, timeout expires, spurious wakeup, or thread interrupted (JVM only).
30
*/
31
fun park(timeout: Duration)
32
33
/**
34
* Parks the current thread until the specified deadline is reached.
35
* Wakes up when: unpark is called, deadline passes, spurious wakeup, or thread interrupted (JVM only).
36
*/
37
fun parkUntil(deadline: TimeMark)
38
39
/**
40
* Unparks the thread corresponding to the given handle.
41
* If called while thread is not parked, next park call returns immediately.
42
*/
43
fun unpark(handle: ParkingHandle)
44
45
/**
46
* Returns the ParkingHandle for the current thread.
47
* Each thread has a unique handle for unparking operations.
48
*/
49
fun currentThreadHandle(): ParkingHandle
50
}
51
52
/**
53
* Handle for unparking a specific thread.
54
* On JVM, this is a typealias for Thread.
55
*/
56
@ExperimentalThreadBlockingApi
57
typealias ParkingHandle = Thread
58
```
59
60
**Usage Examples:**
61
62
```kotlin
63
import kotlinx.atomicfu.locks.*
64
import kotlinx.atomicfu.*
65
import kotlin.time.Duration
66
import kotlin.time.Duration.Companion.seconds
67
import kotlin.time.Duration.Companion.milliseconds
68
69
@OptIn(ExperimentalThreadBlockingApi::class)
70
class SimpleMutex {
71
private val owner = atomic<ParkingHandle?>(null)
72
private val waiters = mutableListOf<ParkingHandle>()
73
private val waitersLock = Any()
74
75
fun lock() {
76
val currentThread = ParkingSupport.currentThreadHandle()
77
78
while (!owner.compareAndSet(null, currentThread)) {
79
// Add to waiters list
80
synchronized(waitersLock) {
81
waiters.add(currentThread)
82
}
83
84
// Park until unlocked
85
ParkingSupport.park(Duration.INFINITE)
86
87
// Remove from waiters list after waking up
88
synchronized(waitersLock) {
89
waiters.remove(currentThread)
90
}
91
}
92
}
93
94
fun unlock() {
95
val currentThread = ParkingSupport.currentThreadHandle()
96
require(owner.compareAndSet(currentThread, null)) {
97
"Cannot unlock mutex not owned by current thread"
98
}
99
100
// Unpark one waiting thread
101
synchronized(waitersLock) {
102
if (waiters.isNotEmpty()) {
103
val waiter = waiters.removeFirst()
104
ParkingSupport.unpark(waiter)
105
}
106
}
107
}
108
109
fun tryLock(): Boolean {
110
val currentThread = ParkingSupport.currentThreadHandle()
111
return owner.compareAndSet(null, currentThread)
112
}
113
114
inline fun <T> withLock(block: () -> T): T {
115
lock()
116
try {
117
return block()
118
} finally {
119
unlock()
120
}
121
}
122
}
123
```
124
125
### Parking with Timeout
126
127
Parking operations with configurable timeout support for responsive applications.
128
129
```kotlin { .api }
130
/**
131
* Parks current thread for specified timeout duration.
132
* @param timeout - Maximum time to park (Duration.INFINITE for indefinite parking)
133
*/
134
@ExperimentalThreadBlockingApi
135
fun park(timeout: Duration)
136
```
137
138
**Usage Examples:**
139
140
```kotlin
141
import kotlinx.atomicfu.locks.*
142
import kotlinx.atomicfu.*
143
import kotlin.time.Duration.Companion.seconds
144
import kotlin.time.Duration.Companion.milliseconds
145
146
@OptIn(ExperimentalThreadBlockingApi::class)
147
class TimedLatch {
148
private val count = atomic(1)
149
private val waiters = mutableListOf<ParkingHandle>()
150
private val waitersLock = Any()
151
152
fun await(timeout: Duration): Boolean {
153
if (count.value == 0) return true
154
155
val currentThread = ParkingSupport.currentThreadHandle()
156
val startTime = System.currentTimeMillis()
157
158
synchronized(waitersLock) {
159
waiters.add(currentThread)
160
}
161
162
try {
163
while (count.value > 0) {
164
val elapsed = System.currentTimeMillis() - startTime
165
val remaining = timeout.inWholeMilliseconds - elapsed
166
167
if (remaining <= 0) {
168
return false // Timeout
169
}
170
171
ParkingSupport.park(remaining.milliseconds)
172
173
// Check for spurious wakeup
174
if (count.value == 0) return true
175
}
176
return true
177
} finally {
178
synchronized(waitersLock) {
179
waiters.remove(currentThread)
180
}
181
}
182
}
183
184
fun countDown() {
185
if (count.decrementAndGet() == 0) {
186
// Unpark all waiters
187
synchronized(waitersLock) {
188
waiters.forEach { waiter ->
189
ParkingSupport.unpark(waiter)
190
}
191
waiters.clear()
192
}
193
}
194
}
195
196
fun getCount(): Int = count.value
197
}
198
199
// Usage example
200
@OptIn(ExperimentalThreadBlockingApi::class)
201
fun demonstrateTimedLatch() {
202
val latch = TimedLatch()
203
204
// Start a worker thread
205
Thread {
206
Thread.sleep(1000) // Simulate work
207
latch.countDown()
208
}.start()
209
210
// Wait with timeout
211
val success = latch.await(2.seconds)
212
println("Latch completed: $success")
213
}
214
```
215
216
### Parking until Deadline
217
218
Parking operations with absolute deadline support for time-based coordination.
219
220
```kotlin { .api }
221
/**
222
* Parks current thread until specified deadline is reached.
223
* @param deadline - Absolute time point to park until
224
*/
225
@ExperimentalThreadBlockingApi
226
fun parkUntil(deadline: TimeMark)
227
```
228
229
**Usage Examples:**
230
231
```kotlin
232
import kotlinx.atomicfu.locks.*
233
import kotlinx.atomicfu.*
234
import kotlin.time.*
235
236
@OptIn(ExperimentalThreadBlockingApi::class)
237
class ScheduledTask {
238
private val isScheduled = atomic(false)
239
private val scheduledTime = atomic<TimeMark?>(null)
240
private val workerHandle = atomic<ParkingHandle?>(null)
241
242
fun scheduleAt(deadline: TimeMark, task: () -> Unit) {
243
require(isScheduled.compareAndSet(false, true)) {
244
"Task already scheduled"
245
}
246
247
scheduledTime.value = deadline
248
249
Thread {
250
val currentThread = ParkingSupport.currentThreadHandle()
251
workerHandle.value = currentThread
252
253
val targetTime = scheduledTime.value
254
if (targetTime != null && targetTime > TimeSource.Monotonic.markNow()) {
255
ParkingSupport.parkUntil(targetTime)
256
}
257
258
// Execute task if still scheduled
259
if (isScheduled.value) {
260
try {
261
task()
262
} finally {
263
isScheduled.value = false
264
workerHandle.value = null
265
}
266
}
267
}.start()
268
}
269
270
fun cancel(): Boolean {
271
if (isScheduled.compareAndSet(true, false)) {
272
workerHandle.value?.let { handle ->
273
ParkingSupport.unpark(handle)
274
}
275
return true
276
}
277
return false
278
}
279
280
fun isScheduled(): Boolean = isScheduled.value
281
}
282
283
// Usage example
284
@OptIn(ExperimentalThreadBlockingApi::class)
285
fun demonstrateScheduledTask() {
286
val task = ScheduledTask()
287
val futureTime = TimeSource.Monotonic.markNow() + 3.seconds
288
289
task.scheduleAt(futureTime) {
290
println("Scheduled task executed at ${TimeSource.Monotonic.markNow()}")
291
}
292
293
// Cancel after 1 second
294
Thread.sleep(1000)
295
val cancelled = task.cancel()
296
println("Task cancelled: $cancelled")
297
}
298
```
299
300
### Parking Handle Management
301
302
Managing thread handles for unparking operations and thread coordination.
303
304
```kotlin { .api }
305
/**
306
* Returns the ParkingHandle for the current thread.
307
* @returns Unique handle for the current thread
308
*/
309
@ExperimentalThreadBlockingApi
310
fun currentThreadHandle(): ParkingHandle
311
312
/**
313
* Unparks the thread corresponding to the given handle.
314
* @param handle - ParkingHandle of the thread to unpark
315
*/
316
@ExperimentalThreadBlockingApi
317
fun unpark(handle: ParkingHandle)
318
```
319
320
**Usage Examples:**
321
322
```kotlin
323
import kotlinx.atomicfu.locks.*
324
import kotlinx.atomicfu.*
325
import kotlin.time.Duration.Companion.seconds
326
327
@OptIn(ExperimentalThreadBlockingApi::class)
328
class WorkerPool(poolSize: Int) {
329
private val workers = Array(poolSize) { WorkerThread(it) }
330
private val tasks = mutableListOf<() -> Unit>()
331
private val tasksLock = Any()
332
333
private inner class WorkerThread(private val id: Int) {
334
private val handle = atomic<ParkingHandle?>(null)
335
private val isRunning = atomic(true)
336
337
fun start() {
338
Thread {
339
val currentHandle = ParkingSupport.currentThreadHandle()
340
handle.value = currentHandle
341
342
while (isRunning.value) {
343
val task = synchronized(tasksLock) {
344
if (tasks.isNotEmpty()) tasks.removeFirst() else null
345
}
346
347
if (task != null) {
348
try {
349
task()
350
} catch (e: Exception) {
351
println("Worker $id error: ${e.message}")
352
}
353
} else {
354
// No tasks available, park until work arrives
355
ParkingSupport.park(1.seconds)
356
}
357
}
358
}.apply {
359
name = "Worker-$id"
360
start()
361
}
362
}
363
364
fun wakeUp() {
365
handle.value?.let { ParkingSupport.unpark(it) }
366
}
367
368
fun stop() {
369
isRunning.value = false
370
wakeUp()
371
}
372
}
373
374
fun submit(task: () -> Unit) {
375
synchronized(tasksLock) {
376
tasks.add(task)
377
}
378
379
// Wake up one worker
380
workers.firstOrNull()?.wakeUp()
381
}
382
383
fun shutdown() {
384
workers.forEach { it.stop() }
385
}
386
387
fun start() {
388
workers.forEach { it.start() }
389
}
390
}
391
392
// Usage example
393
@OptIn(ExperimentalThreadBlockingApi::class)
394
fun demonstrateWorkerPool() {
395
val pool = WorkerPool(3)
396
pool.start()
397
398
// Submit some tasks
399
repeat(10) { taskId ->
400
pool.submit {
401
println("Executing task $taskId on ${Thread.currentThread().name}")
402
Thread.sleep(100) // Simulate work
403
}
404
}
405
406
// Let tasks complete
407
Thread.sleep(2000)
408
pool.shutdown()
409
}
410
```
411
412
## Advanced Patterns
413
414
### Building a Semaphore
415
416
Using parking operations to build a counting semaphore:
417
418
```kotlin
419
import kotlinx.atomicfu.locks.*
420
import kotlinx.atomicfu.*
421
import kotlin.time.Duration
422
423
@OptIn(ExperimentalThreadBlockingApi::class)
424
class Semaphore(initialPermits: Int) {
425
private val permits = atomic(initialPermits)
426
private val waiters = mutableListOf<ParkingHandle>()
427
private val waitersLock = Any()
428
429
fun acquire(timeout: Duration = Duration.INFINITE): Boolean {
430
while (true) {
431
val currentPermits = permits.value
432
if (currentPermits > 0 && permits.compareAndSet(currentPermits, currentPermits - 1)) {
433
return true
434
}
435
436
if (timeout == Duration.ZERO) return false
437
438
val currentThread = ParkingSupport.currentThreadHandle()
439
synchronized(waitersLock) {
440
waiters.add(currentThread)
441
}
442
443
try {
444
ParkingSupport.park(timeout)
445
446
// Try again after waking up
447
val newPermits = permits.value
448
if (newPermits > 0 && permits.compareAndSet(newPermits, newPermits - 1)) {
449
return true
450
}
451
} finally {
452
synchronized(waitersLock) {
453
waiters.remove(currentThread)
454
}
455
}
456
}
457
}
458
459
fun release() {
460
permits.incrementAndGet()
461
462
// Unpark one waiter
463
synchronized(waitersLock) {
464
if (waiters.isNotEmpty()) {
465
val waiter = waiters.removeFirst()
466
ParkingSupport.unpark(waiter)
467
}
468
}
469
}
470
471
fun availablePermits(): Int = permits.value
472
}
473
```
474
475
## Types
476
477
```kotlin { .api }
478
/**
479
* Duration from kotlin.time package representing time spans
480
*/
481
typealias Duration = kotlin.time.Duration
482
483
/**
484
* TimeMark from kotlin.time package representing absolute time points
485
*/
486
typealias TimeMark = kotlin.time.TimeMark
487
488
/**
489
* Experimental annotation marking thread parking APIs
490
*/
491
@Retention(AnnotationRetention.BINARY)
492
@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY, AnnotationTarget.TYPEALIAS)
493
@RequiresOptIn(level = RequiresOptIn.Level.ERROR, message = "This API is experimental. It is low-level and might change in the future.")
494
annotation class ExperimentalThreadBlockingApi
495
```
496
497
## Implementation Notes
498
499
### Platform Behavior
500
501
- **JVM**: Uses `java.util.concurrent.locks.LockSupport` under the hood
502
- **Thread Interruption**: On JVM, interrupted threads wake up from parking, and the interrupted flag remains set
503
- **Spurious Wakeups**: Park operations may wake up without explicit unpark calls
504
- **Pre-unpark**: Calling unpark before park makes the next park return immediately
505
506
### Performance Considerations
507
508
- Parking operations have system call overhead
509
- Use atomic operations when possible instead of parking-based synchronization
510
- Consider using higher-level primitives like `kotlinx.coroutines` for most use cases
511
- Parking is most useful for building custom synchronization primitives
512
513
### Best Practices
514
515
- Always check the condition after waking up from park (spurious wakeups)
516
- Handle timeouts and interruptions appropriately
517
- Use proper exception handling around park/unpark operations
518
- Consider using existing high-level concurrency libraries before implementing custom primitives
519
- Test thoroughly on target platforms as behavior may vary