or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

atomic-arrays.mdatomic-operations.mdindex.mdlocks.mdthread-parking.mdtracing.md

thread-parking.mddocs/

0

# Experimental Thread Parking

1

2

Low-level thread parking support for building advanced synchronization primitives, providing fine-grained control over thread blocking and unblocking operations.

3

4

**Warning**: This is an experimental API marked with `@ExperimentalThreadBlockingApi`. The APIs and semantics can change in the future and are considered low-level. Unless the goal is to create a synchronization primitive like a mutex or semaphore, it is advised to use higher-level concurrency APIs like `kotlinx.coroutines`.

5

6

## Required Imports

7

8

```kotlin

9

import kotlinx.atomicfu.locks.*

10

import kotlin.time.Duration

11

import kotlin.time.TimeMark

12

```

13

14

## Capabilities

15

16

### ParkingSupport Object

17

18

Central API for thread parking operations with timeout and deadline support.

19

20

```kotlin { .api }

21

/**

22

* Experimental thread parking support object.

23

* Provides low-level thread blocking and unblocking operations.

24

*/

25

@ExperimentalThreadBlockingApi

26

object ParkingSupport {

27

/**

28

* Parks the current thread for the specified timeout duration.

29

* Wakes up when: unpark is called, timeout expires, spurious wakeup, or thread interrupted (JVM only).

30

*/

31

fun park(timeout: Duration)

32

33

/**

34

* Parks the current thread until the specified deadline is reached.

35

* Wakes up when: unpark is called, deadline passes, spurious wakeup, or thread interrupted (JVM only).

36

*/

37

fun parkUntil(deadline: TimeMark)

38

39

/**

40

* Unparks the thread corresponding to the given handle.

41

* If called while thread is not parked, next park call returns immediately.

42

*/

43

fun unpark(handle: ParkingHandle)

44

45

/**

46

* Returns the ParkingHandle for the current thread.

47

* Each thread has a unique handle for unparking operations.

48

*/

49

fun currentThreadHandle(): ParkingHandle

50

}

51

52

/**

53

* Handle for unparking a specific thread.

54

* On JVM, this is a typealias for Thread.

55

*/

56

@ExperimentalThreadBlockingApi

57

typealias ParkingHandle = Thread

58

```

59

60

**Usage Examples:**

61

62

```kotlin

63

import kotlinx.atomicfu.locks.*

64

import kotlinx.atomicfu.*

65

import kotlin.time.Duration

66

import kotlin.time.Duration.Companion.seconds

67

import kotlin.time.Duration.Companion.milliseconds

68

69

@OptIn(ExperimentalThreadBlockingApi::class)

70

class SimpleMutex {

71

private val owner = atomic<ParkingHandle?>(null)

72

private val waiters = mutableListOf<ParkingHandle>()

73

private val waitersLock = Any()

74

75

fun lock() {

76

val currentThread = ParkingSupport.currentThreadHandle()

77

78

while (!owner.compareAndSet(null, currentThread)) {

79

// Add to waiters list

80

synchronized(waitersLock) {

81

waiters.add(currentThread)

82

}

83

84

// Park until unlocked

85

ParkingSupport.park(Duration.INFINITE)

86

87

// Remove from waiters list after waking up

88

synchronized(waitersLock) {

89

waiters.remove(currentThread)

90

}

91

}

92

}

93

94

fun unlock() {

95

val currentThread = ParkingSupport.currentThreadHandle()

96

require(owner.compareAndSet(currentThread, null)) {

97

"Cannot unlock mutex not owned by current thread"

98

}

99

100

// Unpark one waiting thread

101

synchronized(waitersLock) {

102

if (waiters.isNotEmpty()) {

103

val waiter = waiters.removeFirst()

104

ParkingSupport.unpark(waiter)

105

}

106

}

107

}

108

109

fun tryLock(): Boolean {

110

val currentThread = ParkingSupport.currentThreadHandle()

111

return owner.compareAndSet(null, currentThread)

112

}

113

114

inline fun <T> withLock(block: () -> T): T {

115

lock()

116

try {

117

return block()

118

} finally {

119

unlock()

120

}

121

}

122

}

123

```

124

125

### Parking with Timeout

126

127

Parking operations with configurable timeout support for responsive applications.

128

129

```kotlin { .api }

130

/**

131

* Parks current thread for specified timeout duration.

132

* @param timeout - Maximum time to park (Duration.INFINITE for indefinite parking)

133

*/

134

@ExperimentalThreadBlockingApi

135

fun park(timeout: Duration)

136

```

137

138

**Usage Examples:**

139

140

```kotlin

141

import kotlinx.atomicfu.locks.*

142

import kotlinx.atomicfu.*

143

import kotlin.time.Duration.Companion.seconds

144

import kotlin.time.Duration.Companion.milliseconds

145

146

@OptIn(ExperimentalThreadBlockingApi::class)

147

class TimedLatch {

148

private val count = atomic(1)

149

private val waiters = mutableListOf<ParkingHandle>()

150

private val waitersLock = Any()

151

152

fun await(timeout: Duration): Boolean {

153

if (count.value == 0) return true

154

155

val currentThread = ParkingSupport.currentThreadHandle()

156

val startTime = System.currentTimeMillis()

157

158

synchronized(waitersLock) {

159

waiters.add(currentThread)

160

}

161

162

try {

163

while (count.value > 0) {

164

val elapsed = System.currentTimeMillis() - startTime

165

val remaining = timeout.inWholeMilliseconds - elapsed

166

167

if (remaining <= 0) {

168

return false // Timeout

169

}

170

171

ParkingSupport.park(remaining.milliseconds)

172

173

// Check for spurious wakeup

174

if (count.value == 0) return true

175

}

176

return true

177

} finally {

178

synchronized(waitersLock) {

179

waiters.remove(currentThread)

180

}

181

}

182

}

183

184

fun countDown() {

185

if (count.decrementAndGet() == 0) {

186

// Unpark all waiters

187

synchronized(waitersLock) {

188

waiters.forEach { waiter ->

189

ParkingSupport.unpark(waiter)

190

}

191

waiters.clear()

192

}

193

}

194

}

195

196

fun getCount(): Int = count.value

197

}

198

199

// Usage example

200

@OptIn(ExperimentalThreadBlockingApi::class)

201

fun demonstrateTimedLatch() {

202

val latch = TimedLatch()

203

204

// Start a worker thread

205

Thread {

206

Thread.sleep(1000) // Simulate work

207

latch.countDown()

208

}.start()

209

210

// Wait with timeout

211

val success = latch.await(2.seconds)

212

println("Latch completed: $success")

213

}

214

```

215

216

### Parking until Deadline

217

218

Parking operations with absolute deadline support for time-based coordination.

219

220

```kotlin { .api }

221

/**

222

* Parks current thread until specified deadline is reached.

223

* @param deadline - Absolute time point to park until

224

*/

225

@ExperimentalThreadBlockingApi

226

fun parkUntil(deadline: TimeMark)

227

```

228

229

**Usage Examples:**

230

231

