0
# Flow API
1
2
Cold reactive streams with comprehensive transformation operators, hot flows for state and event broadcasting. The Flow API provides a complete solution for asynchronous data streams with structured concurrency support.
3
4
## Capabilities
5
6
### Cold Flow Interface
7
8
Base interfaces for cold streams that are created fresh for each collector.
9
10
```kotlin { .api }
11
/**
12
* Represents an asynchronous flow of values
13
*/
14
interface Flow<out T> {
15
/** Collects the flow values with the provided collector */
16
suspend fun collect(collector: FlowCollector<T>)
17
}
18
19
/**
20
* Collector for flow values
21
*/
22
interface FlowCollector<in T> {
23
/** Emit a value to the flow */
24
suspend fun emit(value: T)
25
}
26
27
/**
28
* Base class for flow implementations
29
*/
30
abstract class AbstractFlow<T> : Flow<T> {
31
/** Collect implementation that ensures proper flow context */
32
final override suspend fun collect(collector: FlowCollector<T>)
33
/** Abstract method for subclasses to implement */
34
abstract suspend fun collectSafely(collector: FlowCollector<T>)
35
}
36
```
37
38
### Flow Builders
39
40
Functions for creating flows from various sources.
41
42
```kotlin { .api }
43
/**
44
* Creates a flow with a builder block
45
* @param block the flow builder that emits values
46
* @return Flow that emits values from the builder
47
*/
48
fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>
49
50
/**
51
* Creates a flow from a sequence of values
52
* @param elements values to emit
53
* @return Flow that emits the provided values
54
*/
55
fun <T> flowOf(vararg elements: T): Flow<T>
56
57
/**
58
* Creates an empty flow
59
*/
60
fun <T> emptyFlow(): Flow<T>
61
62
/**
63
* Creates a flow from a collection
64
*/
65
fun <T> Iterable<T>.asFlow(): Flow<T>
66
67
/**
68
* Creates a flow from a sequence
69
*/
70
fun <T> Sequence<T>.asFlow(): Flow<T>
71
```
72
73
**Usage Examples:**
74
75
```kotlin
76
import kotlinx.coroutines.*
77
import kotlinx.coroutines.flow.*
78
79
// Basic flow creation
80
val numberFlow = flow {
81
for (i in 1..5) {
82
delay(100)
83
emit(i)
84
}
85
}
86
87
// Flow from values
88
val valueFlow = flowOf(1, 2, 3, 4, 5)
89
90
// Flow from collection
91
val listFlow = listOf("a", "b", "c").asFlow()
92
93
// Collect the flow
94
numberFlow.collect { value ->
95
println("Received: $value")
96
}
97
```
98
99
### Channel Flow Builder
100
101
Flow with concurrent emission capabilities for complex producers.
102
103
```kotlin { .api }
104
/**
105
* Creates a flow with a channel-based producer
106
* @param block producer block with ProducerScope
107
* @return Flow backed by a channel
108
*/
109
fun <T> channelFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>
110
111
/**
112
* Creates a flow from callback-based APIs
113
* @param block producer block with callback registration
114
* @return Flow that emits callback values
115
*/
116
fun <T> callbackFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>
117
```
118
119
**Usage Examples:**
120
121
```kotlin
122
import kotlinx.coroutines.*
123
import kotlinx.coroutines.flow.*
124
125
// Channel flow for concurrent emission
126
val concurrentFlow = channelFlow {
127
launch {
128
for (i in 1..5) {
129
send(i)
130
delay(100)
131
}
132
}
133
launch {
134
for (i in 6..10) {
135
send(i)
136
delay(150)
137
}
138
}
139
}
140
141
// Callback flow for integrating with callback APIs
142
val callbackBasedFlow = callbackFlow {
143
val listener = object : DataListener {
144
override fun onData(data: String) {
145
trySend(data)
146
}
147
override fun onComplete() {
148
close()
149
}
150
}
151
152
// Register listener
153
dataSource.addListener(listener)
154
155
// Cleanup when flow is cancelled
156
awaitClose {
157
dataSource.removeListener(listener)
158
}
159
}
160
```
161
162
### Transformation Operators
163
164
Operators for transforming flow values.
165
166
```kotlin { .api }
167
/**
168
* Transform each value emitted by the flow
169
*/
170
fun <T, R> Flow<T>.map(transform: suspend (value: T) -> R): Flow<R>
171
172
/**
173
* Transform each value with its index
174
*/
175
fun <T, R> Flow<T>.mapIndexed(transform: suspend (index: Int, value: T) -> R): Flow<R>
176
177
/**
178
* Transform values and filter out nulls
179
*/
180
fun <T, R : Any> Flow<T>.mapNotNull(transform: suspend (value: T) -> R?): Flow<R>
181
182
/**
183
* Transform to flows and flatten concatenating
184
*/
185
fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
186
187
/**
188
* Transform to flows and flatten merging concurrently
189
*/
190
fun <T, R> Flow<T>.flatMapMerge(
191
concurrency: Int = DEFAULT_CONCURRENCY,
192
transform: suspend (value: T) -> Flow<R>
193
): Flow<R>
194
195
/**
196
* Transform with general transformation block
197
*/
198
fun <T, R> Flow<T>.transform(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>
199
200
/**
201
* Accumulate values with scan operation
202
*/
203
fun <T, R> Flow<T>.scan(initial: R, operation: suspend (accumulator: R, value: T) -> R): Flow<R>
204
```
205
206
**Usage Examples:**
207
208
```kotlin
209
import kotlinx.coroutines.flow.*
210
211
val numbers = flowOf(1, 2, 3, 4, 5)
212
213
// Map transformation
214
val doubled = numbers.map { it * 2 }
215
216
// Map with index
217
val indexed = numbers.mapIndexed { index, value -> "[$index] = $value" }
218
219
// Filter and transform
220
val evenSquares = numbers
221
.filter { it % 2 == 0 }
222
.map { it * it }
223
224
// Scan for running totals
225
val runningSum = numbers.scan(0) { acc, value -> acc + value }
226
227
// Transform with emission control
228
val expanded = numbers.transform { value ->
229
emit(value)
230
emit(value * 10)
231
}
232
```
233
234
### Filtering Operators
235
236
Operators for filtering flow values based on conditions.
237
238
```kotlin { .api }
239
/**
240
* Filter values based on predicate
241
*/
242
fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T>
243
244
/**
245
* Filter out null values
246
*/
247
fun <T : Any> Flow<T?>.filterNotNull(): Flow<T>
248
249
/**
250
* Take first N values
251
*/
252
fun <T> Flow<T>.take(count: Int): Flow<T>
253
254
/**
255
* Take while predicate is true
256
*/
257
fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>
258
259
/**
260
* Drop first N values
261
*/
262
fun <T> Flow<T>.drop(count: Int): Flow<T>
263
264
/**
265
* Drop while predicate is true
266
*/
267
fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
268
269
/**
270
* Emit only distinct consecutive values
271
*/
272
fun <T> Flow<T>.distinctUntilChanged(): Flow<T>
273
274
/**
275
* Emit only distinct consecutive values by key
276
*/
277
fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: suspend (T) -> K): Flow<T>
278
```
279
280
### Advanced Transformation Operators
281
282
Advanced operators for complex transformations and timing control.
283
284
```kotlin { .api }
285
/**
286
* Transform values but cancel previous transformation on new emission
287
*/
288
fun <T, R> Flow<T>.mapLatest(transform: suspend (value: T) -> R): Flow<R>
289
290
/**
291
* Transform values with full control, cancelling on new emission
292
*/
293
fun <T, R> Flow<T>.transformLatest(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>
294
295
/**
296
* Running fold that emits intermediate results
297
*/
298
fun <T, R> Flow<T>.runningFold(initial: R, operation: suspend (accumulator: R, value: T) -> R): Flow<R>
299
300
/**
301
* Running reduce that emits intermediate results
302
*/
303
fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T>
304
```
305
306
### Timing Operators
307
308
Operators for controlling timing of emissions.
309
310
```kotlin { .api }
311
/**
312
* Sample emissions at specified intervals
313
*/
314
fun <T> Flow<T>.sample(periodMillis: Long): Flow<T>
315
316
/**
317
* Debounce emissions - only emit if no new value within timeout
318
*/
319
fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>
320
321
/**
322
* Add timeout to flow emissions
323
*/
324
fun <T> Flow<T>.timeout(timeoutMillis: Long): Flow<T>
325
326
/**
327
* Delay each emission
328
*/
329
fun <T> Flow<T>.delayEach(delayMillis: Long): Flow<T>
330
```
331
332
### Terminal Operators
333
334
Operators that consume the flow and produce a final result.
335
336
```kotlin { .api }
337
/**
338
* Collect all values with the provided action
339
*/
340
suspend fun <T> Flow<T>.collect(action: suspend (value: T) -> Unit)
341
342
/**
343
* Collect with index
344
*/
345
suspend fun <T> Flow<T>.collectIndexed(action: suspend (index: Int, value: T) -> Unit)
346
347
/**
348
* Launch collection in the provided scope
349
*/
350
fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
351
352
/**
353
* Reduce flow to a single value
354
*/
355
suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S
356
357
/**
358
* Fold flow values with initial value
359
*/
360
suspend fun <T, R> Flow<T>.fold(initial: R, operation: suspend (acc: R, value: T) -> R): R
361
362
/**
363
* Get first value
364
*/
365
suspend fun <T> Flow<T>.first(): T
366
367
/**
368
* Get first value or null
369
*/
370
suspend fun <T> Flow<T>.firstOrNull(): T?
371
372
/**
373
* Get single value
374
*/
375
suspend fun <T> Flow<T>.single(): T
376
377
/**
378
* Convert to list
379
*/
380
suspend fun <T> Flow<T>.toList(): List<T>
381
382
/**
383
* Convert to set
384
*/
385
suspend fun <T> Flow<T>.toSet(): Set<T>
386
387
/**
388
* Count values
389
*/
390
suspend fun <T> Flow<T>.count(): Int
391
```
392
393
### Hot Flows - SharedFlow
394
395
Hot flows that can have multiple collectors and emit values regardless of collectors.
396
397
```kotlin { .api }
398
/**
399
* A flow that can be shared among multiple collectors
400
*/
401
interface SharedFlow<out T> : Flow<T> {
402
/** Values currently available for replay */
403
val replayCache: List<T>
404
}
405
406
/**
407
* Mutable version of SharedFlow
408
*/
409
interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
410
/** Number of active subscribers */
411
val subscriptionCount: StateFlow<Int>
412
413
/** Emit a value to all subscribers */
414
override suspend fun emit(value: T)
415
416
/** Try to emit a value without suspending */
417
fun tryEmit(value: T): Boolean
418
419
/** Reset the replay cache */
420
fun resetReplayCache()
421
}
422
423
/**
424
* Create a MutableSharedFlow
425
* @param replay number of values to replay to new subscribers
426
* @param extraBufferCapacity additional buffer capacity
427
* @param onBufferOverflow strategy when buffer overflows
428
*/
429
fun <T> MutableSharedFlow(
430
replay: Int = 0,
431
extraBufferCapacity: Int = 0,
432
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
433
): MutableSharedFlow<T>
434
```
435
436
### Hot Flows - StateFlow
437
438
SharedFlow specialized for holding current state.
439
440
```kotlin { .api }
441
/**
442
* A SharedFlow that represents a mutable state
443
*/
444
interface StateFlow<out T> : SharedFlow<T> {
445
/** Current state value */
446
val value: T
447
}
448
449
/**
450
* Mutable version of StateFlow
451
*/
452
interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
453
/** Current state value (mutable) */
454
override var value: T
455
456
/** Atomically update the value */
457
fun update(function: (T) -> T)
458
459
/** Atomically update and return new value */
460
fun updateAndGet(function: (T) -> T): T
461
462
/** Atomically update and return old value */
463
fun getAndUpdate(function: (T) -> T): T
464
465
/** Compare and set the value */
466
fun compareAndSet(expect: T, update: T): Boolean
467
}
468
469
/**
470
* Create a MutableStateFlow
471
*/
472
fun <T> MutableStateFlow(value: T): MutableStateFlow<T>
473
```
474
475
**Usage Examples:**
476
477
```kotlin
478
import kotlinx.coroutines.*
479
import kotlinx.coroutines.flow.*
480
481
// SharedFlow for events
482
val eventFlow = MutableSharedFlow<String>()
483
484
// Multiple collectors
485
launch {
486
eventFlow.collect { event ->
487
println("Collector 1: $event")
488
}
489
}
490
491
launch {
492
eventFlow.collect { event ->
493
println("Collector 2: $event")
494
}
495
}
496
497
// Emit events
498
eventFlow.emit("Hello")
499
eventFlow.emit("World")
500
501
// StateFlow for state
502
val counterState = MutableStateFlow(0)
503
504
// Observe state changes
505
launch {
506
counterState.collect { count ->
507
println("Count: $count")
508
}
509
}
510
511
// Update state
512
counterState.value = 1
513
counterState.update { it + 1 }
514
```
515
516
### Flow Context and Threading
517
518
Operators for controlling flow execution context.
519
520
```kotlin { .api }
521
/**
522
* Change the context of upstream flow
523
*/
524
fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
525
526
/**
527
* Add buffering between producer and consumer
528
*/
529
fun <T> Flow<T>.buffer(
530
capacity: Int = BUFFERED,
531
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
532
): Flow<T>
533
534
/**
535
* Ensure single collector at a time
536
*/
537
fun <T> Flow<T>.shareIn(
538
scope: CoroutineScope,
539
started: SharingStarted,
540
replay: Int = 0
541
): SharedFlow<T>
542
543
/**
544
* Convert to StateFlow
545
*/
546
fun <T> Flow<T>.stateIn(
547
scope: CoroutineScope,
548
started: SharingStarted,
549
initialValue: T
550
): StateFlow<T>
551
```
552
553
### Error Handling
554
555
Operators for handling exceptions in flows.
556
557
```kotlin { .api }
558
/**
559
* Catch exceptions and handle them
560
*/
561
fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>
562
563
/**
564
* Retry on exceptions
565
*/
566
fun <T> Flow<T>.retry(retries: Long = Long.MAX_VALUE): Flow<T>
567
568
/**
569
* Retry with predicate
570
*/
571
fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T>
572
```
573
574
## Flow Principles
575
576
1. **Cold Nature**: Flows are cold by default - they don't start producing values until collected
577
2. **Sequential Processing**: Flow operators process values sequentially unless explicit concurrency is used
578
3. **Context Preservation**: Flows preserve coroutine context and respect structured concurrency
579
4. **Exception Transparency**: Exceptions in flows propagate to collectors unless handled with `catch`
580
5. **Cancellation Support**: Flows are cancellable and respect coroutine cancellation