CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core

Coroutines support libraries for Kotlin providing structured concurrency primitives, Flow API for reactive streams, channels for communication, and synchronization utilities across all Kotlin platforms

Pending
Overview
Eval results
Files

structured-concurrency.mddocs/

Structured Concurrency Functions

Scoping functions and cancellation management for coordinated lifecycle management. These functions provide structured patterns for coroutine execution and ensure proper resource cleanup and cancellation propagation.

Capabilities

Coroutine Scope Functions

Functions that create new scopes with specific lifecycle and error handling behaviors.

/**
 * Creates a new coroutineScope that does not complete until all launched children complete.
 * Cancellation or failure of any child cancels the scope and all other children.
 */
suspend fun <T> coroutineScope(block: suspend CoroutineScope.() -> T): T

/**
 * Creates a new supervisorScope that does not complete until all launched children complete.
 * Unlike coroutineScope, failure of a child does not cancel other children.
 */
suspend fun <T> supervisorScope(block: suspend CoroutineScope.() -> T): T

/**
 * Calls the specified suspending block with a given coroutine context,
 * suspends until it completes, and returns the result.
 */
suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T

Usage Examples:

import kotlinx.coroutines.*

val scope = MainScope()

scope.launch {
    // coroutineScope - all children must complete successfully
    try {
        val result = coroutineScope {
            val task1 = async { computeValue1() }
            val task2 = async { computeValue2() }
            val task3 = async { computeValue3() }
            
            // If any task fails, all are cancelled
            listOf(task1.await(), task2.await(), task3.await())
        }
        println("All tasks completed: $result")
    } catch (e: Exception) {
        println("Some task failed: ${e.message}")
    }
}

scope.launch {
    // supervisorScope - child failures are independent
    val results = supervisorScope {
        val task1 = async {
            delay(100)
            "Task 1 success"
        }
        val task2 = async {
            delay(200)
            throw RuntimeException("Task 2 failed")
        }
        val task3 = async {
            delay(300)
            "Task 3 success"
        }
        
        // Collect results, handling failures individually
        listOf(
            try { task1.await() } catch (e: Exception) { "Task 1 failed: ${e.message}" },
            try { task2.await() } catch (e: Exception) { "Task 2 failed: ${e.message}" },
            try { task3.await() } catch (e: Exception) { "Task 3 failed: ${e.message}" }
        )
    }
    println("Supervisor results: $results")
    // Output: [Task 1 success, Task 2 failed: Task 2 failed, Task 3 success]
}

// withContext for context switching
scope.launch {
    println("Starting on: ${Thread.currentThread().name}")
    
    val result = withContext(Dispatchers.Default + CoroutineName("DataProcessor")) {
        println("Processing on: ${Thread.currentThread().name}")
        println("Coroutine name: ${coroutineContext[CoroutineName]?.name}")
        
        val data = processLargeDataset()
        data.summary
    }
    
    println("Back on: ${Thread.currentThread().name}")
    displayResult(result)
}

Timeout Functions

Functions for executing operations with time limits and cancellation.

/**
 * Runs a given suspending block of code inside a coroutine with a specified timeout
 * and throws TimeoutCancellationException if the timeout was exceeded.
 */
suspend fun <T> withTimeout(timeoutMillis: Long, block: suspend CoroutineScope.() -> T): T

/**
 * Runs a given suspending block of code inside a coroutine with a specified timeout
 * and returns null if the timeout was exceeded.
 */
suspend fun <T> withTimeoutOrNull(timeoutMillis: Long, block: suspend CoroutineScope.() -> T): T?

/**
 * Exception thrown by withTimeout when the timeout is exceeded.
 */
class TimeoutCancellationException(
    message: String?,
    coroutine: Job
) : CancellationException(message)

Usage Examples:

import kotlinx.coroutines.*

val scope = MainScope()

scope.launch {
    // withTimeout throws exception on timeout
    try {
        val result = withTimeout(1000) {
            delay(500)  // Completes within timeout
            "Operation completed"
        }
        println("Result: $result")
    } catch (e: TimeoutCancellationException) {
        println("Operation timed out")
    }
}

scope.launch {
    // withTimeoutOrNull returns null on timeout
    val result = withTimeoutOrNull(500) {
        delay(1000)  // Exceeds timeout
        "This won't complete"
    }
    
    if (result != null) {
        println("Result: $result")
    } else {
        println("Operation timed out, using default value")
        handleTimeout()
    }
}

// Network request with timeout
suspend fun fetchDataWithTimeout(url: String): String? {
    return withTimeoutOrNull(5000) {  // 5 second timeout
        // Simulate network request
        delay(kotlin.random.Random.nextLong(1000, 10000))
        if (kotlin.random.Random.nextBoolean()) {
            "Data from $url"
        } else {
            throw RuntimeException("Network error")
        }
    }
}

scope.launch {
    val data = fetchDataWithTimeout("https://api.example.com/data")
    when (data) {
        null -> println("Request timed out")
        else -> println("Received: $data")
    }
}

