CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core

Coroutines support libraries for Kotlin providing structured concurrency primitives, Flow API for reactive streams, channels for communication, and synchronization utilities across all Kotlin platforms

Pending
Overview
Eval results
Files

flow-api.mddocs/

Flow API - Reactive Streams

Asynchronous data streams with rich operator support for reactive programming patterns. Flow provides a cold stream implementation that enables declarative handling of asynchronous data sequences.

Capabilities

Core Flow Types

The foundational interfaces for reactive stream processing in kotlinx-coroutines.

/**
 * An asynchronous data stream that sequentially emits values
 * and completes normally or with an exception.
 */
interface Flow<out T> {
    /**
     * Accepts the given collector and emits values into it.
     * This method should never be called directly.
     */
    suspend fun collect(collector: FlowCollector<T>)
}

/**
 * FlowCollector is used as an intermediate or a terminal collector of flow.
 */
interface FlowCollector<in T> {
    /**
     * Collects the value emitted by the upstream.
     */
    suspend fun emit(value: T)
}

Flow Builders

Functions to create flows from various sources and patterns.

/**
 * Creates a cold flow from the given suspendable block.
 * The flow being built is collected concurrently with the
 * building block execution.
 */
fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>

/**
 * Creates a flow that produces the given values.
 */
fun <T> flowOf(vararg elements: T): Flow<T>

/**
 * Creates a flow that produces no values.
 */
fun <T> emptyFlow(): Flow<T>

/**
 * Creates a flow from Iterable, Iterator, or arrays.
 */
fun <T> Iterable<T>.asFlow(): Flow<T>
fun <T> Iterator<T>.asFlow(): Flow<T>
fun <T> Array<out T>.asFlow(): Flow<T>

/**
 * Creates a flow using channels which can be used from multiple coroutines.
 * The resulting flow is a cold flow.
 */
fun <T> channelFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>

/**
 * Creates a flow suitable for use with callback-based APIs.
 */
fun <T> callbackFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Basic flow creation
val simpleFlow = flow {
    for (i in 1..5) {
        emit(i)
        delay(100)
    }
}

// Flow from values
val valuesFlow = flowOf(1, 2, 3, 4, 5)

// Flow from collections
val listFlow = listOf("a", "b", "c").asFlow()

// Channel flow for concurrent emissions
val channelBasedFlow = channelFlow {
    launch {
        repeat(3) {
            send("From coroutine 1: $it")
            delay(100)
        }
    }
    launch {
        repeat(3) {
            send("From coroutine 2: $it")
            delay(150)
        }
    }
}

// Callback flow for bridging callback APIs
val callbackBasedFlow = callbackFlow {
    val callback = object : EventCallback {
        override fun onEvent(data: String) {
            trySend(data)
        }
        override fun onError(error: Exception) {
            close(error)
        }
    }
    
    eventSource.registerCallback(callback)
    
    awaitClose {
        eventSource.unregisterCallback(callback)
    }
}

Intermediate Operators

Transform, filter, and manipulate flow emissions.

/**
 * Returns a flow containing the results of applying the given transform
 * function to each value of the original flow.
 */
fun <T, R> Flow<T>.map(transform: suspend (value: T) -> R): Flow<R>

/**
 * Returns a flow containing only values of the original flow that match
 * the given predicate.
 */
fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T>

/**
 * Applies the given transform function to each value and emits all
 * elements returned by transform function.
 */
