Arrow Fx Coroutines provides functional effect types and utilities for managing side effects in Kotlin coroutines
npx @tessl/cli install tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm@2.2.0Arrow FX Coroutines is a comprehensive Kotlin library that provides functional effect types and utilities for managing side effects in Kotlin coroutines. It offers structured concurrency patterns, resource management, parallel processing capabilities, and advanced synchronization primitives designed around functional programming principles.
implementation("io.arrow-kt:arrow-fx-coroutines:2.2.0-beta.3")import arrow.fx.coroutines.*For specific functionality:
import arrow.fx.coroutines.Resource
import arrow.fx.coroutines.parZip
import arrow.fx.coroutines.resourceScopeimport arrow.fx.coroutines.*
suspend fun main() {
// Resource management with automatic cleanup
resourceScope {
val file = resource({ openFile() }, { it.close() }).bind()
val connection = resource({ openConnection() }, { it.close() }).bind()
// Use resources safely - cleanup guaranteed
processData(file, connection)
}
// Parallel execution
val result = parZip(
{ fetchUserData() },
{ fetchSettings() }
) { userData, settings ->
combineData(userData, settings)
}
// Racing operations
val winner = raceN(
{ slowOperation() },
{ fastOperation() }
)
}Arrow FX Coroutines is built around several key concepts:
Comprehensive resource management with automatic cleanup and proper exception handling.
suspend fun <A> resourceScope(action: suspend ResourceScope.() -> A): A
fun <A> resource(block: suspend ResourceScope.() -> A): Resource<A>
fun <A> resource(acquire: suspend () -> A, release: suspend (A, ExitCase) -> Unit): Resource<A>
suspend fun <A, B> Resource<A>.use(f: suspend (A) -> B): B
typealias Resource<A> = suspend ResourceScope.() -> AFor detailed resource management patterns, lifecycle control, and advanced resource composition, see Resource Management.
High-performance parallel execution with concurrency control and error accumulation.
suspend fun <A, B> Iterable<A>.parMap(context: CoroutineContext = EmptyCoroutineContext, concurrency: Int, f: suspend CoroutineScope.(A) -> B): List<B>
suspend fun <A, B> Iterable<A>.parMap(context: CoroutineContext = EmptyCoroutineContext, transform: suspend CoroutineScope.(A) -> B): List<B>
suspend fun <A, B, C> parZip(crossinline fa: suspend CoroutineScope.() -> A, crossinline fb: suspend CoroutineScope.() -> B, crossinline f: suspend CoroutineScope.(A, B) -> C): C
suspend fun <A, B, C> parZip(ctx: CoroutineContext = EmptyCoroutineContext, crossinline fa: suspend CoroutineScope.() -> A, crossinline fb: suspend CoroutineScope.() -> B, crossinline f: suspend CoroutineScope.(A, B) -> C): CFor parallel mapping, zipping operations, and error handling strategies, see Parallel Processing.
Competitive execution patterns where multiple operations race to completion.
suspend fun <A, B> raceN(crossinline fa: suspend CoroutineScope.() -> A, crossinline fb: suspend CoroutineScope.() -> B): Either<A, B>
suspend fun <A, B> raceN(ctx: CoroutineContext = EmptyCoroutineContext, crossinline fa: suspend CoroutineScope.() -> A, crossinline fb: suspend CoroutineScope.() -> B): Either<A, B>
suspend fun <A> racing(block: RacingScope<A>.() -> Unit): AFor racing operations, timeout handling, and competitive execution patterns, see Racing Operations.
Integration with Arrow's Raise system for functional error handling and accumulation.
suspend fun <Error, A, B> Iterable<A>.parMapOrAccumulate(context: CoroutineContext = EmptyCoroutineContext, transform: suspend ScopedRaiseAccumulate<Error>.(A) -> B): Either<NonEmptyList<Error>, List<B>>
suspend fun <E, A, B, C> Raise<NonEmptyList<E>>.parZipOrAccumulate(crossinline fa: suspend ScopedRaiseAccumulate<E>.() -> A, crossinline fb: suspend ScopedRaiseAccumulate<E>.() -> B, crossinline transform: suspend CoroutineScope.(A, B) -> C): CFor error accumulation, structured error handling, and integration with Arrow's type system, see Error Handling.
Advanced synchronization primitives and Flow extensions for concurrent coordination.
class CountDownLatch(initial: Long) {
fun count(): Long
suspend fun await()
fun countDown()
}
class CyclicBarrier(capacity: Int, barrierAction: () -> Unit = {}) {
val capacity: Int
suspend fun await()
suspend fun reset()
}
suspend fun <A, B> Flow<A>.parMap(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend CoroutineScope.(A) -> B): Flow<B>For synchronization primitives, Flow extensions, and timing utilities, see Synchronization and Flow.
Fundamental bracket patterns for safe resource acquisition and guaranteed cleanup.
suspend fun <A, B> bracket(crossinline acquire: suspend () -> A, use: suspend (A) -> B, crossinline release: suspend (A) -> Unit): B
suspend fun <A, B> bracketCase(crossinline acquire: suspend () -> A, use: suspend (A) -> B, crossinline release: suspend (A, ExitCase) -> Unit): B
suspend fun <A> guarantee(fa: suspend () -> A, crossinline finalizer: suspend () -> Unit): A
suspend fun <A> guaranteeCase(fa: suspend () -> A, crossinline finalizer: suspend (ExitCase) -> Unit): A
suspend fun <A> onCancel(fa: suspend () -> A, crossinline onCancel: suspend () -> Unit): AFor fundamental resource safety patterns and bracket operations, see Resource Management.
sealed class ExitCase {
object Completed : ExitCase
data class Cancelled(val exception: CancellationException) : ExitCase
data class Failure(val failure: Throwable) : ExitCase
companion object {
fun ExitCase(error: Throwable): ExitCase
}
}All operations in Arrow FX Coroutines properly handle cancellation and exceptions: