Coroutines support libraries for Kotlin providing structured concurrency primitives, Flow API for reactive streams, channels for communication, and synchronization utilities across all Kotlin platforms
—
Comprehensive error handling and cancellation mechanisms for robust coroutine applications. This covers exception propagation, cancellation semantics, error recovery patterns, and custom exception handling strategies.
The fundamental exception types for coroutine error handling and cancellation.
/**
* Indicates that the operation was cancelled.
* This is a special exception that is used to cancel coroutines cooperatively.
*/
open class CancellationException(
message: String? = null,
cause: Throwable? = null
) : IllegalStateException(message, cause)
/**
* Exception thrown by withTimeout when the timeout is exceeded.
*/
class TimeoutCancellationException(
message: String?,
coroutine: Job
) : CancellationException(message)
/**
* Exception thrown when a coroutine is cancelled while suspended in a cancellable suspending function.
*/
class JobCancellationException(
message: String,
cause: Throwable? = null,
job: Job
) : CancellationException(message, cause)A context element for handling uncaught exceptions in coroutines.
/**
* An optional element in the coroutine context to handle uncaught exceptions.
* Normally, uncaught exceptions can only result from root coroutines created using
* launch builder. A child coroutine that fails cancels its parent and thus all siblings.
*/
interface CoroutineExceptionHandler : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
/**
* Handles uncaught exception in the given context.
*/
fun handleException(context: CoroutineContext, exception: Throwable)
}
/**
* Creates a CoroutineExceptionHandler with the specified handler function.
*/
inline fun CoroutineExceptionHandler(
crossinline handler: (CoroutineContext, Throwable) -> Unit
): CoroutineExceptionHandlerUsage Examples:
import kotlinx.coroutines.*
val scope = MainScope()
// Basic exception handler
val exceptionHandler = CoroutineExceptionHandler { context, exception ->
println("Caught exception in ${context[CoroutineName]}: ${exception.message}")
logException(exception)
}
scope.launch(exceptionHandler + CoroutineName("DataProcessor")) {
throw RuntimeException("Something went wrong!")
}
// Output: Caught exception in DataProcessor: Something went wrong!
// Exception handler for specific coroutine types
val networkExceptionHandler = CoroutineExceptionHandler { context, exception ->
when (exception) {
is java.net.ConnectException -> handleNetworkError(exception)
is java.net.SocketTimeoutException -> handleTimeoutError(exception)
else -> handleGenericError(exception)
}
}
scope.launch(networkExceptionHandler) {
val data = fetchFromNetwork() // May throw network exceptions
processData(data)
}
// Global exception handler
val globalHandler = CoroutineExceptionHandler { context, exception ->
// Log to crash reporting service
crashReporter.logException(exception, context)
// Show user-friendly error message
when (exception) {
is CancellationException -> {
// Don't report cancellation as an error
println("Operation was cancelled")
}
else -> {
showErrorDialog("An unexpected error occurred: ${exception.message}")
}
}
}
// Use with scope
val applicationScope = CoroutineScope(
SupervisorJob() + Dispatchers.Main + globalHandler
)Understanding how exceptions propagate through coroutine hierarchies.
import kotlinx.coroutines.*
val scope = MainScope()
// Exception propagation in regular scope
scope.launch {
println("Parent started")
try {
coroutineScope {
launch {
delay(100)
println("Child 1 completed")
}
launch {
delay(200)
throw RuntimeException("Child 2 failed")
}
launch {
delay(300)
println("Child 3 completed") // Won't execute
}
}
} catch (e: Exception) {
println("Caught exception: ${e.message}")
}
println("Parent completed")
}
// Exception isolation with supervisor scope
scope.launch {
println("Supervisor parent started")
supervisorScope {
launch {
delay(100)
println("Supervisor child 1 completed")
}
launch {
try {
delay(200)
throw RuntimeException("Supervisor child 2 failed")
} catch (e: Exception) {
println("Child caught its own exception: ${e.message}")
}
}
launch {
delay(300)
println("Supervisor child 3 completed") // Will execute
}
}
println("Supervisor parent completed")
}
// Exception handling with async
scope.launch {
supervisorScope {
val deferred1 = async {
delay(100)
"Success 1"
}
val deferred2 = async {
delay(200)
throw RuntimeException("Async task failed")
}
val deferred3 = async {
delay(300)
"Success 3"
}
// Handle each async result individually
val result1 = try { deferred1.await() } catch (e: Exception) { "Failed: ${e.message}" }
val result2 = try { deferred2.await() } catch (e: Exception) { "Failed: ${e.message}" }
val result3 = try { deferred3.await() } catch (e: Exception) { "Failed: ${e.message}" }
println("Results: [$result1, $result2, $result3]")
}
}Proper handling of cancellation in coroutines and cleanup operations.
import kotlinx.coroutines.*
val scope = MainScope()
// Basic cancellation handling
val job = scope.launch {
try {
repeat(100) { i ->
println("Working on item $i")
delay(100)
}
} catch (e: CancellationException) {
println("Work was cancelled")
throw e // Always re-throw CancellationException
} finally {
println("Cleanup completed")
}
}
// Cancel after 3 seconds
scope.launch {
delay(3000)
job.cancel("User requested cancellation")
}
// Non-cancellable cleanup
scope.launch {
try {
println("Starting work...")
performWork()
} catch (e: CancellationException) {
println("Work cancelled, performing cleanup...")
// Critical cleanup that must complete
withContext(NonCancellable) {
saveStateToFile()
closeResources()
notifyServer()
}
throw e
}
}
// Cancellation with timeout
suspend fun operationWithTimeout(): String {
return try {
withTimeout(5000) {
longRunningOperation()
}
} catch (e: TimeoutCancellationException) {
println("Operation timed out, performing cleanup...")
cleanupAfterTimeout()
throw e
}
}
// Cooperative cancellation checking
suspend fun longRunningTask() {
repeat(10000) { i ->
// Check for cancellation every 100 iterations
if (i % 100 == 0) {
ensureActive() // Throws CancellationException if cancelled
}
performWorkItem(i)
}
}
// Cancellation with resource management
class ResourceManager : Closeable {
private val resources = mutableListOf<Resource>()
suspend fun doWork() {
try {
val resource1 = acquireResource("Resource1")
resources.add(resource1)
val resource2 = acquireResource("Resource2")
resources.add(resource2)
// Perform work that may be cancelled
performWorkWithResources(resource1, resource2)
} catch (e: CancellationException) {
println("Work cancelled, cleaning up resources...")
throw e
} finally {
close() // Always cleanup resources
}
}
override fun close() {
resources.reversed().forEach { resource ->
try {
resource.release()
} catch (e: Exception) {
println("Error releasing resource: ${e.message}")
}
}
resources.clear()
}
}Strategies for recovering from failures and implementing resilient systems.
import kotlinx.coroutines.*
import kotlin.time.Duration.Companion.seconds
// Retry with exponential backoff
suspend fun <T> retryWithBackoff(
maxRetries: Int = 3,
initialDelay: Long = 1000,
factor: Double = 2.0,
operation: suspend () -> T
): T {
var delay = initialDelay
var lastException: Exception? = null
repeat(maxRetries) { attempt ->
try {
return operation()
} catch (e: CancellationException) {
throw e // Don't retry cancellation
} catch (e: Exception) {
lastException = e
println("Attempt ${attempt + 1} failed: ${e.message}")
if (attempt < maxRetries - 1) {
println("Retrying in ${delay}ms...")
delay(delay)
delay = (delay * factor).toLong()
}
}
}
throw lastException ?: RuntimeException("All retry attempts failed")
}
// Usage
scope.launch {
try {
val result = retryWithBackoff(maxRetries = 3) {
unreliableNetworkCall()
}
println("Success: $result")
} catch (e: Exception) {
println("All retries failed: ${e.message}")
}
}
// Circuit breaker pattern
class CircuitBreaker(
private val failureThreshold: Int = 5,
private val resetTimeoutMs: Long = 60000
) {
private enum class State { CLOSED, OPEN, HALF_OPEN }
private var state = State.CLOSED
private var failureCount = 0
private var lastFailureTime = 0L
suspend fun <T> execute(operation: suspend () -> T): T {
when (state) {
State.OPEN -> {
if (System.currentTimeMillis() - lastFailureTime >= resetTimeoutMs) {
state = State.HALF_OPEN
} else {
throw RuntimeException("Circuit breaker is OPEN")
}
}
State.HALF_OPEN -> {
// Allow one request to test if service is back
}
State.CLOSED -> {
// Normal operation
}
}
return try {
val result = operation()
onSuccess()
result
} catch (e: Exception) {
onFailure()
throw e
}
}
private fun onSuccess() {
failureCount = 0
state = State.CLOSED
}
private fun onFailure() {
failureCount++
lastFailureTime = System.currentTimeMillis()
if (failureCount >= failureThreshold) {
state = State.OPEN
}
}
}
val circuitBreaker = CircuitBreaker()
scope.launch {
try {
val result = circuitBreaker.execute {
unreliableService()
}
println("Service call succeeded: $result")
} catch (e: Exception) {
println("Service call failed: ${e.message}")
}
}
// Fallback pattern
suspend fun fetchDataWithFallback(id: String): Data {
return try {
primaryDataSource.fetch(id)
} catch (e: Exception) {
println("Primary source failed: ${e.message}")
try {
secondaryDataSource.fetch(id)
} catch (e2: Exception) {
println("Secondary source failed: ${e2.message}")
// Final fallback to cache
cacheDataSource.fetch(id) ?: Data.empty(id)
}
}
}
// Bulkhead pattern - isolate failures
class ServiceManager {
private val userServiceScope = CoroutineScope(
SupervisorJob() + Dispatchers.Default + CoroutineName("UserService")
)
private val orderServiceScope = CoroutineScope(
SupervisorJob() + Dispatchers.Default + CoroutineName("OrderService")
)
private val notificationServiceScope = CoroutineScope(
SupervisorJob() + Dispatchers.Default + CoroutineName("NotificationService")
)
suspend fun processUser(userId: String): UserResult {
return try {
userServiceScope.async {
userService.process(userId)
}.await()
} catch (e: Exception) {
UserResult.Failed(e.message ?: "User processing failed")
}
}
suspend fun processOrder(orderId: String): OrderResult {
return try {
orderServiceScope.async {
orderService.process(orderId)
}.await()
} catch (e: Exception) {
OrderResult.Failed(e.message ?: "Order processing failed")
}
}
fun shutdown() {
userServiceScope.cancel()
orderServiceScope.cancel()
notificationServiceScope.cancel()
}
}Sophisticated error handling patterns for complex applications.
// Result-based error handling
sealed class Result<out T> {
data class Success<T>(val value: T) : Result<T>()
data class Failure(val exception: Throwable) : Result<Nothing>()
inline fun <R> map(transform: (T) -> R): Result<R> = when (this) {
is Success -> Success(transform(value))
is Failure -> this
}
inline fun <R> flatMap(transform: (T) -> Result<R>): Result<R> = when (this) {
is Success -> transform(value)
is Failure -> this
}
inline fun onSuccess(action: (T) -> Unit): Result<T> = apply {
if (this is Success) action(value)
}
inline fun onFailure(action: (Throwable) -> Unit): Result<T> = apply {
if (this is Failure) action(exception)
}
}
suspend fun <T> safeCall(operation: suspend () -> T): Result<T> {
return try {
Result.Success(operation())
} catch (e: CancellationException) {
throw e // Don't wrap cancellation
} catch (e: Exception) {
Result.Failure(e)
}
}
// Usage
scope.launch {
val result = safeCall { riskyOperation() }
.map { processResult(it) }
.onSuccess { println("Operation succeeded: $it") }
.onFailure { println("Operation failed: ${it.message}") }
}
// Error aggregation
suspend fun processMultipleItems(items: List<String>): ProcessingReport {
val results = mutableListOf<ItemResult>()
val errors = mutableListOf<ProcessingError>()
supervisorScope {
items.map { item ->
async {
try {
val result = processItem(item)
results.add(ItemResult.Success(item, result))
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
val error = ProcessingError(item, e.message ?: "Unknown error")
errors.add(error)
results.add(ItemResult.Failed(item, error))
}
}
}.awaitAll()
}
return ProcessingReport(
totalItems = items.size,
successCount = results.count { it is ItemResult.Success },
failureCount = errors.size,
errors = errors
)
}
// Custom exception hierarchy
sealed class BusinessException(message: String, cause: Throwable? = null) : Exception(message, cause) {
class ValidationException(message: String) : BusinessException(message)
class AuthenticationException(message: String) : BusinessException(message)
class AuthorizationException(message: String) : BusinessException(message)
class ResourceNotFoundException(resource: String) : BusinessException("Resource not found: $resource")
class ExternalServiceException(service: String, cause: Throwable) : BusinessException("External service error: $service", cause)
}
// Business logic exception handler
val businessExceptionHandler = CoroutineExceptionHandler { context, exception ->
when (exception) {
is BusinessException.ValidationException -> {
showValidationError(exception.message ?: "Validation failed")
}
is BusinessException.AuthenticationException -> {
redirectToLogin()
}
is BusinessException.AuthorizationException -> {
showAccessDeniedMessage()
}
is BusinessException.ResourceNotFoundException -> {
showNotFoundMessage(exception.message ?: "Resource not found")
}
is BusinessException.ExternalServiceException -> {
showServiceUnavailableMessage()
logExternalServiceError(exception)
}
else -> {
showGenericErrorMessage()
logUnexpectedError(exception)
}
}
}Strategies for testing error scenarios and exception handling logic.
import kotlinx.coroutines.test.*
import kotlin.test.*
@Test
fun testExceptionPropagation() = runTest {
var exceptionHandled = false
val handler = CoroutineExceptionHandler { _, exception ->
exceptionHandled = true
assertEquals("Test exception", exception.message)
}
val job = launch(handler) {
throw RuntimeException("Test exception")
}
job.join()
assertTrue(exceptionHandled)
}
@Test
fun testCancellationHandling() = runTest {
var cleanupCalled = false
val job = launch {
try {
delay(1000)
fail("Should have been cancelled")
} catch (e: CancellationException) {
// Expected
throw e
} finally {
cleanupCalled = true
}
}
delay(100)
job.cancel()
job.join()
assertTrue(cleanupCalled)
assertTrue(job.isCancelled)
}
@Test
fun testRetryLogic() = runTest {
var attemptCount = 0
val result = retryWithBackoff(maxRetries = 3, initialDelay = 10) {
attemptCount++
if (attemptCount < 3) {
throw RuntimeException("Attempt $attemptCount failed")
}
"Success on attempt $attemptCount"
}
assertEquals("Success on attempt 3", result)
assertEquals(3, attemptCount)
}
@Test
fun testCircuitBreakerOpen() = runTest {
val circuitBreaker = CircuitBreaker(failureThreshold = 2)
// Cause failures to open circuit
repeat(2) {
assertFailsWith<RuntimeException> {
circuitBreaker.execute { throw RuntimeException("Service down") }
}
}
// Circuit should be open
assertFailsWith<RuntimeException> {
circuitBreaker.execute { "Should not execute" }
}
}
@Test
fun testSupervisorScopeIsolation() = runTest {
val results = mutableListOf<String>()
supervisorScope {
launch {
results.add("Task 1 completed")
}
launch {
throw RuntimeException("Task 2 failed")
}
launch {
delay(100) // Ensure task 2 fails first
results.add("Task 3 completed")
}
}
assertEquals(listOf("Task 1 completed", "Task 3 completed"), results)
}Guidelines for robust error handling in coroutine applications.
// DO: Always re-throw CancellationException
suspend fun goodCancellationHandling() {
try {
performWork()
} catch (e: CancellationException) {
performCleanup()
throw e // CRITICAL: Always re-throw
} catch (e: Exception) {
handleBusinessException(e)
}
}
// DON'T: Swallow CancellationException
suspend fun badCancellationHandling() {
try {
performWork()
} catch (e: Exception) { // This catches CancellationException too!
handleException(e)
// Missing re-throw of CancellationException
}
}
// DO: Use appropriate scope for error isolation
suspend fun goodErrorIsolation() {
supervisorScope { // Child failures don't cancel siblings
val task1 = async { riskyOperation1() }
val task2 = async { riskyOperation2() }
val result1 = try { task1.await() } catch (e: Exception) { "Default1" }
val result2 = try { task2.await() } catch (e: Exception) { "Default2" }
processResults(result1, result2)
}
}
// DO: Handle exceptions at the right level
class DataService {
// Low-level: Convert specific exceptions
private suspend fun fetchFromNetwork(): NetworkData {
try {
return networkClient.fetch()
} catch (e: IOException) {
throw BusinessException.ExternalServiceException("Network", e)
}
}
// Mid-level: Aggregate and provide fallbacks
suspend fun getData(): Result<Data> {
return try {
val networkData = fetchFromNetwork()
Result.Success(processData(networkData))
} catch (e: BusinessException.ExternalServiceException) {
// Try cache fallback
getCachedData()
}
}
// High-level: Final error handling with user communication
suspend fun displayData() {
when (val result = getData()) {
is Result.Success -> showData(result.value)
is Result.Failure -> showErrorMessage(result.exception)
}
}
}
// DO: Use structured error handling
suspend fun structuredErrorHandling() {
val errorBoundary = CoroutineExceptionHandler { _, exception ->
when (exception) {
is CancellationException -> {
// Don't log cancellation as error
}
else -> {
logError(exception)
notifyMonitoring(exception)
}
}
}
withContext(errorBoundary) {
coroutineScope {
// All child coroutines inherit error handling
launch { task1() }
launch { task2() }
launch { task3() }
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core