CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-iosx64

Kotlin coroutines library providing comprehensive asynchronous programming support with structured concurrency for iOS x64 platforms

Pending
Overview
Eval results
Files

flow-api.mddocs/

Flow API

Cold reactive streams with comprehensive transformation operators, hot flows for state and event broadcasting. The Flow API provides a complete solution for asynchronous data streams with structured concurrency support.

Capabilities

Cold Flow Interface

Base interfaces for cold streams that are created fresh for each collector.

/**
 * Represents an asynchronous flow of values
 */
interface Flow<out T> {
    /** Collects the flow values with the provided collector */
    suspend fun collect(collector: FlowCollector<T>)
}

/**
 * Collector for flow values
 */
interface FlowCollector<in T> {
    /** Emit a value to the flow */
    suspend fun emit(value: T)
}

/**
 * Base class for flow implementations
 */
abstract class AbstractFlow<T> : Flow<T> {
    /** Collect implementation that ensures proper flow context */
    final override suspend fun collect(collector: FlowCollector<T>)
    /** Abstract method for subclasses to implement */
    abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

Flow Builders

Functions for creating flows from various sources.

/**
 * Creates a flow with a builder block
 * @param block the flow builder that emits values
 * @return Flow that emits values from the builder
 */
fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>

/**
 * Creates a flow from a sequence of values
 * @param elements values to emit
 * @return Flow that emits the provided values
 */
fun <T> flowOf(vararg elements: T): Flow<T>

/**
 * Creates an empty flow
 */
fun <T> emptyFlow(): Flow<T>

/**
 * Creates a flow from a collection
 */
fun <T> Iterable<T>.asFlow(): Flow<T>

/**
 * Creates a flow from a sequence
 */
fun <T> Sequence<T>.asFlow(): Flow<T>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Basic flow creation
val numberFlow = flow {
    for (i in 1..5) {
        delay(100)
        emit(i)
    }
}

// Flow from values
val valueFlow = flowOf(1, 2, 3, 4, 5)

// Flow from collection
val listFlow = listOf("a", "b", "c").asFlow()

// Collect the flow
numberFlow.collect { value ->
    println("Received: $value")
}

Channel Flow Builder

Flow with concurrent emission capabilities for complex producers.

/**
 * Creates a flow with a channel-based producer
 * @param block producer block with ProducerScope
 * @return Flow backed by a channel
 */
fun <T> channelFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>

/**
 * Creates a flow from callback-based APIs
 * @param block producer block with callback registration
 * @return Flow that emits callback values
 */
fun <T> callbackFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// Channel flow for concurrent emission
val concurrentFlow = channelFlow {
    launch {
        for (i in 1..5) {
            send(i)
            delay(100)
        }
    }
    launch {
        for (i in 6..10) {
            send(i)
            delay(150)
        }
    }
}

// Callback flow for integrating with callback APIs
val callbackBasedFlow = callbackFlow {
    val listener = object : DataListener {
        override fun onData(data: String) {
            trySend(data)
        }
        override fun onComplete() {
            close()
        }
    }
    
    // Register listener
    dataSource.addListener(listener)
    
    // Cleanup when flow is cancelled
    awaitClose {
        dataSource.removeListener(listener)
    }
}

Transformation Operators

Operators for transforming flow values.

/**
 * Transform each value emitted by the flow
 */
fun <T, R> Flow<T>.map(transform: suspend (value: T) -> R): Flow<R>

/**
 * Transform each value with its index
 */
fun <T, R> Flow<T>.mapIndexed(transform: suspend (index: Int, value: T) -> R): Flow<R>

/**
 * Transform values and filter out nulls
 */
fun <T, R : Any> Flow<T>.mapNotNull(transform: suspend (value: T) -> R?): Flow<R>

/**
 * Transform to flows and flatten concatenating
 */
fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>

/**
 * Transform to flows and flatten merging concurrently
 */
fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

/**
 * Transform with general transformation block
 */
fun <T, R> Flow<T>.transform(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>

/**
 * Accumulate values with scan operation
 */
fun <T, R> Flow<T>.scan(initial: R, operation: suspend (accumulator: R, value: T) -> R): Flow<R>

Usage Examples:

import kotlinx.coroutines.flow.*

val numbers = flowOf(1, 2, 3, 4, 5)

// Map transformation
val doubled = numbers.map { it * 2 }

// Map with index
val indexed = numbers.mapIndexed { index, value -> "[$index] = $value" }

// Filter and transform
val evenSquares = numbers
    .filter { it % 2 == 0 }
    .map { it * it }

// Scan for running totals
val runningSum = numbers.scan(0) { acc, value -> acc + value }

// Transform with emission control
val expanded = numbers.transform { value ->
    emit(value)
    emit(value * 10)
}

Filtering Operators

Operators for filtering flow values based on conditions.

/**
 * Filter values based on predicate
 */
fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T>

/**
 * Filter out null values
 */
fun <T : Any> Flow<T?>.filterNotNull(): Flow<T>

/**
 * Take first N values
 */
fun <T> Flow<T>.take(count: Int): Flow<T>

/**
 * Take while predicate is true
 */
fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>

/**
 * Drop first N values
 */
fun <T> Flow<T>.drop(count: Int): Flow<T>

/**
 * Drop while predicate is true
 */
fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>

/**
 * Emit only distinct consecutive values
 */
fun <T> Flow<T>.distinctUntilChanged(): Flow<T>

/**
 * Emit only distinct consecutive values by key
 */
fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: suspend (T) -> K): Flow<T>

Advanced Transformation Operators

Advanced operators for complex transformations and timing control.

/**
 * Transform values but cancel previous transformation on new emission
 */
fun <T, R> Flow<T>.mapLatest(transform: suspend (value: T) -> R): Flow<R>

/**
 * Transform values with full control, cancelling on new emission
 */
fun <T, R> Flow<T>.transformLatest(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>

/**
 * Running fold that emits intermediate results
 */
fun <T, R> Flow<T>.runningFold(initial: R, operation: suspend (accumulator: R, value: T) -> R): Flow<R>

/**
 * Running reduce that emits intermediate results
 */
fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T>

Timing Operators

Operators for controlling timing of emissions.

/**
 * Sample emissions at specified intervals
 */
fun <T> Flow<T>.sample(periodMillis: Long): Flow<T>

/**
 * Debounce emissions - only emit if no new value within timeout
 */
fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>

/**
 * Add timeout to flow emissions
 */
fun <T> Flow<T>.timeout(timeoutMillis: Long): Flow<T>

/**
 * Delay each emission
 */
fun <T> Flow<T>.delayEach(delayMillis: Long): Flow<T>

Terminal Operators

Operators that consume the flow and produce a final result.

/**
 * Collect all values with the provided action
 */
suspend fun <T> Flow<T>.collect(action: suspend (value: T) -> Unit)

/**
 * Collect with index
 */
suspend fun <T> Flow<T>.collectIndexed(action: suspend (index: Int, value: T) -> Unit)

/**
 * Launch collection in the provided scope
 */
fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job

/**
 * Reduce flow to a single value
 */
suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

/**
 * Fold flow values with initial value
 */
suspend fun <T, R> Flow<T>.fold(initial: R, operation: suspend (acc: R, value: T) -> R): R

/**
 * Get first value
 */
suspend fun <T> Flow<T>.first(): T

/**
 * Get first value or null
 */
suspend fun <T> Flow<T>.firstOrNull(): T?

/**
 * Get single value
 */
suspend fun <T> Flow<T>.single(): T

/**
 * Convert to list
 */
suspend fun <T> Flow<T>.toList(): List<T>

/**
 * Convert to set
 */
suspend fun <T> Flow<T>.toSet(): Set<T>

/**
 * Count values
 */
suspend fun <T> Flow<T>.count(): Int

Hot Flows - SharedFlow

Hot flows that can have multiple collectors and emit values regardless of collectors.

/**
 * A flow that can be shared among multiple collectors
 */
interface SharedFlow<out T> : Flow<T> {
    /** Values currently available for replay */
    val replayCache: List<T>
}

/**
 * Mutable version of SharedFlow
 */
interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
    /** Number of active subscribers */
    val subscriptionCount: StateFlow<Int>
    
    /** Emit a value to all subscribers */
    override suspend fun emit(value: T)
    
    /** Try to emit a value without suspending */
    fun tryEmit(value: T): Boolean
    
    /** Reset the replay cache */
    fun resetReplayCache()
}

/**
 * Create a MutableSharedFlow
 * @param replay number of values to replay to new subscribers
 * @param extraBufferCapacity additional buffer capacity
 * @param onBufferOverflow strategy when buffer overflows
 */
fun <T> MutableSharedFlow(
    replay: Int = 0,
    extraBufferCapacity: Int = 0,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>

Hot Flows - StateFlow

SharedFlow specialized for holding current state.

/**
 * A SharedFlow that represents a mutable state
 */
interface StateFlow<out T> : SharedFlow<T> {
    /** Current state value */
    val value: T
}

/**
 * Mutable version of StateFlow
 */
interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
    /** Current state value (mutable) */
    override var value: T
    
    /** Atomically update the value */
    fun update(function: (T) -> T)
    
    /** Atomically update and return new value */
    fun updateAndGet(function: (T) -> T): T
    
    /** Atomically update and return old value */
    fun getAndUpdate(function: (T) -> T): T
    
    /** Compare and set the value */
    fun compareAndSet(expect: T, update: T): Boolean
}

/**
 * Create a MutableStateFlow
 */
fun <T> MutableStateFlow(value: T): MutableStateFlow<T>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

// SharedFlow for events
val eventFlow = MutableSharedFlow<String>()

// Multiple collectors
launch {
    eventFlow.collect { event ->
        println("Collector 1: $event")
    }
}

launch {
    eventFlow.collect { event ->
        println("Collector 2: $event")
    }
}

// Emit events
eventFlow.emit("Hello")
eventFlow.emit("World")

// StateFlow for state
val counterState = MutableStateFlow(0)

// Observe state changes
launch {
    counterState.collect { count ->
        println("Count: $count")
    }
}

// Update state
counterState.value = 1
counterState.update { it + 1 }

Flow Context and Threading

Operators for controlling flow execution context.

/**
 * Change the context of upstream flow
 */
fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>

/**
 * Add buffering between producer and consumer
 */
fun <T> Flow<T>.buffer(
    capacity: Int = BUFFERED,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>

/**
 * Ensure single collector at a time
 */
fun <T> Flow<T>.shareIn(
    scope: CoroutineScope,
    started: SharingStarted,
    replay: Int = 0
): SharedFlow<T>

/**
 * Convert to StateFlow
 */
fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
): StateFlow<T>

Error Handling

Operators for handling exceptions in flows.

/**
 * Catch exceptions and handle them
 */
fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>

/**
 * Retry on exceptions
 */
fun <T> Flow<T>.retry(retries: Long = Long.MAX_VALUE): Flow<T>

/**
 * Retry with predicate
 */
fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T>

Flow Principles

  1. Cold Nature: Flows are cold by default - they don't start producing values until collected
  2. Sequential Processing: Flow operators process values sequentially unless explicit concurrency is used
  3. Context Preservation: Flows preserve coroutine context and respect structured concurrency
  4. Exception Transparency: Exceptions in flows propagate to collectors unless handled with catch
  5. Cancellation Support: Flows are cancellable and respect coroutine cancellation

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-iosx64

docs

channels.md

coroutine-builders.md

coroutine-management.md

dispatchers.md

error-handling.md

flow-api.md

index.md

select-expression.md

synchronization.md

tile.json