// Timeout with custom handling
scope.launch {
    try {
        val result = withTimeout(2000) {
            val task1 = async { longRunningTask1() }
            val task2 = async { longRunningTask2() }
            
            // Both tasks must complete within timeout
            Pair(task1.await(), task2.await())
        }
        println("Both tasks completed: $result")
    } catch (e: TimeoutCancellationException) {
        println("Tasks timed out, cleaning up...")
        cleanup()
    }
}

Cancellation Functions

Functions for cooperative cancellation and cancellation checking.

/**
 * Yields the thread (or thread pool) of the current coroutine dispatcher
 * to other coroutines on the same dispatcher to run if possible.
 */
suspend fun yield()

/**
 * Suspends the current coroutine until it is cancelled.
 * This function never returns normally.
 */
suspend fun awaitCancellation(): Nothing

/**
 * Throws CancellationException if the context is cancelled.
 */
fun CoroutineContext.ensureActive()

/**
 * Throws CancellationException if the current Job is cancelled.
 */
fun ensureActive()

Usage Examples:

import kotlinx.coroutines.*

val scope = MainScope()

// yield() for cooperative multitasking
scope.launch {
    repeat(1000) { i ->
        if (i % 100 == 0) {
            yield()  // Give other coroutines a chance to run
        }
        performWork(i)
    }
}

// ensureActive() for cancellation checking
suspend fun processLargeDataset(data: List<String>): List<String> {
    val results = mutableListOf<String>()
    
    for ((index, item) in data.withIndex()) {
        // Check for cancellation periodically
        if (index % 1000 == 0) {
            ensureActive()  // Throws CancellationException if cancelled
        }
        
        results.add(processItem(item))
    }
    
    return results
}

val job = scope.launch {
    try {
        val largeDataset = generateLargeDataset()
        val results = processLargeDataset(largeDataset)
        println("Processing completed: ${results.size} items")
    } catch (e: CancellationException) {
        println("Processing was cancelled")
        throw e  // Re-throw to maintain cancellation semantics
    }
}

// Cancel after 5 seconds
scope.launch {
    delay(5000)
    job.cancel()
}

// awaitCancellation for services
class BackgroundService {
    private val serviceScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
    
    fun start() {
        serviceScope.launch {
            try {
                while (true) {
                    performPeriodicTask()
                    delay(60000)  // Wait 1 minute
                }
            } catch (e: CancellationException) {
                println("Service is shutting down")
                throw e
            }
        }
        
        // Lifecycle management
        serviceScope.launch {
            try {
                awaitCancellation()  // Suspend until cancelled
            } finally {
                cleanupResources()
            }
        }
    }
    
    fun stop() {
        serviceScope.cancel()
    }
}

// Context cancellation checking
suspend fun robustOperation() {
    coroutineContext.ensureActive()  // Check at start
    
    performSetup()
    
    coroutineContext.ensureActive()  // Check before expensive operation
    
    val result = expensiveComputation()
    
    coroutineContext.ensureActive()  // Check before final step
    
    saveResults(result)
}

Resource Management

Patterns for managing resources with structured concurrency and proper cleanup.

/**
 * Ensure proper resource cleanup using try-finally or use patterns.
 */
suspend inline fun <T : Closeable, R> T.use(block: (T) -> R): R

Usage Examples:

import kotlinx.coroutines.*
import java.io.Closeable

val scope = MainScope()

// Resource management with coroutines
class DatabaseConnection : Closeable {
    private var isOpen = true
    
    fun query(sql: String): List<String> {
        check(isOpen) { "Connection is closed" }
        // Simulate database query
        return listOf("result1", "result2")
    }
    
    override fun close() {
        println("Closing database connection")
        isOpen = false
    }
}

scope.launch {
    // Automatic resource cleanup
    DatabaseConnection().use { connection ->
        coroutineScope {
            val query1 = async { connection.query("SELECT * FROM users") }
            val query2 = async { connection.query("SELECT * FROM orders") }
            
            val results = listOf(query1.await(), query2.await())
            println("Query results: $results")
        }
        // Connection automatically closed even if coroutines are cancelled
    }
}

// Manual resource management with cancellation handling
class ResourceManager {
    private val resources = mutableListOf<Closeable>()
    private val resourceMutex = Mutex()
    
    suspend fun <T : Closeable> manage(resource: T): T {
        resourceMutex.withLock {
            resources.add(resource)
        }
        return resource
    }
    
    suspend fun cleanup() {
        resourceMutex.withLock {
            resources.reversed().forEach { resource ->
                try {
                    resource.close()
                } catch (e: Exception) {
                    println("Error closing resource: ${e.message}")
                }
            }
            resources.clear()
        }
    }
}

scope.launch {
    val resourceManager = ResourceManager()
    
    try {
        coroutineScope {
            val connection1 = resourceManager.manage(DatabaseConnection())
            val connection2 = resourceManager.manage(DatabaseConnection())
            
            launch {
                val results1 = connection1.query("SELECT 1")
                println("Connection 1 results: $results1")
            }
            
            launch {
                val results2 = connection2.query("SELECT 2")
                println("Connection 2 results: $results2")
            }
        }
    } finally {
        resourceManager.cleanup()
    }
}