fun <T, R> Flow<T>.transform(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>

/**
 * Returns a flow that performs the given action on each value of the
 * original flow as it passes through.
 */
fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>

/**
 * Returns a flow that invokes the given action before this flow starts
 * to be collected.
 */
fun <T> Flow<T>.onStart(action: suspend FlowCollector<T>.() -> Unit): Flow<T>

/**
 * Returns a flow that invokes the given action after the flow is completed
 * or cancelled.
 */
fun <T> Flow<T>.onCompletion(action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit): Flow<T>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val scope = MainScope()

scope.launch {
    flowOf(1, 2, 3, 4, 5)
        .map { it * 2 }  // Transform each value
        .filter { it > 4 }  // Keep only values > 4
        .onEach { println("Processing: $it") }  // Side effect
        .collect { value ->
            println("Collected: $value")
        }
    // Output: Processing: 6, Collected: 6, Processing: 8, Collected: 8, Processing: 10, Collected: 10
}

// Complex transformation
scope.launch {
    flow {
        emit("hello")
        emit("world")
    }
        .transform { value ->
            emit(value.uppercase())
            emit(value.length)
        }
        .collect { println(it) }
    // Output: HELLO, 5, WORLD, 5
}

// Flow lifecycle hooks
scope.launch {
    flowOf(1, 2, 3)
        .onStart { emit(0) }  // Emit 0 before starting
        .onCompletion { emit(-1) }  // Emit -1 after completion
        .collect { println("Value: $it") }
    // Output: Value: 0, Value: 1, Value: 2, Value: 3, Value: -1
}

Flow Context and Threading

Control the execution context and threading behavior of flows.

/**
 * Changes the context where this flow is executed to the given context.
 * All operations upstream of flowOn are executed in the provided context.
 */
fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>

/**
 * Buffers flow emissions via channel with given capacity and runs collector
 * in a separate coroutine.
 */
fun <T> Flow<T>.buffer(
    capacity: Int = Channel.BUFFERED,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>

/**
 * Conflates flow emissions via conflated channel and runs collector in a
 * separate coroutine. Latest emitted value overwrites previous values.
 */
fun <T> Flow<T>.conflate(): Flow<T>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val scope = MainScope()

scope.launch {
    flow {
        println("Emitting on: ${Thread.currentThread().name}")
        for (i in 1..3) {
            emit(i)
            delay(100)
        }
    }
        .flowOn(Dispatchers.IO)  // Upstream operations run on IO dispatcher
        .collect { value ->
            println("Collecting $value on: ${Thread.currentThread().name}")
        }
}

// Buffering for performance
scope.launch {
    flow {
        repeat(100) {
            emit(it)
            delay(10)  // Slow producer
        }
    }
        .buffer(10)  // Buffer up to 10 items
        .collect { value ->
            delay(50)  // Slow consumer
            println("Processed: $value")
        }
}

// Conflation for latest-value semantics
scope.launch {
    flow {
        repeat(100) {
            emit(it)
            delay(10)
        }
    }
        .conflate()  // Only keep latest value when consumer is slow
        .collect { value ->
            delay(100)  // Very slow consumer
            println("Latest value: $value")
        }
}

Terminal Operators

Consume flow values and produce final results.

/**
 * Terminal flow operator that collects the given flow with a provided
 * action that is applied to each emitted element.
 */
suspend fun <T> Flow<T>.collect(action: suspend (value: T) -> Unit)

/**
 * Terminal operator that collects the given flow ensuring that emissions
 * from upstream are performed on the current coroutine context.
 */
suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)

/**
 * Collects all elements from the flow and returns them as a List.
 */
suspend fun <T> Flow<T>.toList(): List<T>

/**
 * Collects all elements from the flow and returns them as a Set.
 */
suspend fun <T> Flow<T>.toSet(): Set<T>

/**
 * Returns the first element emitted by the flow and cancels flow's collection.
 */
suspend fun <T> Flow<T>.first(): T

/**
 * Returns the first element emitted by the flow matching the predicate.
 */
suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T

/**
 * Returns the single element emitted by the flow.
 */
suspend fun <T> Flow<T>.single(): T

/**
 * Accumulates value starting with initial value and applying operation
 * from left to right to current accumulator value and each element.
 */
suspend fun <T, R> Flow<T>.fold(initial: R, operation: suspend (acc: R, value: T) -> R): R

/**
 * Accumulates value starting with the first element and applying operation
 * from left to right to current accumulator value and each element.
 */
suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

/**
 * Returns the number of elements in this flow.
 */
suspend fun <T> Flow<T>.count(): Int

/**
 * Returns the number of elements matching the given predicate.
 */
suspend fun <T> Flow<T>.count(predicate: suspend (T) -> Boolean): Int

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val scope = MainScope()

scope.launch {
    val numbers = flowOf(1, 2, 3, 4, 5)
    
    // Collect to list
    val list = numbers.toList()
    println("List: $list")
    
    // Get first element
    val first = numbers.first()
    println("First: $first")
    
    // Find first matching predicate
    val firstEven = numbers.first { it % 2 == 0 }
    println("First even: $firstEven")
    
    // Fold operation
    val sum = numbers.fold(0) { acc, value -> acc + value }
    println("Sum: $sum")
    
    // Reduce operation
    val product = numbers.reduce { acc, value -> acc * value }
    println("Product: $product")
    
    // Count elements
    val evenCount = numbers.count { it % 2 == 0 }
    println("Even numbers: $evenCount")
}

// collectLatest cancels previous collection
scope.launch {
    flow {
        repeat(5) {
            emit(it)
            delay(100)
        }
    }
        .collectLatest { value ->
            println("Processing $value")
            delay(200)  // Slow processing
            println("Finished $value")  // Only last value will finish
        }
}

SharedFlow and StateFlow

Hot flows that share emissions among multiple collectors.

/**
 * A hot Flow that shares emitted values among all its collectors in a broadcast fashion.
 */
interface SharedFlow<out T> : Flow<T> {
    /**
     * A snapshot of the most recently emitted values into this shared flow.
     */
    val replayCache: List<T>
    
    /**
     * The number of subscribers (active collectors) to this shared flow.
     */
    val subscriptionCount: StateFlow<Int>
}

/**
 * A mutable SharedFlow that provides functions to emit values to the flow.
 */
interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    /**
     * Tries to emit a value to this flow without suspending the caller.
     */
    fun tryEmit(value: T): Boolean
    
    /**
     * Resets the replayCache of this flow to an empty state.
     */
    fun resetReplayCache()
    
    /**
     * The number of subscribers (active collectors) to this shared flow.
     */
    override val subscriptionCount: StateFlow<Int>
}

