or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

error-handling.mdindex.mdparallel-processing.mdracing.mdresource-management.mdsynchronization-flow.md

synchronization-flow.mddocs/

0

# Synchronization and Flow Processing

1

2

Arrow FX Coroutines provides advanced synchronization primitives and Flow extensions for coordinating concurrent operations and processing streams of data with timing and parallel capabilities.

3

4

## Synchronization Primitives

5

6

### CountDownLatch

7

8

```kotlin { .api }

9

class CountDownLatch(private val initial: Long) {

10

fun count(): Long

11

suspend fun await()

12

fun countDown()

13

}

14

```

15

16

A synchronization primitive that allows coroutines to wait until a specified number of countdown signals have been received.

17

18

#### CountDownLatch Usage

19

20

```kotlin

21

val latch = CountDownLatch(3)

22

23

// Start multiple coroutines

24

launch {

25

performTask1()

26

latch.countDown()

27

}

28

29

launch {

30

performTask2()

31

latch.countDown()

32

}

33

34

launch {

35

performTask3()

36

latch.countDown()

37

}

38

39

// Wait for all tasks to complete

40

latch.await()

41

println("All tasks completed!")

42

```

43

44

#### Producer-Consumer Pattern

45

46

```kotlin

47

class DataProcessor {

48

private val latch = CountDownLatch(1)

49

private var processedData: String? = null

50

51

suspend fun processData(input: String) {

52

// Simulate processing

53

delay(1000)

54

processedData = "Processed: $input"

55

latch.countDown()

56

}

57

58

suspend fun getResult(): String {

59

latch.await()

60

return processedData!!

61

}

62

}

63

64

val processor = DataProcessor()

65

launch { processor.processData("important data") }

66

val result = processor.getResult()

67

```

68

69

### CyclicBarrier

70

71

```kotlin { .api }

72

class CyclicBarrier(val capacity: Int, barrierAction: () -> Unit = {}) {

73

val capacity: Int

74

suspend fun reset()

75

suspend fun await()

76

}

77

78

class CyclicBarrierCancellationException : CancellationException

79

```

80

81

A synchronization primitive that allows a set of coroutines to wait for each other to reach a common barrier point.

82

83

#### CyclicBarrier Usage

84

85

```kotlin

86

val barrier = CyclicBarrier(3) {

87

println("All workers reached the barrier!")

88

}

89

90

// Start workers

91

repeat(3) { workerId ->

92

launch {

93

repeat(5) { phase ->

94

performWork(workerId, phase)

95

println("Worker $workerId completed phase $phase")

96

97

barrier.await() // Wait for all workers

98

99

println("Worker $workerId starting next phase")

100

}

101

}

102

}

103

```

104

105

#### Batch Processing Pattern

106

107

```kotlin

108

class BatchProcessor<T>(private val batchSize: Int) {

109

private val barrier = CyclicBarrier(batchSize) {

110

println("Batch of $batchSize items ready for processing")

111

}

112

113

suspend fun submitItem(item: T) {

114

// Add item to batch

115

addToBatch(item)

116

117

// Wait for batch to fill

118

barrier.await()

119

120

// Process batch collectively

121

processBatch()

122

123

// Reset for next batch

124

barrier.reset()

125

}

126

}

127

```

128

129

## Experimental AwaitAll API

130

131

### AwaitAllScope

132

133

```kotlin { .api }

134

@ExperimentalAwaitAllApi

135

class AwaitAllScope {

136

fun <A> async(

137

context: CoroutineContext = EmptyCoroutineContext,

138

start: CoroutineStart = CoroutineStart.DEFAULT,

139

block: suspend CoroutineScope.() -> A

140

): Deferred<A>

141

}

142

```

143

144

A scope that automatically awaits all async operations created within it.

145

146

### AwaitAll Functions

147

148

```kotlin { .api }

149

@ExperimentalAwaitAllApi

150

suspend fun <A> awaitAll(block: suspend AwaitAllScope.() -> A): A

151

152

@ExperimentalAwaitAllApi

153

suspend fun <A> awaitAll(context: CoroutineContext, block: suspend AwaitAllScope.() -> A): A

154

```

155

156

Execute a block where all async operations are automatically awaited.

157

158

```kotlin

159

@OptIn(ExperimentalAwaitAllApi::class)

160

val results = awaitAll {

161

val deferred1 = async { fetchData1() }

162

val deferred2 = async { fetchData2() }

163

val deferred3 = async { fetchData3() }

164

165

// All deferreds are automatically awaited

166

// Results are available immediately

167

combineResults(deferred1.await(), deferred2.await(), deferred3.await())

168

}

169

```