Error Boundary Patterns

Implement error boundaries to contain failures within specific scopes.

class ErrorBoundary {
    suspend fun <T> withBoundary(
        name: String,
        onError: suspend (Throwable) -> T,
        block: suspend CoroutineScope.() -> T
    ): T {
        return try {
            supervisorScope {
                block()
            }
        } catch (e: Exception) {
            println("Error in boundary '$name': ${e.message}")
            onError(e)
        }
    }
}

val errorBoundary = ErrorBoundary()

scope.launch {
    val result = errorBoundary.withBoundary(
        name = "DataProcessing",
        onError = { "Default value due to error" }
    ) {
        val task1 = async { riskyOperation1() }
        val task2 = async { riskyOperation2() }
        
        "${task1.await()} + ${task2.await()}"
    }
    
    println("Final result: $result")
}

// Hierarchical error boundaries
class ServiceLayer {
    private val errorBoundary = ErrorBoundary()
    
    suspend fun processUserRequest(userId: String): UserResponse {
        return errorBoundary.withBoundary(
            name = "UserRequest-$userId",
            onError = { UserResponse.error("Service temporarily unavailable") }
        ) {
            val userData = async { fetchUserData(userId) }
            val userPrefs = async { fetchUserPreferences(userId) }
            val userPosts = async { fetchUserPosts(userId) }
            
            UserResponse.success(
                user = userData.await(),
                preferences = userPrefs.await(),
                posts = userPosts.await()
            )
        }
    }
}

Structured Concurrency Best Practices

Guidelines for effective use of structured concurrency patterns.

// GOOD: Proper nesting and error handling
suspend fun processData(): ProcessingResult {
    return coroutineScope {
        val validationResult = async { validateInput() }
        
        if (!validationResult.await().isValid) {
            return@coroutineScope ProcessingResult.Invalid
        }
        
        supervisorScope {
            val processing1 = async { processChunk1() }
            val processing2 = async { processChunk2() }
            val processing3 = async { processChunk3() }
            
            // Collect all results, handling individual failures
            val results = listOf(processing1, processing2, processing3)
                .map { deferred ->
                    try {
                        deferred.await()
                    } catch (e: Exception) {
                        ChunkResult.Failed(e.message ?: "Unknown error")
                    }
                }
            
            ProcessingResult.Completed(results)
        }
    }
}

// BAD: Mixing structured and unstructured concurrency
suspend fun badExample() {
    // Don't do this - mixing structured and unstructured
    val job = GlobalScope.launch {  // Unstructured
        coroutineScope {  // Structured inside unstructured
            // This breaks structured concurrency guarantees
        }
    }
}

// GOOD: Consistent structured approach
suspend fun goodExample() {
    coroutineScope {
        val backgroundTask = async(Dispatchers.Default) {
            longRunningComputation()
        }
        
        val uiTask = async(Dispatchers.Main) {
            updateUserInterface()
        }
        
        // Both tasks are properly structured
        backgroundTask.await()
        uiTask.await()
    }
}

// Exception handling best practices
suspend fun robustDataProcessing(): Result<String> {
    return try {
        coroutineScope {
            val data = async { fetchData() }
            val processed = async { processData(data.await()) }
            val validated = async { validateResult(processed.await()) }
            
            Result.Success(validated.await())
        }
    } catch (e: CancellationException) {
        // Always re-throw cancellation
        throw e
    } catch (e: Exception) {
        // Handle other exceptions
        Result.Failure(e.message ?: "Processing failed")
    }
}

sealed class Result<T> {
    data class Success<T>(val value: T) : Result<T>()
    data class Failure<T>(val error: String) : Result<T>()
}

Testing Structured Concurrency

Approaches for testing structured concurrency patterns.

import kotlinx.coroutines.test.*

@Test
fun testStructuredConcurrency() = runTest {
    var cleanupCalled = false
    
    try {
        coroutineScope {
            launch {
                try {
                    delay(1000)
                    fail("Should have been cancelled")
                } finally {
                    cleanupCalled = true
                }
            }
            
            launch {
                delay(500)
                throw RuntimeException("Simulated failure")
            }
        }
    } catch (e: RuntimeException) {
        assertEquals("Simulated failure", e.message)
    }
    
    // Verify cleanup was called due to structured cancellation
    assertTrue(cleanupCalled)
}

@Test
fun testSupervisorScope() = runTest {
    val results = mutableListOf<String>()
    
    supervisorScope {
        launch {
            delay(100)
            results.add("Task 1 completed")
        }
        
        launch {
            delay(200)
            throw RuntimeException("Task 2 failed")
        }
        
        launch {
            delay(300)
            results.add("Task 3 completed")
        }
        
        // Wait for all tasks to finish (supervisor doesn't cancel siblings)
        delay(400)
    }
    
    // Tasks 1 and 3 should complete despite task 2 failing
    assertEquals(listOf("Task 1 completed", "Task 3 completed"), results)
}

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core

docs

channels.md

coroutine-builders.md

dispatchers.md

exception-handling.md

flow-api.md

index.md

jobs-deferreds.md

structured-concurrency.md

synchronization.md

tile.json