CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm

Arrow Fx Coroutines provides functional effect types and utilities for managing side effects in Kotlin coroutines

Pending
Overview
Eval results
Files

error-handling.mddocs/

Error Handling

Arrow FX Coroutines integrates seamlessly with Arrow's Raise system to provide structured error handling capabilities. This enables functional error management patterns including error accumulation, typed error handling, and composable error strategies.

Bracket Operations with Error Handling

ExitCase for Error Information

sealed class ExitCase {
    object Completed : ExitCase()
    data class Cancelled(val exception: CancellationException) : ExitCase()
    data class Failure(val failure: Throwable) : ExitCase()
    
    companion object {
        fun ExitCase(error: Throwable): ExitCase
    }
}

ExitCase provides detailed information about how an operation terminated, enabling precise error handling and cleanup strategies.

Cancellation Handling

suspend fun <A> onCancel(fa: suspend () -> A, onCancel: suspend () -> Unit): A

Register a handler that executes only when the operation is cancelled.

val result = onCancel(
    fa = { performLongRunningTask() },
    onCancel = { 
        println("Task was cancelled, cleaning up...")
        cleanupResources()
    }
)

Guaranteed Execution

suspend fun <A> guarantee(fa: suspend () -> A, finalizer: suspend () -> Unit): A
suspend fun <A> guaranteeCase(fa: suspend () -> A, finalizer: suspend (ExitCase) -> Unit): A

Guarantee that cleanup code runs regardless of how the operation exits.

val result = guaranteeCase(
    fa = { processData() },
    finalizer = { exitCase ->
        when (exitCase) {
            is ExitCase.Completed -> logSuccess()
            is ExitCase.Cancelled -> logCancellation()
            is ExitCase.Failure -> logError(exitCase.failure)
        }
    }
)

Bracket Pattern with Error Handling

suspend fun <A, B> bracket(
    acquire: suspend () -> A,
    use: suspend (A) -> B,
    release: suspend (A) -> Unit
): B

suspend fun <A, B> bracketCase(
    acquire: suspend () -> A,
    use: suspend (A) -> B,
    release: suspend (A, ExitCase) -> Unit
): B

Safe resource management with detailed exit case handling.

val result = bracketCase(
    acquire = { openDatabaseConnection() },
    use = { connection -> 
        connection.executeQuery("SELECT * FROM users")
    },
    release = { connection, exitCase ->
        when (exitCase) {
            is ExitCase.Completed -> {
                connection.commit()
                connection.close()
            }
            is ExitCase.Cancelled -> {
                connection.rollback()
                connection.close()
            }
            is ExitCase.Failure -> {
                connection.rollback()
                connection.close()
                logError("Database operation failed", exitCase.failure)
            }
        }
    }
)

Error Accumulation in Parallel Operations

Parallel Map with Error Accumulation

suspend fun <A, B> Iterable<A>.parMapOrAccumulate(
    f: suspend CoroutineScope.(A) -> B
): Either<NonEmptyList<Error>, List<B>>

suspend fun <A, B> Iterable<A>.parMapOrAccumulate(
    context: CoroutineContext,
    f: suspend CoroutineScope.(A) -> B
): Either<NonEmptyList<Error>, List<B>>

suspend fun <A, B> Iterable<A>.parMapOrAccumulate(
    concurrency: Int,
    f: suspend CoroutineScope.(A) -> B
): Either<NonEmptyList<Error>, List<B>>

suspend fun <A, B> Iterable<A>.parMapOrAccumulate(
    context: CoroutineContext,
    concurrency: Int,
    f: suspend CoroutineScope.(A) -> B
): Either<NonEmptyList<Error>, List<B>>

Execute parallel mapping operations while accumulating all errors instead of failing fast.

data class ValidationError(val field: String, val message: String)

val validationResults = users.parMapOrAccumulate { user ->
    validateUser(user) // Function that can raise ValidationError
}

when (validationResults) {
    is Either.Left -> {
        println("Validation failed with errors:")
        validationResults.value.forEach { error ->
            println("- ${error.field}: ${error.message}")
        }
    }
    is Either.Right -> {
        println("All users validated successfully")
        processValidUsers(validationResults.value)
    }
}

Parallel Zip with Error Accumulation

suspend fun <A, B, C> Raise<E>.parZipOrAccumulate(
    fa: suspend CoroutineScope.() -> A,
    fb: suspend CoroutineScope.() -> B,
    f: suspend CoroutineScope.(A, B) -> C
): C

suspend fun <A, B, C> Raise<NonEmptyList<E>>.parZipOrAccumulate(
    fa: suspend CoroutineScope.() -> A,
    fb: suspend CoroutineScope.() -> B,
    f: suspend CoroutineScope.(A, B) -> C
): C

Execute parallel operations within a Raise context, accumulating errors from all operations.