170

171

## Flow Extensions

172

173

### Parallel Flow Processing

174

175

```kotlin { .api }

176

fun <A, B> Flow<A>.parMap(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend CoroutineScope.(A) -> B): Flow<B>

177

fun <A, B> Flow<A>.parMapUnordered(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend (A) -> B): Flow<B>

178

fun <A, B> Flow<A>.parMapNotNullUnordered(concurrency: Int = DEFAULT_CONCURRENCY, crossinline transform: suspend (A) -> B?): Flow<B>

179

```

180

181

Process Flow elements in parallel while controlling concurrency.

182

183

#### Ordered Parallel Processing

184

185

```kotlin

186

val processedFlow = sourceFlow

187

.parMap(concurrency = 10) { item ->

188

expensiveOperation(item)

189

}

190

.collect { processedItem ->

191

// Items arrive in original order

192

println("Processed: $processedItem")

193

}

194

```

195

196

#### Unordered Parallel Processing

197

198

```kotlin

199

val processedFlow = sourceFlow

200

.parMapUnordered(concurrency = 5) { item ->

201

asyncOperation(item)

202

}

203

.collect { processedItem ->

204

// Items arrive as soon as they're processed

205

println("Completed: $processedItem")

206

}

207

```

208

209

### Flow Repetition

210

211

```kotlin { .api }

212

fun <A> Flow<A>.repeat(): Flow<A>

213

```

214

215

Repeat a Flow forever.

216

217

```kotlin

218

val heartbeatFlow = flowOf("ping")

219

.repeat()

220

.collect {

221

println("Heartbeat: $it")

222

delay(1000)

223

}

224

```

225

226

### Timed Flow Operations

227

228

```kotlin { .api }

229

fun <A> Flow<A>.metered(period: Duration): Flow<A>

230

fun <A> Flow<A>.metered(periodInMillis: Long): Flow<A>

231

fun <A, B> Flow<A>.mapIndexed(crossinline f: suspend (index: Int, value: A) -> B): Flow<B>

232

```

233

234

Control the timing and indexing of Flow emissions.

235

236

#### Rate-Limited Processing

237

238

```kotlin

239

val rateLimitedFlow = dataFlow

240

.metered(Duration.ofSeconds(1)) // One item per second

241

.collect { item ->

242

processItem(item)

243

}

244

```

245

246

#### Indexed Processing

247

248

```kotlin

249

val indexedResults = sourceFlow

250

.mapIndexed { index, item ->

251

"Item $index: $item"

252

}

253

.collect { indexedItem ->

254

println(indexedItem)

255

}

256

```

257

258

### Fixed Rate Flow Generation

259

260

```kotlin { .api }

261

fun fixedRate(period: Duration, dampen: Boolean = true, timeStamp: () -> ComparableTimeMark = { TimeSource.Monotonic.markNow() }): Flow<Unit>

262

fun fixedRate(periodInMillis: Long, dampen: Boolean = true, timeStamp: () -> ComparableTimeMark = { TimeSource.Monotonic.markNow() }): Flow<Unit>

263

```

264

265

Create a Flow that emits at fixed intervals.

266

267

#### Periodic Tasks

268

269

```kotlin

270

val periodicTask = fixedRate(Duration.ofMinutes(5))

271

.collect {

272

performMaintenanceTask()

273

}

274

```

275

276

#### With Dampening

277

278

```kotlin

279

// Dampen = true: Delays if processing takes longer than period

280

val dampenedFlow = fixedRate(Duration.ofSeconds(10), dampen = true)

281

.collect {

282

longRunningTask() // Won't overlap if it takes > 10 seconds

283

}

284

285

// Dampen = false: Strict timing regardless of processing time

286

val strictFlow = fixedRate(Duration.ofSeconds(10), dampen = false)

287

.collect {

288

quickTask() // Overlapping execution possible

289

}

290

```

291

292

## Advanced Synchronization Patterns

293

294

### Multi-Stage Pipeline

295

296

```kotlin

297

class PipelineStage<T>(private val capacity: Int) {

298

private val inputBarrier = CyclicBarrier(capacity)

299

private val outputBarrier = CyclicBarrier(capacity)

300

301

suspend fun process(items: List<T>): List<T> {

302

// Wait for all inputs

303

inputBarrier.await()

304

305

// Process in parallel

306

val results = items.parMap { item ->

307

processItem(item)

308

}

309

310

// Wait for all processing to complete

311

outputBarrier.await()

312

313

return results

314

}

315

}

316

```

317

318

### Coordinated Resource Access

319