```kotlin

232

import kotlinx.atomicfu.locks.*

233

import kotlinx.atomicfu.*

234

import kotlin.time.*

235

236

@OptIn(ExperimentalThreadBlockingApi::class)

237

class ScheduledTask {

238

private val isScheduled = atomic(false)

239

private val scheduledTime = atomic<TimeMark?>(null)

240

private val workerHandle = atomic<ParkingHandle?>(null)

241

242

fun scheduleAt(deadline: TimeMark, task: () -> Unit) {

243

require(isScheduled.compareAndSet(false, true)) {

244

"Task already scheduled"

245

}

246

247

scheduledTime.value = deadline

248

249

Thread {

250

val currentThread = ParkingSupport.currentThreadHandle()

251

workerHandle.value = currentThread

252

253

val targetTime = scheduledTime.value

254

if (targetTime != null && targetTime > TimeSource.Monotonic.markNow()) {

255

ParkingSupport.parkUntil(targetTime)

256

}

257

258

// Execute task if still scheduled

259

if (isScheduled.value) {

260

try {

261

task()

262

} finally {

263

isScheduled.value = false

264

workerHandle.value = null

265

}

266

}

267

}.start()

268

}

269

270

fun cancel(): Boolean {

271

if (isScheduled.compareAndSet(true, false)) {

272

workerHandle.value?.let { handle ->

273

ParkingSupport.unpark(handle)

274

}

275

return true

276

}

277

return false

278

}

279

280

fun isScheduled(): Boolean = isScheduled.value

281

}

282

283

// Usage example

284

@OptIn(ExperimentalThreadBlockingApi::class)

285

fun demonstrateScheduledTask() {

286

val task = ScheduledTask()

287

val futureTime = TimeSource.Monotonic.markNow() + 3.seconds

288

289

task.scheduleAt(futureTime) {

290

println("Scheduled task executed at ${TimeSource.Monotonic.markNow()}")

291

}

292

293

// Cancel after 1 second

294

Thread.sleep(1000)

295

val cancelled = task.cancel()

296

println("Task cancelled: $cancelled")

297

}

298

```

299

300

### Parking Handle Management

301

302

Managing thread handles for unparking operations and thread coordination.

303

304

```kotlin { .api }

305

/**

306

* Returns the ParkingHandle for the current thread.

307

* @returns Unique handle for the current thread

308

*/

309

@ExperimentalThreadBlockingApi

310

fun currentThreadHandle(): ParkingHandle

311

312

/**

313

* Unparks the thread corresponding to the given handle.

314

* @param handle - ParkingHandle of the thread to unpark

315

*/

316

@ExperimentalThreadBlockingApi

317

fun unpark(handle: ParkingHandle)

318

```

319

320

**Usage Examples:**

321

322

```kotlin

323

import kotlinx.atomicfu.locks.*

324

import kotlinx.atomicfu.*

325

import kotlin.time.Duration.Companion.seconds

326

327

@OptIn(ExperimentalThreadBlockingApi::class)

328

class WorkerPool(poolSize: Int) {

329

private val workers = Array(poolSize) { WorkerThread(it) }

330

private val tasks = mutableListOf<() -> Unit>()

331

private val tasksLock = Any()

332

333

private inner class WorkerThread(private val id: Int) {

334

private val handle = atomic<ParkingHandle?>(null)

335

private val isRunning = atomic(true)

336

337

fun start() {

338

Thread {

339

val currentHandle = ParkingSupport.currentThreadHandle()

340

handle.value = currentHandle

341

342

while (isRunning.value) {

343

val task = synchronized(tasksLock) {

344

if (tasks.isNotEmpty()) tasks.removeFirst() else null

345

}

346

347

if (task != null) {

348

try {

349

task()

350

} catch (e: Exception) {

351

println("Worker $id error: ${e.message}")

352

}

353

} else {

354

// No tasks available, park until work arrives

355

ParkingSupport.park(1.seconds)

356

}

357

}

358

}.apply {

359

name = "Worker-$id"

360

start()

361

}

362

}

363

364

fun wakeUp() {

365

handle.value?.let { ParkingSupport.unpark(it) }

366

}

367

368

fun stop() {

369

isRunning.value = false

370

wakeUp()

371

}

372

}

373

374

fun submit(task: () -> Unit) {

375

synchronized(tasksLock) {

376

tasks.add(task)

377

}

378

379

// Wake up one worker

380

workers.firstOrNull()?.wakeUp()

381

}

382

383

fun shutdown() {

384

workers.forEach { it.stop() }

385

}

386

387

fun start() {

388

workers.forEach { it.start() }

389

}

390

}

391

392

// Usage example

393

@OptIn(ExperimentalThreadBlockingApi::class)

394

fun demonstrateWorkerPool() {

395

val pool = WorkerPool(3)

396

pool.start()

397

398

// Submit some tasks

399

repeat(10) { taskId ->

400

pool.submit {

401

println("Executing task $taskId on ${Thread.currentThread().name}")

402

Thread.sleep(100) // Simulate work

403

}

404

}

405

406

// Let tasks complete

407

Thread.sleep(2000)

408

pool.shutdown()

409

}

410

```

