CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-iosx64

Kotlin coroutines library providing comprehensive asynchronous programming support with structured concurrency for iOS x64 platforms

Pending
Overview
Eval results
Files

channels.mddocs/

Channels

Communication primitives between coroutines with various buffering strategies and channel types. Channels provide a way to send values between coroutines with flow control and different delivery guarantees.

Capabilities

Channel Interfaces

Core interfaces for sending and receiving values between coroutines.

/**
 * Interface for sending values to a channel
 */
interface SendChannel<in E> {
    /** True if channel is closed for sending */
    val isClosedForSend: Boolean
    
    /** Send a value, suspending if channel is full */
    suspend fun send(element: E)
    
    /** Try to send a value immediately without suspending */
    fun trySend(element: E): ChannelResult<Unit>
    
    /** Close the channel optionally with a cause */
    fun close(cause: Throwable? = null): Boolean
    
    /** Register a handler for when channel is closed */
    fun invokeOnClose(handler: (cause: Throwable?) -> Unit): Unit
}

/**
 * Interface for receiving values from a channel
 */
interface ReceiveChannel<out E> {
    /** True if channel is closed for receiving and empty */
    val isClosedForReceive: Boolean
    
    /** True if channel is empty */
    val isEmpty: Boolean
    
    /** Receive a value, suspending if channel is empty */
    suspend fun receive(): E
    
    /** Try to receive a value immediately without suspending */
    fun tryReceive(): ChannelResult<E>
    
    /** Receive a value or null/failure if closed */
    suspend fun receiveCatching(): ChannelResult<E>
    
    /** Cancel the channel with optional cause */
    fun cancel(cause: CancellationException? = null)
    
    /** Get iterator for consuming values */
    operator fun iterator(): ChannelIterator<E>
}

/**
 * Bidirectional channel combining SendChannel and ReceiveChannel
 */
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

Channel Result

Result wrapper for non-blocking channel operations.

/**
 * Result of channel operations
 */
@JvmInline
value class ChannelResult<out T> {
    /** True if operation was successful */
    val isSuccess: Boolean
    
    /** True if channel was closed */
    val isClosed: Boolean
    
    /** True if operation failed (channel full/empty) */
    val isFailure: Boolean
    
    /** Get the value or throw if not successful */
    fun getOrThrow(): T
    
    /** Get the value or null if not successful */
    fun getOrNull(): T?
    
    /** Get the exception or null if successful */
    fun exceptionOrNull(): Throwable?
}

Channel Iterator

Iterator for consuming channel values.

/**
 * Iterator for channel values
 */
interface ChannelIterator<out E> {
    /** Check if there are more values (suspending) */
    suspend fun hasNext(): Boolean
    
    /** Get the next value */
    operator fun next(): E
}

Channel Factory

Factory function for creating channels with various configurations.

/**
 * Create a channel with specified capacity and behavior
 * @param capacity channel capacity (RENDEZVOUS, CONFLATED, UNLIMITED, or positive number)
 * @param onBufferOverflow behavior when buffer is full
 * @param onUndeliveredElement handler for undelivered elements
 */
fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

// Capacity constants
const val RENDEZVOUS = 0          // No buffering, direct handoff
const val CONFLATED = -1          // Keep only latest value
const val UNLIMITED = Int.MAX_VALUE  // Unlimited buffering
const val BUFFERED = -2           // Use default buffer size

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

// Rendezvous channel (no buffering)
val rendezvousChannel = Channel<Int>()

// Buffered channel
val bufferedChannel = Channel<String>(capacity = 10)

// Conflated channel (only latest value)
val conflatedChannel = Channel<Data>(capacity = Channel.CONFLATED)

// Unlimited channel
val unlimitedChannel = Channel<Event>(capacity = Channel.UNLIMITED)

// Producer-consumer example
launch {
    // Producer
    for (i in 1..5) {
        rendezvousChannel.send(i)
        println("Sent: $i")
    }
    rendezvousChannel.close()
}

launch {
    // Consumer
    for (value in rendezvousChannel) {
        println("Received: $value")
        delay(100) // Simulate processing
    }
}

Buffer Overflow Strategies

Configuration for how channels handle buffer overflow.

/**
 * Strategy for handling buffer overflow
 */
