0
# Synchronization and Flow Processing
1
2
Arrow FX Coroutines provides advanced synchronization primitives and Flow extensions for coordinating concurrent operations and processing streams of data with timing and parallel capabilities.
3
4
## Synchronization Primitives
5
6
### CountDownLatch
7
8
```kotlin { .api }
9
class CountDownLatch(private val initial: Long) {
10
fun count(): Long
11
suspend fun await()
12
fun countDown()
13
}
14
```
15
16
A synchronization primitive that allows coroutines to wait until a specified number of countdown signals have been received.
17
18
#### CountDownLatch Usage
19
20
```kotlin
21
val latch = CountDownLatch(3)
22
23
// Start multiple coroutines
24
launch {
25
performTask1()
26
latch.countDown()
27
}
28
29
launch {
30
performTask2()
31
latch.countDown()
32
}
33
34
launch {
35
performTask3()
36
latch.countDown()
37
}
38
39
// Wait for all tasks to complete
40
latch.await()
41
println("All tasks completed!")
42
```
43
44
#### Producer-Consumer Pattern
45
46
```kotlin
47
class DataProcessor {
48
private val latch = CountDownLatch(1)
49
private var processedData: String? = null
50
51
suspend fun processData(input: String) {
52
// Simulate processing
53
delay(1000)
54
processedData = "Processed: $input"
55
latch.countDown()
56
}
57
58
suspend fun getResult(): String {
59
latch.await()
60
return processedData!!
61
}
62
}
63
64
val processor = DataProcessor()
65
launch { processor.processData("important data") }
66
val result = processor.getResult()
67
```
68
69
### CyclicBarrier
70
71
```kotlin { .api }
72
class CyclicBarrier(val capacity: Int, barrierAction: () -> Unit = {}) {
73
val capacity: Int
74
suspend fun reset()
75
suspend fun await()
76
}
77
78
class CyclicBarrierCancellationException : CancellationException
79
```
80
81
A synchronization primitive that allows a set of coroutines to wait for each other to reach a common barrier point.
82
83
#### CyclicBarrier Usage
84
85
```kotlin
86
val barrier = CyclicBarrier(3) {
87
println("All workers reached the barrier!")
88
}
89
90
// Start workers
91
repeat(3) { workerId ->
92
launch {
93
repeat(5) { phase ->
94
performWork(workerId, phase)
95
println("Worker $workerId completed phase $phase")
96
97
barrier.await() // Wait for all workers
98
99
println("Worker $workerId starting next phase")
100
}
101
}
102
}
103
```
104
105
#### Batch Processing Pattern
106
107
```kotlin
108
class BatchProcessor<T>(private val batchSize: Int) {
109
private val barrier = CyclicBarrier(batchSize) {
110
println("Batch of $batchSize items ready for processing")
111
}
112
113
suspend fun submitItem(item: T) {
114
// Add item to batch
115
addToBatch(item)
116
117
// Wait for batch to fill
118
barrier.await()
119
120
// Process batch collectively
121
processBatch()
122
123
// Reset for next batch
124
barrier.reset()
125
}
126
}
127
```
128
129
## Experimental AwaitAll API
130
131
### AwaitAllScope
132
133
```kotlin { .api }
134
@ExperimentalAwaitAllApi
135
class AwaitAllScope {
136
fun <A> async(
137
context: CoroutineContext = EmptyCoroutineContext,
138
start: CoroutineStart = CoroutineStart.DEFAULT,
139
block: suspend CoroutineScope.() -> A
140
): Deferred<A>
141
}
142
```
143
144
A scope that automatically awaits all async operations created within it.
145
146
### AwaitAll Functions
147
148
```kotlin { .api }
149
@ExperimentalAwaitAllApi
150
suspend fun <A> awaitAll(block: suspend AwaitAllScope.() -> A): A
151
152
@ExperimentalAwaitAllApi
153
suspend fun <A> awaitAll(context: CoroutineContext, block: suspend AwaitAllScope.() -> A): A
154
```
155
156
Execute a block where all async operations are automatically awaited.
157
158
```kotlin
159
@OptIn(ExperimentalAwaitAllApi::class)
160
val results = awaitAll {
161
val deferred1 = async { fetchData1() }
162
val deferred2 = async { fetchData2() }
163
val deferred3 = async { fetchData3() }
164
165
// All deferreds are automatically awaited
166
// Results are available immediately
167
combineResults(deferred1.await(), deferred2.await(), deferred3.await())
168
}
169
```
170
171
## Flow Extensions
172
173
### Parallel Flow Processing
174
175
```kotlin { .api }
176
fun <A, B> Flow<A>.parMap(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend CoroutineScope.(A) -> B): Flow<B>
177
fun <A, B> Flow<A>.parMapUnordered(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend (A) -> B): Flow<B>
178
fun <A, B> Flow<A>.parMapNotNullUnordered(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend (A) -> B?): Flow<B>
179
```
180
181
Process Flow elements in parallel while controlling concurrency.
182
183
#### Ordered Parallel Processing
184
185
```kotlin
186
val processedFlow = sourceFlow
187
.parMap(concurrency = 10) { item ->
188
expensiveOperation(item)
189
}
190
.collect { processedItem ->
191
// Items arrive in original order
192
println("Processed: $processedItem")
193
}
194
```
195
196
#### Unordered Parallel Processing
197
198
```kotlin
199
val processedFlow = sourceFlow
200
.parMapUnordered(concurrency = 5) { item ->
201
asyncOperation(item)
202
}
203
.collect { processedItem ->
204
// Items arrive as soon as they're processed
205
println("Completed: $processedItem")
206
}
207
```
208
209
### Flow Repetition
210
211
```kotlin { .api }
212
fun <A> Flow<A>.repeat(): Flow<A>
213
```
214
215
Repeat a Flow forever.
216
217
```kotlin
218
val heartbeatFlow = flowOf("ping")
219
.repeat()
220
.collect {
221
println("Heartbeat: $it")
222
delay(1000)
223
}
224
```
225
226
### Timed Flow Operations
227
228
```kotlin { .api }
229
fun <A> Flow<A>.metered(period: Duration): Flow<A>
230
fun <A> Flow<A>.metered(periodInMillis: Long): Flow<A>
231
fun <A, B> Flow<A>.mapIndexed(crossinline f: suspend (index: Int, value: A) -> B): Flow<B>
232
```
233
234
Control the timing and indexing of Flow emissions.
235
236
#### Rate-Limited Processing
237
238
```kotlin
239
val rateLimitedFlow = dataFlow
240
.metered(Duration.ofSeconds(1)) // One item per second
241
.collect { item ->
242
processItem(item)
243
}
244
```
245
246
#### Indexed Processing
247
248
```kotlin
249
val indexedResults = sourceFlow
250
.mapIndexed { index, item ->
251
"Item $index: $item"
252
}
253
.collect { indexedItem ->
254
println(indexedItem)
255
}
256
```
257
258
### Fixed Rate Flow Generation
259
260
```kotlin { .api }
261
fun fixedRate(period: Duration, dampen: Boolean = true, timeStamp: () -> ComparableTimeMark = { TimeSource.Monotonic.markNow() }): Flow<Unit>
262
fun fixedRate(periodInMillis: Long, dampen: Boolean = true, timeStamp: () -> ComparableTimeMark = { TimeSource.Monotonic.markNow() }): Flow<Unit>
263
```
264
265
Create a Flow that emits at fixed intervals.
266
267
#### Periodic Tasks
268
269
```kotlin
270
val periodicTask = fixedRate(Duration.ofMinutes(5))
271
.collect {
272
performMaintenanceTask()
273
}
274
```
275
276
#### With Dampening
277
278
```kotlin
279
// Dampen = true: Delays if processing takes longer than period
280
val dampenedFlow = fixedRate(Duration.ofSeconds(10), dampen = true)
281
.collect {
282
longRunningTask() // Won't overlap if it takes > 10 seconds
283
}
284
285
// Dampen = false: Strict timing regardless of processing time
286
val strictFlow = fixedRate(Duration.ofSeconds(10), dampen = false)
287
.collect {
288
quickTask() // Overlapping execution possible
289
}
290
```
291
292
## Advanced Synchronization Patterns
293
294
### Multi-Stage Pipeline
295
296
```kotlin
297
class PipelineStage<T>(private val capacity: Int) {
298
private val inputBarrier = CyclicBarrier(capacity)
299
private val outputBarrier = CyclicBarrier(capacity)
300
301
suspend fun process(items: List<T>): List<T> {
302
// Wait for all inputs
303
inputBarrier.await()
304
305
// Process in parallel
306
val results = items.parMap { item ->
307
processItem(item)
308
}
309
310
// Wait for all processing to complete
311
outputBarrier.await()
312
313
return results
314
}
315
}
316
```
317
318
### Coordinated Resource Access
319
320
```kotlin
321
class CoordinatedResourcePool<T>(
322
private val resources: List<T>,
323
private val maxConcurrentUsers: Int
324
) {
325
private val accessBarrier = CyclicBarrier(maxConcurrentUsers)
326
327
suspend fun <R> useResource(operation: suspend (T) -> R): R {
328
accessBarrier.await() // Wait for access slot
329
330
return try {
331
val resource = acquireResource()
332
operation(resource)
333
} finally {
334
releaseResource()
335
accessBarrier.reset()
336
}
337
}
338
}
339
```
340
341
### Flow-Based Event Processing
342
343
```kotlin
344
class EventProcessor {
345
fun processEvents(eventFlow: Flow<Event>) = eventFlow
346
.parMapUnordered(concurrency = 20) { event ->
347
when (event.type) {
348
EventType.HIGH_PRIORITY -> processImmediately(event)
349
EventType.NORMAL -> processNormal(event)
350
EventType.BATCH -> processBatch(event)
351
}
352
}
353
.metered(Duration.ofMillis(100)) // Rate limit output
354
.collect { result ->
355
publishResult(result)
356
}
357
}
358
```
359
360
## Integration Examples
361
362
### Synchronization with Resource Management
363
364
```kotlin
365
resourceScope {
366
val database = databaseResource.bind()
367
val latch = CountDownLatch(3)
368
369
// Start parallel database operations
370
launch {
371
database.updateUsers()
372
latch.countDown()
373
}
374
375
launch {
376
database.updateProducts()
377
latch.countDown()
378
}
379
380
launch {
381
database.updateOrders()
382
latch.countDown()
383
}
384
385
// Wait for all updates to complete
386
latch.await()
387
388
// Perform final operation
389
database.generateReport()
390
}
391
```
392
393
### Flow Processing with Error Handling
394
395
```kotlin
396
val processedResults = either<ProcessingError, List<Result>> {
397
dataFlow
398
.parMapUnordered(concurrency = 10) { item ->
399
validateAndProcess(item).bind()
400
}
401
.metered(Duration.ofSeconds(1))
402
.toList()
403
}
404
```
405
406
### Complex Coordination Example
407
408
```kotlin
409
class WorkflowCoordinator(private val stages: Int) {
410
private val stageBarriers = (0 until stages).map {
411
CyclicBarrier(10) // 10 workers per stage
412
}
413
414
suspend fun executeWorkflow(data: List<WorkItem>) {
415
data.chunked(10).forEachIndexed { stageIndex, batch ->
416
batch.parMap { item ->
417
processAtStage(item, stageIndex)
418
stageBarriers[stageIndex].await()
419
}
420
}
421
}
422
}
423
```
424
425
## Experimental APIs
426
427
### AwaitAllScope (Experimental)
428
429
```kotlin { .api }
430
@ExperimentalAwaitAllApi
431
class AwaitAllScope(scope: CoroutineScope) : CoroutineScope by scope {
432
fun <T> async(
433
context: CoroutineContext = EmptyCoroutineContext,
434
start: CoroutineStart = CoroutineStart.DEFAULT,
435
block: suspend CoroutineScope.() -> T
436
): Deferred<T>
437
}
438
439
@ExperimentalAwaitAllApi
440
suspend fun <A> awaitAll(block: suspend AwaitAllScope.() -> A): A
441
442
@ExperimentalAwaitAllApi
443
suspend fun <A> CoroutineScope.awaitAll(block: suspend AwaitAllScope.() -> A): A
444
445
@RequiresOptIn(level = RequiresOptIn.Level.WARNING, message = "This API is work-in-progress and is subject to change.")
446
@Retention(AnnotationRetention.BINARY)
447
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.CLASS)
448
annotation class ExperimentalAwaitAllApi
449
```
450
451
Experimental scope for automatic await management of async operations.
452
453
#### AwaitAllScope Usage
454
455
```kotlin
456
@OptIn(ExperimentalAwaitAllApi::class)
457
suspend fun fetchDataFromMultipleSources(): CombinedData = awaitAll {
458
// All async calls within this scope are automatically awaited
459
val userData = async { userService.getData() }
460
val settingsData = async { settingsService.getData() }
461
val notificationData = async { notificationService.getData() }
462
463
// Results are automatically awaited when accessed
464
CombinedData(
465
user = userData.await(),
466
settings = settingsData.await(),
467
notifications = notificationData.await()
468
)
469
}
470
```
471
472
**⚠️ Warning**: This API is experimental and subject to change in future versions.