Arrow Fx Coroutines provides functional effect types and utilities for managing side effects in Kotlin coroutines
—
Arrow FX Coroutines provides advanced synchronization primitives and Flow extensions for coordinating concurrent operations and processing streams of data with timing and parallel capabilities.
class CountDownLatch(private val initial: Long) {
fun count(): Long
suspend fun await()
fun countDown()
}A synchronization primitive that allows coroutines to wait until a specified number of countdown signals have been received.
val latch = CountDownLatch(3)
// Start multiple coroutines
launch {
performTask1()
latch.countDown()
}
launch {
performTask2()
latch.countDown()
}
launch {
performTask3()
latch.countDown()
}
// Wait for all tasks to complete
latch.await()
println("All tasks completed!")class DataProcessor {
private val latch = CountDownLatch(1)
private var processedData: String? = null
suspend fun processData(input: String) {
// Simulate processing
delay(1000)
processedData = "Processed: $input"
latch.countDown()
}
suspend fun getResult(): String {
latch.await()
return processedData!!
}
}
val processor = DataProcessor()
launch { processor.processData("important data") }
val result = processor.getResult()class CyclicBarrier(val capacity: Int, barrierAction: () -> Unit = {}) {
val capacity: Int
suspend fun reset()
suspend fun await()
}
class CyclicBarrierCancellationException : CancellationExceptionA synchronization primitive that allows a set of coroutines to wait for each other to reach a common barrier point.
val barrier = CyclicBarrier(3) {
println("All workers reached the barrier!")
}
// Start workers
repeat(3) { workerId ->
launch {
repeat(5) { phase ->
performWork(workerId, phase)
println("Worker $workerId completed phase $phase")
barrier.await() // Wait for all workers
println("Worker $workerId starting next phase")
}
}
}class BatchProcessor<T>(private val batchSize: Int) {
private val barrier = CyclicBarrier(batchSize) {
println("Batch of $batchSize items ready for processing")
}
suspend fun submitItem(item: T) {
// Add item to batch
addToBatch(item)
// Wait for batch to fill
barrier.await()
// Process batch collectively
processBatch()
// Reset for next batch
barrier.reset()
}
}@ExperimentalAwaitAllApi
class AwaitAllScope {
fun <A> async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> A
): Deferred<A>
}A scope that automatically awaits all async operations created within it.
@ExperimentalAwaitAllApi
suspend fun <A> awaitAll(block: suspend AwaitAllScope.() -> A): A
@ExperimentalAwaitAllApi
suspend fun <A> awaitAll(context: CoroutineContext, block: suspend AwaitAllScope.() -> A): AExecute a block where all async operations are automatically awaited.
@OptIn(ExperimentalAwaitAllApi::class)
val results = awaitAll {
val deferred1 = async { fetchData1() }
val deferred2 = async { fetchData2() }
val deferred3 = async { fetchData3() }
// All deferreds are automatically awaited
// Results are available immediately
combineResults(deferred1.await(), deferred2.await(), deferred3.await())
}fun <A, B> Flow<A>.parMap(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend CoroutineScope.(A) -> B): Flow<B>
fun <A, B> Flow<A>.parMapUnordered(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend (A) -> B): Flow<B>
fun <A, B> Flow<A>.parMapNotNullUnordered(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend (A) -> B?): Flow<B>Process Flow elements in parallel while controlling concurrency.
val processedFlow = sourceFlow
.parMap(concurrency = 10) { item ->
expensiveOperation(item)
}
.collect { processedItem ->
// Items arrive in original order
println("Processed: $processedItem")
}val processedFlow = sourceFlow
.parMapUnordered(concurrency = 5) { item ->
asyncOperation(item)
}
.collect { processedItem ->
// Items arrive as soon as they're processed
println("Completed: $processedItem")
}fun <A> Flow<A>.repeat(): Flow<A>Repeat a Flow forever.
val heartbeatFlow = flowOf("ping")
.repeat()
.collect {
println("Heartbeat: $it")
delay(1000)
}fun <A> Flow<A>.metered(period: Duration): Flow<A>
fun <A> Flow<A>.metered(periodInMillis: Long): Flow<A>
fun <A, B> Flow<A>.mapIndexed(crossinline f: suspend (index: Int, value: A) -> B): Flow<B>Control the timing and indexing of Flow emissions.
val rateLimitedFlow = dataFlow
.metered(Duration.ofSeconds(1)) // One item per second
.collect { item ->
processItem(item)
}val indexedResults = sourceFlow
.mapIndexed { index, item ->
"Item $index: $item"
}
.collect { indexedItem ->
println(indexedItem)
}fun fixedRate(period: Duration, dampen: Boolean = true, timeStamp: () -> ComparableTimeMark = { TimeSource.Monotonic.markNow() }): Flow<Unit>
fun fixedRate(periodInMillis: Long, dampen: Boolean = true, timeStamp: () -> ComparableTimeMark = { TimeSource.Monotonic.markNow() }): Flow<Unit>Create a Flow that emits at fixed intervals.
val periodicTask = fixedRate(Duration.ofMinutes(5))
.collect {
performMaintenanceTask()
}// Dampen = true: Delays if processing takes longer than period
val dampenedFlow = fixedRate(Duration.ofSeconds(10), dampen = true)
.collect {
longRunningTask() // Won't overlap if it takes > 10 seconds
}
// Dampen = false: Strict timing regardless of processing time
val strictFlow = fixedRate(Duration.ofSeconds(10), dampen = false)
.collect {
quickTask() // Overlapping execution possible
}class PipelineStage<T>(private val capacity: Int) {
private val inputBarrier = CyclicBarrier(capacity)
private val outputBarrier = CyclicBarrier(capacity)
suspend fun process(items: List<T>): List<T> {
// Wait for all inputs
inputBarrier.await()
// Process in parallel
val results = items.parMap { item ->
processItem(item)
}
// Wait for all processing to complete
outputBarrier.await()
return results
}
}class CoordinatedResourcePool<T>(
private val resources: List<T>,
private val maxConcurrentUsers: Int
) {
private val accessBarrier = CyclicBarrier(maxConcurrentUsers)
suspend fun <R> useResource(operation: suspend (T) -> R): R {
accessBarrier.await() // Wait for access slot
return try {
val resource = acquireResource()
operation(resource)
} finally {
releaseResource()
accessBarrier.reset()
}
}
}class EventProcessor {
fun processEvents(eventFlow: Flow<Event>) = eventFlow
.parMapUnordered(concurrency = 20) { event ->
when (event.type) {
EventType.HIGH_PRIORITY -> processImmediately(event)
EventType.NORMAL -> processNormal(event)
EventType.BATCH -> processBatch(event)
}
}
.metered(Duration.ofMillis(100)) // Rate limit output
.collect { result ->
publishResult(result)
}
}resourceScope {
val database = databaseResource.bind()
val latch = CountDownLatch(3)
// Start parallel database operations
launch {
database.updateUsers()
latch.countDown()
}
launch {
database.updateProducts()
latch.countDown()
}
launch {
database.updateOrders()
latch.countDown()
}
// Wait for all updates to complete
latch.await()
// Perform final operation
database.generateReport()
}val processedResults = either<ProcessingError, List<Result>> {
dataFlow
.parMapUnordered(concurrency = 10) { item ->
validateAndProcess(item).bind()
}
.metered(Duration.ofSeconds(1))
.toList()
}class WorkflowCoordinator(private val stages: Int) {
private val stageBarriers = (0 until stages).map {
CyclicBarrier(10) // 10 workers per stage
}
suspend fun executeWorkflow(data: List<WorkItem>) {
data.chunked(10).forEachIndexed { stageIndex, batch ->
batch.parMap { item ->
processAtStage(item, stageIndex)
stageBarriers[stageIndex].await()
}
}
}
}@ExperimentalAwaitAllApi
class AwaitAllScope(scope: CoroutineScope) : CoroutineScope by scope {
fun <T> async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>
}
@ExperimentalAwaitAllApi
suspend fun <A> awaitAll(block: suspend AwaitAllScope.() -> A): A
@ExperimentalAwaitAllApi
suspend fun <A> CoroutineScope.awaitAll(block: suspend AwaitAllScope.() -> A): A
@RequiresOptIn(level = RequiresOptIn.Level.WARNING, message = "This API is work-in-progress and is subject to change.")
@Retention(AnnotationRetention.BINARY)
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.CLASS)
annotation class ExperimentalAwaitAllApiExperimental scope for automatic await management of async operations.
@OptIn(ExperimentalAwaitAllApi::class)
suspend fun fetchDataFromMultipleSources(): CombinedData = awaitAll {
// All async calls within this scope are automatically awaited
val userData = async { userService.getData() }
val settingsData = async { settingsService.getData() }
val notificationData = async { notificationService.getData() }
// Results are automatically awaited when accessed
CombinedData(
user = userData.await(),
settings = settingsData.await(),
notifications = notificationData.await()
)
}⚠️ Warning: This API is experimental and subject to change in future versions.
Install with Tessl CLI
npx tessl i tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm