0
# Structured Concurrency Functions
1
2
Scoping functions and cancellation management for coordinated lifecycle management. These functions provide structured patterns for coroutine execution and ensure proper resource cleanup and cancellation propagation.
3
4
## Capabilities
5
6
### Coroutine Scope Functions
7
8
Functions that create new scopes with specific lifecycle and error handling behaviors.
9
10
```kotlin { .api }
11
/**
12
* Creates a new coroutineScope that does not complete until all launched children complete.
13
* Cancellation or failure of any child cancels the scope and all other children.
14
*/
15
suspend fun <T> coroutineScope(block: suspend CoroutineScope.() -> T): T
16
17
/**
18
* Creates a new supervisorScope that does not complete until all launched children complete.
19
* Unlike coroutineScope, failure of a child does not cancel other children.
20
*/
21
suspend fun <T> supervisorScope(block: suspend CoroutineScope.() -> T): T
22
23
/**
24
* Calls the specified suspending block with a given coroutine context,
25
* suspends until it completes, and returns the result.
26
*/
27
suspend fun <T> withContext(
28
context: CoroutineContext,
29
block: suspend CoroutineScope.() -> T
30
): T
31
```
32
33
**Usage Examples:**
34
35
```kotlin
36
import kotlinx.coroutines.*
37
38
val scope = MainScope()
39
40
scope.launch {
41
// coroutineScope - all children must complete successfully
42
try {
43
val result = coroutineScope {
44
val task1 = async { computeValue1() }
45
val task2 = async { computeValue2() }
46
val task3 = async { computeValue3() }
47
48
// If any task fails, all are cancelled
49
listOf(task1.await(), task2.await(), task3.await())
50
}
51
println("All tasks completed: $result")
52
} catch (e: Exception) {
53
println("Some task failed: ${e.message}")
54
}
55
}
56
57
scope.launch {
58
// supervisorScope - child failures are independent
59
val results = supervisorScope {
60
val task1 = async {
61
delay(100)
62
"Task 1 success"
63
}
64
val task2 = async {
65
delay(200)
66
throw RuntimeException("Task 2 failed")
67
}
68
val task3 = async {
69
delay(300)
70
"Task 3 success"
71
}
72
73
// Collect results, handling failures individually
74
listOf(
75
try { task1.await() } catch (e: Exception) { "Task 1 failed: ${e.message}" },
76
try { task2.await() } catch (e: Exception) { "Task 2 failed: ${e.message}" },
77
try { task3.await() } catch (e: Exception) { "Task 3 failed: ${e.message}" }
78
)
79
}
80
println("Supervisor results: $results")
81
// Output: [Task 1 success, Task 2 failed: Task 2 failed, Task 3 success]
82
}
83
84
// withContext for context switching
85
scope.launch {
86
println("Starting on: ${Thread.currentThread().name}")
87
88
val result = withContext(Dispatchers.Default + CoroutineName("DataProcessor")) {
89
println("Processing on: ${Thread.currentThread().name}")
90
println("Coroutine name: ${coroutineContext[CoroutineName]?.name}")
91
92
val data = processLargeDataset()
93
data.summary
94
}
95
96
println("Back on: ${Thread.currentThread().name}")
97
displayResult(result)
98
}
99
```
100
101
### Timeout Functions
102
103
Functions for executing operations with time limits and cancellation.
104
105
```kotlin { .api }
106
/**
107
* Runs a given suspending block of code inside a coroutine with a specified timeout
108
* and throws TimeoutCancellationException if the timeout was exceeded.
109
*/
110
suspend fun <T> withTimeout(timeoutMillis: Long, block: suspend CoroutineScope.() -> T): T
111
112
/**
113
* Runs a given suspending block of code inside a coroutine with a specified timeout
114
* and returns null if the timeout was exceeded.
115
*/
116
suspend fun <T> withTimeoutOrNull(timeoutMillis: Long, block: suspend CoroutineScope.() -> T): T?
117
118
/**
119
* Exception thrown by withTimeout when the timeout is exceeded.
120
*/
121
class TimeoutCancellationException(
122
message: String?,
123
coroutine: Job
124
) : CancellationException(message)
125
```
126
127
**Usage Examples:**
128
129
```kotlin
130
import kotlinx.coroutines.*
131
132
val scope = MainScope()
133
134
scope.launch {
135
// withTimeout throws exception on timeout
136
try {
137
val result = withTimeout(1000) {
138
delay(500) // Completes within timeout
139
"Operation completed"
140
}
141
println("Result: $result")
142
} catch (e: TimeoutCancellationException) {
143
println("Operation timed out")
144
}
145
}
146
147
scope.launch {
148
// withTimeoutOrNull returns null on timeout
149
val result = withTimeoutOrNull(500) {
150
delay(1000) // Exceeds timeout
151
"This won't complete"
152
}
153
154
if (result != null) {
155
println("Result: $result")
156
} else {
157
println("Operation timed out, using default value")
158
handleTimeout()
159
}
160
}
161
162
// Network request with timeout
163
suspend fun fetchDataWithTimeout(url: String): String? {
164
return withTimeoutOrNull(5000) { // 5 second timeout
165
// Simulate network request
166
delay(kotlin.random.Random.nextLong(1000, 10000))
167
if (kotlin.random.Random.nextBoolean()) {
168
"Data from $url"
169
} else {
170
throw RuntimeException("Network error")
171
}
172
}
173
}
174
175
scope.launch {
176
val data = fetchDataWithTimeout("https://api.example.com/data")
177
when (data) {
178
null -> println("Request timed out")
179
else -> println("Received: $data")
180
}
181
}
182
183
// Timeout with custom handling
184
scope.launch {
185
try {
186
val result = withTimeout(2000) {
187
val task1 = async { longRunningTask1() }
188
val task2 = async { longRunningTask2() }
189
190
// Both tasks must complete within timeout
191
Pair(task1.await(), task2.await())
192
}
193
println("Both tasks completed: $result")
194
} catch (e: TimeoutCancellationException) {
195
println("Tasks timed out, cleaning up...")
196
cleanup()
197
}
198
}
199
```
200
201
### Cancellation Functions
202
203
Functions for cooperative cancellation and cancellation checking.
204
205
```kotlin { .api }
206
/**
207
* Yields the thread (or thread pool) of the current coroutine dispatcher
208
* to other coroutines on the same dispatcher to run if possible.
209
*/
210
suspend fun yield()
211
212
/**
213
* Suspends the current coroutine until it is cancelled.
214
* This function never returns normally.
215
*/
216
suspend fun awaitCancellation(): Nothing
217
218
/**
219
* Throws CancellationException if the context is cancelled.
220
*/
221
fun CoroutineContext.ensureActive()
222
223
/**
224
* Throws CancellationException if the current Job is cancelled.
225
*/
226
fun ensureActive()
227
```
228
229
**Usage Examples:**
230
231
```kotlin
232
import kotlinx.coroutines.*
233
234
val scope = MainScope()
235
236
// yield() for cooperative multitasking
237
scope.launch {
238
repeat(1000) { i ->
239
if (i % 100 == 0) {
240
yield() // Give other coroutines a chance to run
241
}
242
performWork(i)
243
}
244
}
245
246
// ensureActive() for cancellation checking
247
suspend fun processLargeDataset(data: List<String>): List<String> {
248
val results = mutableListOf<String>()
249
250
for ((index, item) in data.withIndex()) {
251
// Check for cancellation periodically
252
if (index % 1000 == 0) {
253
ensureActive() // Throws CancellationException if cancelled
254
}
255
256
results.add(processItem(item))
257
}
258
259
return results
260
}
261
262
val job = scope.launch {
263
try {
264
val largeDataset = generateLargeDataset()
265
val results = processLargeDataset(largeDataset)
266
println("Processing completed: ${results.size} items")
267
} catch (e: CancellationException) {
268
println("Processing was cancelled")
269
throw e // Re-throw to maintain cancellation semantics
270
}
271
}
272
273
// Cancel after 5 seconds
274
scope.launch {
275
delay(5000)
276
job.cancel()
277
}
278
279
// awaitCancellation for services
280
class BackgroundService {
281
private val serviceScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
282
283
fun start() {
284
serviceScope.launch {
285
try {
286
while (true) {
287
performPeriodicTask()
288
delay(60000) // Wait 1 minute
289
}
290
} catch (e: CancellationException) {
291
println("Service is shutting down")
292
throw e
293
}
294
}
295
296
// Lifecycle management
297
serviceScope.launch {
298
try {
299
awaitCancellation() // Suspend until cancelled
300
} finally {
301
cleanupResources()
302
}
303
}
304
}
305
306
fun stop() {
307
serviceScope.cancel()
308
}
309
}
310
311
// Context cancellation checking
312
suspend fun robustOperation() {
313
coroutineContext.ensureActive() // Check at start
314
315
performSetup()
316
317
coroutineContext.ensureActive() // Check before expensive operation
318
319
val result = expensiveComputation()
320
321
coroutineContext.ensureActive() // Check before final step
322
323
saveResults(result)
324
}
325
```
326
327
### Resource Management
328
329
Patterns for managing resources with structured concurrency and proper cleanup.
330
331
```kotlin { .api }
332
/**
333
* Ensure proper resource cleanup using try-finally or use patterns.
334
*/
335
suspend inline fun <T : Closeable, R> T.use(block: (T) -> R): R
336
```
337
338
**Usage Examples:**
339
340
```kotlin
341
import kotlinx.coroutines.*
342
import java.io.Closeable
343
344
val scope = MainScope()
345
346
// Resource management with coroutines
347
class DatabaseConnection : Closeable {
348
private var isOpen = true
349
350
fun query(sql: String): List<String> {
351
check(isOpen) { "Connection is closed" }
352
// Simulate database query
353
return listOf("result1", "result2")
354
}
355
356
override fun close() {
357
println("Closing database connection")
358
isOpen = false
359
}
360
}
361
362
scope.launch {
363
// Automatic resource cleanup
364
DatabaseConnection().use { connection ->
365
coroutineScope {
366
val query1 = async { connection.query("SELECT * FROM users") }
367
val query2 = async { connection.query("SELECT * FROM orders") }
368
369
val results = listOf(query1.await(), query2.await())
370
println("Query results: $results")
371
}
372
// Connection automatically closed even if coroutines are cancelled
373
}
374
}
375
376
// Manual resource management with cancellation handling
377
class ResourceManager {
378
private val resources = mutableListOf<Closeable>()
379
private val resourceMutex = Mutex()
380
381
suspend fun <T : Closeable> manage(resource: T): T {
382
resourceMutex.withLock {
383
resources.add(resource)
384
}
385
return resource
386
}
387
388
suspend fun cleanup() {
389
resourceMutex.withLock {
390
resources.reversed().forEach { resource ->
391
try {
392
resource.close()
393
} catch (e: Exception) {
394
println("Error closing resource: ${e.message}")
395
}
396
}
397
resources.clear()
398
}
399
}
400
}
401
402
scope.launch {
403
val resourceManager = ResourceManager()
404
405
try {
406
coroutineScope {
407
val connection1 = resourceManager.manage(DatabaseConnection())
408
val connection2 = resourceManager.manage(DatabaseConnection())
409
410
launch {
411
val results1 = connection1.query("SELECT 1")
412
println("Connection 1 results: $results1")
413
}
414
415
launch {
416
val results2 = connection2.query("SELECT 2")
417
println("Connection 2 results: $results2")
418
}
419
}
420
} finally {
421
resourceManager.cleanup()
422
}
423
}
424
```
425
426
### Error Boundary Patterns
427
428
Implement error boundaries to contain failures within specific scopes.
429
430
```kotlin
431
class ErrorBoundary {
432
suspend fun <T> withBoundary(
433
name: String,
434
onError: suspend (Throwable) -> T,
435
block: suspend CoroutineScope.() -> T
436
): T {
437
return try {
438
supervisorScope {
439
block()
440
}
441
} catch (e: Exception) {
442
println("Error in boundary '$name': ${e.message}")
443
onError(e)
444
}
445
}
446
}
447
448
val errorBoundary = ErrorBoundary()
449
450
scope.launch {
451
val result = errorBoundary.withBoundary(
452
name = "DataProcessing",
453
onError = { "Default value due to error" }
454
) {
455
val task1 = async { riskyOperation1() }
456
val task2 = async { riskyOperation2() }
457
458
"${task1.await()} + ${task2.await()}"
459
}
460
461
println("Final result: $result")
462
}
463
464
// Hierarchical error boundaries
465
class ServiceLayer {
466
private val errorBoundary = ErrorBoundary()
467
468
suspend fun processUserRequest(userId: String): UserResponse {
469
return errorBoundary.withBoundary(
470
name = "UserRequest-$userId",
471
onError = { UserResponse.error("Service temporarily unavailable") }
472
) {
473
val userData = async { fetchUserData(userId) }
474
val userPrefs = async { fetchUserPreferences(userId) }
475
val userPosts = async { fetchUserPosts(userId) }
476
477
UserResponse.success(
478
user = userData.await(),
479
preferences = userPrefs.await(),
480
posts = userPosts.await()
481
)
482
}
483
}
484
}
485
```
486
487
### Structured Concurrency Best Practices
488
489
Guidelines for effective use of structured concurrency patterns.
490
491
```kotlin
492
// GOOD: Proper nesting and error handling
493
suspend fun processData(): ProcessingResult {
494
return coroutineScope {
495
val validationResult = async { validateInput() }
496
497
if (!validationResult.await().isValid) {
498
return@coroutineScope ProcessingResult.Invalid
499
}
500
501
supervisorScope {
502
val processing1 = async { processChunk1() }
503
val processing2 = async { processChunk2() }
504
val processing3 = async { processChunk3() }
505
506
// Collect all results, handling individual failures
507
val results = listOf(processing1, processing2, processing3)
508
.map { deferred ->
509
try {
510
deferred.await()
511
} catch (e: Exception) {
512
ChunkResult.Failed(e.message ?: "Unknown error")
513
}
514
}
515
516
ProcessingResult.Completed(results)
517
}
518
}
519
}
520
521
// BAD: Mixing structured and unstructured concurrency
522
suspend fun badExample() {
523
// Don't do this - mixing structured and unstructured
524
val job = GlobalScope.launch { // Unstructured
525
coroutineScope { // Structured inside unstructured
526
// This breaks structured concurrency guarantees
527
}
528
}
529
}
530
531
// GOOD: Consistent structured approach
532
suspend fun goodExample() {
533
coroutineScope {
534
val backgroundTask = async(Dispatchers.Default) {
535
longRunningComputation()
536
}
537
538
val uiTask = async(Dispatchers.Main) {
539
updateUserInterface()
540
}
541
542
// Both tasks are properly structured
543
backgroundTask.await()
544
uiTask.await()
545
}
546
}
547
548
// Exception handling best practices
549
suspend fun robustDataProcessing(): Result<String> {
550
return try {
551
coroutineScope {
552
val data = async { fetchData() }
553
val processed = async { processData(data.await()) }
554
val validated = async { validateResult(processed.await()) }
555
556
Result.Success(validated.await())
557
}
558
} catch (e: CancellationException) {
559
// Always re-throw cancellation
560
throw e
561
} catch (e: Exception) {
562
// Handle other exceptions
563
Result.Failure(e.message ?: "Processing failed")
564
}
565
}
566
567
sealed class Result<T> {
568
data class Success<T>(val value: T) : Result<T>()
569
data class Failure<T>(val error: String) : Result<T>()
570
}
571
```
572
573
### Testing Structured Concurrency
574
575
Approaches for testing structured concurrency patterns.
576
577
```kotlin
578
import kotlinx.coroutines.test.*
579
580
@Test
581
fun testStructuredConcurrency() = runTest {
582
var cleanupCalled = false
583
584
try {
585
coroutineScope {
586
launch {
587
try {
588
delay(1000)
589
fail("Should have been cancelled")
590
} finally {
591
cleanupCalled = true
592
}
593
}
594
595
launch {
596
delay(500)
597
throw RuntimeException("Simulated failure")
598
}
599
}
600
} catch (e: RuntimeException) {
601
assertEquals("Simulated failure", e.message)
602
}
603
604
// Verify cleanup was called due to structured cancellation
605
assertTrue(cleanupCalled)
606
}
607
608
@Test
609
fun testSupervisorScope() = runTest {
610
val results = mutableListOf<String>()
611
612
supervisorScope {
613
launch {
614
delay(100)
615
results.add("Task 1 completed")
616
}
617
618
launch {
619
delay(200)
620
throw RuntimeException("Task 2 failed")
621
}
622
623
launch {
624
delay(300)
625
results.add("Task 3 completed")
626
}
627
628
// Wait for all tasks to finish (supervisor doesn't cancel siblings)
629
delay(400)
630
}
631
632
// Tasks 1 and 3 should complete despite task 2 failing
633
assertEquals(listOf("Task 1 completed", "Task 3 completed"), results)
634
}
635
```