val userProfile = either<NonEmptyList<ValidationError>, UserProfile> {
    parZipOrAccumulate(
        { validateName(userData.name) },
        { validateEmail(userData.email) },
        { validateAge(userData.age) },
        { validateAddress(userData.address) }
    ) { name, email, age, address ->
        UserProfile(name, email, age, address)
    }
}

Raise Integration Types

ScopedRaiseAccumulate

interface ScopedRaiseAccumulate<Error> : CoroutineScope, RaiseAccumulate<Error>

Intersection type that combines CoroutineScope and RaiseAccumulate for advanced error handling scenarios.

RaiseScope for Racing

interface RaiseScope<in Error> : CoroutineScope, Raise<Error>
typealias RaiseHandler<Error> = suspend CoroutineScope.(Error) -> Nothing

Specialized scope type for racing operations with error handling.

Advanced Error Handling Patterns

Error Recovery Strategies

suspend fun <T> withRetry(
    maxAttempts: Int,
    operation: suspend () -> T
): Either<Exception, T> = either {
    var lastException: Exception? = null
    
    repeat(maxAttempts) { attempt ->
        try {
            return@either operation()
        } catch (e: Exception) {
            lastException = e
            if (attempt < maxAttempts - 1) {
                delay(1000 * (attempt + 1)) // Exponential backoff
            }
        }
    }
    
    raise(lastException!!)
}

Fallback Chains

suspend fun <T> withFallbackChain(
    operations: List<suspend () -> T>
): Either<NonEmptyList<Exception>, T> = either {
    val errors = mutableListOf<Exception>()
    
    for (operation in operations) {
        try {
            return@either operation()
        } catch (e: Exception) {
            errors.add(e)
        }
    }
    
    raise(NonEmptyList.fromListUnsafe(errors))
}

Validation Pipelines

data class User(val name: String, val email: String, val age: Int)
data class ValidationError(val field: String, val message: String)

suspend fun validateUser(userData: UserData): Either<NonEmptyList<ValidationError>, User> = 
    either {
        parZipOrAccumulate(
            { validateName(userData.name) },
            { validateEmail(userData.email) },
            { validateAge(userData.age) }
        ) { name, email, age ->
            User(name, email, age)
        }
    }

suspend fun validateName(name: String): String =
    if (name.isBlank()) raise(ValidationError("name", "Name cannot be blank"))
    else name.trim()

suspend fun validateEmail(email: String): String =
    if (!email.contains("@")) raise(ValidationError("email", "Invalid email format"))
    else email.lowercase()

suspend fun validateAge(age: Int): Int =
    if (age < 0) raise(ValidationError("age", "Age must be positive"))
    else age

Error Handling with Resources

Resource Error Propagation

val result = either<DatabaseError, ProcessingResult> {
    resourceScope {
        val connection = databaseResource
            .releaseCase { conn, exitCase ->
                when (exitCase) {
                    is ExitCase.Failure -> {
                        conn.rollback()
                        logError("Database transaction failed", exitCase.failure)
                    }
                    else -> conn.commit()
                }
                conn.close()
            }
            .bind()
        
        // Use connection - errors automatically handled
        processDataWithConnection(connection)
    }
}

Nested Resource Error Handling

suspend fun processWithMultipleResources(): Either<ProcessingError, Result> = either {
    resourceScope {
        val database = databaseResource.bind()
        val cache = cacheResource.bind()
        val httpClient = httpClientResource.bind()
        
        val data = parZipOrAccumulate(
            { database.fetchUsers() },
            { cache.getConfiguration() },
            { httpClient.fetchMetadata() }
        ) { users, config, metadata ->
            ProcessingInput(users, config, metadata)
        }.bind()
        
        processData(data)
    }
}

Integration Examples

Error Handling with Racing

suspend fun fetchDataWithFallback(): Either<ServiceError, String> = either {
    racing<String> {
        raceOrThrow(raise = { error -> raise(error) }) {
            primaryService.getData() // Can raise ServiceError
        }
        
        raceOrThrow(raise = { error -> raise(error) }) {
            delay(2000)
            fallbackService.getData() // Can raise ServiceError
        }
    }
}

Complex Error Accumulation

data class ProcessingError(val stage: String, val error: String)

suspend fun processDataPipeline(
    inputs: List<InputData>
): Either<NonEmptyList<ProcessingError>, List<OutputData>> = either {
    val validated = inputs.parMapOrAccumulate { input ->
        validateInput(input).mapLeft { error ->
            ProcessingError("validation", error.message)
        }.bind()
    }.bind()
    
    val processed = validated.parMapOrAccumulate { validInput ->
        processInput(validInput).mapLeft { error ->
            ProcessingError("processing", error.message)
        }.bind()
    }.bind()
    
    val enriched = processed.parMapOrAccumulate { processedInput ->
        enrichData(processedInput).mapLeft { error ->
            ProcessingError("enrichment", error.message)
        }.bind()
    }.bind()
    
    enriched
}

Install with Tessl CLI

npx tessl i tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm

docs

error-handling.md

index.md

parallel-processing.md

racing.md

resource-management.md

synchronization-flow.md

tile.json