Kotlin coroutines library providing comprehensive asynchronous programming support with structured concurrency for iOS x64 platforms
—
Communication primitives between coroutines with various buffering strategies and channel types. Channels provide a way to send values between coroutines with flow control and different delivery guarantees.
Core interfaces for sending and receiving values between coroutines.
/**
* Interface for sending values to a channel
*/
interface SendChannel<in E> {
/** True if channel is closed for sending */
val isClosedForSend: Boolean
/** Send a value, suspending if channel is full */
suspend fun send(element: E)
/** Try to send a value immediately without suspending */
fun trySend(element: E): ChannelResult<Unit>
/** Close the channel optionally with a cause */
fun close(cause: Throwable? = null): Boolean
/** Register a handler for when channel is closed */
fun invokeOnClose(handler: (cause: Throwable?) -> Unit): Unit
}
/**
* Interface for receiving values from a channel
*/
interface ReceiveChannel<out E> {
/** True if channel is closed for receiving and empty */
val isClosedForReceive: Boolean
/** True if channel is empty */
val isEmpty: Boolean
/** Receive a value, suspending if channel is empty */
suspend fun receive(): E
/** Try to receive a value immediately without suspending */
fun tryReceive(): ChannelResult<E>
/** Receive a value or null/failure if closed */
suspend fun receiveCatching(): ChannelResult<E>
/** Cancel the channel with optional cause */
fun cancel(cause: CancellationException? = null)
/** Get iterator for consuming values */
operator fun iterator(): ChannelIterator<E>
}
/**
* Bidirectional channel combining SendChannel and ReceiveChannel
*/
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>Result wrapper for non-blocking channel operations.
/**
* Result of channel operations
*/
@JvmInline
value class ChannelResult<out T> {
/** True if operation was successful */
val isSuccess: Boolean
/** True if channel was closed */
val isClosed: Boolean
/** True if operation failed (channel full/empty) */
val isFailure: Boolean
/** Get the value or throw if not successful */
fun getOrThrow(): T
/** Get the value or null if not successful */
fun getOrNull(): T?
/** Get the exception or null if successful */
fun exceptionOrNull(): Throwable?
}Iterator for consuming channel values.
/**
* Iterator for channel values
*/
interface ChannelIterator<out E> {
/** Check if there are more values (suspending) */
suspend fun hasNext(): Boolean
/** Get the next value */
operator fun next(): E
}Factory function for creating channels with various configurations.
/**
* Create a channel with specified capacity and behavior
* @param capacity channel capacity (RENDEZVOUS, CONFLATED, UNLIMITED, or positive number)
* @param onBufferOverflow behavior when buffer is full
* @param onUndeliveredElement handler for undelivered elements
*/
fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
// Capacity constants
const val RENDEZVOUS = 0 // No buffering, direct handoff
const val CONFLATED = -1 // Keep only latest value
const val UNLIMITED = Int.MAX_VALUE // Unlimited buffering
const val BUFFERED = -2 // Use default buffer sizeUsage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
// Rendezvous channel (no buffering)
val rendezvousChannel = Channel<Int>()
// Buffered channel
val bufferedChannel = Channel<String>(capacity = 10)
// Conflated channel (only latest value)
val conflatedChannel = Channel<Data>(capacity = Channel.CONFLATED)
// Unlimited channel
val unlimitedChannel = Channel<Event>(capacity = Channel.UNLIMITED)
// Producer-consumer example
launch {
// Producer
for (i in 1..5) {
rendezvousChannel.send(i)
println("Sent: $i")
}
rendezvousChannel.close()
}
launch {
// Consumer
for (value in rendezvousChannel) {
println("Received: $value")
delay(100) // Simulate processing
}
}Configuration for how channels handle buffer overflow.
/**
* Strategy for handling buffer overflow
*/
enum class BufferOverflow {
/** Suspend sender when buffer is full */
SUSPEND,
/** Drop oldest values when buffer is full */
DROP_OLDEST,
/** Drop latest values when buffer is full */
DROP_LATEST
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
// Channel that drops oldest values when full
val dropOldestChannel = Channel<Int>(
capacity = 3,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
// Channel that drops latest values when full
val dropLatestChannel = Channel<Int>(
capacity = 3,
onBufferOverflow = BufferOverflow.DROP_LATEST
)
// Fast producer, slow consumer
launch {
repeat(10) { i ->
val result = dropOldestChannel.trySend(i)
if (result.isSuccess) {
println("Sent: $i")
} else {
println("Dropped: $i")
}
}
dropOldestChannel.close()
}
launch {
delay(500) // Slow consumer
for (value in dropOldestChannel) {
println("Received: $value")
}
}Creates a receive channel from a coroutine that produces values.
/**
* Creates a receive channel from a producer coroutine
* @param context additional context for the producer
* @param capacity channel capacity
* @param block producer coroutine code
*/
fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
/**
* Scope for producer coroutines
*/
interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
/** The channel being produced to */
val channel: SendChannel<E>
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
// Simple producer
val numbers = produce {
for (i in 1..10) {
send(i * i)
delay(100)
}
}
// Consume the values
for (square in numbers) {
println("Square: $square")
}
// Producer with error handling
val dataProducer = produce<String> {
try {
while (true) {
val data = fetchData() // May throw exception
send(data)
delay(1000)
}
} catch (e: Exception) {
// Producer will close channel with this exception
throw e
}
}Channel operations can be used in select expressions for non-blocking multi-way selection.
/**
* Select clauses for channels
*/
interface SendChannel<in E> {
/** Select clause for sending */
val onSend: SelectClause2<E, SendChannel<E>>
}
interface ReceiveChannel<out E> {
/** Select clause for receiving */
val onReceive: SelectClause1<E>
/** Select clause for receiving with result */
val onReceiveCatching: SelectClause1<ChannelResult<E>>
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
suspend fun selectChannelOperations() {
val channel1 = Channel<String>()
val channel2 = Channel<String>()
// Select between multiple channel operations
val result = select<String> {
channel1.onReceive { value ->
"From channel1: $value"
}
channel2.onReceive { value ->
"From channel2: $value"
}
onTimeout(1000) {
"Timeout"
}
}
println(result)
}
suspend fun selectSend() {
val channel1 = Channel<Int>(1)
val channel2 = Channel<Int>(1)
select<Unit> {
channel1.onSend(42) {
println("Sent to channel1")
}
channel2.onSend(24) {
println("Sent to channel2")
}
}
}Utility functions for working with channels.
/**
* Consume all values from the channel
*/
suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit)
/**
* Convert channel to list
*/
suspend fun <E> ReceiveChannel<E>.toList(): List<E>
/**
* Map channel values
*/
fun <E, R> ReceiveChannel<E>.map(transform: suspend (E) -> R): ReceiveChannel<R>
/**
* Filter channel values
*/
fun <E> ReceiveChannel<E>.filter(predicate: suspend (E) -> Boolean): ReceiveChannel<E>
/**
* Take first n values from channel
*/
fun <E> ReceiveChannel<E>.take(n: Int): ReceiveChannel<E>Legacy broadcasting channel replaced by SharedFlow.
/**
* @deprecated Use SharedFlow instead
* Channel that broadcasts values to multiple subscribers
*/
@Deprecated("Use SharedFlow instead", level = DeprecationLevel.WARNING)
interface BroadcastChannel<E> : SendChannel<E> {
/** Subscribe to the broadcast channel */
fun openSubscription(): ReceiveChannel<E>
/** Cancel all subscriptions */
fun cancel(cause: CancellationException? = null)
}Multiple consumers processing values from a single channel.
val jobs = List(3) { workerId ->
launch {
for (work in workChannel) {
processWork(work, workerId)
}
}
}Multiple producers sending values to a single channel.
val outputChannel = Channel<Result>()
// Multiple producers
repeat(3) { producerId ->
launch {
repeat(10) {
outputChannel.send(produceResult(producerId, it))
}
}
}Chaining channels for multi-stage processing.
val rawData = produce { /* generate raw data */ }
val processed = produce {
for (data in rawData) {
send(processStage1(data))
}
}
val final = produce {
for (data in processed) {
send(processStage2(data))
}
}| Feature | Channel | Flow |
|---|---|---|
| Nature | Hot (always active) | Cold (starts on collect) |
| Consumers | Multiple concurrent | Single sequential |
| Buffering | Built-in buffering | Operator-based buffering |
| Backpressure | Send suspension | Collector-driven |
| Use Case | Producer-consumer | Data transformation pipelines |
Install with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-iosx64