320

```kotlin

321

class CoordinatedResourcePool<T>(

322

private val resources: List<T>,

323

private val maxConcurrentUsers: Int

324

) {

325

private val accessBarrier = CyclicBarrier(maxConcurrentUsers)

326

327

suspend fun <R> useResource(operation: suspend (T) -> R): R {

328

accessBarrier.await() // Wait for access slot

329

330

return try {

331

val resource = acquireResource()

332

operation(resource)

333

} finally {

334

releaseResource()

335

accessBarrier.reset()

336

}

337

}

338

}

339

```

340

341

### Flow-Based Event Processing

342

343

```kotlin

344

class EventProcessor {

345

fun processEvents(eventFlow: Flow<Event>) = eventFlow

346

.parMapUnordered(concurrency = 20) { event ->

347

when (event.type) {

348

EventType.HIGH_PRIORITY -> processImmediately(event)

349

EventType.NORMAL -> processNormal(event)

350

EventType.BATCH -> processBatch(event)

351

}

352

}

353

.metered(Duration.ofMillis(100)) // Rate limit output

354

.collect { result ->

355

publishResult(result)

356

}

357

}

358

```

359

360

## Integration Examples

361

362

### Synchronization with Resource Management

363

364

```kotlin

365

resourceScope {

366

val database = databaseResource.bind()

367

val latch = CountDownLatch(3)

368

369

// Start parallel database operations

370

launch {

371

database.updateUsers()

372

latch.countDown()

373

}

374

375

launch {

376

database.updateProducts()

377

latch.countDown()

378

}

379

380

launch {

381

database.updateOrders()

382

latch.countDown()

383

}

384

385

// Wait for all updates to complete

386

latch.await()

387

388

// Perform final operation

389

database.generateReport()

390

}

391

```

392

393

### Flow Processing with Error Handling

394

395

```kotlin

396

val processedResults = either<ProcessingError, List<Result>> {

397

dataFlow

398

.parMapUnordered(concurrency = 10) { item ->

399

validateAndProcess(item).bind()

400

}

401

.metered(Duration.ofSeconds(1))

402

.toList()

403

}

404

```

405

406

### Complex Coordination Example

407

408

```kotlin

409

class WorkflowCoordinator(private val stages: Int) {

410

private val stageBarriers = (0 until stages).map {

411

CyclicBarrier(10) // 10 workers per stage

412

}

413

414

suspend fun executeWorkflow(data: List<WorkItem>) {

415

data.chunked(10).forEachIndexed { stageIndex, batch ->

416

batch.parMap { item ->

417

processAtStage(item, stageIndex)

418

stageBarriers[stageIndex].await()

419

}

420

}

421

}

422

}

423

```

424

425

## Experimental APIs

426

427

### AwaitAllScope (Experimental)

428

429

```kotlin { .api }

430

@ExperimentalAwaitAllApi

431

class AwaitAllScope(scope: CoroutineScope) : CoroutineScope by scope {

432

fun <T> async(

433

context: CoroutineContext = EmptyCoroutineContext,

434

start: CoroutineStart = CoroutineStart.DEFAULT,

435

block: suspend CoroutineScope.() -> T

436

): Deferred<T>

437

}

438

439

@ExperimentalAwaitAllApi

440

suspend fun <A> awaitAll(block: suspend AwaitAllScope.() -> A): A

441

442

@ExperimentalAwaitAllApi

443

suspend fun <A> CoroutineScope.awaitAll(block: suspend AwaitAllScope.() -> A): A

444

445

@RequiresOptIn(level = RequiresOptIn.Level.WARNING, message = "This API is work-in-progress and is subject to change.")

446

@Retention(AnnotationRetention.BINARY)

447

@Target(AnnotationTarget.FUNCTION, AnnotationTarget.CLASS)

448

annotation class ExperimentalAwaitAllApi

449

```

450

451

Experimental scope for automatic await management of async operations.

452

453

#### AwaitAllScope Usage

454

455

```kotlin

456

@OptIn(ExperimentalAwaitAllApi::class)

457

suspend fun fetchDataFromMultipleSources(): CombinedData = awaitAll {

458

// All async calls within this scope are automatically awaited

459

val userData = async { userService.getData() }

460

val settingsData = async { settingsService.getData() }

461

val notificationData = async { notificationService.getData() }

462

463

// Results are automatically awaited when accessed

464

CombinedData(

465

user = userData.await(),

466

settings = settingsData.await(),

467

notifications = notificationData.await()

468

)

469

}

470

```

471

472

**⚠️ Warning**: This API is experimental and subject to change in future versions.