0
# Flow API - Reactive Streams
1
2
Asynchronous data streams with rich operator support for reactive programming patterns. Flow provides a cold stream implementation that enables declarative handling of asynchronous data sequences.
3
4
## Capabilities
5
6
### Core Flow Types
7
8
The foundational interfaces for reactive stream processing in kotlinx-coroutines.
9
10
```kotlin { .api }
11
/**
12
* An asynchronous data stream that sequentially emits values
13
* and completes normally or with an exception.
14
*/
15
interface Flow<out T> {
16
/**
17
* Accepts the given collector and emits values into it.
18
* This method should never be called directly.
19
*/
20
suspend fun collect(collector: FlowCollector<T>)
21
}
22
23
/**
24
* FlowCollector is used as an intermediate or a terminal collector of flow.
25
*/
26
interface FlowCollector<in T> {
27
/**
28
* Collects the value emitted by the upstream.
29
*/
30
suspend fun emit(value: T)
31
}
32
```
33
34
### Flow Builders
35
36
Functions to create flows from various sources and patterns.
37
38
```kotlin { .api }
39
/**
40
* Creates a cold flow from the given suspendable block.
41
* The flow being built is collected concurrently with the
42
* building block execution.
43
*/
44
fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>
45
46
/**
47
* Creates a flow that produces the given values.
48
*/
49
fun <T> flowOf(vararg elements: T): Flow<T>
50
51
/**
52
* Creates a flow that produces no values.
53
*/
54
fun <T> emptyFlow(): Flow<T>
55
56
/**
57
* Creates a flow from Iterable, Iterator, or arrays.
58
*/
59
fun <T> Iterable<T>.asFlow(): Flow<T>
60
fun <T> Iterator<T>.asFlow(): Flow<T>
61
fun <T> Array<out T>.asFlow(): Flow<T>
62
63
/**
64
* Creates a flow using channels which can be used from multiple coroutines.
65
* The resulting flow is a cold flow.
66
*/
67
fun <T> channelFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>
68
69
/**
70
* Creates a flow suitable for use with callback-based APIs.
71
*/
72
fun <T> callbackFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>
73
```
74
75
**Usage Examples:**
76
77
```kotlin
78
import kotlinx.coroutines.*
79
import kotlinx.coroutines.flow.*
80
81
// Basic flow creation
82
val simpleFlow = flow {
83
for (i in 1..5) {
84
emit(i)
85
delay(100)
86
}
87
}
88
89
// Flow from values
90
val valuesFlow = flowOf(1, 2, 3, 4, 5)
91
92
// Flow from collections
93
val listFlow = listOf("a", "b", "c").asFlow()
94
95
// Channel flow for concurrent emissions
96
val channelBasedFlow = channelFlow {
97
launch {
98
repeat(3) {
99
send("From coroutine 1: $it")
100
delay(100)
101
}
102
}
103
launch {
104
repeat(3) {
105
send("From coroutine 2: $it")
106
delay(150)
107
}
108
}
109
}
110
111
// Callback flow for bridging callback APIs
112
val callbackBasedFlow = callbackFlow {
113
val callback = object : EventCallback {
114
override fun onEvent(data: String) {
115
trySend(data)
116
}
117
override fun onError(error: Exception) {
118
close(error)
119
}
120
}
121
122
eventSource.registerCallback(callback)
123
124
awaitClose {
125
eventSource.unregisterCallback(callback)
126
}
127
}
128
```
129
130
### Intermediate Operators
131
132
Transform, filter, and manipulate flow emissions.
133
134
```kotlin { .api }
135
/**
136
* Returns a flow containing the results of applying the given transform
137
* function to each value of the original flow.
138
*/
139
fun <T, R> Flow<T>.map(transform: suspend (value: T) -> R): Flow<R>
140
141
/**
142
* Returns a flow containing only values of the original flow that match
143
* the given predicate.
144
*/
145
fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T>
146
147
/**
148
* Applies the given transform function to each value and emits all
149
* elements returned by transform function.
150
*/
151
fun <T, R> Flow<T>.transform(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>
152
153
/**
154
* Returns a flow that performs the given action on each value of the
155
* original flow as it passes through.
156
*/
157
fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>
158
159
/**
160
* Returns a flow that invokes the given action before this flow starts
161
* to be collected.
162
*/
163
fun <T> Flow<T>.onStart(action: suspend FlowCollector<T>.() -> Unit): Flow<T>
164
165
/**
166
* Returns a flow that invokes the given action after the flow is completed
167
* or cancelled.
168
*/
169
fun <T> Flow<T>.onCompletion(action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit): Flow<T>
170
```
171
172
**Usage Examples:**
173
174
```kotlin
175
import kotlinx.coroutines.*
176
import kotlinx.coroutines.flow.*
177
178
val scope = MainScope()
179
180
scope.launch {
181
flowOf(1, 2, 3, 4, 5)
182
.map { it * 2 } // Transform each value
183
.filter { it > 4 } // Keep only values > 4
184
.onEach { println("Processing: $it") } // Side effect
185
.collect { value ->
186
println("Collected: $value")
187
}
188
// Output: Processing: 6, Collected: 6, Processing: 8, Collected: 8, Processing: 10, Collected: 10
189
}
190
191
// Complex transformation
192
scope.launch {
193
flow {
194
emit("hello")
195
emit("world")
196
}
197
.transform { value ->
198
emit(value.uppercase())
199
emit(value.length)
200
}
201
.collect { println(it) }
202
// Output: HELLO, 5, WORLD, 5
203
}
204
205
// Flow lifecycle hooks
206
scope.launch {
207
flowOf(1, 2, 3)
208
.onStart { emit(0) } // Emit 0 before starting
209
.onCompletion { emit(-1) } // Emit -1 after completion
210
.collect { println("Value: $it") }
211
// Output: Value: 0, Value: 1, Value: 2, Value: 3, Value: -1
212
}
213
```
214
215
### Flow Context and Threading
216
217
Control the execution context and threading behavior of flows.
218
219
```kotlin { .api }
220
/**
221
* Changes the context where this flow is executed to the given context.
222
* All operations upstream of flowOn are executed in the provided context.
223
*/
224
fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
225
226
/**
227
* Buffers flow emissions via channel with given capacity and runs collector
228
* in a separate coroutine.
229
*/
230
fun <T> Flow<T>.buffer(
231
capacity: Int = Channel.BUFFERED,
232
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
233
): Flow<T>
234
235
/**
236
* Conflates flow emissions via conflated channel and runs collector in a
237
* separate coroutine. Latest emitted value overwrites previous values.
238
*/
239
fun <T> Flow<T>.conflate(): Flow<T>
240
```
241
242
**Usage Examples:**
243
244
```kotlin
245
import kotlinx.coroutines.*
246
import kotlinx.coroutines.flow.*
247
248
val scope = MainScope()
249
250
scope.launch {
251
flow {
252
println("Emitting on: ${Thread.currentThread().name}")
253
for (i in 1..3) {
254
emit(i)
255
delay(100)
256
}
257
}
258
.flowOn(Dispatchers.IO) // Upstream operations run on IO dispatcher
259
.collect { value ->
260
println("Collecting $value on: ${Thread.currentThread().name}")
261
}
262
}
263
264
// Buffering for performance
265
scope.launch {
266
flow {
267
repeat(100) {
268
emit(it)
269
delay(10) // Slow producer
270
}
271
}
272
.buffer(10) // Buffer up to 10 items
273
.collect { value ->
274
delay(50) // Slow consumer
275
println("Processed: $value")
276
}
277
}
278
279
// Conflation for latest-value semantics
280
scope.launch {
281
flow {
282
repeat(100) {
283
emit(it)
284
delay(10)
285
}
286
}
287
.conflate() // Only keep latest value when consumer is slow
288
.collect { value ->
289
delay(100) // Very slow consumer
290
println("Latest value: $value")
291
}
292
}
293
```
294
295
### Terminal Operators
296
297
Consume flow values and produce final results.
298
299
```kotlin { .api }
300
/**
301
* Terminal flow operator that collects the given flow with a provided
302
* action that is applied to each emitted element.
303
*/
304
suspend fun <T> Flow<T>.collect(action: suspend (value: T) -> Unit)
305
306
/**
307
* Terminal operator that collects the given flow ensuring that emissions
308
* from upstream are performed on the current coroutine context.
309
*/
310
suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
311
312
/**
313
* Collects all elements from the flow and returns them as a List.
314
*/
315
suspend fun <T> Flow<T>.toList(): List<T>
316
317
/**
318
* Collects all elements from the flow and returns them as a Set.
319
*/
320
suspend fun <T> Flow<T>.toSet(): Set<T>
321
322
/**
323
* Returns the first element emitted by the flow and cancels flow's collection.
324
*/
325
suspend fun <T> Flow<T>.first(): T
326
327
/**
328
* Returns the first element emitted by the flow matching the predicate.
329
*/
330
suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
331
332
/**
333
* Returns the single element emitted by the flow.
334
*/
335
suspend fun <T> Flow<T>.single(): T
336
337
/**
338
* Accumulates value starting with initial value and applying operation
339
* from left to right to current accumulator value and each element.
340
*/
341
suspend fun <T, R> Flow<T>.fold(initial: R, operation: suspend (acc: R, value: T) -> R): R
342
343
/**
344
* Accumulates value starting with the first element and applying operation
345
* from left to right to current accumulator value and each element.
346
*/
347
suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S
348
349
/**
350
* Returns the number of elements in this flow.
351
*/
352
suspend fun <T> Flow<T>.count(): Int
353
354
/**
355
* Returns the number of elements matching the given predicate.
356
*/
357
suspend fun <T> Flow<T>.count(predicate: suspend (T) -> Boolean): Int
358
```
359
360
**Usage Examples:**
361
362
```kotlin
363
import kotlinx.coroutines.*
364
import kotlinx.coroutines.flow.*
365
366
val scope = MainScope()
367
368
scope.launch {
369
val numbers = flowOf(1, 2, 3, 4, 5)
370
371
// Collect to list
372
val list = numbers.toList()
373
println("List: $list")
374
375
// Get first element
376
val first = numbers.first()
377
println("First: $first")
378
379
// Find first matching predicate
380
val firstEven = numbers.first { it % 2 == 0 }
381
println("First even: $firstEven")
382
383
// Fold operation
384
val sum = numbers.fold(0) { acc, value -> acc + value }
385
println("Sum: $sum")
386
387
// Reduce operation
388
val product = numbers.reduce { acc, value -> acc * value }
389
println("Product: $product")
390
391
// Count elements
392
val evenCount = numbers.count { it % 2 == 0 }
393
println("Even numbers: $evenCount")
394
}
395
396
// collectLatest cancels previous collection
397
scope.launch {
398
flow {
399
repeat(5) {
400
emit(it)
401
delay(100)
402
}
403
}
404
.collectLatest { value ->
405
println("Processing $value")
406
delay(200) // Slow processing
407
println("Finished $value") // Only last value will finish
408
}
409
}
410
```
411
412
### SharedFlow and StateFlow
413
414
Hot flows that share emissions among multiple collectors.
415
416
```kotlin { .api }
417
/**
418
* A hot Flow that shares emitted values among all its collectors in a broadcast fashion.
419
*/
420
interface SharedFlow<out T> : Flow<T> {
421
/**
422
* A snapshot of the most recently emitted values into this shared flow.
423
*/
424
val replayCache: List<T>
425
426
/**
427
* The number of subscribers (active collectors) to this shared flow.
428
*/
429
val subscriptionCount: StateFlow<Int>
430
}
431
432
/**
433
* A mutable SharedFlow that provides functions to emit values to the flow.
434
*/
435
interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
436
/**
437
* Tries to emit a value to this flow without suspending the caller.
438
*/
439
fun tryEmit(value: T): Boolean
440
441
/**
442
* Resets the replayCache of this flow to an empty state.
443
*/
444
fun resetReplayCache()
445
446
/**
447
* The number of subscribers (active collectors) to this shared flow.
448
*/
449
override val subscriptionCount: StateFlow<Int>
450
}
451
452
/**
453
* Creates a MutableSharedFlow with the given configuration.
454
*/
455
fun <T> MutableSharedFlow(
456
replay: Int = 0,
457
extraBufferCapacity: Int = 0,
458
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
459
): MutableSharedFlow<T>
460
461
/**
462
* A SharedFlow that represents a read-only state with a single updatable value.
463
*/
464
interface StateFlow<out T> : SharedFlow<T> {
465
/**
466
* The current value of this state flow.
467
*/
468
val value: T
469
}
470
471
/**
472
* A mutable StateFlow that provides a setter for value.
473
*/
474
interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
475
/**
476
* The current value of this state flow.
477
*/
478
override var value: T
479
480
/**
481
* Atomically compares the current value with expect and sets it to update if they are the same.
482
*/
483
fun compareAndSet(expect: T, update: T): Boolean
484
}
485
486
/**
487
* Creates a MutableStateFlow with the given initial value.
488
*/
489
fun <T> MutableStateFlow(value: T): MutableStateFlow<T>
490
491
/**
492
* Updates the MutableStateFlow.value atomically using the specified function of its value.
493
*/
494
fun <T> MutableStateFlow<T>.update(function: (T) -> T)
495
496
/**
497
* Updates the MutableStateFlow.value atomically using the specified function of its value and returns the new value.
498
*/
499
fun <T> MutableStateFlow<T>.updateAndGet(function: (T) -> T): T
500
501
/**
502
* Updates the MutableStateFlow.value atomically using the specified function of its value and returns its prior value.
503
*/
504
fun <T> MutableStateFlow<T>.getAndUpdate(function: (T) -> T): T
505
```
506
507
**Usage Examples:**
508
509
```kotlin
510
import kotlinx.coroutines.*
511
import kotlinx.coroutines.flow.*
512
513
val scope = MainScope()
514
515
// SharedFlow example
516
val sharedFlow = MutableSharedFlow<String>(replay = 2)
517
518
// Multiple collectors
519
scope.launch {
520
sharedFlow.collect { value ->
521
println("Collector 1: $value")
522
}
523
}
524
525
scope.launch {
526
sharedFlow.collect { value ->
527
println("Collector 2: $value")
528
}
529
}
530
531
// Emit values
532
scope.launch {
533
sharedFlow.emit("First")
534
delay(100)
535
sharedFlow.emit("Second")
536
delay(100)
537
sharedFlow.emit("Third")
538
}
539
540
// StateFlow example
541
val stateFlow = MutableStateFlow("Initial")
542
543
// Observe state changes
544
scope.launch {
545
stateFlow.collect { state ->
546
println("Current state: $state")
547
}
548
}
549
550
// Update state
551
scope.launch {
552
delay(1000)
553
stateFlow.value = "Updated"
554
delay(1000)
555
stateFlow.value = "Final"
556
}
557
558
// Atomic updates using update functions
559
val counter = MutableStateFlow(0)
560
scope.launch {
561
// Safe concurrent increment
562
counter.update { it + 1 }
563
564
// Get new value after update
565
val newValue = counter.updateAndGet { it * 2 }
566
println("New value: $newValue")
567
568
// Get old value before update
569
val oldValue = counter.getAndUpdate { it - 5 }
570
println("Old value: $oldValue")
571
}
572
573
// StateFlow always has current value
574
println("Immediate value: ${stateFlow.value}")
575
```
576
577
### Flow Sharing Operators
578
579
Convert cold flows to hot flows with sharing behavior.
580
581
```kotlin { .api }
582
/**
583
* Converts this cold Flow into a hot SharedFlow that is started in the given
584
* coroutine scope, sharing emissions from a single running instance of the upstream flow.
585
*/
586
fun <T> Flow<T>.shareIn(
587
scope: CoroutineScope,
588
started: SharingStarted,
589
replay: Int = 0
590
): SharedFlow<T>
591
592
/**
593
* Converts this cold Flow into a hot StateFlow that is started in the given
594
* coroutine scope, sharing the most recently emitted value from a single running
595
* instance of the upstream flow.
596
*/
597
fun <T> Flow<T>.stateIn(
598
scope: CoroutineScope,
599
started: SharingStarted,
600
initialValue: T
601
): StateFlow<T>
602
603
/**
604
* A policy for starting and stopping the sharing coroutine in shareIn and stateIn operators.
605
*/
606
interface SharingStarted {
607
companion object {
608
val Eagerly: SharingStarted
609
val Lazily: SharingStarted
610
611
fun WhileSubscribed(
612
stopTimeoutMillis: Long = Long.MAX_VALUE,
613
replayExpirationMillis: Long = Long.MAX_VALUE
614
): SharingStarted
615
}
616
}
617
```
618
619
**Usage Examples:**
620
621
```kotlin
622
import kotlinx.coroutines.*
623
import kotlinx.coroutines.flow.*
624
625
val scope = MainScope()
626
627
// Cold flow
628
val coldFlow = flow {
629
println("Flow started")
630
repeat(5) {
631
emit(it)
632
delay(1000)
633
}
634
}
635
636
// Convert to hot SharedFlow
637
val hotSharedFlow = coldFlow.shareIn(
638
scope = scope,
639
started = SharingStarted.WhileSubscribed(),
640
replay = 1
641
)
642
643
// Multiple collectors share the same flow instance
644
scope.launch {
645
delay(2000) // Start collecting after 2 seconds
646
hotSharedFlow.collect { value ->
647
println("Late collector: $value")
648
}
649
}
650
651
scope.launch {
652
hotSharedFlow.collect { value ->
653
println("Early collector: $value")
654
}
655
}
656
657
// Convert to StateFlow
658
val dataFlow = flow {
659
emit("Loading...")
660
delay(2000)
661
emit("Data loaded")
662
}
663
664
val dataState = dataFlow.stateIn(
665
scope = scope,
666
started = SharingStarted.Lazily,
667
initialValue = "Initial"
668
)
669
670
// State is immediately available
671
println("Current state: ${dataState.value}")
672
673
scope.launch {
674
dataState.collect { state ->
675
println("State changed: $state")
676
}
677
}
678
```
679
680
### Flow Combination Operators
681
682
Combine multiple flows into single flows with various strategies.
683
684
```kotlin { .api }
685
/**
686
* Zips values from the current flow with other flow using provided transform
687
* function applied to each pair of values.
688
*/
689
fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
690
691
/**
692
* Returns a flow whose values are generated by transform function by combining
693
* the most recently emitted values by each flow.
694
*/
695
fun <T1, T2, R> Flow<T1>.combine(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
696
697
/**
698
* Flattens the given flow of flows into a single flow by merging emissions.
699
*/
700
fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T>
701
702
/**
703
* Transforms elements emitted by the original flow by applying transform that
704
* returns another flow, and then merging the resulting flows.
705
*/
706
fun <T, R> Flow<T>.flatMapMerge(
707
concurrency: Int = DEFAULT_CONCURRENCY,
708
transform: suspend (value: T) -> Flow<R>
709
): Flow<R>
710
711
/**
712
* Transforms elements emitted by the original flow by applying transform that
713
* returns another flow, and then concatenating the resulting flows.
714
*/
715
fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
716
717
/**
718
* Transforms elements emitted by the original flow by applying transform that
719
* returns another flow, and then flattening these flows by switching to new flows.
720
*/
721
fun <T, R> Flow<T>.flatMapLatest(transform: suspend (value: T) -> Flow<R>): Flow<R>
722
```
723
724
**Usage Examples:**
725
726
```kotlin
727
import kotlinx.coroutines.*
728
import kotlinx.coroutines.flow.*
729
730
val scope = MainScope()
731
732
scope.launch {
733
// Zip - waits for both flows to emit
734
val flow1 = flowOf(1, 2, 3).onEach { delay(100) }
735
val flow2 = flowOf("A", "B", "C").onEach { delay(150) }
736
737
flow1.zip(flow2) { num, letter ->
738
"$num$letter"
739
}.collect { println("Zipped: $it") }
740
// Output: Zipped: 1A, Zipped: 2B, Zipped: 3C
741
}
742
743
scope.launch {
744
// Combine - uses latest values from both
745
val numbers = flow {
746
repeat(3) {
747
emit(it)
748
delay(100)
749
}
750
}
751
val letters = flow {
752
repeat(3) {
753
emit('A' + it)
754
delay(150)
755
}
756
}
757
758
numbers.combine(letters) { num, letter ->
759
"$num$letter"
760
}.collect { println("Combined: $it") }
761
}
762
763
scope.launch {
764
// FlatMap merge - run multiple flows concurrently
765
flowOf(1, 2, 3)
766
.flatMapMerge { value ->
767
flow {
768
repeat(2) {
769
emit("$value-$it")
770
delay(100)
771
}
772
}
773
}
774
.collect { println("Merged: $it") }
775
}
776
777
scope.launch {
778
// FlatMap latest - cancel previous when new value arrives
779
flow {
780
emit(1)
781
delay(200)
782
emit(2)
783
delay(200)
784
emit(3)
785
}
786
.flatMapLatest { value ->
787
flow {
788
repeat(5) {
789
emit("$value-$it")
790
delay(100)
791
}
792
}
793
}
794
.collect { println("Latest: $it") }
795
}
796
```