Arrow Fx Coroutines provides functional effect types and utilities for managing side effects in Kotlin coroutines
—
Arrow FX Coroutines integrates seamlessly with Arrow's Raise system to provide structured error handling capabilities. This enables functional error management patterns including error accumulation, typed error handling, and composable error strategies.
sealed class ExitCase {
object Completed : ExitCase()
data class Cancelled(val exception: CancellationException) : ExitCase()
data class Failure(val failure: Throwable) : ExitCase()
companion object {
fun ExitCase(error: Throwable): ExitCase
}
}ExitCase provides detailed information about how an operation terminated, enabling precise error handling and cleanup strategies.
suspend fun <A> onCancel(fa: suspend () -> A, onCancel: suspend () -> Unit): ARegister a handler that executes only when the operation is cancelled.
val result = onCancel(
fa = { performLongRunningTask() },
onCancel = {
println("Task was cancelled, cleaning up...")
cleanupResources()
}
)suspend fun <A> guarantee(fa: suspend () -> A, finalizer: suspend () -> Unit): A
suspend fun <A> guaranteeCase(fa: suspend () -> A, finalizer: suspend (ExitCase) -> Unit): AGuarantee that cleanup code runs regardless of how the operation exits.
val result = guaranteeCase(
fa = { processData() },
finalizer = { exitCase ->
when (exitCase) {
is ExitCase.Completed -> logSuccess()
is ExitCase.Cancelled -> logCancellation()
is ExitCase.Failure -> logError(exitCase.failure)
}
}
)suspend fun <A, B> bracket(
acquire: suspend () -> A,
use: suspend (A) -> B,
release: suspend (A) -> Unit
): B
suspend fun <A, B> bracketCase(
acquire: suspend () -> A,
use: suspend (A) -> B,
release: suspend (A, ExitCase) -> Unit
): BSafe resource management with detailed exit case handling.
val result = bracketCase(
acquire = { openDatabaseConnection() },
use = { connection ->
connection.executeQuery("SELECT * FROM users")
},
release = { connection, exitCase ->
when (exitCase) {
is ExitCase.Completed -> {
connection.commit()
connection.close()
}
is ExitCase.Cancelled -> {
connection.rollback()
connection.close()
}
is ExitCase.Failure -> {
connection.rollback()
connection.close()
logError("Database operation failed", exitCase.failure)
}
}
}
)suspend fun <A, B> Iterable<A>.parMapOrAccumulate(
f: suspend CoroutineScope.(A) -> B
): Either<NonEmptyList<Error>, List<B>>
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(
context: CoroutineContext,
f: suspend CoroutineScope.(A) -> B
): Either<NonEmptyList<Error>, List<B>>
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(
concurrency: Int,
f: suspend CoroutineScope.(A) -> B
): Either<NonEmptyList<Error>, List<B>>
suspend fun <A, B> Iterable<A>.parMapOrAccumulate(
context: CoroutineContext,
concurrency: Int,
f: suspend CoroutineScope.(A) -> B
): Either<NonEmptyList<Error>, List<B>>Execute parallel mapping operations while accumulating all errors instead of failing fast.
data class ValidationError(val field: String, val message: String)
val validationResults = users.parMapOrAccumulate { user ->
validateUser(user) // Function that can raise ValidationError
}
when (validationResults) {
is Either.Left -> {
println("Validation failed with errors:")
validationResults.value.forEach { error ->
println("- ${error.field}: ${error.message}")
}
}
is Either.Right -> {
println("All users validated successfully")
processValidUsers(validationResults.value)
}
}suspend fun <A, B, C> Raise<E>.parZipOrAccumulate(
fa: suspend CoroutineScope.() -> A,
fb: suspend CoroutineScope.() -> B,
f: suspend CoroutineScope.(A, B) -> C
): C
suspend fun <A, B, C> Raise<NonEmptyList<E>>.parZipOrAccumulate(
fa: suspend CoroutineScope.() -> A,
fb: suspend CoroutineScope.() -> B,
f: suspend CoroutineScope.(A, B) -> C
): CExecute parallel operations within a Raise context, accumulating errors from all operations.
val userProfile = either<NonEmptyList<ValidationError>, UserProfile> {
parZipOrAccumulate(
{ validateName(userData.name) },
{ validateEmail(userData.email) },
{ validateAge(userData.age) },
{ validateAddress(userData.address) }
) { name, email, age, address ->
UserProfile(name, email, age, address)
}
}interface ScopedRaiseAccumulate<Error> : CoroutineScope, RaiseAccumulate<Error>Intersection type that combines CoroutineScope and RaiseAccumulate for advanced error handling scenarios.
interface RaiseScope<in Error> : CoroutineScope, Raise<Error>
typealias RaiseHandler<Error> = suspend CoroutineScope.(Error) -> NothingSpecialized scope type for racing operations with error handling.
suspend fun <T> withRetry(
maxAttempts: Int,
operation: suspend () -> T
): Either<Exception, T> = either {
var lastException: Exception? = null
repeat(maxAttempts) { attempt ->
try {
return@either operation()
} catch (e: Exception) {
lastException = e
if (attempt < maxAttempts - 1) {
delay(1000 * (attempt + 1)) // Exponential backoff
}
}
}
raise(lastException!!)
}suspend fun <T> withFallbackChain(
operations: List<suspend () -> T>
): Either<NonEmptyList<Exception>, T> = either {
val errors = mutableListOf<Exception>()
for (operation in operations) {
try {
return@either operation()
} catch (e: Exception) {
errors.add(e)
}
}
raise(NonEmptyList.fromListUnsafe(errors))
}data class User(val name: String, val email: String, val age: Int)
data class ValidationError(val field: String, val message: String)
suspend fun validateUser(userData: UserData): Either<NonEmptyList<ValidationError>, User> =
either {
parZipOrAccumulate(
{ validateName(userData.name) },
{ validateEmail(userData.email) },
{ validateAge(userData.age) }
) { name, email, age ->
User(name, email, age)
}
}
suspend fun validateName(name: String): String =
if (name.isBlank()) raise(ValidationError("name", "Name cannot be blank"))
else name.trim()
suspend fun validateEmail(email: String): String =
if (!email.contains("@")) raise(ValidationError("email", "Invalid email format"))
else email.lowercase()
suspend fun validateAge(age: Int): Int =
if (age < 0) raise(ValidationError("age", "Age must be positive"))
else ageval result = either<DatabaseError, ProcessingResult> {
resourceScope {
val connection = databaseResource
.releaseCase { conn, exitCase ->
when (exitCase) {
is ExitCase.Failure -> {
conn.rollback()
logError("Database transaction failed", exitCase.failure)
}
else -> conn.commit()
}
conn.close()
}
.bind()
// Use connection - errors automatically handled
processDataWithConnection(connection)
}
}suspend fun processWithMultipleResources(): Either<ProcessingError, Result> = either {
resourceScope {
val database = databaseResource.bind()
val cache = cacheResource.bind()
val httpClient = httpClientResource.bind()
val data = parZipOrAccumulate(
{ database.fetchUsers() },
{ cache.getConfiguration() },
{ httpClient.fetchMetadata() }
) { users, config, metadata ->
ProcessingInput(users, config, metadata)
}.bind()
processData(data)
}
}suspend fun fetchDataWithFallback(): Either<ServiceError, String> = either {
racing<String> {
raceOrThrow(raise = { error -> raise(error) }) {
primaryService.getData() // Can raise ServiceError
}
raceOrThrow(raise = { error -> raise(error) }) {
delay(2000)
fallbackService.getData() // Can raise ServiceError
}
}
}data class ProcessingError(val stage: String, val error: String)
suspend fun processDataPipeline(
inputs: List<InputData>
): Either<NonEmptyList<ProcessingError>, List<OutputData>> = either {
val validated = inputs.parMapOrAccumulate { input ->
validateInput(input).mapLeft { error ->
ProcessingError("validation", error.message)
}.bind()
}.bind()
val processed = validated.parMapOrAccumulate { validInput ->
processInput(validInput).mapLeft { error ->
ProcessingError("processing", error.message)
}.bind()
}.bind()
val enriched = processed.parMapOrAccumulate { processedInput ->
enrichData(processedInput).mapLeft { error ->
ProcessingError("enrichment", error.message)
}.bind()
}.bind()
enriched
}Install with Tessl CLI
npx tessl i tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm