Kotlin coroutines library providing comprehensive asynchronous programming support with structured concurrency for iOS x64 platforms
—
Cold reactive streams with comprehensive transformation operators, hot flows for state and event broadcasting. The Flow API provides a complete solution for asynchronous data streams with structured concurrency support.
Base interfaces for cold streams that are created fresh for each collector.
/**
* Represents an asynchronous flow of values
*/
interface Flow<out T> {
/** Collects the flow values with the provided collector */
suspend fun collect(collector: FlowCollector<T>)
}
/**
* Collector for flow values
*/
interface FlowCollector<in T> {
/** Emit a value to the flow */
suspend fun emit(value: T)
}
/**
* Base class for flow implementations
*/
abstract class AbstractFlow<T> : Flow<T> {
/** Collect implementation that ensures proper flow context */
final override suspend fun collect(collector: FlowCollector<T>)
/** Abstract method for subclasses to implement */
abstract suspend fun collectSafely(collector: FlowCollector<T>)
}Functions for creating flows from various sources.
/**
* Creates a flow with a builder block
* @param block the flow builder that emits values
* @return Flow that emits values from the builder
*/
fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>
/**
* Creates a flow from a sequence of values
* @param elements values to emit
* @return Flow that emits the provided values
*/
fun <T> flowOf(vararg elements: T): Flow<T>
/**
* Creates an empty flow
*/
fun <T> emptyFlow(): Flow<T>
/**
* Creates a flow from a collection
*/
fun <T> Iterable<T>.asFlow(): Flow<T>
/**
* Creates a flow from a sequence
*/
fun <T> Sequence<T>.asFlow(): Flow<T>Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Basic flow creation
val numberFlow = flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
// Flow from values
val valueFlow = flowOf(1, 2, 3, 4, 5)
// Flow from collection
val listFlow = listOf("a", "b", "c").asFlow()
// Collect the flow
numberFlow.collect { value ->
println("Received: $value")
}Flow with concurrent emission capabilities for complex producers.
/**
* Creates a flow with a channel-based producer
* @param block producer block with ProducerScope
* @return Flow backed by a channel
*/
fun <T> channelFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>
/**
* Creates a flow from callback-based APIs
* @param block producer block with callback registration
* @return Flow that emits callback values
*/
fun <T> callbackFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Channel flow for concurrent emission
val concurrentFlow = channelFlow {
launch {
for (i in 1..5) {
send(i)
delay(100)
}
}
launch {
for (i in 6..10) {
send(i)
delay(150)
}
}
}
// Callback flow for integrating with callback APIs
val callbackBasedFlow = callbackFlow {
val listener = object : DataListener {
override fun onData(data: String) {
trySend(data)
}
override fun onComplete() {
close()
}
}
// Register listener
dataSource.addListener(listener)
// Cleanup when flow is cancelled
awaitClose {
dataSource.removeListener(listener)
}
}Operators for transforming flow values.
/**
* Transform each value emitted by the flow
*/
fun <T, R> Flow<T>.map(transform: suspend (value: T) -> R): Flow<R>
/**
* Transform each value with its index
*/
fun <T, R> Flow<T>.mapIndexed(transform: suspend (index: Int, value: T) -> R): Flow<R>
/**
* Transform values and filter out nulls
*/
fun <T, R : Any> Flow<T>.mapNotNull(transform: suspend (value: T) -> R?): Flow<R>
/**
* Transform to flows and flatten concatenating
*/
fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>
/**
* Transform to flows and flatten merging concurrently
*/
fun <T, R> Flow<T>.flatMapMerge(
concurrency: Int = DEFAULT_CONCURRENCY,
transform: suspend (value: T) -> Flow<R>
): Flow<R>
/**
* Transform with general transformation block
*/
fun <T, R> Flow<T>.transform(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>
/**
* Accumulate values with scan operation
*/
fun <T, R> Flow<T>.scan(initial: R, operation: suspend (accumulator: R, value: T) -> R): Flow<R>Usage Examples:
import kotlinx.coroutines.flow.*
val numbers = flowOf(1, 2, 3, 4, 5)
// Map transformation
val doubled = numbers.map { it * 2 }
// Map with index
val indexed = numbers.mapIndexed { index, value -> "[$index] = $value" }
// Filter and transform
val evenSquares = numbers
.filter { it % 2 == 0 }
.map { it * it }
// Scan for running totals
val runningSum = numbers.scan(0) { acc, value -> acc + value }
// Transform with emission control
val expanded = numbers.transform { value ->
emit(value)
emit(value * 10)
}Operators for filtering flow values based on conditions.
/**
* Filter values based on predicate
*/
fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T>
/**
* Filter out null values
*/
fun <T : Any> Flow<T?>.filterNotNull(): Flow<T>
/**
* Take first N values
*/
fun <T> Flow<T>.take(count: Int): Flow<T>
/**
* Take while predicate is true
*/
fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>
/**
* Drop first N values
*/
fun <T> Flow<T>.drop(count: Int): Flow<T>
/**
* Drop while predicate is true
*/
fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>
/**
* Emit only distinct consecutive values
*/
fun <T> Flow<T>.distinctUntilChanged(): Flow<T>
/**
* Emit only distinct consecutive values by key
*/
fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: suspend (T) -> K): Flow<T>Advanced operators for complex transformations and timing control.
/**
* Transform values but cancel previous transformation on new emission
*/
fun <T, R> Flow<T>.mapLatest(transform: suspend (value: T) -> R): Flow<R>
/**
* Transform values with full control, cancelling on new emission
*/
fun <T, R> Flow<T>.transformLatest(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>
/**
* Running fold that emits intermediate results
*/
fun <T, R> Flow<T>.runningFold(initial: R, operation: suspend (accumulator: R, value: T) -> R): Flow<R>
/**
* Running reduce that emits intermediate results
*/
fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T>Operators for controlling timing of emissions.
/**
* Sample emissions at specified intervals
*/
fun <T> Flow<T>.sample(periodMillis: Long): Flow<T>
/**
* Debounce emissions - only emit if no new value within timeout
*/
fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>
/**
* Add timeout to flow emissions
*/
fun <T> Flow<T>.timeout(timeoutMillis: Long): Flow<T>
/**
* Delay each emission
*/
fun <T> Flow<T>.delayEach(delayMillis: Long): Flow<T>Operators that consume the flow and produce a final result.
/**
* Collect all values with the provided action
*/
suspend fun <T> Flow<T>.collect(action: suspend (value: T) -> Unit)
/**
* Collect with index
*/
suspend fun <T> Flow<T>.collectIndexed(action: suspend (index: Int, value: T) -> Unit)
/**
* Launch collection in the provided scope
*/
fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job
/**
* Reduce flow to a single value
*/
suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S
/**
* Fold flow values with initial value
*/
suspend fun <T, R> Flow<T>.fold(initial: R, operation: suspend (acc: R, value: T) -> R): R
/**
* Get first value
*/
suspend fun <T> Flow<T>.first(): T
/**
* Get first value or null
*/
suspend fun <T> Flow<T>.firstOrNull(): T?
/**
* Get single value
*/
suspend fun <T> Flow<T>.single(): T
/**
* Convert to list
*/
suspend fun <T> Flow<T>.toList(): List<T>
/**
* Convert to set
*/
suspend fun <T> Flow<T>.toSet(): Set<T>
/**
* Count values
*/
suspend fun <T> Flow<T>.count(): IntHot flows that can have multiple collectors and emit values regardless of collectors.
/**
* A flow that can be shared among multiple collectors
*/
interface SharedFlow<out T> : Flow<T> {
/** Values currently available for replay */
val replayCache: List<T>
}
/**
* Mutable version of SharedFlow
*/
interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
/** Number of active subscribers */
val subscriptionCount: StateFlow<Int>
/** Emit a value to all subscribers */
override suspend fun emit(value: T)
/** Try to emit a value without suspending */
fun tryEmit(value: T): Boolean
/** Reset the replay cache */
fun resetReplayCache()
}
/**
* Create a MutableSharedFlow
* @param replay number of values to replay to new subscribers
* @param extraBufferCapacity additional buffer capacity
* @param onBufferOverflow strategy when buffer overflows
*/
fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T>SharedFlow specialized for holding current state.
/**
* A SharedFlow that represents a mutable state
*/
interface StateFlow<out T> : SharedFlow<T> {
/** Current state value */
val value: T
}
/**
* Mutable version of StateFlow
*/
interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {
/** Current state value (mutable) */
override var value: T
/** Atomically update the value */
fun update(function: (T) -> T)
/** Atomically update and return new value */
fun updateAndGet(function: (T) -> T): T
/** Atomically update and return old value */
fun getAndUpdate(function: (T) -> T): T
/** Compare and set the value */
fun compareAndSet(expect: T, update: T): Boolean
}
/**
* Create a MutableStateFlow
*/
fun <T> MutableStateFlow(value: T): MutableStateFlow<T>Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// SharedFlow for events
val eventFlow = MutableSharedFlow<String>()
// Multiple collectors
launch {
eventFlow.collect { event ->
println("Collector 1: $event")
}
}
launch {
eventFlow.collect { event ->
println("Collector 2: $event")
}
}
// Emit events
eventFlow.emit("Hello")
eventFlow.emit("World")
// StateFlow for state
val counterState = MutableStateFlow(0)
// Observe state changes
launch {
counterState.collect { count ->
println("Count: $count")
}
}
// Update state
counterState.value = 1
counterState.update { it + 1 }Operators for controlling flow execution context.
/**
* Change the context of upstream flow
*/
fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
/**
* Add buffering between producer and consumer
*/
fun <T> Flow<T>.buffer(
capacity: Int = BUFFERED,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>
/**
* Ensure single collector at a time
*/
fun <T> Flow<T>.shareIn(
scope: CoroutineScope,
started: SharingStarted,
replay: Int = 0
): SharedFlow<T>
/**
* Convert to StateFlow
*/
fun <T> Flow<T>.stateIn(
scope: CoroutineScope,
started: SharingStarted,
initialValue: T
): StateFlow<T>Operators for handling exceptions in flows.
/**
* Catch exceptions and handle them
*/
fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>
/**
* Retry on exceptions
*/
fun <T> Flow<T>.retry(retries: Long = Long.MAX_VALUE): Flow<T>
/**
* Retry with predicate
*/
fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T>catchInstall with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-iosx64