/**
 * Creates a MutableSharedFlow with the given configuration.
 */
fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

/**
 * A SharedFlow that represents a read-only state with a single updatable value.
 */
interface StateFlow<out T> : SharedFlow<T> {
    /**
     * The current value of this state flow.
     */
    val value: T
}

/**
 * A mutable StateFlow that provides a setter for value.
 */
interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    /**
     * The current value of this state flow.
     */
    override var value: T
    
    /**
     * Atomically compares the current value with expect and sets it to update if they are the same.
     */
    fun compareAndSet(expect: T, update: T): Boolean
}

/**
 * Creates a MutableStateFlow with the given initial value.
 */
fun <T> MutableStateFlow(value: T): MutableStateFlow<T>

/**
 * Updates the MutableStateFlow.value atomically using the specified function of its value.
 */
fun <T> MutableStateFlow<T>.update(function: (T) -> T)

/**
 * Updates the MutableStateFlow.value atomically using the specified function of its value and returns the new value.
 */
fun <T> MutableStateFlow<T>.updateAndGet(function: (T) -> T): T

/**
 * Updates the MutableStateFlow.value atomically using the specified function of its value and returns its prior value.
 */
fun <T> MutableStateFlow<T>.getAndUpdate(function: (T) -> T): T

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val scope = MainScope()

// SharedFlow example
val sharedFlow = MutableSharedFlow<String>(replay = 2)

// Multiple collectors
scope.launch {
    sharedFlow.collect { value ->
        println("Collector 1: $value")
    }
}

scope.launch {
    sharedFlow.collect { value ->
        println("Collector 2: $value")
    }
}

// Emit values
scope.launch {
    sharedFlow.emit("First")
    delay(100)
    sharedFlow.emit("Second")
    delay(100)
    sharedFlow.emit("Third")
}

// StateFlow example
val stateFlow = MutableStateFlow("Initial")

// Observe state changes
scope.launch {
    stateFlow.collect { state ->
        println("Current state: $state")
    }
}

// Update state
scope.launch {
    delay(1000)
    stateFlow.value = "Updated"
    delay(1000)
    stateFlow.value = "Final"
}

// Atomic updates using update functions
val counter = MutableStateFlow(0)
scope.launch {
    // Safe concurrent increment
    counter.update { it + 1 }
    
    // Get new value after update
    val newValue = counter.updateAndGet { it * 2 }
    println("New value: $newValue")
    
    // Get old value before update
    val oldValue = counter.getAndUpdate { it - 5 }
    println("Old value: $oldValue")
}

// StateFlow always has current value
println("Immediate value: ${stateFlow.value}")

Flow Sharing Operators

Convert cold flows to hot flows with sharing behavior.

/**
 * Converts this cold Flow into a hot SharedFlow that is started in the given
 * coroutine scope, sharing emissions from a single running instance of the upstream flow.
 */
fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

/**
 * Converts this cold Flow into a hot StateFlow that is started in the given
 * coroutine scope, sharing the most recently emitted value from a single running
 * instance of the upstream flow.
 */
fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

/**
 * A policy for starting and stopping the sharing coroutine in shareIn and stateIn operators.
 */
interface SharingStarted {
    companion object {
        val Eagerly: SharingStarted
        val Lazily: SharingStarted
        
        fun WhileSubscribed(
            stopTimeoutMillis: Long = Long.MAX_VALUE,
            replayExpirationMillis: Long = Long.MAX_VALUE
        ): SharingStarted
    }
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val scope = MainScope()

// Cold flow
val coldFlow = flow {
    println("Flow started")
    repeat(5) {
        emit(it)
        delay(1000)
    }
}

// Convert to hot SharedFlow
val hotSharedFlow = coldFlow.shareIn(
    scope = scope,
    started = SharingStarted.WhileSubscribed(),
    replay = 1
)

// Multiple collectors share the same flow instance
scope.launch {
    delay(2000)  // Start collecting after 2 seconds
    hotSharedFlow.collect { value ->
        println("Late collector: $value")
    }
}

scope.launch {
    hotSharedFlow.collect { value ->
        println("Early collector: $value")
    }
}

// Convert to StateFlow
val dataFlow = flow {
    emit("Loading...")
    delay(2000)
    emit("Data loaded")
}

val dataState = dataFlow.stateIn(
    scope = scope,
    started = SharingStarted.Lazily,
    initialValue = "Initial"
)

// State is immediately available
println("Current state: ${dataState.value}")

scope.launch {
    dataState.collect { state ->
        println("State changed: $state")
    }
}

Flow Combination Operators

Combine multiple flows into single flows with various strategies.

/**
 * Zips values from the current flow with other flow using provided transform
 * function applied to each pair of values.
 */
fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>

/**
 * Returns a flow whose values are generated by transform function by combining
 * the most recently emitted values by each flow.
 */
fun <T1, T2, R> Flow<T1>.combine(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>

/**
 * Flattens the given flow of flows into a single flow by merging emissions.
 */
fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T>

/**
 * Transforms elements emitted by the original flow by applying transform that
 * returns another flow, and then merging the resulting flows.
 */
fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

/**
 * Transforms elements emitted by the original flow by applying transform that
 * returns another flow, and then concatenating the resulting flows.
 */
fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>

/**
 * Transforms elements emitted by the original flow by applying transform that
 * returns another flow, and then flattening these flows by switching to new flows.
 */
fun <T, R> Flow<T>.flatMapLatest(transform: suspend (value: T) -> Flow<R>): Flow<R>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val scope = MainScope()

scope.launch {
    // Zip - waits for both flows to emit
    val flow1 = flowOf(1, 2, 3).onEach { delay(100) }
    val flow2 = flowOf("A", "B", "C").onEach { delay(150) }
    
    flow1.zip(flow2) { num, letter ->
        "$num$letter"
    }.collect { println("Zipped: $it") }
    // Output: Zipped: 1A, Zipped: 2B, Zipped: 3C
}

scope.launch {
    // Combine - uses latest values from both
    val numbers = flow {
        repeat(3) {
            emit(it)
            delay(100)
        }
    }
    val letters = flow {
        repeat(3) {
            emit('A' + it)
            delay(150)
        }
    }
    
    numbers.combine(letters) { num, letter ->
        "$num$letter"
    }.collect { println("Combined: $it") }
}

scope.launch {
    // FlatMap merge - run multiple flows concurrently
    flowOf(1, 2, 3)
        .flatMapMerge { value ->
            flow {
                repeat(2) {
                    emit("$value-$it")
                    delay(100)
                }
            }
        }
        .collect { println("Merged: $it") }
}

scope.launch {
    // FlatMap latest - cancel previous when new value arrives
    flow {
        emit(1)
        delay(200)
        emit(2)
        delay(200)
        emit(3)
    }
        .flatMapLatest { value ->
            flow {
                repeat(5) {
                    emit("$value-$it")
                    delay(100)
                }
            }
        }
        .collect { println("Latest: $it") }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core

docs

channels.md

coroutine-builders.md

dispatchers.md

exception-handling.md

flow-api.md

index.md

jobs-deferreds.md

structured-concurrency.md

synchronization.md

tile.json