enum class BufferOverflow {
    /** Suspend sender when buffer is full */
    SUSPEND,
    /** Drop oldest values when buffer is full */
    DROP_OLDEST,
    /** Drop latest values when buffer is full */
    DROP_LATEST
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

// Channel that drops oldest values when full
val dropOldestChannel = Channel<Int>(
    capacity = 3,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)

// Channel that drops latest values when full
val dropLatestChannel = Channel<Int>(
    capacity = 3,
    onBufferOverflow = BufferOverflow.DROP_LATEST
)

// Fast producer, slow consumer
launch {
    repeat(10) { i ->
        val result = dropOldestChannel.trySend(i)
        if (result.isSuccess) {
            println("Sent: $i")
        } else {
            println("Dropped: $i")
        }
    }
    dropOldestChannel.close()
}

launch {
    delay(500) // Slow consumer
    for (value in dropOldestChannel) {
        println("Received: $value")
    }
}

Producer Function

Creates a receive channel from a coroutine that produces values.

/**
 * Creates a receive channel from a producer coroutine
 * @param context additional context for the producer
 * @param capacity channel capacity
 * @param block producer coroutine code
 */
fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>

/**
 * Scope for producer coroutines
 */
interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
    /** The channel being produced to */
    val channel: SendChannel<E>
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

// Simple producer
val numbers = produce {
    for (i in 1..10) {
        send(i * i)
        delay(100)
    }
}

// Consume the values
for (square in numbers) {
    println("Square: $square")
}

// Producer with error handling
val dataProducer = produce<String> {
    try {
        while (true) {
            val data = fetchData() // May throw exception
            send(data)
            delay(1000)
        }
    } catch (e: Exception) {
        // Producer will close channel with this exception
        throw e
    }
}

Select Clauses for Channels

Channel operations can be used in select expressions for non-blocking multi-way selection.

/**
 * Select clauses for channels
 */
interface SendChannel<in E> {
    /** Select clause for sending */
    val onSend: SelectClause2<E, SendChannel<E>>
}

interface ReceiveChannel<out E> {
    /** Select clause for receiving */
    val onReceive: SelectClause1<E>
    
    /** Select clause for receiving with result */
    val onReceiveCatching: SelectClause1<ChannelResult<E>>
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*

suspend fun selectChannelOperations() {
    val channel1 = Channel<String>()
    val channel2 = Channel<String>()
    
    // Select between multiple channel operations
    val result = select<String> {
        channel1.onReceive { value ->
            "From channel1: $value"
        }
        channel2.onReceive { value ->
            "From channel2: $value"
        }
        onTimeout(1000) {
            "Timeout"
        }
    }
    
    println(result)
}

suspend fun selectSend() {
    val channel1 = Channel<Int>(1)
    val channel2 = Channel<Int>(1) 
    
    select<Unit> {
        channel1.onSend(42) {
            println("Sent to channel1")
        }
        channel2.onSend(24) {
            println("Sent to channel2")
        }
    }
}

Channel Extensions

Utility functions for working with channels.

/**
 * Consume all values from the channel
 */
suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit)

/**
 * Convert channel to list
 */
suspend fun <E> ReceiveChannel<E>.toList(): List<E>

/**
 * Map channel values
 */
fun <E, R> ReceiveChannel<E>.map(transform: suspend (E) -> R): ReceiveChannel<R>

/**
 * Filter channel values
 */
fun <E> ReceiveChannel<E>.filter(predicate: suspend (E) -> Boolean): ReceiveChannel<E>

/**
 * Take first n values from channel
 */
fun <E> ReceiveChannel<E>.take(n: Int): ReceiveChannel<E>

Broadcast Channel (Deprecated)

Legacy broadcasting channel replaced by SharedFlow.

/**
 * @deprecated Use SharedFlow instead
 * Channel that broadcasts values to multiple subscribers
 */
@Deprecated("Use SharedFlow instead", level = DeprecationLevel.WARNING)
interface BroadcastChannel<E> : SendChannel<E> {
    /** Subscribe to the broadcast channel */
    fun openSubscription(): ReceiveChannel<E>
    
    /** Cancel all subscriptions */
    fun cancel(cause: CancellationException? = null)
}

Channel Patterns

Fan-out Pattern

Multiple consumers processing values from a single channel.

val jobs = List(3) { workerId ->
    launch {
        for (work in workChannel) {
            processWork(work, workerId)
        }
    }
}

Fan-in Pattern

Multiple producers sending values to a single channel.

val outputChannel = Channel<Result>()

// Multiple producers
repeat(3) { producerId ->
    launch {
        repeat(10) { 
            outputChannel.send(produceResult(producerId, it))
        }
    }
}

Pipeline Pattern

Chaining channels for multi-stage processing.

val rawData = produce { /* generate raw data */ }
val processed = produce {
    for (data in rawData) {
        send(processStage1(data))
    }
}
val final = produce {
    for (data in processed) {
        send(processStage2(data))
    }
}

Channel vs Flow

FeatureChannelFlow
NatureHot (always active)Cold (starts on collect)
ConsumersMultiple concurrentSingle sequential
BufferingBuilt-in bufferingOperator-based buffering
BackpressureSend suspensionCollector-driven
Use CaseProducer-consumerData transformation pipelines

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-iosx64

docs

channels.md

coroutine-builders.md

coroutine-management.md

dispatchers.md

error-handling.md

flow-api.md

index.md

select-expression.md

synchronization.md

tile.json