Coroutines support libraries for Kotlin providing structured concurrency primitives, Flow API for reactive streams, channels for communication, and synchronization utilities across all Kotlin platforms
—
Scoping functions and cancellation management for coordinated lifecycle management. These functions provide structured patterns for coroutine execution and ensure proper resource cleanup and cancellation propagation.
Functions that create new scopes with specific lifecycle and error handling behaviors.
/**
* Creates a new coroutineScope that does not complete until all launched children complete.
* Cancellation or failure of any child cancels the scope and all other children.
*/
suspend fun <T> coroutineScope(block: suspend CoroutineScope.() -> T): T
/**
* Creates a new supervisorScope that does not complete until all launched children complete.
* Unlike coroutineScope, failure of a child does not cancel other children.
*/
suspend fun <T> supervisorScope(block: suspend CoroutineScope.() -> T): T
/**
* Calls the specified suspending block with a given coroutine context,
* suspends until it completes, and returns the result.
*/
suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): TUsage Examples:
import kotlinx.coroutines.*
val scope = MainScope()
scope.launch {
// coroutineScope - all children must complete successfully
try {
val result = coroutineScope {
val task1 = async { computeValue1() }
val task2 = async { computeValue2() }
val task3 = async { computeValue3() }
// If any task fails, all are cancelled
listOf(task1.await(), task2.await(), task3.await())
}
println("All tasks completed: $result")
} catch (e: Exception) {
println("Some task failed: ${e.message}")
}
}
scope.launch {
// supervisorScope - child failures are independent
val results = supervisorScope {
val task1 = async {
delay(100)
"Task 1 success"
}
val task2 = async {
delay(200)
throw RuntimeException("Task 2 failed")
}
val task3 = async {
delay(300)
"Task 3 success"
}
// Collect results, handling failures individually
listOf(
try { task1.await() } catch (e: Exception) { "Task 1 failed: ${e.message}" },
try { task2.await() } catch (e: Exception) { "Task 2 failed: ${e.message}" },
try { task3.await() } catch (e: Exception) { "Task 3 failed: ${e.message}" }
)
}
println("Supervisor results: $results")
// Output: [Task 1 success, Task 2 failed: Task 2 failed, Task 3 success]
}
// withContext for context switching
scope.launch {
println("Starting on: ${Thread.currentThread().name}")
val result = withContext(Dispatchers.Default + CoroutineName("DataProcessor")) {
println("Processing on: ${Thread.currentThread().name}")
println("Coroutine name: ${coroutineContext[CoroutineName]?.name}")
val data = processLargeDataset()
data.summary
}
println("Back on: ${Thread.currentThread().name}")
displayResult(result)
}Functions for executing operations with time limits and cancellation.
/**
* Runs a given suspending block of code inside a coroutine with a specified timeout
* and throws TimeoutCancellationException if the timeout was exceeded.
*/
suspend fun <T> withTimeout(timeoutMillis: Long, block: suspend CoroutineScope.() -> T): T
/**
* Runs a given suspending block of code inside a coroutine with a specified timeout
* and returns null if the timeout was exceeded.
*/
suspend fun <T> withTimeoutOrNull(timeoutMillis: Long, block: suspend CoroutineScope.() -> T): T?
/**
* Exception thrown by withTimeout when the timeout is exceeded.
*/
class TimeoutCancellationException(
message: String?,
coroutine: Job
) : CancellationException(message)Usage Examples:
import kotlinx.coroutines.*
val scope = MainScope()
scope.launch {
// withTimeout throws exception on timeout
try {
val result = withTimeout(1000) {
delay(500) // Completes within timeout
"Operation completed"
}
println("Result: $result")
} catch (e: TimeoutCancellationException) {
println("Operation timed out")
}
}
scope.launch {
// withTimeoutOrNull returns null on timeout
val result = withTimeoutOrNull(500) {
delay(1000) // Exceeds timeout
"This won't complete"
}
if (result != null) {
println("Result: $result")
} else {
println("Operation timed out, using default value")
handleTimeout()
}
}
// Network request with timeout
suspend fun fetchDataWithTimeout(url: String): String? {
return withTimeoutOrNull(5000) { // 5 second timeout
// Simulate network request
delay(kotlin.random.Random.nextLong(1000, 10000))
if (kotlin.random.Random.nextBoolean()) {
"Data from $url"
} else {
throw RuntimeException("Network error")
}
}
}
scope.launch {
val data = fetchDataWithTimeout("https://api.example.com/data")
when (data) {
null -> println("Request timed out")
else -> println("Received: $data")
}
}
// Timeout with custom handling
scope.launch {
try {
val result = withTimeout(2000) {
val task1 = async { longRunningTask1() }
val task2 = async { longRunningTask2() }
// Both tasks must complete within timeout
Pair(task1.await(), task2.await())
}
println("Both tasks completed: $result")
} catch (e: TimeoutCancellationException) {
println("Tasks timed out, cleaning up...")
cleanup()
}
}Functions for cooperative cancellation and cancellation checking.
/**
* Yields the thread (or thread pool) of the current coroutine dispatcher
* to other coroutines on the same dispatcher to run if possible.
*/
suspend fun yield()
/**
* Suspends the current coroutine until it is cancelled.
* This function never returns normally.
*/
suspend fun awaitCancellation(): Nothing
/**
* Throws CancellationException if the context is cancelled.
*/
fun CoroutineContext.ensureActive()
/**
* Throws CancellationException if the current Job is cancelled.
*/
fun ensureActive()Usage Examples:
import kotlinx.coroutines.*
val scope = MainScope()
// yield() for cooperative multitasking
scope.launch {
repeat(1000) { i ->
if (i % 100 == 0) {
yield() // Give other coroutines a chance to run
}
performWork(i)
}
}
// ensureActive() for cancellation checking
suspend fun processLargeDataset(data: List<String>): List<String> {
val results = mutableListOf<String>()
for ((index, item) in data.withIndex()) {
// Check for cancellation periodically
if (index % 1000 == 0) {
ensureActive() // Throws CancellationException if cancelled
}
results.add(processItem(item))
}
return results
}
val job = scope.launch {
try {
val largeDataset = generateLargeDataset()
val results = processLargeDataset(largeDataset)
println("Processing completed: ${results.size} items")
} catch (e: CancellationException) {
println("Processing was cancelled")
throw e // Re-throw to maintain cancellation semantics
}
}
// Cancel after 5 seconds
scope.launch {
delay(5000)
job.cancel()
}
// awaitCancellation for services
class BackgroundService {
private val serviceScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
fun start() {
serviceScope.launch {
try {
while (true) {
performPeriodicTask()
delay(60000) // Wait 1 minute
}
} catch (e: CancellationException) {
println("Service is shutting down")
throw e
}
}
// Lifecycle management
serviceScope.launch {
try {
awaitCancellation() // Suspend until cancelled
} finally {
cleanupResources()
}
}
}
fun stop() {
serviceScope.cancel()
}
}
// Context cancellation checking
suspend fun robustOperation() {
coroutineContext.ensureActive() // Check at start
performSetup()
coroutineContext.ensureActive() // Check before expensive operation
val result = expensiveComputation()
coroutineContext.ensureActive() // Check before final step
saveResults(result)
}Patterns for managing resources with structured concurrency and proper cleanup.
/**
* Ensure proper resource cleanup using try-finally or use patterns.
*/
suspend inline fun <T : Closeable, R> T.use(block: (T) -> R): RUsage Examples:
import kotlinx.coroutines.*
import java.io.Closeable
val scope = MainScope()
// Resource management with coroutines
class DatabaseConnection : Closeable {
private var isOpen = true
fun query(sql: String): List<String> {
check(isOpen) { "Connection is closed" }
// Simulate database query
return listOf("result1", "result2")
}
override fun close() {
println("Closing database connection")
isOpen = false
}
}
scope.launch {
// Automatic resource cleanup
DatabaseConnection().use { connection ->
coroutineScope {
val query1 = async { connection.query("SELECT * FROM users") }
val query2 = async { connection.query("SELECT * FROM orders") }
val results = listOf(query1.await(), query2.await())
println("Query results: $results")
}
// Connection automatically closed even if coroutines are cancelled
}
}
// Manual resource management with cancellation handling
class ResourceManager {
private val resources = mutableListOf<Closeable>()
private val resourceMutex = Mutex()
suspend fun <T : Closeable> manage(resource: T): T {
resourceMutex.withLock {
resources.add(resource)
}
return resource
}
suspend fun cleanup() {
resourceMutex.withLock {
resources.reversed().forEach { resource ->
try {
resource.close()
} catch (e: Exception) {
println("Error closing resource: ${e.message}")
}
}
resources.clear()
}
}
}
scope.launch {
val resourceManager = ResourceManager()
try {
coroutineScope {
val connection1 = resourceManager.manage(DatabaseConnection())
val connection2 = resourceManager.manage(DatabaseConnection())
launch {
val results1 = connection1.query("SELECT 1")
println("Connection 1 results: $results1")
}
launch {
val results2 = connection2.query("SELECT 2")
println("Connection 2 results: $results2")
}
}
} finally {
resourceManager.cleanup()
}
}Implement error boundaries to contain failures within specific scopes.
class ErrorBoundary {
suspend fun <T> withBoundary(
name: String,
onError: suspend (Throwable) -> T,
block: suspend CoroutineScope.() -> T
): T {
return try {
supervisorScope {
block()
}
} catch (e: Exception) {
println("Error in boundary '$name': ${e.message}")
onError(e)
}
}
}
val errorBoundary = ErrorBoundary()
scope.launch {
val result = errorBoundary.withBoundary(
name = "DataProcessing",
onError = { "Default value due to error" }
) {
val task1 = async { riskyOperation1() }
val task2 = async { riskyOperation2() }
"${task1.await()} + ${task2.await()}"
}
println("Final result: $result")
}
// Hierarchical error boundaries
class ServiceLayer {
private val errorBoundary = ErrorBoundary()
suspend fun processUserRequest(userId: String): UserResponse {
return errorBoundary.withBoundary(
name = "UserRequest-$userId",
onError = { UserResponse.error("Service temporarily unavailable") }
) {
val userData = async { fetchUserData(userId) }
val userPrefs = async { fetchUserPreferences(userId) }
val userPosts = async { fetchUserPosts(userId) }
UserResponse.success(
user = userData.await(),
preferences = userPrefs.await(),
posts = userPosts.await()
)
}
}
}Guidelines for effective use of structured concurrency patterns.
// GOOD: Proper nesting and error handling
suspend fun processData(): ProcessingResult {
return coroutineScope {
val validationResult = async { validateInput() }
if (!validationResult.await().isValid) {
return@coroutineScope ProcessingResult.Invalid
}
supervisorScope {
val processing1 = async { processChunk1() }
val processing2 = async { processChunk2() }
val processing3 = async { processChunk3() }
// Collect all results, handling individual failures
val results = listOf(processing1, processing2, processing3)
.map { deferred ->
try {
deferred.await()
} catch (e: Exception) {
ChunkResult.Failed(e.message ?: "Unknown error")
}
}
ProcessingResult.Completed(results)
}
}
}
// BAD: Mixing structured and unstructured concurrency
suspend fun badExample() {
// Don't do this - mixing structured and unstructured
val job = GlobalScope.launch { // Unstructured
coroutineScope { // Structured inside unstructured
// This breaks structured concurrency guarantees
}
}
}
// GOOD: Consistent structured approach
suspend fun goodExample() {
coroutineScope {
val backgroundTask = async(Dispatchers.Default) {
longRunningComputation()
}
val uiTask = async(Dispatchers.Main) {
updateUserInterface()
}
// Both tasks are properly structured
backgroundTask.await()
uiTask.await()
}
}
// Exception handling best practices
suspend fun robustDataProcessing(): Result<String> {
return try {
coroutineScope {
val data = async { fetchData() }
val processed = async { processData(data.await()) }
val validated = async { validateResult(processed.await()) }
Result.Success(validated.await())
}
} catch (e: CancellationException) {
// Always re-throw cancellation
throw e
} catch (e: Exception) {
// Handle other exceptions
Result.Failure(e.message ?: "Processing failed")
}
}
sealed class Result<T> {
data class Success<T>(val value: T) : Result<T>()
data class Failure<T>(val error: String) : Result<T>()
}Approaches for testing structured concurrency patterns.
import kotlinx.coroutines.test.*
@Test
fun testStructuredConcurrency() = runTest {
var cleanupCalled = false
try {
coroutineScope {
launch {
try {
delay(1000)
fail("Should have been cancelled")
} finally {
cleanupCalled = true
}
}
launch {
delay(500)
throw RuntimeException("Simulated failure")
}
}
} catch (e: RuntimeException) {
assertEquals("Simulated failure", e.message)
}
// Verify cleanup was called due to structured cancellation
assertTrue(cleanupCalled)
}
@Test
fun testSupervisorScope() = runTest {
val results = mutableListOf<String>()
supervisorScope {
launch {
delay(100)
results.add("Task 1 completed")
}
launch {
delay(200)
throw RuntimeException("Task 2 failed")
}
launch {
delay(300)
results.add("Task 3 completed")
}
// Wait for all tasks to finish (supervisor doesn't cancel siblings)
delay(400)
}
// Tasks 1 and 3 should complete despite task 2 failing
assertEquals(listOf("Task 1 completed", "Task 3 completed"), results)
}Install with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core