0
# Flow API
1
2
Reactive streams implementation providing cold flows for asynchronous sequences and hot flows for shared state management with backpressure and exception transparency.
3
4
## Capabilities
5
6
### Flow Interface
7
8
Base interface for cold asynchronous data streams that emit values sequentially.
9
10
```kotlin { .api }
11
interface Flow<out T> {
12
/** Accepts collector and emits values into it */
13
suspend fun collect(collector: FlowCollector<T>)
14
}
15
16
interface FlowCollector<in T> {
17
/** Emits a value to the flow */
18
suspend fun emit(value: T)
19
}
20
```
21
22
**Usage Examples:**
23
24
```kotlin
25
import kotlinx.coroutines.*
26
import kotlinx.coroutines.flow.*
27
28
fun main() = runBlocking {
29
// Basic flow collection
30
val simpleFlow = flow {
31
repeat(3) { i ->
32
emit(i)
33
delay(100)
34
}
35
}
36
37
simpleFlow.collect { value ->
38
println("Collected: $value")
39
}
40
41
// Flow with transformation
42
simpleFlow
43
.map { it * 2 }
44
.filter { it > 0 }
45
.collect { value ->
46
println("Transformed: $value")
47
}
48
}
49
```
50
51
### Flow Builders
52
53
Functions for creating flows from various sources.
54
55
```kotlin { .api }
56
/** Creates a flow from a builder block */
57
fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>
58
59
/** Creates a flow from fixed values */
60
fun <T> flowOf(vararg elements: T): Flow<T>
61
62
/** Creates an empty flow */
63
fun <T> emptyFlow(): Flow<T>
64
65
/** Creates a flow from a channel */
66
fun <T> channelFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>
67
68
/** Converts various types to flows */
69
fun <T> Iterable<T>.asFlow(): Flow<T>
70
fun <T> Sequence<T>.asFlow(): Flow<T>
71
fun <T> Array<T>.asFlow(): Flow<T>
72
```
73
74
**Usage Examples:**
75
76
```kotlin
77
import kotlinx.coroutines.*
78
import kotlinx.coroutines.flow.*
79
80
fun main() = runBlocking {
81
// Flow from builder
82
val countFlow = flow {
83
repeat(5) { i ->
84
emit(i)
85
delay(100)
86
}
87
}
88
89
// Flow from values
90
val valuesFlow = flowOf("a", "b", "c", "d")
91
92
// Flow from collection
93
val listFlow = listOf(1, 2, 3, 4, 5).asFlow()
94
95
// Channel flow for concurrent emission
96
val channelFlow = channelFlow {
97
launch {
98
repeat(3) { i ->
99
send(i * 10)
100
delay(50)
101
}
102
}
103
launch {
104
repeat(3) { i ->
105
send(i * 100)
106
delay(75)
107
}
108
}
109
}
110
111
channelFlow.collect { println("Channel flow: $it") }
112
}
113
```
114
115
### Flow Operators
116
117
Intermediate operators for transforming flows.
118
119
```kotlin { .api }
120
/** Transform each value */
121
fun <T, R> Flow<T>.map(transform: suspend (T) -> R): Flow<R>
122
123
/** Filter values based on predicate */
124
fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T>
125
126
/** Transform to flows and flatten */
127
fun <T, R> Flow<T>.flatMapConcat(transform: suspend (T) -> Flow<R>): Flow<R>
128
fun <T, R> Flow<T>.flatMapMerge(
129
concurrency: Int = DEFAULT_CONCURRENCY,
130
transform: suspend (T) -> Flow<R>
131
): Flow<R>
132
133
/** Take first N elements */
134
fun <T> Flow<T>.take(count: Int): Flow<T>
135
136
/** Drop first N elements */
137
fun <T> Flow<T>.drop(count: Int): Flow<T>
138
139
/** Execute on different context */
140
fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
141
142
/** Buffer emissions */
143
fun <T> Flow<T>.buffer(
144
capacity: Int = BUFFERED,
145
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
146
): Flow<T>
147
148
/** Combine with another flow */
149
fun <T1, T2, R> Flow<T1>.combine(
150
flow: Flow<T2>,
151
transform: suspend (T1, T2) -> R
152
): Flow<R>
153
154
/** Zip with another flow */
155
fun <T1, T2, R> Flow<T1>.zip(
156
other: Flow<T2>,
157
transform: suspend (T1, T2) -> R
158
): Flow<R>
159
```
160
161
**Usage Examples:**
162
163
```kotlin
164
import kotlinx.coroutines.*
165
import kotlinx.coroutines.flow.*
166
167
fun main() = runBlocking {
168
val sourceFlow = (1..10).asFlow()
169
170
// Chain multiple operators
171
sourceFlow
172
.filter { it % 2 == 0 }
173
.map { it * it }
174
.take(3)
175
.flowOn(Dispatchers.Default)
176
.collect { println("Result: $it") }
177
178
// FlatMap example
179
sourceFlow
180
.take(3)
181
.flatMapConcat { value ->
182
flow {
183
repeat(2) {
184
emit("$value-$it")
185
delay(50)
186
}
187
}
188
}
189
.collect { println("FlatMap: $it") }
190
191
// Combine flows
192
val flow1 = (1..3).asFlow().onEach { delay(100) }
193
val flow2 = (10..12).asFlow().onEach { delay(150) }
194
195
flow1.combine(flow2) { a, b -> "$a+$b" }
196
.collect { println("Combined: $it") }
197
}
198
```
199
200
### Terminal Operators
201
202
Operators that consume the flow and produce results.
203
204
```kotlin { .api }
205
/** Collect all values */
206
suspend fun <T> Flow<T>.collect(action: suspend (T) -> Unit)
207
208
/** Collect to list */
209
suspend fun <T> Flow<T>.toList(): List<T>
210
211
/** Collect to set */
212
suspend fun <T> Flow<T>.toSet(): Set<T>
213
214
/** Get first value */
215
suspend fun <T> Flow<T>.first(): T
216
suspend fun <T> Flow<T>.firstOrNull(): T?
217
218
/** Get single value */
219
suspend fun <T> Flow<T>.single(): T
220
suspend fun <T> Flow<T>.singleOrNull(): T?
221
222
/** Reduce values */
223
suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (S, T) -> S): S
224
225
/** Fold with initial value */
226
suspend fun <T, R> Flow<T>.fold(
227
initial: R,
228
operation: suspend (R, T) -> R
229
): R
230
231
/** Count elements */
232
suspend fun <T> Flow<T>.count(): Int
233
suspend fun <T> Flow<T>.count(predicate: suspend (T) -> Boolean): Int
234
```
235
236
**Usage Examples:**
237
238
```kotlin
239
import kotlinx.coroutines.*
240
import kotlinx.coroutines.flow.*
241
242
fun main() = runBlocking {
243
val numberFlow = (1..5).asFlow()
244
245
// Collect to collections
246
val list = numberFlow.toList()
247
println("List: $list")
248
249
// Reduce operations
250
val sum = numberFlow.fold(0) { acc, value -> acc + value }
251
println("Sum: $sum")
252
253
val product = numberFlow.reduce { acc, value -> acc * value }
254
println("Product: $product")
255
256
// Single values
257
val first = numberFlow.first()
258
val count = numberFlow.count { it > 3 }
259
println("First: $first, Count > 3: $count")
260
}
261
```
262
263
### SharedFlow Interface
264
265
Hot flow that shares emitted values among multiple collectors.
266
267
```kotlin { .api }
268
interface SharedFlow<out T> : Flow<T> {
269
/** Most recent values available to new subscribers */
270
val replayCache: List<T>
271
272
/** Current number of subscribers */
273
val subscriptionCount: StateFlow<Int>
274
}
275
276
interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
277
/** Number of subscribers */
278
val subscriptionCount: StateFlow<Int>
279
280
/** Emits value to all subscribers */
281
override suspend fun emit(value: T)
282
283
/** Tries to emit value immediately */
284
fun tryEmit(value: T): Boolean
285
286
/** Resets replay cache */
287
fun resetReplayCache()
288
}
289
290
/** Creates MutableSharedFlow */
291
fun <T> MutableSharedFlow(
292
replay: Int = 0,
293
extraBufferCapacity: Int = 0,
294
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
295
): MutableSharedFlow<T>
296
```
297
298
**Usage Examples:**
299
300
```kotlin
301
import kotlinx.coroutines.*
302
import kotlinx.coroutines.flow.*
303
304
fun main() = runBlocking {
305
val sharedFlow = MutableSharedFlow<Int>(replay = 2)
306
307
// Start collectors
308
val job1 = launch {
309
sharedFlow.collect { value ->
310
println("Collector 1: $value")
311
}
312
}
313
314
val job2 = launch {
315
sharedFlow.collect { value ->
316
println("Collector 2: $value")
317
}
318
}
319
320
delay(100)
321
322
// Emit values
323
sharedFlow.emit(1)
324
sharedFlow.emit(2)
325
sharedFlow.emit(3)
326
327
delay(100)
328
329
// New collector gets replay
330
val job3 = launch {
331
sharedFlow.collect { value ->
332
println("Collector 3 (late): $value")
333
}
334
}
335
336
sharedFlow.emit(4)
337
338
delay(100)
339
listOf(job1, job2, job3).forEach { it.cancel() }
340
}
341
```
342
343
### StateFlow Interface
344
345
Hot flow representing a state with current value.
346
347
```kotlin { .api }
348
interface StateFlow<out T> : SharedFlow<T> {
349
/** Current state value */
350
val value: T
351
}
352
353
interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
354
/** Current state value (mutable) */
355
override var value: T
356
357
/** Atomically compares and sets value */
358
fun compareAndSet(expect: T, update: T): Boolean
359
360
/** Updates value atomically with function */
361
inline fun update(function: (T) -> T)
362
363
/** Updates value atomically and returns new value */
364
inline fun updateAndGet(function: (T) -> T): T
365
366
/** Updates value atomically and returns old value */
367
inline fun getAndUpdate(function: (T) -> T): T
368
}
369
370
/** Creates MutableStateFlow */
371
fun <T> MutableStateFlow(value: T): MutableStateFlow<T>
372
```
373
374
**Usage Examples:**
375
376
```kotlin
377
import kotlinx.coroutines.*
378
import kotlinx.coroutines.flow.*
379
380
data class UiState(val loading: Boolean, val data: String)
381
382
class ViewModel {
383
private val _uiState = MutableStateFlow(UiState(loading = false, data = ""))
384
val uiState: StateFlow<UiState> = _uiState.asStateFlow()
385
386
fun loadData() {
387
_uiState.value = _uiState.value.copy(loading = true)
388
389
// Simulate async operation
390
GlobalScope.launch {
391
delay(1000)
392
_uiState.value = UiState(loading = false, data = "Loaded data")
393
}
394
}
395
396
fun updateData(newData: String) {
397
_uiState.update { currentState ->
398
currentState.copy(data = newData)
399
}
400
}
401
}
402
403
fun main() = runBlocking {
404
val viewModel = ViewModel()
405
406
// Collect state changes
407
val job = launch {
408
viewModel.uiState.collect { state ->
409
println("UI State: $state")
410
}
411
}
412
413
delay(100)
414
viewModel.loadData()
415
delay(1500)
416
viewModel.updateData("Updated data")
417
418
delay(100)
419
job.cancel()
420
}
421
```
422
423
### Exception Handling
424
425
Flow exception handling with catch and error recovery.
426
427
```kotlin { .api }
428
/** Catches upstream exceptions */
429
fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(Throwable) -> Unit): Flow<T>
430
431
/** Handles completion with optional exception */
432
fun <T> Flow<T>.onCompletion(
433
action: suspend FlowCollector<T>.(Throwable?) -> Unit
434
): Flow<T>
435
436
/** Retries on exception */
437
fun <T> Flow<T>.retry(
438
retries: Long = Long.MAX_VALUE,
439
predicate: suspend (Throwable) -> Boolean = { true }
440
): Flow<T>
441
442
/** Retries with when predicate */
443
fun <T> Flow<T>.retryWhen(
444
predicate: suspend FlowCollector<T>.(Throwable, attempt: Long) -> Boolean
445
): Flow<T>
446
```
447
448
**Usage Examples:**
449
450
```kotlin
451
import kotlinx.coroutines.*
452
import kotlinx.coroutines.flow.*
453
454
fun createFailingFlow() = flow {
455
repeat(5) { i ->
456
if (i == 3) throw RuntimeException("Simulated error")
457
emit(i)
458
delay(100)
459
}
460
}
461
462
fun main() = runBlocking {
463
// Exception handling with catch
464
createFailingFlow()
465
.catch { e ->
466
println("Caught exception: ${e.message}")
467
emit(-1) // Emit recovery value
468
}
469
.onCompletion { cause ->
470
if (cause == null) {
471
println("Flow completed successfully")
472
} else {
473
println("Flow completed with exception: $cause")
474
}
475
}
476
.collect { value ->
477
println("Value: $value")
478
}
479
480
// Retry example
481
createFailingFlow()
482
.retry(retries = 2) { exception ->
483
println("Retrying due to: ${exception.message}")
484
true
485
}
486
.catch { e ->
487
println("All retries failed: ${e.message}")
488
}
489
.collect { value ->
490
println("Retry value: $value")
491
}
492
}
493
```
494
495
## Types
496
497
### BufferOverflow
498
499
Strategy for handling buffer overflow.
500
501
```kotlin { .api }
502
enum class BufferOverflow {
503
/** Suspend on buffer overflow (default) */
504
SUSPEND,
505
/** Drop oldest values */
506
DROP_OLDEST,
507
/** Drop latest values */
508
DROP_LATEST
509
}
510
```
511
512
### FlowCollector
513
514
Interface for collecting flow values.
515
516
```kotlin { .api }
517
fun interface FlowCollector<in T> {
518
suspend fun emit(value: T)
519
}
520
```