CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-jvm

JVM-specific implementation of kotlinx.coroutines core library providing coroutine primitives, builders, dispatchers, and synchronization primitives for asynchronous programming in Kotlin.

Pending
Overview
Eval results
Files

flow-api.mddocs/

Flow API

Reactive streams implementation providing cold flows for asynchronous sequences and hot flows for shared state management with backpressure and exception transparency.

Capabilities

Flow Interface

Base interface for cold asynchronous data streams that emit values sequentially.

interface Flow<out T> {
    /** Accepts collector and emits values into it */
    suspend fun collect(collector: FlowCollector<T>)
}

interface FlowCollector<in T> {
    /** Emits a value to the flow */
    suspend fun emit(value: T)
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    // Basic flow collection
    val simpleFlow = flow {
        repeat(3) { i ->
            emit(i)
            delay(100)
        }
    }
    
    simpleFlow.collect { value ->
        println("Collected: $value")
    }
    
    // Flow with transformation
    simpleFlow
        .map { it * 2 }
        .filter { it > 0 }
        .collect { value ->
            println("Transformed: $value")
        }
}

Flow Builders

Functions for creating flows from various sources.

/** Creates a flow from a builder block */
fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>

/** Creates a flow from fixed values */
fun <T> flowOf(vararg elements: T): Flow<T>

/** Creates an empty flow */
fun <T> emptyFlow(): Flow<T>

/** Creates a flow from a channel */
fun <T> channelFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>

/** Converts various types to flows */
fun <T> Iterable<T>.asFlow(): Flow<T>
fun <T> Sequence<T>.asFlow(): Flow<T>
fun <T> Array<T>.asFlow(): Flow<T>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    // Flow from builder
    val countFlow = flow {
        repeat(5) { i ->
            emit(i)
            delay(100)
        }
    }
    
    // Flow from values
    val valuesFlow = flowOf("a", "b", "c", "d")
    
    // Flow from collection
    val listFlow = listOf(1, 2, 3, 4, 5).asFlow()
    
    // Channel flow for concurrent emission
    val channelFlow = channelFlow {
        launch {
            repeat(3) { i ->
                send(i * 10)
                delay(50)
            }
        }
        launch {
            repeat(3) { i ->
                send(i * 100)
                delay(75)
            }
        }
    }
    
    channelFlow.collect { println("Channel flow: $it") }
}

Flow Operators

Intermediate operators for transforming flows.

/** Transform each value */
fun <T, R> Flow<T>.map(transform: suspend (T) -> R): Flow<R>

/** Filter values based on predicate */
fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T>

/** Transform to flows and flatten */
fun <T, R> Flow<T>.flatMapConcat(transform: suspend (T) -> Flow<R>): Flow<R>
fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (T) -> Flow<R>
): Flow<R>

/** Take first N elements */
fun <T> Flow<T>.take(count: Int): Flow<T>

/** Drop first N elements */
fun <T> Flow<T>.drop(count: Int): Flow<T>

/** Execute on different context */
fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>

/** Buffer emissions */
fun <T> Flow<T>.buffer(
    capacity: Int = BUFFERED,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>

/** Combine with another flow */
fun <T1, T2, R> Flow<T1>.combine(
    flow: Flow<T2>,
    transform: suspend (T1, T2) -> R
): Flow<R>

/** Zip with another flow */
fun <T1, T2, R> Flow<T1>.zip(
    other: Flow<T2>,
    transform: suspend (T1, T2) -> R
): Flow<R>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val sourceFlow = (1..10).asFlow()
    
    // Chain multiple operators
    sourceFlow
        .filter { it % 2 == 0 }
        .map { it * it }
        .take(3)
        .flowOn(Dispatchers.Default)
        .collect { println("Result: $it") }
    
    // FlatMap example
    sourceFlow
        .take(3)
        .flatMapConcat { value ->
            flow {
                repeat(2) {
                    emit("$value-$it")
                    delay(50)
                }
            }
        }
        .collect { println("FlatMap: $it") }
    
    // Combine flows
    val flow1 = (1..3).asFlow().onEach { delay(100) }
    val flow2 = (10..12).asFlow().onEach { delay(150) }
    
    flow1.combine(flow2) { a, b -> "$a+$b" }
        .collect { println("Combined: $it") }
}

Terminal Operators

Operators that consume the flow and produce results.

/** Collect all values */
suspend fun <T> Flow<T>.collect(action: suspend (T) -> Unit)

/** Collect to list */
suspend fun <T> Flow<T>.toList(): List<T>

/** Collect to set */
suspend fun <T> Flow<T>.toSet(): Set<T>

/** Get first value */
suspend fun <T> Flow<T>.first(): T
suspend fun <T> Flow<T>.firstOrNull(): T?

/** Get single value */
suspend fun <T> Flow<T>.single(): T
suspend fun <T> Flow<T>.singleOrNull(): T?

/** Reduce values */
suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (S, T) -> S): S

/** Fold with initial value */
suspend fun <T, R> Flow<T>.fold(
    initial: R,
    operation: suspend (R, T) -> R
): R

/** Count elements */
suspend fun <T> Flow<T>.count(): Int
suspend fun <T> Flow<T>.count(predicate: suspend (T) -> Boolean): Int

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val numberFlow = (1..5).asFlow()
    
    // Collect to collections
    val list = numberFlow.toList()
    println("List: $list")
    
    // Reduce operations
    val sum = numberFlow.fold(0) { acc, value -> acc + value }
    println("Sum: $sum")
    
    val product = numberFlow.reduce { acc, value -> acc * value }
    println("Product: $product")
    
    // Single values
    val first = numberFlow.first()
    val count = numberFlow.count { it > 3 }
    println("First: $first, Count > 3: $count")
}