411

412

## Advanced Patterns

413

414

### Building a Semaphore

415

416

Using parking operations to build a counting semaphore:

417

418

```kotlin

419

import kotlinx.atomicfu.locks.*

420

import kotlinx.atomicfu.*

421

import kotlin.time.Duration

422

423

@OptIn(ExperimentalThreadBlockingApi::class)

424

class Semaphore(initialPermits: Int) {

425

private val permits = atomic(initialPermits)

426

private val waiters = mutableListOf<ParkingHandle>()

427

private val waitersLock = Any()

428

429

fun acquire(timeout: Duration = Duration.INFINITE): Boolean {

430

while (true) {

431

val currentPermits = permits.value

432

if (currentPermits > 0 && permits.compareAndSet(currentPermits, currentPermits - 1)) {

433

return true

434

}

435

436

if (timeout == Duration.ZERO) return false

437

438

val currentThread = ParkingSupport.currentThreadHandle()

439

synchronized(waitersLock) {

440

waiters.add(currentThread)

441

}

442

443

try {

444

ParkingSupport.park(timeout)

445

446

// Try again after waking up

447

val newPermits = permits.value

448

if (newPermits > 0 && permits.compareAndSet(newPermits, newPermits - 1)) {

449

return true

450

}

451

} finally {

452

synchronized(waitersLock) {

453

waiters.remove(currentThread)

454

}

455

}

456

}

457

}

458

459

fun release() {

460

permits.incrementAndGet()

461

462

// Unpark one waiter

463

synchronized(waitersLock) {

464

if (waiters.isNotEmpty()) {

465

val waiter = waiters.removeFirst()

466

ParkingSupport.unpark(waiter)

467

}

468

}

469

}

470

471

fun availablePermits(): Int = permits.value

472

}

473

```

474

475

## Types

476

477

```kotlin { .api }

478

/**

479

* Duration from kotlin.time package representing time spans

480

*/

481

typealias Duration = kotlin.time.Duration

482

483

/**

484

* TimeMark from kotlin.time package representing absolute time points

485

*/

486

typealias TimeMark = kotlin.time.TimeMark

487

488

/**

489

* Experimental annotation marking thread parking APIs

490

*/

491

@Retention(AnnotationRetention.BINARY)

492

@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY, AnnotationTarget.TYPEALIAS)

493

@RequiresOptIn(level = RequiresOptIn.Level.ERROR, message = "This API is experimental. It is low-level and might change in the future.")

494

annotation class ExperimentalThreadBlockingApi

495

```

496

497

## Implementation Notes

498

499

### Platform Behavior

500

501

- **JVM**: Uses `java.util.concurrent.locks.LockSupport` under the hood

502

- **Thread Interruption**: On JVM, interrupted threads wake up from parking, and the interrupted flag remains set

503

- **Spurious Wakeups**: Park operations may wake up without explicit unpark calls

504

- **Pre-unpark**: Calling unpark before park makes the next park return immediately

505

506

### Performance Considerations

507

508

- Parking operations have system call overhead

509

- Use atomic operations when possible instead of parking-based synchronization

510

- Consider using higher-level primitives like `kotlinx.coroutines` for most use cases

511

- Parking is most useful for building custom synchronization primitives

512

513

### Best Practices

514

515

- Always check the condition after waking up from park (spurious wakeups)

516

- Handle timeouts and interruptions appropriately

517

- Use proper exception handling around park/unpark operations

518

- Consider using existing high-level concurrency libraries before implementing custom primitives

519

- Test thoroughly on target platforms as behavior may vary