Arrow Fx Coroutines provides functional effect types and utilities for managing side effects in Kotlin coroutines
—
Arrow FX Coroutines provides racing operations that allow multiple coroutine blocks to compete, with the first successful completion providing the result while all other blocks are cancelled. This enables powerful patterns for timeouts, fallbacks, and performance optimization.
suspend fun <A, B> raceN(crossinline fa: suspend CoroutineScope.() -> A, crossinline fb: suspend CoroutineScope.() -> B): Either<A, B>
suspend fun <A, B> raceN(ctx: CoroutineContext = EmptyCoroutineContext, crossinline fa: suspend CoroutineScope.() -> A, crossinline fb: suspend CoroutineScope.() -> B): Either<A, B>Race two operations, returning an Either indicating which operation won.
val result = raceN(
{ slowDatabaseQuery() },
{ fastCacheQuery() }
)
when (result) {
is Either.Left -> println("Database won: ${result.value}")
is Either.Right -> println("Cache won: ${result.value}")
}sealed class Race3<out A, out B, out C> {
data class First<A>(val winner: A) : Race3<A, Nothing, Nothing>()
data class Second<B>(val winner: B) : Race3<Nothing, B, Nothing>()
data class Third<C>(val winner: C) : Race3<Nothing, Nothing, C>()
fun <D> fold(
ifFirst: (A) -> D,
ifSecond: (B) -> D,
ifThird: (C) -> D
): D
}Result type for three-way races with pattern matching capability.
suspend fun <A, B, C> raceN(
crossinline fa: suspend CoroutineScope.() -> A,
crossinline fb: suspend CoroutineScope.() -> B,
crossinline fc: suspend CoroutineScope.() -> C
): Race3<A, B, C>
suspend fun <A, B, C> raceN(
ctx: CoroutineContext = EmptyCoroutineContext,
crossinline fa: suspend CoroutineScope.() -> A,
crossinline fb: suspend CoroutineScope.() -> B,
crossinline fc: suspend CoroutineScope.() -> C
): Race3<A, B, C>Race three operations with structured result handling.
val result = raceN(
{ primaryService.getData() },
{ secondaryService.getData() },
{ fallbackService.getData() }
)
val data = result.fold(
ifFirst = { primary -> "Primary: $primary" },
ifSecond = { secondary -> "Secondary: $secondary" },
ifThird = { fallback -> "Fallback: $fallback" }
)interface RacingScope<in Result> : CoroutineScope {
fun raceOrThrow(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> Result
)
}Advanced racing scope that allows building complex racing scenarios.
suspend fun <A> racing(block: RacingScope<A>.() -> Unit): ACreate a racing scope where multiple operations can compete.
val winner = racing<String> {
raceOrThrow { "Fast operation completed" }
raceOrThrow {
delay(1000)
"Slow operation completed"
}
raceOrThrow {
delay(500)
"Medium operation completed"
}
}fun <Result> RacingScope<Result>.race(
context: CoroutineContext = EmptyCoroutineContext,
condition: (Result) -> Boolean = { true },
block: suspend CoroutineScope.() -> Result
)Race with custom exception handling and success conditions.
val result = racing<String?> {
race(condition = { it != null }) {
unreliableService.getData() // May return null
}
race {
delay(5000)
"Timeout fallback"
}
}interface RaiseScope<in Error> : CoroutineScope, Raise<Error>
typealias RaiseHandler<Error> = (context: CoroutineContext, error: Error) -> UnitRacing operations that integrate with Arrow's Raise error handling system.
fun <Result> RacingScope<Result>.raceOrThrow(
raise: RaiseHandler<Error>,
condition: (Result) -> Boolean = { true },
block: suspend RaiseScope<Error>.() -> Result
)
fun <Result> RacingScope<Result>.race(
context: CoroutineContext = EmptyCoroutineContext,
raise: RaiseHandler<Error>,
condition: (Result) -> Boolean = { true },
block: suspend RaiseScope<Error>.() -> Result
)Race operations with structured error handling.
val result = either<ServiceError, String> {
racing<String> {
raceOrThrow(raise = { error -> raise(error) }) {
serviceA.getData() // Can raise ServiceError
}
raceOrThrow(raise = { error -> raise(error) }) {
serviceB.getData() // Can raise ServiceError
}
}
}suspend fun <T> withTimeout(timeoutMs: Long, operation: suspend () -> T): T? {
return raceN(
{ operation() },
{
delay(timeoutMs)
null
}
).fold(
{ result -> result },
{ null }
)
}
val result = withTimeout(5000) {
slowNetworkCall()
}suspend fun <T> withFallback(
primary: suspend () -> T,
fallback: suspend () -> T
): T {
return raceN(
{ primary() },
{
delay(3000) // Wait before trying fallback
fallback()
}
).fold(
{ it },
{ it }
)
}class CircuitBreaker<T> {
suspend fun executeWithFallback(
primary: suspend () -> T,
fallback: suspend () -> T
): T = racing {
raceOrThrow(condition = { isServiceHealthy() }) {
primary()
}
race {
delay(100) // Small delay before fallback
fallback()
}
}
}suspend fun <T> raceMultipleServices(
services: List<suspend () -> T>
): T = racing {
services.forEach { service ->
raceOrThrow { service() }
}
}
val data = raceMultipleServices(
listOf(
{ serviceA.getData() },
{ serviceB.getData() },
{ serviceC.getData() }
)
)All racing operations properly handle cancellation:
val result = racing<String> {
raceOrThrow {
resourceScope {
val connection = connectionResource.bind()
connection.query("SELECT data FROM table1")
}
}
raceOrThrow {
resourceScope {
val cache = cacheResource.bind()
cache.get("data_key")
}
}
}
// Losing operation's resources are automatically cleaned upRacing operations handle exceptions according to structured concurrency principles:
val result = try {
racing<String> {
raceOrThrow {
throw IllegalStateException("Service A failed")
}
raceOrThrow {
delay(100)
"Service B success"
}
}
} catch (e: IllegalStateException) {
"Handled failure: ${e.message}"
}val results = urls.parMap { url ->
raceN(
{ primaryClient.get(url) },
{
delay(2000)
fallbackClient.get(url)
}
).fold({ it }, { it })
}resourceScope {
val config = configResource.bind()
val data = racing<String> {
config.endpoints.forEach { endpoint ->
raceOrThrow {
httpClient.get(endpoint)
}
}
}
processData(data)
}Install with Tessl CLI
npx tessl i tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm