CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm

Arrow Fx Coroutines provides functional effect types and utilities for managing side effects in Kotlin coroutines

Pending
Overview
Eval results
Files

synchronization-flow.mddocs/

Synchronization and Flow Processing

Arrow FX Coroutines provides advanced synchronization primitives and Flow extensions for coordinating concurrent operations and processing streams of data with timing and parallel capabilities.

Synchronization Primitives

CountDownLatch

class CountDownLatch(private val initial: Long) {
    fun count(): Long
    suspend fun await()
    fun countDown()
}

A synchronization primitive that allows coroutines to wait until a specified number of countdown signals have been received.

CountDownLatch Usage

val latch = CountDownLatch(3)

// Start multiple coroutines
launch { 
    performTask1()
    latch.countDown()
}

launch { 
    performTask2()
    latch.countDown()
}

launch { 
    performTask3()
    latch.countDown()
}

// Wait for all tasks to complete
latch.await()
println("All tasks completed!")

Producer-Consumer Pattern

class DataProcessor {
    private val latch = CountDownLatch(1)
    private var processedData: String? = null
    
    suspend fun processData(input: String) {
        // Simulate processing
        delay(1000)
        processedData = "Processed: $input"
        latch.countDown()
    }
    
    suspend fun getResult(): String {
        latch.await()
        return processedData!!
    }
}

val processor = DataProcessor()
launch { processor.processData("important data") }
val result = processor.getResult()

CyclicBarrier

class CyclicBarrier(val capacity: Int, barrierAction: () -> Unit = {}) {
    val capacity: Int
    suspend fun reset()
    suspend fun await()
}

class CyclicBarrierCancellationException : CancellationException

A synchronization primitive that allows a set of coroutines to wait for each other to reach a common barrier point.

CyclicBarrier Usage

val barrier = CyclicBarrier(3) {
    println("All workers reached the barrier!")
}

// Start workers
repeat(3) { workerId ->
    launch {
        repeat(5) { phase ->
            performWork(workerId, phase)
            println("Worker $workerId completed phase $phase")
            
            barrier.await() // Wait for all workers
            
            println("Worker $workerId starting next phase")
        }
    }
}

Batch Processing Pattern

class BatchProcessor<T>(private val batchSize: Int) {
    private val barrier = CyclicBarrier(batchSize) {
        println("Batch of $batchSize items ready for processing")
    }
    
    suspend fun submitItem(item: T) {
        // Add item to batch
        addToBatch(item)
        
        // Wait for batch to fill
        barrier.await()
        
        // Process batch collectively
        processBatch()
        
        // Reset for next batch
        barrier.reset()
    }
}

Experimental AwaitAll API

AwaitAllScope

@ExperimentalAwaitAllApi
class AwaitAllScope {
    fun <A> async(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> A
    ): Deferred<A>
}

A scope that automatically awaits all async operations created within it.

AwaitAll Functions

@ExperimentalAwaitAllApi
suspend fun <A> awaitAll(block: suspend AwaitAllScope.() -> A): A

@ExperimentalAwaitAllApi
suspend fun <A> awaitAll(context: CoroutineContext, block: suspend AwaitAllScope.() -> A): A

Execute a block where all async operations are automatically awaited.

@OptIn(ExperimentalAwaitAllApi::class)
val results = awaitAll {
    val deferred1 = async { fetchData1() }
    val deferred2 = async { fetchData2() }
    val deferred3 = async { fetchData3() }
    
    // All deferreds are automatically awaited
    // Results are available immediately
    combineResults(deferred1.await(), deferred2.await(), deferred3.await())
}

Flow Extensions

Parallel Flow Processing

fun <A, B> Flow<A>.parMap(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend CoroutineScope.(A) -> B): Flow<B>
fun <A, B> Flow<A>.parMapUnordered(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend (A) -> B): Flow<B>
fun <A, B> Flow<A>.parMapNotNullUnordered(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend (A) -> B?): Flow<B>

Process Flow elements in parallel while controlling concurrency.

Ordered Parallel Processing

val processedFlow = sourceFlow
    .parMap(concurrency = 10) { item ->
        expensiveOperation(item)
    }
    .collect { processedItem ->
        // Items arrive in original order
        println("Processed: $processedItem")
    }

Unordered Parallel Processing

val processedFlow = sourceFlow
    .parMapUnordered(concurrency = 5) { item ->
        asyncOperation(item)
    }
    .collect { processedItem ->
        // Items arrive as soon as they're processed
        println("Completed: $processedItem")
    }

Flow Repetition

fun <A> Flow<A>.repeat(): Flow<A>

Repeat a Flow forever.

val heartbeatFlow = flowOf("ping")
    .repeat()
    .collect { 
        println("Heartbeat: $it")
        delay(1000)
    }

Timed Flow Operations

fun <A> Flow<A>.metered(period: Duration): Flow<A>
fun <A> Flow<A>.metered(periodInMillis: Long): Flow<A>
fun <A, B> Flow<A>.mapIndexed(crossinline f: suspend (index: Int, value: A) -> B): Flow<B>

Control the timing and indexing of Flow emissions.

Rate-Limited Processing

val rateLimitedFlow = dataFlow
    .metered(Duration.ofSeconds(1)) // One item per second
    .collect { item ->
        processItem(item)
    }

Indexed Processing

val indexedResults = sourceFlow
    .mapIndexed { index, item ->
        "Item $index: $item"
    }
    .collect { indexedItem ->
        println(indexedItem)
    }

Fixed Rate Flow Generation

