CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm

Arrow Fx Coroutines provides functional effect types and utilities for managing side effects in Kotlin coroutines

Pending
Overview
Eval results
Files

racing.mddocs/

Racing Operations

Arrow FX Coroutines provides racing operations that allow multiple coroutine blocks to compete, with the first successful completion providing the result while all other blocks are cancelled. This enables powerful patterns for timeouts, fallbacks, and performance optimization.

Binary Racing

Two-Way Race with Either Result

suspend fun <A, B> raceN(crossinline fa: suspend CoroutineScope.() -> A, crossinline fb: suspend CoroutineScope.() -> B): Either<A, B>
suspend fun <A, B> raceN(ctx: CoroutineContext = EmptyCoroutineContext, crossinline fa: suspend CoroutineScope.() -> A, crossinline fb: suspend CoroutineScope.() -> B): Either<A, B>

Race two operations, returning an Either indicating which operation won.

val result = raceN(
    { slowDatabaseQuery() },
    { fastCacheQuery() }
)

when (result) {
    is Either.Left -> println("Database won: ${result.value}")
    is Either.Right -> println("Cache won: ${result.value}")
}

Ternary Racing

Three-Way Race Result Type

sealed class Race3<out A, out B, out C> {
    data class First<A>(val winner: A) : Race3<A, Nothing, Nothing>()
    data class Second<B>(val winner: B) : Race3<Nothing, B, Nothing>()
    data class Third<C>(val winner: C) : Race3<Nothing, Nothing, C>()
    
    fun <D> fold(
        ifFirst: (A) -> D,
        ifSecond: (B) -> D,
        ifThird: (C) -> D
    ): D
}

Result type for three-way races with pattern matching capability.

Three-Way Race Operations

suspend fun <A, B, C> raceN(
    crossinline fa: suspend CoroutineScope.() -> A,
    crossinline fb: suspend CoroutineScope.() -> B,
    crossinline fc: suspend CoroutineScope.() -> C
): Race3<A, B, C>

suspend fun <A, B, C> raceN(
    ctx: CoroutineContext = EmptyCoroutineContext,
    crossinline fa: suspend CoroutineScope.() -> A,
    crossinline fb: suspend CoroutineScope.() -> B,
    crossinline fc: suspend CoroutineScope.() -> C
): Race3<A, B, C>

Race three operations with structured result handling.

val result = raceN(
    { primaryService.getData() },
    { secondaryService.getData() },
    { fallbackService.getData() }
)

val data = result.fold(
    ifFirst = { primary -> "Primary: $primary" },
    ifSecond = { secondary -> "Secondary: $secondary" },
    ifThird = { fallback -> "Fallback: $fallback" }
)

Advanced Racing with RacingScope

RacingScope Interface

interface RacingScope<in Result> : CoroutineScope {
    fun raceOrThrow(
        context: CoroutineContext = EmptyCoroutineContext,
        block: suspend CoroutineScope.() -> Result
    )
}

Advanced racing scope that allows building complex racing scenarios.

Racing Scope Execution

suspend fun <A> racing(block: RacingScope<A>.() -> Unit): A

Create a racing scope where multiple operations can compete.

val winner = racing<String> {
    raceOrThrow { "Fast operation completed" }
    raceOrThrow { 
        delay(1000)
        "Slow operation completed" 
    }
    raceOrThrow {
        delay(500)
        "Medium operation completed"
    }
}

Racing with Exception Handling

fun <Result> RacingScope<Result>.race(
    context: CoroutineContext = EmptyCoroutineContext,
    condition: (Result) -> Boolean = { true },
    block: suspend CoroutineScope.() -> Result
)

Race with custom exception handling and success conditions.

val result = racing<String?> {
    race(condition = { it != null }) {
        unreliableService.getData() // May return null
    }
    
    race {
        delay(5000)
        "Timeout fallback"
    }
}

Racing with Error Handling

RaiseScope Integration

interface RaiseScope<in Error> : CoroutineScope, Raise<Error>
typealias RaiseHandler<Error> = (context: CoroutineContext, error: Error) -> Unit