SharedFlow Interface

Hot flow that shares emitted values among multiple collectors.

interface SharedFlow<out T> : Flow<T> {
    /** Most recent values available to new subscribers */
    val replayCache: List<T>
    
    /** Current number of subscribers */
    val subscriptionCount: StateFlow<Int>
}

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    /** Number of subscribers */
    val subscriptionCount: StateFlow<Int>
    
    /** Emits value to all subscribers */
    override suspend fun emit(value: T)
    
    /** Tries to emit value immediately */
    fun tryEmit(value: T): Boolean
    
    /** Resets replay cache */
    fun resetReplayCache()
}

/** Creates MutableSharedFlow */
fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>(replay = 2)
    
    // Start collectors
    val job1 = launch {
        sharedFlow.collect { value ->
            println("Collector 1: $value")
        }
    }
    
    val job2 = launch {
        sharedFlow.collect { value ->
            println("Collector 2: $value")
        }
    }
    
    delay(100)
    
    // Emit values
    sharedFlow.emit(1)
    sharedFlow.emit(2)
    sharedFlow.emit(3)
    
    delay(100)
    
    // New collector gets replay
    val job3 = launch {
        sharedFlow.collect { value ->
            println("Collector 3 (late): $value")
        }
    }
    
    sharedFlow.emit(4)
    
    delay(100)
    listOf(job1, job2, job3).forEach { it.cancel() }
}

StateFlow Interface

Hot flow representing a state with current value.

interface StateFlow<out T> : SharedFlow<T> {
    /** Current state value */
    val value: T
}

interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    /** Current state value (mutable) */
    override var value: T
    
    /** Atomically compares and sets value */
    fun compareAndSet(expect: T, update: T): Boolean
    
    /** Updates value atomically with function */
    inline fun update(function: (T) -> T)
    
    /** Updates value atomically and returns new value */
    inline fun updateAndGet(function: (T) -> T): T
    
    /** Updates value atomically and returns old value */
    inline fun getAndUpdate(function: (T) -> T): T
}

/** Creates MutableStateFlow */
fun <T> MutableStateFlow(value: T): MutableStateFlow<T>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

data class UiState(val loading: Boolean, val data: String)

class ViewModel {
    private val _uiState = MutableStateFlow(UiState(loading = false, data = ""))
    val uiState: StateFlow<UiState> = _uiState.asStateFlow()
    
    fun loadData() {
        _uiState.value = _uiState.value.copy(loading = true)
        
        // Simulate async operation
        GlobalScope.launch {
            delay(1000)
            _uiState.value = UiState(loading = false, data = "Loaded data")
        }
    }
    
    fun updateData(newData: String) {
        _uiState.update { currentState ->
            currentState.copy(data = newData)
        }
    }
}

fun main() = runBlocking {
    val viewModel = ViewModel()
    
    // Collect state changes
    val job = launch {
        viewModel.uiState.collect { state ->
            println("UI State: $state")
        }
    }
    
    delay(100)
    viewModel.loadData()
    delay(1500)
    viewModel.updateData("Updated data")
    
    delay(100)
    job.cancel()
}

Exception Handling

Flow exception handling with catch and error recovery.

/** Catches upstream exceptions */
fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(Throwable) -> Unit): Flow<T>

/** Handles completion with optional exception */
fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(Throwable?) -> Unit
): Flow<T>

/** Retries on exception */
fun <T> Flow<T>.retry(
    retries: Long = Long.MAX_VALUE,
    predicate: suspend (Throwable) -> Boolean = { true }
): Flow<T>

/** Retries with when predicate */
fun <T> Flow<T>.retryWhen(
    predicate: suspend FlowCollector<T>.(Throwable, attempt: Long) -> Boolean
): Flow<T>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun createFailingFlow() = flow {
    repeat(5) { i ->
        if (i == 3) throw RuntimeException("Simulated error")
        emit(i)
        delay(100)
    }
}

fun main() = runBlocking {
    // Exception handling with catch
    createFailingFlow()
        .catch { e ->
            println("Caught exception: ${e.message}")
            emit(-1) // Emit recovery value
        }
        .onCompletion { cause ->
            if (cause == null) {
                println("Flow completed successfully")
            } else {
                println("Flow completed with exception: $cause")
            }
        }
        .collect { value ->
            println("Value: $value")
        }
    
    // Retry example
    createFailingFlow()
        .retry(retries = 2) { exception ->
            println("Retrying due to: ${exception.message}")
            true
        }
        .catch { e ->
            println("All retries failed: ${e.message}")
        }
        .collect { value ->
            println("Retry value: $value")
        }
}

Types

BufferOverflow

Strategy for handling buffer overflow.

enum class BufferOverflow {
    /** Suspend on buffer overflow (default) */
    SUSPEND,
    /** Drop oldest values */
    DROP_OLDEST,
    /** Drop latest values */
    DROP_LATEST
}

FlowCollector

Interface for collecting flow values.

fun interface FlowCollector<in T> {
    suspend fun emit(value: T)
}

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-jvm

docs

channels.md

coroutine-builders.md

dispatchers.md

exception-handling.md

flow-api.md

index.md

job-management.md

jvm-integration.md

synchronization.md

tile.json