fun fixedRate(period: Duration, dampen: Boolean = true, timeStamp: () -> ComparableTimeMark = { TimeSource.Monotonic.markNow() }): Flow<Unit>
fun fixedRate(periodInMillis: Long, dampen: Boolean = true, timeStamp: () -> ComparableTimeMark = { TimeSource.Monotonic.markNow() }): Flow<Unit>

Create a Flow that emits at fixed intervals.

Periodic Tasks

val periodicTask = fixedRate(Duration.ofMinutes(5))
    .collect {
        performMaintenanceTask()
    }

With Dampening

// Dampen = true: Delays if processing takes longer than period
val dampenedFlow = fixedRate(Duration.ofSeconds(10), dampen = true)
    .collect {
        longRunningTask() // Won't overlap if it takes > 10 seconds
    }

// Dampen = false: Strict timing regardless of processing time
val strictFlow = fixedRate(Duration.ofSeconds(10), dampen = false)
    .collect {
        quickTask() // Overlapping execution possible
    }

Advanced Synchronization Patterns

Multi-Stage Pipeline

class PipelineStage<T>(private val capacity: Int) {
    private val inputBarrier = CyclicBarrier(capacity)
    private val outputBarrier = CyclicBarrier(capacity)
    
    suspend fun process(items: List<T>): List<T> {
        // Wait for all inputs
        inputBarrier.await()
        
        // Process in parallel
        val results = items.parMap { item ->
            processItem(item)
        }
        
        // Wait for all processing to complete
        outputBarrier.await()
        
        return results
    }
}

Coordinated Resource Access

class CoordinatedResourcePool<T>(
    private val resources: List<T>,
    private val maxConcurrentUsers: Int
) {
    private val accessBarrier = CyclicBarrier(maxConcurrentUsers)
    
    suspend fun <R> useResource(operation: suspend (T) -> R): R {
        accessBarrier.await() // Wait for access slot
        
        return try {
            val resource = acquireResource()
            operation(resource)
        } finally {
            releaseResource()
            accessBarrier.reset()
        }
    }
}

Flow-Based Event Processing

class EventProcessor {
    fun processEvents(eventFlow: Flow<Event>) = eventFlow
        .parMapUnordered(concurrency = 20) { event ->
            when (event.type) {
                EventType.HIGH_PRIORITY -> processImmediately(event)
                EventType.NORMAL -> processNormal(event)
                EventType.BATCH -> processBatch(event)
            }
        }
        .metered(Duration.ofMillis(100)) // Rate limit output
        .collect { result ->
            publishResult(result)
        }
}

Integration Examples

Synchronization with Resource Management

resourceScope {
    val database = databaseResource.bind()
    val latch = CountDownLatch(3)
    
    // Start parallel database operations
    launch { 
        database.updateUsers()
        latch.countDown()
    }
    
    launch {
        database.updateProducts()
        latch.countDown()
    }
    
    launch {
        database.updateOrders()
        latch.countDown()
    }
    
    // Wait for all updates to complete
    latch.await()
    
    // Perform final operation
    database.generateReport()
}

Flow Processing with Error Handling

val processedResults = either<ProcessingError, List<Result>> {
    dataFlow
        .parMapUnordered(concurrency = 10) { item ->
            validateAndProcess(item).bind()
        }
        .metered(Duration.ofSeconds(1))
        .toList()
}

Complex Coordination Example

class WorkflowCoordinator(private val stages: Int) {
    private val stageBarriers = (0 until stages).map { 
        CyclicBarrier(10) // 10 workers per stage
    }
    
    suspend fun executeWorkflow(data: List<WorkItem>) {
        data.chunked(10).forEachIndexed { stageIndex, batch ->
            batch.parMap { item ->
                processAtStage(item, stageIndex)
                stageBarriers[stageIndex].await()
            }
        }
    }
}

Experimental APIs

AwaitAllScope (Experimental)

@ExperimentalAwaitAllApi
class AwaitAllScope(scope: CoroutineScope) : CoroutineScope by scope {
    fun <T> async(
        context: CoroutineContext = EmptyCoroutineContext,
        start: CoroutineStart = CoroutineStart.DEFAULT,
        block: suspend CoroutineScope.() -> T
    ): Deferred<T>
}

@ExperimentalAwaitAllApi
suspend fun <A> awaitAll(block: suspend AwaitAllScope.() -> A): A

@ExperimentalAwaitAllApi
suspend fun <A> CoroutineScope.awaitAll(block: suspend AwaitAllScope.() -> A): A

@RequiresOptIn(level = RequiresOptIn.Level.WARNING, message = "This API is work-in-progress and is subject to change.")
@Retention(AnnotationRetention.BINARY) 
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.CLASS)
annotation class ExperimentalAwaitAllApi

Experimental scope for automatic await management of async operations.

AwaitAllScope Usage

@OptIn(ExperimentalAwaitAllApi::class)
suspend fun fetchDataFromMultipleSources(): CombinedData = awaitAll {
    // All async calls within this scope are automatically awaited
    val userData = async { userService.getData() }
    val settingsData = async { settingsService.getData() }
    val notificationData = async { notificationService.getData() }
    
    // Results are automatically awaited when accessed
    CombinedData(
        user = userData.await(),
        settings = settingsData.await(),
        notifications = notificationData.await()
    )
}

⚠️ Warning: This API is experimental and subject to change in future versions.

Install with Tessl CLI

npx tessl i tessl/maven-io-arrow-kt--arrow-fx-coroutines-jvm

docs

error-handling.md

index.md

parallel-processing.md

racing.md

resource-management.md

synchronization-flow.md

tile.json