Coroutines support libraries for Kotlin providing structured concurrency primitives, Flow API for reactive streams, channels for communication, and synchronization utilities across all Kotlin platforms
—
Asynchronous data streams with rich operator support for reactive programming patterns. Flow provides a cold stream implementation that enables declarative handling of asynchronous data sequences.
The foundational interfaces for reactive stream processing in kotlinx-coroutines.
/**
* An asynchronous data stream that sequentially emits values
* and completes normally or with an exception.
*/
interface Flow<out T> {
/**
* Accepts the given collector and emits values into it.
* This method should never be called directly.
*/
suspend fun collect(collector: FlowCollector<T>)
}
/**
* FlowCollector is used as an intermediate or a terminal collector of flow.
*/
interface FlowCollector<in T> {
/**
* Collects the value emitted by the upstream.
*/
suspend fun emit(value: T)
}Functions to create flows from various sources and patterns.
/**
* Creates a cold flow from the given suspendable block.
* The flow being built is collected concurrently with the
* building block execution.
*/
fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>
/**
* Creates a flow that produces the given values.
*/
fun <T> flowOf(vararg elements: T): Flow<T>
/**
* Creates a flow that produces no values.
*/
fun <T> emptyFlow(): Flow<T>
/**
* Creates a flow from Iterable, Iterator, or arrays.
*/
fun <T> Iterable<T>.asFlow(): Flow<T>
fun <T> Iterator<T>.asFlow(): Flow<T>
fun <T> Array<out T>.asFlow(): Flow<T>
/**
* Creates a flow using channels which can be used from multiple coroutines.
* The resulting flow is a cold flow.
*/
fun <T> channelFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>
/**
* Creates a flow suitable for use with callback-based APIs.
*/
fun <T> callbackFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Basic flow creation
val simpleFlow = flow {
for (i in 1..5) {
emit(i)
delay(100)
}
}
// Flow from values
val valuesFlow = flowOf(1, 2, 3, 4, 5)
// Flow from collections
val listFlow = listOf("a", "b", "c").asFlow()
// Channel flow for concurrent emissions
val channelBasedFlow = channelFlow {
launch {
repeat(3) {
send("From coroutine 1: $it")
delay(100)
}
}
launch {
repeat(3) {
send("From coroutine 2: $it")
delay(150)
}
}
}
// Callback flow for bridging callback APIs
val callbackBasedFlow = callbackFlow {
val callback = object : EventCallback {
override fun onEvent(data: String) {
trySend(data)
}
override fun onError(error: Exception) {
close(error)
}
}
eventSource.registerCallback(callback)
awaitClose {
eventSource.unregisterCallback(callback)
}
}Transform, filter, and manipulate flow emissions.
/**
* Returns a flow containing the results of applying the given transform
* function to each value of the original flow.
*/
fun <T, R> Flow<T>.map(transform: suspend (value: T) -> R): Flow<R>
/**
* Returns a flow containing only values of the original flow that match
* the given predicate.
*/
fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T>
/**
* Applies the given transform function to each value and emits all
* elements returned by transform function.
*/
fun <T, R> Flow<T>.transform(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>
/**
* Returns a flow that performs the given action on each value of the
* original flow as it passes through.
*/
fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>
/**
* Returns a flow that invokes the given action before this flow starts
* to be collected.
*/
fun <T> Flow<T>.onStart(action: suspend FlowCollector<T>.() -> Unit): Flow<T>
/**
* Returns a flow that invokes the given action after the flow is completed
* or cancelled.
*/
fun <T> Flow<T>.onCompletion(action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit): Flow<T>Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val scope = MainScope()
scope.launch {
flowOf(1, 2, 3, 4, 5)
.map { it * 2 } // Transform each value
.filter { it > 4 } // Keep only values > 4
.onEach { println("Processing: $it") } // Side effect
.collect { value ->
println("Collected: $value")
}
// Output: Processing: 6, Collected: 6, Processing: 8, Collected: 8, Processing: 10, Collected: 10
}
// Complex transformation
scope.launch {
flow {
emit("hello")
emit("world")
}
.transform { value ->
emit(value.uppercase())
emit(value.length)
}
.collect { println(it) }
// Output: HELLO, 5, WORLD, 5
}
// Flow lifecycle hooks
scope.launch {
flowOf(1, 2, 3)
.onStart { emit(0) } // Emit 0 before starting
.onCompletion { emit(-1) } // Emit -1 after completion
.collect { println("Value: $it") }
// Output: Value: 0, Value: 1, Value: 2, Value: 3, Value: -1
}Control the execution context and threading behavior of flows.
/**
* Changes the context where this flow is executed to the given context.
* All operations upstream of flowOn are executed in the provided context.
*/
fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
/**
* Buffers flow emissions via channel with given capacity and runs collector
* in a separate coroutine.
*/
fun <T> Flow<T>.buffer(
capacity: Int = Channel.BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>
/**
* Conflates flow emissions via conflated channel and runs collector in a
* separate coroutine. Latest emitted value overwrites previous values.
*/
fun <T> Flow<T>.conflate(): Flow<T>Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val scope = MainScope()
scope.launch {
flow {
println("Emitting on: ${Thread.currentThread().name}")
for (i in 1..3) {
emit(i)
delay(100)
}
}
.flowOn(Dispatchers.IO) // Upstream operations run on IO dispatcher
.collect { value ->
println("Collecting $value on: ${Thread.currentThread().name}")
}
}
// Buffering for performance
scope.launch {
flow {
repeat(100) {
emit(it)
delay(10) // Slow producer
}
}
.buffer(10) // Buffer up to 10 items
.collect { value ->
delay(50) // Slow consumer
println("Processed: $value")
}
}
// Conflation for latest-value semantics
scope.launch {
flow {
repeat(100) {
emit(it)
delay(10)
}
}
.conflate() // Only keep latest value when consumer is slow
.collect { value ->
delay(100) // Very slow consumer
println("Latest value: $value")
}
}Consume flow values and produce final results.
/**
* Terminal flow operator that collects the given flow with a provided
* action that is applied to each emitted element.
*/
suspend fun <T> Flow<T>.collect(action: suspend (value: T) -> Unit)
/**
* Terminal operator that collects the given flow ensuring that emissions
* from upstream are performed on the current coroutine context.
*/
suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)
/**
* Collects all elements from the flow and returns them as a List.
*/
suspend fun <T> Flow<T>.toList(): List<T>
/**
* Collects all elements from the flow and returns them as a Set.
*/
suspend fun <T> Flow<T>.toSet(): Set<T>
/**
* Returns the first element emitted by the flow and cancels flow's collection.
*/
suspend fun <T> Flow<T>.first(): T
/**
* Returns the first element emitted by the flow matching the predicate.
*/
suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T
/**
* Returns the single element emitted by the flow.
*/
suspend fun <T> Flow<T>.single(): T
/**
* Accumulates value starting with initial value and applying operation
* from left to right to current accumulator value and each element.
*/
suspend fun <T, R> Flow<T>.fold(initial: R, operation: suspend (acc: R, value: T) -> R): R
/**
* Accumulates value starting with the first element and applying operation
* from left to right to current accumulator value and each element.
*/
suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S
/**
* Returns the number of elements in this flow.
*/
suspend fun <T> Flow<T>.count(): Int
/**
* Returns the number of elements matching the given predicate.
*/
suspend fun <T> Flow<T>.count(predicate: suspend (T) -> Boolean): IntUsage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val scope = MainScope()
scope.launch {
val numbers = flowOf(1, 2, 3, 4, 5)
// Collect to list
val list = numbers.toList()
println("List: $list")
// Get first element
val first = numbers.first()
println("First: $first")
// Find first matching predicate
val firstEven = numbers.first { it % 2 == 0 }
println("First even: $firstEven")
// Fold operation
val sum = numbers.fold(0) { acc, value -> acc + value }
println("Sum: $sum")
// Reduce operation
val product = numbers.reduce { acc, value -> acc * value }
println("Product: $product")
// Count elements
val evenCount = numbers.count { it % 2 == 0 }
println("Even numbers: $evenCount")
}
// collectLatest cancels previous collection
scope.launch {
flow {
repeat(5) {
emit(it)
delay(100)
}
}
.collectLatest { value ->
println("Processing $value")
delay(200) // Slow processing
println("Finished $value") // Only last value will finish
}
}Hot flows that share emissions among multiple collectors.
/**
* A hot Flow that shares emitted values among all its collectors in a broadcast fashion.
*/
interface SharedFlow<out T> : Flow<T> {
/**
* A snapshot of the most recently emitted values into this shared flow.
*/
val replayCache: List<T>
/**
* The number of subscribers (active collectors) to this shared flow.
*/
val subscriptionCount: StateFlow<Int>
}
/**
* A mutable SharedFlow that provides functions to emit values to the flow.
*/
interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
/**
* Tries to emit a value to this flow without suspending the caller.
*/
fun tryEmit(value: T): Boolean
/**
* Resets the replayCache of this flow to an empty state.
*/
fun resetReplayCache()
/**
* The number of subscribers (active collectors) to this shared flow.
*/
override val subscriptionCount: StateFlow<Int>
}
/**
* Creates a MutableSharedFlow with the given configuration.
*/
fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>
/**
* A SharedFlow that represents a read-only state with a single updatable value.
*/
interface StateFlow<out T> : SharedFlow<T> {
/**
* The current value of this state flow.
*/
val value: T
}
/**
* A mutable StateFlow that provides a setter for value.
*/
interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
/**
* The current value of this state flow.
*/
override var value: T
/**
* Atomically compares the current value with expect and sets it to update if they are the same.
*/
fun compareAndSet(expect: T, update: T): Boolean
}
/**
* Creates a MutableStateFlow with the given initial value.
*/
fun <T> MutableStateFlow(value: T): MutableStateFlow<T>
/**
* Updates the MutableStateFlow.value atomically using the specified function of its value.
*/
fun <T> MutableStateFlow<T>.update(function: (T) -> T)
/**
* Updates the MutableStateFlow.value atomically using the specified function of its value and returns the new value.
*/
fun <T> MutableStateFlow<T>.updateAndGet(function: (T) -> T): T
/**
* Updates the MutableStateFlow.value atomically using the specified function of its value and returns its prior value.
*/
fun <T> MutableStateFlow<T>.getAndUpdate(function: (T) -> T): TUsage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val scope = MainScope()
// SharedFlow example
val sharedFlow = MutableSharedFlow<String>(replay = 2)
// Multiple collectors
scope.launch {
sharedFlow.collect { value ->
println("Collector 1: $value")
}
}
scope.launch {
sharedFlow.collect { value ->
println("Collector 2: $value")
}
}
// Emit values
scope.launch {
sharedFlow.emit("First")
delay(100)
sharedFlow.emit("Second")
delay(100)
sharedFlow.emit("Third")
}
// StateFlow example
val stateFlow = MutableStateFlow("Initial")
// Observe state changes
scope.launch {
stateFlow.collect { state ->
println("Current state: $state")
}
}
// Update state
scope.launch {
delay(1000)
stateFlow.value = "Updated"
delay(1000)
stateFlow.value = "Final"
}
// Atomic updates using update functions
val counter = MutableStateFlow(0)
scope.launch {
// Safe concurrent increment
counter.update { it + 1 }
// Get new value after update
val newValue = counter.updateAndGet { it * 2 }
println("New value: $newValue")
// Get old value before update
val oldValue = counter.getAndUpdate { it - 5 }
println("Old value: $oldValue")
}
// StateFlow always has current value
println("Immediate value: ${stateFlow.value}")Convert cold flows to hot flows with sharing behavior.
/**
* Converts this cold Flow into a hot SharedFlow that is started in the given
* coroutine scope, sharing emissions from a single running instance of the upstream flow.
*/
fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T>
/**
* Converts this cold Flow into a hot StateFlow that is started in the given
* coroutine scope, sharing the most recently emitted value from a single running
* instance of the upstream flow.
*/
fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T>
/**
* A policy for starting and stopping the sharing coroutine in shareIn and stateIn operators.
*/
interface SharingStarted {
companion object {
val Eagerly: SharingStarted
val Lazily: SharingStarted
fun WhileSubscribed(
stopTimeoutMillis: Long = Long.MAX_VALUE,
replayExpirationMillis: Long = Long.MAX_VALUE
): SharingStarted
}
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val scope = MainScope()
// Cold flow
val coldFlow = flow {
println("Flow started")
repeat(5) {
emit(it)
delay(1000)
}
}
// Convert to hot SharedFlow
val hotSharedFlow = coldFlow.shareIn(
scope = scope,
started = SharingStarted.WhileSubscribed(),
replay = 1
)
// Multiple collectors share the same flow instance
scope.launch {
delay(2000) // Start collecting after 2 seconds
hotSharedFlow.collect { value ->
println("Late collector: $value")
}
}
scope.launch {
hotSharedFlow.collect { value ->
println("Early collector: $value")
}
}
// Convert to StateFlow
val dataFlow = flow {
emit("Loading...")
delay(2000)
emit("Data loaded")
}
val dataState = dataFlow.stateIn(
scope = scope,
started = SharingStarted.Lazily,
initialValue = "Initial"
)
// State is immediately available
println("Current state: ${dataState.value}")
scope.launch {
dataState.collect { state ->
println("State changed: $state")
}
}Combine multiple flows into single flows with various strategies.
/**
* Zips values from the current flow with other flow using provided transform
* function applied to each pair of values.
*/
fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
/**
* Returns a flow whose values are generated by transform function by combining
* the most recently emitted values by each flow.
*/
fun <T1, T2, R> Flow<T1>.combine(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>
/**
* Flattens the given flow of flows into a single flow by merging emissions.
*/
fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T>
/**
* Transforms elements emitted by the original flow by applying transform that
* returns another flow, and then merging the resulting flows.
*/
fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R>
/**
* Transforms elements emitted by the original flow by applying transform that
* returns another flow, and then concatenating the resulting flows.
*/
fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
/**
* Transforms elements emitted by the original flow by applying transform that
* returns another flow, and then flattening these flows by switching to new flows.
*/
fun <T, R> Flow<T>.flatMapLatest(transform: suspend (value: T) -> Flow<R>): Flow<R>Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
val scope = MainScope()
scope.launch {
// Zip - waits for both flows to emit
val flow1 = flowOf(1, 2, 3).onEach { delay(100) }
val flow2 = flowOf("A", "B", "C").onEach { delay(150) }
flow1.zip(flow2) { num, letter ->
"$num$letter"
}.collect { println("Zipped: $it") }
// Output: Zipped: 1A, Zipped: 2B, Zipped: 3C
}
scope.launch {
// Combine - uses latest values from both
val numbers = flow {
repeat(3) {
emit(it)
delay(100)
}
}
val letters = flow {
repeat(3) {
emit('A' + it)
delay(150)
}
}
numbers.combine(letters) { num, letter ->
"$num$letter"
}.collect { println("Combined: $it") }
}
scope.launch {
// FlatMap merge - run multiple flows concurrently
flowOf(1, 2, 3)
.flatMapMerge { value ->
flow {
repeat(2) {
emit("$value-$it")
delay(100)
}
}
}
.collect { println("Merged: $it") }
}
scope.launch {
// FlatMap latest - cancel previous when new value arrives
flow {
emit(1)
delay(200)
emit(2)
delay(200)
emit(3)
}
.flatMapLatest { value ->
flow {
repeat(5) {
emit("$value-$it")
delay(100)
}
}
}
.collect { println("Latest: $it") }
}Install with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core