CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core

Coroutines support libraries for Kotlin providing structured concurrency primitives, Flow API for reactive streams, channels for communication, and synchronization utilities across all Kotlin platforms

Pending
Overview
Eval results
Files

channels.mddocs/

Channels - Communication Primitives

Producer-consumer communication channels for passing data between coroutines. Channels provide a way to transfer values between coroutines with various capacity and overflow strategies.

Capabilities

Core Channel Interfaces

The fundamental interfaces for channel-based communication between coroutines.

/**
 * Channel is a non-blocking primitive for communication between a sender and a receiver.
 * Conceptually, a channel is similar to BlockingQueue, but with suspend operations
 * instead of blocking ones.
 */
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

/**
 * Sender's interface to a Channel.
 */
interface SendChannel<in E> {
    /**
     * Returns true if this channel was closed by an invocation of close or
     * its receiving side was cancelled.
     */
    val isClosedForSend: Boolean
    
    /**
     * Sends the specified element to this channel, suspending the caller
     * while the buffer of this channel is full.
     */
    suspend fun send(element: E)
    
    /**
     * Tries to send the specified element to this channel without blocking.
     */
    fun trySend(element: E): ChannelResult<Unit>
    
    /**
     * Closes this channel with an optional cause exception.
     */
    fun close(cause: Throwable? = null): Boolean
    
    /**
     * Registers a handler which is synchronously invoked once the channel is closed.
     */
    fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
}

/**
 * Receiver's interface to a Channel.
 */
interface ReceiveChannel<out E> {
    /**
     * Returns true if this channel was closed and no more elements will ever be received.
     */
    val isClosedForReceive: Boolean
    
    /**
     * Retrieves and removes an element from this channel, suspending the caller
     * if this channel is empty.
     */
    suspend fun receive(): E
    
    /**
     * Tries to retrieve and remove an element from this channel without blocking.
     */
    fun tryReceive(): ChannelResult<E>
    
    /**
     * Cancels reception of remaining elements from this channel with an optional cause exception.
     */
    fun cancel(cause: CancellationException? = null)
    
    /**
     * Returns a new iterator to receive elements from this channel using a for loop.
     */
    operator fun iterator(): ChannelIterator<E>
}

/**
 * Iterator for ReceiveChannel.
 */
interface ChannelIterator<out E> {
    /**
     * Returns true if the channel has more elements, suspending the caller
     * if this channel is empty.
     */
    suspend operator fun hasNext(): Boolean
    
    /**
     * Retrieves the next element from this channel.
     */
    operator fun next(): E
}

Channel Factory Functions

Functions to create channels with different configurations and behaviors.

/**
 * Creates a channel with the specified buffer capacity.
 */