Racing operations that integrate with Arrow's Raise error handling system.

Racing with Raise Error Handling

fun <Result> RacingScope<Result>.raceOrThrow(
    raise: RaiseHandler<Error>,
    condition: (Result) -> Boolean = { true },
    block: suspend RaiseScope<Error>.() -> Result
)

fun <Result> RacingScope<Result>.race(
    context: CoroutineContext = EmptyCoroutineContext,
    raise: RaiseHandler<Error>,
    condition: (Result) -> Boolean = { true },
    block: suspend RaiseScope<Error>.() -> Result
)

Race operations with structured error handling.

val result = either<ServiceError, String> {
    racing<String> {
        raceOrThrow(raise = { error -> raise(error) }) {
            serviceA.getData() // Can raise ServiceError
        }
        
        raceOrThrow(raise = { error -> raise(error) }) {
            serviceB.getData() // Can raise ServiceError
        }
    }
}

Common Racing Patterns

Timeout Pattern

suspend fun <T> withTimeout(timeoutMs: Long, operation: suspend () -> T): T? {
    return raceN(
        { operation() },
        { 
            delay(timeoutMs)
            null
        }
    ).fold(
        { result -> result },
        { null }
    )
}

val result = withTimeout(5000) {
    slowNetworkCall()
}

Fallback Pattern

suspend fun <T> withFallback(
    primary: suspend () -> T,
    fallback: suspend () -> T
): T {
    return raceN(
        { primary() },
        { 
            delay(3000) // Wait before trying fallback
            fallback()
        }
    ).fold(
        { it },
        { it }
    )
}

Circuit Breaker Pattern

class CircuitBreaker<T> {
    suspend fun executeWithFallback(
        primary: suspend () -> T,
        fallback: suspend () -> T
    ): T = racing {
        raceOrThrow(condition = { isServiceHealthy() }) {
            primary()
        }
        
        race {
            delay(100) // Small delay before fallback
            fallback()
        }
    }
}

Multiple Service Racing

suspend fun <T> raceMultipleServices(
    services: List<suspend () -> T>
): T = racing {
    services.forEach { service ->
        raceOrThrow { service() }
    }
}

val data = raceMultipleServices(
    listOf(
        { serviceA.getData() },
        { serviceB.getData() },
        { serviceC.getData() }
    )
)

Performance and Cancellation

Cancellation Behavior

All racing operations properly handle cancellation:

  • When one operation completes, all other operations are immediately cancelled
  • Resources are properly cleaned up in cancelled operations
  • Parent scope cancellation is propagated to all racing operations

Resource Management in Racing

val result = racing<String> {
    raceOrThrow {
        resourceScope {
            val connection = connectionResource.bind()
            connection.query("SELECT data FROM table1")
        }
    }
    
    raceOrThrow {
        resourceScope {
            val cache = cacheResource.bind()
            cache.get("data_key")
        }
    }
}
// Losing operation's resources are automatically cleaned up

Exception Propagation

Racing operations handle exceptions according to structured concurrency principles:

val result = try {
    racing<String> {
        raceOrThrow { 
            throw IllegalStateException("Service A failed")
        }
        
        raceOrThrow {
            delay(100)
            "Service B success"
        }
    }
} catch (e: IllegalStateException) {
    "Handled failure: ${e.message}"
}

Integration with Other Patterns

Racing with Parallel Processing

val results = urls.parMap { url ->
    raceN(
        { primaryClient.get(url) },
        { 
            delay(2000)
            fallbackClient.get(url)
        }
    ).fold({ it }, { it })
}

Racing with Resource Management

resourceScope {
    val config = configResource.bind()
    
    val data = racing<String> {
        config.endpoints.forEach { endpoint ->
            raceOrThrow {
                httpClient.get(endpoint)
            }
        }
    }
    
    processData(data)
}

Install with Tessl CLI

npx tessl i tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm

docs

error-handling.md

index.md

parallel-processing.md

racing.md

resource-management.md

synchronization-flow.md

tile.json