or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdcoroutine-builders.mddispatchers.mdexception-handling.mdflow-api.mdindex.mdjobs-deferreds.mdstructured-concurrency.mdsynchronization.md

structured-concurrency.mddocs/

0

# Structured Concurrency Functions

1

2

Scoping functions and cancellation management for coordinated lifecycle management. These functions provide structured patterns for coroutine execution and ensure proper resource cleanup and cancellation propagation.

3

4

## Capabilities

5

6

### Coroutine Scope Functions

7

8

Functions that create new scopes with specific lifecycle and error handling behaviors.

9

10

```kotlin { .api }

11

/**

12

* Creates a new coroutineScope that does not complete until all launched children complete.

13

* Cancellation or failure of any child cancels the scope and all other children.

14

*/

15

suspend fun <T> coroutineScope(block: suspend CoroutineScope.() -> T): T

16

17

/**

18

* Creates a new supervisorScope that does not complete until all launched children complete.

19

* Unlike coroutineScope, failure of a child does not cancel other children.

20

*/

21

suspend fun <T> supervisorScope(block: suspend CoroutineScope.() -> T): T

22

23

/**

24

* Calls the specified suspending block with a given coroutine context,

25

* suspends until it completes, and returns the result.

26

*/

27

suspend fun <T> withContext(

28

context: CoroutineContext,

29

block: suspend CoroutineScope.() -> T

30

): T

31

```

32

33

**Usage Examples:**

34

35

```kotlin

36

import kotlinx.coroutines.*

37

38

val scope = MainScope()

39

40

scope.launch {

41

// coroutineScope - all children must complete successfully

42

try {

43

val result = coroutineScope {

44

val task1 = async { computeValue1() }

45

val task2 = async { computeValue2() }

46

val task3 = async { computeValue3() }

47

48

// If any task fails, all are cancelled

49

listOf(task1.await(), task2.await(), task3.await())

50

}

51

println("All tasks completed: $result")

52

} catch (e: Exception) {

53

println("Some task failed: ${e.message}")

54

}

55

}

56

57

scope.launch {

58

// supervisorScope - child failures are independent

59

val results = supervisorScope {

60

val task1 = async {

61

delay(100)

62

"Task 1 success"

63

}

64

val task2 = async {

65

delay(200)

66

throw RuntimeException("Task 2 failed")

67

}

68

val task3 = async {

69

delay(300)

70

"Task 3 success"

71

}

72

73

// Collect results, handling failures individually

74

listOf(

75

try { task1.await() } catch (e: Exception) { "Task 1 failed: ${e.message}" },

76

try { task2.await() } catch (e: Exception) { "Task 2 failed: ${e.message}" },

77

try { task3.await() } catch (e: Exception) { "Task 3 failed: ${e.message}" }

78

)

79

}

80

println("Supervisor results: $results")

81

// Output: [Task 1 success, Task 2 failed: Task 2 failed, Task 3 success]

82

}

83

84

// withContext for context switching

85

scope.launch {

86

println("Starting on: ${Thread.currentThread().name}")

87

88

val result = withContext(Dispatchers.Default + CoroutineName("DataProcessor")) {

89

println("Processing on: ${Thread.currentThread().name}")

90

println("Coroutine name: ${coroutineContext[CoroutineName]?.name}")

91

92

val data = processLargeDataset()

93

data.summary

94

}

95

96

println("Back on: ${Thread.currentThread().name}")

97

displayResult(result)

98

}

99

```

100

101

### Timeout Functions

102

103

Functions for executing operations with time limits and cancellation.

104

105

```kotlin { .api }

106

/**

107

* Runs a given suspending block of code inside a coroutine with a specified timeout

108

* and throws TimeoutCancellationException if the timeout was exceeded.

109

*/

110

suspend fun <T> withTimeout(timeoutMillis: Long, block: suspend CoroutineScope.() -> T): T

111

112

/**

113

* Runs a given suspending block of code inside a coroutine with a specified timeout

114

* and returns null if the timeout was exceeded.

115

*/

116

suspend fun <T> withTimeoutOrNull(timeoutMillis: Long, block: suspend CoroutineScope.() -> T): T?

117

118

/**

119

* Exception thrown by withTimeout when the timeout is exceeded.

120

*/

121

class TimeoutCancellationException(

122

message: String?,

123

coroutine: Job

124

) : CancellationException(message)

125

```

126

127

**Usage Examples:**

128

129

```kotlin

130

import kotlinx.coroutines.*

131

132

val scope = MainScope()

133

134

scope.launch {

135

// withTimeout throws exception on timeout

136

try {

137

val result = withTimeout(1000) {

138

delay(500) // Completes within timeout

139

"Operation completed"

140

}

141

println("Result: $result")

142

} catch (e: TimeoutCancellationException) {

143

println("Operation timed out")

144

}

145

}

146

147

scope.launch {

148

// withTimeoutOrNull returns null on timeout

149

val result = withTimeoutOrNull(500) {

150

delay(1000) // Exceeds timeout

151

"This won't complete"

152

}

153

154

if (result != null) {

155

println("Result: $result")

156

} else {

157

println("Operation timed out, using default value")

158

handleTimeout()

159

}

160

}

161

162

// Network request with timeout

163

suspend fun fetchDataWithTimeout(url: String): String? {

164

return withTimeoutOrNull(5000) { // 5 second timeout

165

// Simulate network request

166

delay(kotlin.random.Random.nextLong(1000, 10000))

167

if (kotlin.random.Random.nextBoolean()) {

168

"Data from $url"

169

} else {

170

throw RuntimeException("Network error")

171

}

172

}

173

}

174

175

scope.launch {

176

val data = fetchDataWithTimeout("https://api.example.com/data")

177

when (data) {

178

null -> println("Request timed out")

179

else -> println("Received: $data")

180

}

181

}

182

183

// Timeout with custom handling

184

scope.launch {

185

try {

186

val result = withTimeout(2000) {

187

val task1 = async { longRunningTask1() }

188

val task2 = async { longRunningTask2() }

189

190

// Both tasks must complete within timeout

191

Pair(task1.await(), task2.await())

192

}

193

println("Both tasks completed: $result")

194

} catch (e: TimeoutCancellationException) {

195

println("Tasks timed out, cleaning up...")

196

cleanup()

197

}

198

}

199

```

200

201

### Cancellation Functions

202

203

Functions for cooperative cancellation and cancellation checking.

204

205

```kotlin { .api }

206

/**

207

* Yields the thread (or thread pool) of the current coroutine dispatcher

208

* to other coroutines on the same dispatcher to run if possible.

209

*/

210

suspend fun yield()

211

212

/**

213

* Suspends the current coroutine until it is cancelled.

214

* This function never returns normally.

215

*/

216

suspend fun awaitCancellation(): Nothing

217

218

/**

219

* Throws CancellationException if the context is cancelled.

220

*/

221

fun CoroutineContext.ensureActive()

222

223

/**

224

* Throws CancellationException if the current Job is cancelled.

225

*/

226

fun ensureActive()

227

```

228

229

**Usage Examples:**

230

231

```kotlin

232

import kotlinx.coroutines.*

233

234

val scope = MainScope()

235

236

// yield() for cooperative multitasking

237

scope.launch {

238

repeat(1000) { i ->

239

if (i % 100 == 0) {

240

yield() // Give other coroutines a chance to run

241

}

242

performWork(i)

243

}

244

}

245

246

// ensureActive() for cancellation checking

247

suspend fun processLargeDataset(data: List<String>): List<String> {

248

val results = mutableListOf<String>()

249

250

for ((index, item) in data.withIndex()) {

251

// Check for cancellation periodically

252

if (index % 1000 == 0) {

253

ensureActive() // Throws CancellationException if cancelled

254

}

255

256

results.add(processItem(item))

257

}

258

259

return results

260

}

261

262

val job = scope.launch {

263

try {

264

val largeDataset = generateLargeDataset()

265

val results = processLargeDataset(largeDataset)

266

println("Processing completed: ${results.size} items")

267

} catch (e: CancellationException) {

268

println("Processing was cancelled")

269

throw e // Re-throw to maintain cancellation semantics

270

}

271

}

272

273

// Cancel after 5 seconds

274

scope.launch {

275

delay(5000)

276

job.cancel()

277

}

278

279

// awaitCancellation for services

280

class BackgroundService {

281

private val serviceScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)

282

283

fun start() {

284

serviceScope.launch {

285

try {

286

while (true) {

287

performPeriodicTask()

288

delay(60000) // Wait 1 minute

289

}

290

} catch (e: CancellationException) {

291

println("Service is shutting down")

292

throw e

293

}

294

}

295

296

// Lifecycle management

297

serviceScope.launch {

298

try {

299

awaitCancellation() // Suspend until cancelled

300

} finally {

301

cleanupResources()

302

}

303

}

304

}

305

306

fun stop() {

307

serviceScope.cancel()

308

}

309

}

310

311

// Context cancellation checking

312

suspend fun robustOperation() {

313

coroutineContext.ensureActive() // Check at start

314

315

performSetup()

316

317

coroutineContext.ensureActive() // Check before expensive operation

318

319

val result = expensiveComputation()

320

321

coroutineContext.ensureActive() // Check before final step

322

323

saveResults(result)

324

}

325

```

326

327

### Resource Management

328

329

Patterns for managing resources with structured concurrency and proper cleanup.

330

331

```kotlin { .api }

332

/**

333

* Ensure proper resource cleanup using try-finally or use patterns.

334

*/

335

suspend inline fun <T : Closeable, R> T.use(block: (T) -> R): R

336

```

337

338

**Usage Examples:**

339

340

```kotlin

341

import kotlinx.coroutines.*

342

import java.io.Closeable

343

344

val scope = MainScope()

345

346

// Resource management with coroutines

347

class DatabaseConnection : Closeable {

348

private var isOpen = true

349

350

fun query(sql: String): List<String> {

351

check(isOpen) { "Connection is closed" }

352

// Simulate database query

353

return listOf("result1", "result2")

354

}

355

356

override fun close() {

357

println("Closing database connection")

358

isOpen = false

359

}

360

}

361

362

scope.launch {

363

// Automatic resource cleanup

364

DatabaseConnection().use { connection ->

365

coroutineScope {

366

val query1 = async { connection.query("SELECT * FROM users") }

367

val query2 = async { connection.query("SELECT * FROM orders") }

368

369

val results = listOf(query1.await(), query2.await())

370

println("Query results: $results")

371

}

372

// Connection automatically closed even if coroutines are cancelled

373

}

374

}

375

376

// Manual resource management with cancellation handling

377

class ResourceManager {

378

private val resources = mutableListOf<Closeable>()

379

private val resourceMutex = Mutex()

380

381

suspend fun <T : Closeable> manage(resource: T): T {

382

resourceMutex.withLock {

383

resources.add(resource)

384

}

385

return resource

386

}

387

388

suspend fun cleanup() {

389

resourceMutex.withLock {

390

resources.reversed().forEach { resource ->

391

try {

392

resource.close()

393

} catch (e: Exception) {

394

println("Error closing resource: ${e.message}")

395

}

396

}

397

resources.clear()

398

}

399

}

400

}

401

402

scope.launch {

403

val resourceManager = ResourceManager()

404

405

try {

406

coroutineScope {

407

val connection1 = resourceManager.manage(DatabaseConnection())

408

val connection2 = resourceManager.manage(DatabaseConnection())

409

410

launch {

411

val results1 = connection1.query("SELECT 1")

412

println("Connection 1 results: $results1")

413

}

414

415

launch {

416

val results2 = connection2.query("SELECT 2")

417

println("Connection 2 results: $results2")

418

}

419

}

420

} finally {

421

resourceManager.cleanup()

422

}

423

}

424

```

425

426

### Error Boundary Patterns

427

428

Implement error boundaries to contain failures within specific scopes.

429

430

```kotlin

431

class ErrorBoundary {

432

suspend fun <T> withBoundary(

433

name: String,

434

onError: suspend (Throwable) -> T,

435

block: suspend CoroutineScope.() -> T

436

): T {

437

return try {

438

supervisorScope {

439

block()

440

}

441

} catch (e: Exception) {

442

println("Error in boundary '$name': ${e.message}")

443

onError(e)

444

}

445

}

446

}

447

448

val errorBoundary = ErrorBoundary()

449

450

scope.launch {

451

val result = errorBoundary.withBoundary(

452

name = "DataProcessing",

453

onError = { "Default value due to error" }

454

) {

455

val task1 = async { riskyOperation1() }

456

val task2 = async { riskyOperation2() }

457

458

"${task1.await()} + ${task2.await()}"

459

}

460

461

println("Final result: $result")

462

}

463

464

// Hierarchical error boundaries

465

class ServiceLayer {

466

private val errorBoundary = ErrorBoundary()

467

468

suspend fun processUserRequest(userId: String): UserResponse {

469

return errorBoundary.withBoundary(

470

name = "UserRequest-$userId",

471

onError = { UserResponse.error("Service temporarily unavailable") }

472

) {

473

val userData = async { fetchUserData(userId) }

474

val userPrefs = async { fetchUserPreferences(userId) }

475

val userPosts = async { fetchUserPosts(userId) }

476

477

UserResponse.success(

478

user = userData.await(),

479

preferences = userPrefs.await(),

480

posts = userPosts.await()

481

)

482

}

483

}

484

}

485

```

486

487

### Structured Concurrency Best Practices

488

489

Guidelines for effective use of structured concurrency patterns.

490

491

```kotlin

492

// GOOD: Proper nesting and error handling

493

suspend fun processData(): ProcessingResult {

494

return coroutineScope {

495

val validationResult = async { validateInput() }

496

497

if (!validationResult.await().isValid) {

498

return@coroutineScope ProcessingResult.Invalid

499

}

500

501

supervisorScope {

502

val processing1 = async { processChunk1() }

503

val processing2 = async { processChunk2() }

504

val processing3 = async { processChunk3() }

505

506

// Collect all results, handling individual failures

507

val results = listOf(processing1, processing2, processing3)

508

.map { deferred ->

509

try {

510

deferred.await()

511

} catch (e: Exception) {

512

ChunkResult.Failed(e.message ?: "Unknown error")

513

}

514

}

515

516

ProcessingResult.Completed(results)

517

}

518

}

519

}

520

521

// BAD: Mixing structured and unstructured concurrency

522

suspend fun badExample() {

523

// Don't do this - mixing structured and unstructured

524

val job = GlobalScope.launch { // Unstructured

525

coroutineScope { // Structured inside unstructured

526

// This breaks structured concurrency guarantees

527

}

528

}

529

}

530

531

// GOOD: Consistent structured approach

532

suspend fun goodExample() {

533

coroutineScope {

534

val backgroundTask = async(Dispatchers.Default) {

535

longRunningComputation()

536

}

537

538

val uiTask = async(Dispatchers.Main) {

539

updateUserInterface()

540

}

541

542

// Both tasks are properly structured

543

backgroundTask.await()

544

uiTask.await()

545

}

546

}

547

548

// Exception handling best practices

549

suspend fun robustDataProcessing(): Result<String> {

550

return try {

551

coroutineScope {

552

val data = async { fetchData() }

553

val processed = async { processData(data.await()) }

554

val validated = async { validateResult(processed.await()) }

555

556

Result.Success(validated.await())

557

}

558

} catch (e: CancellationException) {

559

// Always re-throw cancellation

560

throw e

561

} catch (e: Exception) {

562

// Handle other exceptions

563

Result.Failure(e.message ?: "Processing failed")

564

}

565

}

566

567

sealed class Result<T> {

568

data class Success<T>(val value: T) : Result<T>()

569

data class Failure<T>(val error: String) : Result<T>()

570

}

571

```

572

573

### Testing Structured Concurrency

574

575

Approaches for testing structured concurrency patterns.

576

577

```kotlin

578

import kotlinx.coroutines.test.*

579

580

@Test

581

fun testStructuredConcurrency() = runTest {

582

var cleanupCalled = false

583

584

try {

585

coroutineScope {

586

launch {

587

try {

588

delay(1000)

589

fail("Should have been cancelled")

590

} finally {

591

cleanupCalled = true

592

}

593

}

594

595

launch {

596

delay(500)

597

throw RuntimeException("Simulated failure")

598

}

599

}

600

} catch (e: RuntimeException) {

601

assertEquals("Simulated failure", e.message)

602

}

603

604

// Verify cleanup was called due to structured cancellation

605

assertTrue(cleanupCalled)

606

}

607

608

@Test

609

fun testSupervisorScope() = runTest {

610

val results = mutableListOf<String>()

611

612

supervisorScope {

613

launch {

614

delay(100)

615

results.add("Task 1 completed")

616

}

617

618

launch {

619

delay(200)

620

throw RuntimeException("Task 2 failed")

621

}

622

623

launch {

624

delay(300)

625

results.add("Task 3 completed")

626

}

627

628

// Wait for all tasks to finish (supervisor doesn't cancel siblings)

629

delay(400)

630

}

631

632

// Tasks 1 and 3 should complete despite task 2 failing

633

assertEquals(listOf("Task 1 completed", "Task 3 completed"), results)

634

}

635

```