fun <E> Channel(
    capacity: Int = Channel.RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

/**
 * Channel capacity constants.
 */
object Channel {
    /**
     * Requests a rendezvous channel: Channel() with capacity = RENDEZVOUS.
     */
    const val RENDEZVOUS = 0
    
    /**
     * Requests a conflated channel: Channel() with capacity = CONFLATED.
     */
    const val CONFLATED = -1
    
    /**
     * Requests an unlimited channel: Channel() with capacity = UNLIMITED.
     */
    const val UNLIMITED = Int.MAX_VALUE
    
    /**
     * Requests a buffered channel with the default buffer size.
     */
    const val BUFFERED = -2
    
    /**
     * Default buffer capacity used when BUFFERED is specified.
     */
    const val CHANNEL_DEFAULT_CAPACITY = 64
}

/**
 * Buffer overflow strategies for channels.
 */
enum class BufferOverflow {
    /**
     * Suspend on buffer overflow (default).
     */
    SUSPEND,
    
    /**
     * Drop oldest elements on buffer overflow.
     */
    DROP_OLDEST,
    
    /**
     * Drop latest (newly emitted) elements on buffer overflow.
     */
    DROP_LATEST
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

val scope = MainScope()

// Rendezvous channel (capacity = 0)
val rendezvousChannel = Channel<Int>()

scope.launch {
    println("Sending 1")
    rendezvousChannel.send(1)  // Suspends until received
    println("Sent 1")
}

scope.launch {
    delay(1000)
    val value = rendezvousChannel.receive()
    println("Received: $value")
}

// Buffered channel
val bufferedChannel = Channel<String>(capacity = 10)

scope.launch {
    repeat(5) {
        bufferedChannel.send("Message $it")
        println("Sent: Message $it")
    }
    bufferedChannel.close()
}

scope.launch {
    for (message in bufferedChannel) {
        println("Received: $message")
        delay(100)
    }
}

// Conflated channel (only keeps latest)
val conflatedChannel = Channel<Int>(Channel.CONFLATED)

scope.launch {
    repeat(10) {
        conflatedChannel.send(it)
        println("Sent: $it")
    }
    conflatedChannel.close()
}

scope.launch {
    delay(500)  // Let sender finish
    for (value in conflatedChannel) {
        println("Received: $value")  // Will only receive the last value
    }
}

// Channel with overflow strategy
val overflowChannel = Channel<Int>(
    capacity = 3,
    onBufferOverflow = BufferOverflow.DROP_OLDEST
)

scope.launch {
    repeat(10) {
        val result = overflowChannel.trySend(it)
        println("Try send $it: ${result.isSuccess}")
    }
    overflowChannel.close()
}

Channel Result Type

Type-safe result wrapper for non-blocking channel operations.

/**
 * Represents the result of a channel operation.
 */
@JvmInline
value class ChannelResult<out T> {
    /**
     * Returns true if this instance represents a successful outcome.
     */
    val isSuccess: Boolean
    
    /**
     * Returns true if this instance represents a closed channel.
     */
    val isClosed: Boolean
    
    /**
     * Returns true if this instance represents a failed outcome.
     */
    val isFailure: Boolean
    
    /**
     * Returns the encapsulated value if this instance represents success or null if closed/failed.
     */
    fun getOrNull(): T?
    
    /**
     * Returns the encapsulated Throwable exception if this instance represents failure.
     */
    fun exceptionOrNull(): Throwable?
    
    /**
     * Performs the given action on the encapsulated value if this instance represents success.
     */
    inline fun onSuccess(action: (value: T) -> Unit): ChannelResult<T>
    
    /**
     * Performs the given action on the encapsulated Throwable if this instance represents failure.
     */
    inline fun onFailure(action: (exception: Throwable) -> Unit): ChannelResult<T>
    
    /**
     * Performs the given action if this instance represents a closed channel.
     */
    inline fun onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T>
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

val scope = MainScope()

val channel = Channel<String>(capacity = 2)

scope.launch {
    // Non-blocking send attempts
    repeat(5) { i ->
        val result = channel.trySend("Message $i")
        result
            .onSuccess { println("Successfully sent: Message $i") }
            .onFailure { println("Failed to send: Message $i") }
            .onClosed { println("Channel closed, cannot send: Message $i") }
    }
    
    channel.close()
}

scope.launch {
    delay(100)
    
    // Non-blocking receive attempts
    repeat(10) {
        val result = channel.tryReceive()
        result
            .onSuccess { value -> println("Successfully received: $value") }
            .onFailure { println("No value available") }
            .onClosed { println("Channel closed, no more values") }
        
        delay(50)
    }
}

Producer Builder

Create receive channels using a producer coroutine pattern.

/**
 * Launches a new coroutine to produce a stream of values by sending them
 * to a channel and returns a ReceiveChannel that yields these values.
 */
fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    onCompletion: CompletionHandler? = null,
    block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>

/**
 * Scope for produce builder.
 */
interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
    /**
     * The channel that this producer is sending to.
     */
    val channel: SendChannel<E>
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

val scope = MainScope()

// Basic producer
val numbersChannel = scope.produce {
    for (i in 1..5) {
        send(i)
        delay(100)
    }
}

scope.launch {
    for (number in numbersChannel) {
        println("Received number: $number")
    }
}

// Producer with capacity
val bufferedProducer = scope.produce(capacity = 10) {
    repeat(20) {
        send("Item $it")
        println("Produced: Item $it")
    }
}

scope.launch {
    delay(500)  // Let producer get ahead
    for (item in bufferedProducer) {
        println("Consumed: $item")
        delay(100)
    }
}

// Producer with custom context
val ioProducer = scope.produce(context = Dispatchers.IO) {
    repeat(3) {
        val data = fetchDataFromNetwork()  // IO operation
        send(data)
    }
}

scope.launch {
    for (data in ioProducer) {
        processData(data)  // Process on main context
    }
}

// Producer with exception handling
val robustProducer = scope.produce<String>(
    onCompletion = { exception ->
        println("Producer completed with: $exception")
    }
) {
    try {
        repeat(5) {
            if (it == 3) throw RuntimeException("Simulated error")
            send("Value $it")
        }
    } catch (e: Exception) {
        println("Producer caught exception: ${e.message}")
        throw e  // Re-throw to signal completion with exception
    }
}

Channel Extensions

Utility functions for working with channels and converting between channels and other types.

/**
 * Performs the given action for each received element.
 */
suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit)

/**
 * Receives all elements from the channel and returns them as a List.
 */
suspend fun <E> ReceiveChannel<E>.toList(): List<E>

/**
 * Creates a produce block from this channel.
 */
fun <E> ReceiveChannel<E>.consumeAsFlow(): Flow<E>

/**
 * Converts this Flow to a ReceiveChannel.
 */
fun <T> Flow<T>.produceIn(scope: CoroutineScope): ReceiveChannel<T>

/**
 * Returns a channel of [SendChannel] that feeds all elements from this channel.
 */
fun <E> ReceiveChannel<E>.broadcast(capacity: Int = 1): BroadcastChannel<E>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*

val scope = MainScope()

// Channel to list
scope.launch {
    val channel = produce {
        repeat(5) { send(it) }
    }
    
    val list = channel.toList()
    println("Channel as list: $list")
}

// Channel to flow
scope.launch {
    val channel = produce {
        repeat(3) {
            send("Item $it")
            delay(100)
        }
    }
    
    channel.consumeAsFlow()
        .map { it.uppercase() }
        .collect { println("From channel flow: $it") }
}

// Flow to channel
scope.launch {
    val flow = flowOf(1, 2, 3, 4, 5)
    val channel = flow.produceIn(scope)
    
    for (value in channel) {
        println("From flow channel: $value")
    }
}

// ConsumeEach
scope.launch {
    val channel = produce {
        repeat(3) { send("Message $it") }
    }
    
    channel.consumeEach { message ->
        println("Processing: $message")
        delay(50)
    }
}

Select Support for Channels

Use channels in select expressions for awaiting multiple channel operations.

/**
 * Select clause using the send suspending function as a select clause.
 */
val <E> SendChannel<E>.onSend: SelectClause2<E, SendChannel<E>>

/**
 * Select clause using the receive suspending function as a select clause.
 */
val <E> ReceiveChannel<E>.onReceive: SelectClause1<E>

/**
 * Select clause using the receiveCatching suspending function as a select clause.
 */
val <E> ReceiveChannel<E>.onReceiveCatching: SelectClause1<ChannelResult<E>>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*

val scope = MainScope()

scope.launch {
    val channel1 = Channel<String>()
    val channel2 = Channel<String>()
    
    // Producer for channel1
    launch {
        delay(100)
        channel1.send("From channel 1")
    }
    
    // Producer for channel2
    launch {
        delay(200)
        channel2.send("From channel 2")
    }
    
    // Select from multiple channels
    val result = select<String> {
        channel1.onReceive { value ->
            "Received from channel1: $value"
        }
        channel2.onReceive { value ->
            "Received from channel2: $value"
        }
    }
    
    println(result)  // Will print result from channel1 (faster)
    
    channel1.close()
    channel2.close()
}

// Select with send operations
scope.launch {
    val fastChannel = Channel<Int>(Channel.UNLIMITED)
    val slowChannel = Channel<Int>()
    
    // Try to send to whichever channel can accept first
    val sendResult = select<String> {
        fastChannel.onSend(1) { channel ->
            "Sent to fast channel"
        }
        slowChannel.onSend(2) { channel ->
            "Sent to slow channel"
        }
    }
    
    println(sendResult)  // Will likely send to fast channel
    
    fastChannel.close()
    slowChannel.close()
}

Advanced Channel Patterns

Common patterns and best practices for channel usage.

Fan-out Pattern:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

val scope = MainScope()

// Single producer, multiple consumers
fun fanOut() = scope.produce {
    var x = 1
    while (true) {
        send(x++)
        delay(100)
    }
}

scope.launch {
    val producer = fanOut()
    
    repeat(3) { id ->
        launch {
            for (value in producer) {
                println("Consumer $id received: $value")
                if (value >= 10) break
            }
        }
    }
}

Fan-in Pattern:

// Multiple producers, single consumer
suspend fun fanIn(
    input1: ReceiveChannel<String>,
    input2: ReceiveChannel<String>
): ReceiveChannel<String> = scope.produce {
    var input1Active = true
    var input2Active = true
    
    while (input1Active || input2Active) {
        select<Unit> {
            if (input1Active) {
                input1.onReceiveCatching { result ->
                    result.onSuccess { send("From input1: $it") }
                        .onClosed { input1Active = false }
                }
            }
            if (input2Active) {
                input2.onReceiveCatching { result ->
                    result.onSuccess { send("From input2: $it") }
                        .onClosed { input2Active = false }
                }
            }
        }
    }
}

Pipeline Pattern:

// Chain of processing stages
fun numbers() = scope.produce {
    var x = 1
    while (true) send(x++)
}

fun square(numbers: ReceiveChannel<Int>) = scope.produce {
    for (x in numbers) send(x * x)
}

fun print(numbers: ReceiveChannel<Int>) = scope.launch {
    for (x in numbers) println(x)
}

// Usage
val numbersPipeline = numbers()
val squaredPipeline = square(numbersPipeline)
print(squaredPipeline)

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core

docs

channels.md

coroutine-builders.md

dispatchers.md

exception-handling.md

flow-api.md

index.md

jobs-deferreds.md

structured-concurrency.md

synchronization.md

tile.json