JVM-specific implementation of kotlinx.coroutines core library providing coroutine primitives, builders, dispatchers, and synchronization primitives for asynchronous programming in Kotlin.
—
Structured exception handling with coroutine-aware propagation, cancellation semantics, and supervisor patterns for fault-tolerant concurrent programming.
Context element for handling uncaught exceptions in coroutines.
interface CoroutineExceptionHandler : CoroutineContext.Element {
/** Handles uncaught exception in coroutine context */
fun handleException(context: CoroutineContext, exception: Throwable)
companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
}
/** Creates exception handler from function */
inline fun CoroutineExceptionHandler(
crossinline handler: (CoroutineContext, Throwable) -> Unit
): CoroutineExceptionHandlerUsage Examples:
import kotlinx.coroutines.*
val customExceptionHandler = CoroutineExceptionHandler { context, exception ->
println("Caught exception in context $context: ${exception.message}")
// Log exception, send to crash reporting, etc.
when (exception) {
is IllegalArgumentException -> println("Invalid argument provided")
is RuntimeException -> println("Runtime error occurred")
else -> println("Unexpected error: ${exception::class.simpleName}")
}
}
fun main() = runBlocking {
// Exception handler for top-level coroutines
val job = GlobalScope.launch(customExceptionHandler) {
throw RuntimeException("Something went wrong!")
}
job.join()
// Exception handler with multiple coroutines
val supervisor = SupervisorJob()
val scope = CoroutineScope(Dispatchers.Default + supervisor + customExceptionHandler)
scope.launch {
throw IllegalArgumentException("Invalid input")
}
scope.launch {
delay(100)
println("This coroutine continues despite sibling failure")
}
delay(200)
supervisor.complete()
}Job that doesn't cancel when its children fail, enabling independent failure handling.
/** Creates supervisor job that isolates child failures */
fun SupervisorJob(parent: Job? = null): CompletableJob
/** Creates supervisor scope with isolated child failures */
suspend fun <T> supervisorScope(
block: suspend CoroutineScope.() -> T
): TUsage Examples:
import kotlinx.coroutines.*
fun main() = runBlocking {
// SupervisorJob prevents child failures from cancelling parent
val supervisor = SupervisorJob()
launch(supervisor) {
// Child 1 - will fail
launch {
delay(100)
throw RuntimeException("Child 1 failed")
}
// Child 2 - will complete successfully
launch {
delay(200)
println("Child 2 completed successfully")
}
// Child 3 - will also complete
launch {
delay(300)
println("Child 3 completed successfully")
}
delay(500)
println("Parent completed - children failures were isolated")
}
delay(600)
supervisor.complete()
}
// supervisorScope example for parallel processing
suspend fun processItemsSafely(items: List<String>): List<String> = supervisorScope {
items.map { item ->
async {
when {
item == "fail" -> throw RuntimeException("Processing failed for $item")
item.isEmpty() -> throw IllegalArgumentException("Empty item")
else -> "Processed: $item"
}
}
}.mapNotNull { deferred ->
try {
deferred.await()
} catch (e: Exception) {
println("Failed to process item: ${e.message}")
null // Continue with other items
}
}
}
suspend fun supervisorScopeExample() {
val items = listOf("item1", "fail", "item3", "", "item5")
val results = processItemsSafely(items)
println("Successfully processed: $results")
}Special exception used for cooperative cancellation that doesn't propagate to parent.
/** Exception for cooperative cancellation */
open class CancellationException(
message: String? = null,
cause: Throwable? = null
) : IllegalStateException(message)
/** Throws CancellationException if job is cancelled */
fun Job.ensureActive()
/** Checks if context is active (not cancelled) */
val CoroutineContext.isActive: Boolean
/** Gets current job from context */
val CoroutineContext.job: JobUsage Examples:
import kotlinx.coroutines.*
suspend fun cooperativeCancellation() {
repeat(1000) { i ->
// Check for cancellation periodically
if (!coroutineContext.isActive) {
println("Detected cancellation at iteration $i")
return
}
// Alternative: throws CancellationException if cancelled
coroutineContext.ensureActive()
// Simulate work
if (i % 100 == 0) {
println("Processing iteration $i")
}
// Suspending functions automatically check for cancellation
yield() // Yields execution and checks cancellation
}
}
fun main() = runBlocking {
val job = launch {
try {
cooperativeCancellation()
println("Completed all iterations")
} catch (e: CancellationException) {
println("Coroutine was cancelled: ${e.message}")
throw e // Re-throw to maintain cancellation semantics
}
}
delay(250) // Let it run for a while
job.cancel("Manual cancellation")
job.join()
// Custom cancellation exception
val customJob = launch {
try {
delay(1000)
} catch (e: CancellationException) {
println("Custom cleanup before cancellation")
throw e
}
}
delay(100)
customJob.cancel(CancellationException("Custom cancellation reason"))
customJob.join()
}Understanding how exceptions propagate through coroutine hierarchies.
/** Handles coroutine exception with context */
fun handleCoroutineException(context: CoroutineContext, exception: Throwable)
/** Try-catch equivalents for coroutines */
suspend fun <T> runCatching(block: suspend () -> T): Result<T>
/** Non-cancellable execution block */
suspend fun <T> NonCancellable.run(block: suspend () -> T): TUsage Examples:
import kotlinx.coroutines.*
// Exception propagation in regular job hierarchy
suspend fun regularJobExceptionPropagation() = coroutineScope {
try {
val parentJob = launch {
launch {
delay(100)
throw RuntimeException("Child exception")
}
launch {
delay(200)
println("This won't execute - parent gets cancelled")
}
delay(300)
println("Parent won't reach here")
}
parentJob.join()
} catch (e: Exception) {
println("Caught propagated exception: ${e.message}")
}
}
// Exception handling with async
suspend fun asyncExceptionHandling() = coroutineScope {
val deferred1 = async {
delay(100)
throw RuntimeException("Async exception")
}
val deferred2 = async {
delay(200)
"Success result"
}
try {
val result1 = deferred1.await() // Exception thrown here
println("Won't reach: $result1")
} catch (e: Exception) {
println("Caught async exception: ${e.message}")
}
try {
val result2 = deferred2.await()
println("Result2: $result2") // This still works
} catch (e: Exception) {
println("Unexpected exception: ${e.message}")
}
}
// Non-cancellable cleanup
suspend fun nonCancellableCleanup() {
val resource = "Important Resource"
try {
// Simulate work that gets cancelled
repeat(10) { i ->
println("Working on iteration $i")
delay(100)
}
} finally {
// Ensure cleanup runs even if cancelled
withContext(NonCancellable) {
println("Cleaning up $resource")
delay(50) // Even suspending cleanup operations work
println("Cleanup completed")
}
}
}
fun main() = runBlocking {
println("Regular job exception propagation:")
regularJobExceptionPropagation()
println("\nAsync exception handling:")
asyncExceptionHandling()
println("\nNon-cancellable cleanup:")
val job = launch {
nonCancellableCleanup()
}
delay(250)
job.cancel()
job.join()
}Common patterns for error recovery and resilience.
/** Retry with exponential backoff */
suspend fun <T> retry(
retries: Int,
initialDelayMs: Long = 1000,
factor: Double = 2.0,
block: suspend () -> T
): T
/** Circuit breaker pattern */
class CircuitBreaker(
val failureThreshold: Int,
val recoveryTimeoutMs: Long
)Usage Examples:
import kotlinx.coroutines.*
import kotlin.random.Random
// Retry with exponential backoff
suspend fun <T> retryWithBackoff(
retries: Int,
initialDelay: Long = 1000,
factor: Double = 2.0,
block: suspend () -> T
): T {
var currentDelay = initialDelay
repeat(retries) { attempt ->
try {
return block()
} catch (e: Exception) {
if (attempt == retries - 1) throw e
println("Attempt ${attempt + 1} failed: ${e.message}. Retrying in ${currentDelay}ms")
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong()
}
}
error("Should not reach here")
}
// Circuit breaker implementation
class CircuitBreaker(
private val failureThreshold: Int,
private val recoveryTimeoutMs: Long
) {
private var failureCount = 0
private var lastFailureTime = 0L
private var state = State.CLOSED
enum class State { CLOSED, OPEN, HALF_OPEN }
suspend fun <T> execute(block: suspend () -> T): T {
when (state) {
State.CLOSED -> {
return try {
val result = block()
failureCount = 0
result
} catch (e: Exception) {
failureCount++
if (failureCount >= failureThreshold) {
state = State.OPEN
lastFailureTime = System.currentTimeMillis()
}
throw e
}
}
State.OPEN -> {
if (System.currentTimeMillis() - lastFailureTime >= recoveryTimeoutMs) {
state = State.HALF_OPEN
return execute(block)
} else {
throw RuntimeException("Circuit breaker is OPEN")
}
}
State.HALF_OPEN -> {
return try {
val result = block()
state = State.CLOSED
failureCount = 0
result
} catch (e: Exception) {
state = State.OPEN
lastFailureTime = System.currentTimeMillis()
throw e
}
}
}
}
}
// Fallback pattern
suspend fun <T> withFallback(
primary: suspend () -> T,
fallback: suspend () -> T
): T {
return try {
primary()
} catch (e: Exception) {
println("Primary failed: ${e.message}, using fallback")
fallback()
}
}
fun main() = runBlocking {
// Retry example
try {
val result = retryWithBackoff(retries = 3) {
if (Random.nextFloat() < 0.7) {
throw RuntimeException("Random failure")
}
"Success!"
}
println("Retry result: $result")
} catch (e: Exception) {
println("All retry attempts failed: ${e.message}")
}
// Circuit breaker example
val circuitBreaker = CircuitBreaker(failureThreshold = 3, recoveryTimeoutMs = 1000)
repeat(10) { i ->
try {
val result = circuitBreaker.execute {
if (i < 5 && Random.nextFloat() < 0.8) {
throw RuntimeException("Service failure")
}
"Service response $i"
}
println("Circuit breaker result: $result")
} catch (e: Exception) {
println("Circuit breaker blocked: ${e.message}")
}
delay(200)
}
// Fallback example
val result = withFallback(
primary = {
if (Random.nextBoolean()) throw RuntimeException("Primary service down")
"Primary result"
},
fallback = {
"Fallback result"
}
)
println("Fallback result: $result")
}Core exception types for coroutine error handling.
/** Base exception for cancellation */
open class CancellationException : IllegalStateException
/** Exception for timeout operations */
class TimeoutCancellationException : CancellationException
/** Exception for job cancellation */
class JobCancellationException : CancellationExceptionContext elements related to exception handling.
/** Key for CoroutineExceptionHandler in context */
object CoroutineExceptionHandler : CoroutineContext.Key<CoroutineExceptionHandler>
/** Non-cancellable context element */
object NonCancellable : AbstractCoroutineContextElement(Job), JobInstall with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-jvm