Coroutines support libraries for Kotlin providing structured concurrency primitives, Flow API for reactive streams, channels for communication, and synchronization utilities across all Kotlin platforms
—
Producer-consumer communication channels for passing data between coroutines. Channels provide a way to transfer values between coroutines with various capacity and overflow strategies.
The fundamental interfaces for channel-based communication between coroutines.
/**
* Channel is a non-blocking primitive for communication between a sender and a receiver.
* Conceptually, a channel is similar to BlockingQueue, but with suspend operations
* instead of blocking ones.
*/
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
/**
* Sender's interface to a Channel.
*/
interface SendChannel<in E> {
/**
* Returns true if this channel was closed by an invocation of close or
* its receiving side was cancelled.
*/
val isClosedForSend: Boolean
/**
* Sends the specified element to this channel, suspending the caller
* while the buffer of this channel is full.
*/
suspend fun send(element: E)
/**
* Tries to send the specified element to this channel without blocking.
*/
fun trySend(element: E): ChannelResult<Unit>
/**
* Closes this channel with an optional cause exception.
*/
fun close(cause: Throwable? = null): Boolean
/**
* Registers a handler which is synchronously invoked once the channel is closed.
*/
fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
}
/**
* Receiver's interface to a Channel.
*/
interface ReceiveChannel<out E> {
/**
* Returns true if this channel was closed and no more elements will ever be received.
*/
val isClosedForReceive: Boolean
/**
* Retrieves and removes an element from this channel, suspending the caller
* if this channel is empty.
*/
suspend fun receive(): E
/**
* Tries to retrieve and remove an element from this channel without blocking.
*/
fun tryReceive(): ChannelResult<E>
/**
* Cancels reception of remaining elements from this channel with an optional cause exception.
*/
fun cancel(cause: CancellationException? = null)
/**
* Returns a new iterator to receive elements from this channel using a for loop.
*/
operator fun iterator(): ChannelIterator<E>
}
/**
* Iterator for ReceiveChannel.
*/
interface ChannelIterator<out E> {
/**
* Returns true if the channel has more elements, suspending the caller
* if this channel is empty.
*/
suspend operator fun hasNext(): Boolean
/**
* Retrieves the next element from this channel.
*/
operator fun next(): E
}Functions to create channels with different configurations and behaviors.
/**
* Creates a channel with the specified buffer capacity.
*/
fun <E> Channel(
capacity: Int = Channel.RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>
/**
* Channel capacity constants.
*/
object Channel {
/**
* Requests a rendezvous channel: Channel() with capacity = RENDEZVOUS.
*/
const val RENDEZVOUS = 0
/**
* Requests a conflated channel: Channel() with capacity = CONFLATED.
*/
const val CONFLATED = -1
/**
* Requests an unlimited channel: Channel() with capacity = UNLIMITED.
*/
const val UNLIMITED = Int.MAX_VALUE
/**
* Requests a buffered channel with the default buffer size.
*/
const val BUFFERED = -2
/**
* Default buffer capacity used when BUFFERED is specified.
*/
const val CHANNEL_DEFAULT_CAPACITY = 64
}
/**
* Buffer overflow strategies for channels.
*/
enum class BufferOverflow {
/**
* Suspend on buffer overflow (default).
*/
SUSPEND,
/**
* Drop oldest elements on buffer overflow.
*/
DROP_OLDEST,
/**
* Drop latest (newly emitted) elements on buffer overflow.
*/
DROP_LATEST
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
val scope = MainScope()
// Rendezvous channel (capacity = 0)
val rendezvousChannel = Channel<Int>()
scope.launch {
println("Sending 1")
rendezvousChannel.send(1) // Suspends until received
println("Sent 1")
}
scope.launch {
delay(1000)
val value = rendezvousChannel.receive()
println("Received: $value")
}
// Buffered channel
val bufferedChannel = Channel<String>(capacity = 10)
scope.launch {
repeat(5) {
bufferedChannel.send("Message $it")
println("Sent: Message $it")
}
bufferedChannel.close()
}
scope.launch {
for (message in bufferedChannel) {
println("Received: $message")
delay(100)
}
}
// Conflated channel (only keeps latest)
val conflatedChannel = Channel<Int>(Channel.CONFLATED)
scope.launch {
repeat(10) {
conflatedChannel.send(it)
println("Sent: $it")
}
conflatedChannel.close()
}
scope.launch {
delay(500) // Let sender finish
for (value in conflatedChannel) {
println("Received: $value") // Will only receive the last value
}
}
// Channel with overflow strategy
val overflowChannel = Channel<Int>(
capacity = 3,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
scope.launch {
repeat(10) {
val result = overflowChannel.trySend(it)
println("Try send $it: ${result.isSuccess}")
}
overflowChannel.close()
}Type-safe result wrapper for non-blocking channel operations.
/**
* Represents the result of a channel operation.
*/
@JvmInline
value class ChannelResult<out T> {
/**
* Returns true if this instance represents a successful outcome.
*/
val isSuccess: Boolean
/**
* Returns true if this instance represents a closed channel.
*/
val isClosed: Boolean
/**
* Returns true if this instance represents a failed outcome.
*/
val isFailure: Boolean
/**
* Returns the encapsulated value if this instance represents success or null if closed/failed.
*/
fun getOrNull(): T?
/**
* Returns the encapsulated Throwable exception if this instance represents failure.
*/
fun exceptionOrNull(): Throwable?
/**
* Performs the given action on the encapsulated value if this instance represents success.
*/
inline fun onSuccess(action: (value: T) -> Unit): ChannelResult<T>
/**
* Performs the given action on the encapsulated Throwable if this instance represents failure.
*/
inline fun onFailure(action: (exception: Throwable) -> Unit): ChannelResult<T>
/**
* Performs the given action if this instance represents a closed channel.
*/
inline fun onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T>
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
val scope = MainScope()
val channel = Channel<String>(capacity = 2)
scope.launch {
// Non-blocking send attempts
repeat(5) { i ->
val result = channel.trySend("Message $i")
result
.onSuccess { println("Successfully sent: Message $i") }
.onFailure { println("Failed to send: Message $i") }
.onClosed { println("Channel closed, cannot send: Message $i") }
}
channel.close()
}
scope.launch {
delay(100)
// Non-blocking receive attempts
repeat(10) {
val result = channel.tryReceive()
result
.onSuccess { value -> println("Successfully received: $value") }
.onFailure { println("No value available") }
.onClosed { println("Channel closed, no more values") }
delay(50)
}
}Create receive channels using a producer coroutine pattern.
/**
* Launches a new coroutine to produce a stream of values by sending them
* to a channel and returns a ReceiveChannel that yields these values.
*/
fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
onCompletion: CompletionHandler? = null,
block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
/**
* Scope for produce builder.
*/
interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
/**
* The channel that this producer is sending to.
*/
val channel: SendChannel<E>
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
val scope = MainScope()
// Basic producer
val numbersChannel = scope.produce {
for (i in 1..5) {
send(i)
delay(100)
}
}
scope.launch {
for (number in numbersChannel) {
println("Received number: $number")
}
}
// Producer with capacity
val bufferedProducer = scope.produce(capacity = 10) {
repeat(20) {
send("Item $it")
println("Produced: Item $it")
}
}
scope.launch {
delay(500) // Let producer get ahead
for (item in bufferedProducer) {
println("Consumed: $item")
delay(100)
}
}
// Producer with custom context
val ioProducer = scope.produce(context = Dispatchers.IO) {
repeat(3) {
val data = fetchDataFromNetwork() // IO operation
send(data)
}
}
scope.launch {
for (data in ioProducer) {
processData(data) // Process on main context
}
}
// Producer with exception handling
val robustProducer = scope.produce<String>(
onCompletion = { exception ->
println("Producer completed with: $exception")
}
) {
try {
repeat(5) {
if (it == 3) throw RuntimeException("Simulated error")
send("Value $it")
}
} catch (e: Exception) {
println("Producer caught exception: ${e.message}")
throw e // Re-throw to signal completion with exception
}
}Utility functions for working with channels and converting between channels and other types.
/**
* Performs the given action for each received element.
*/
suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit)
/**
* Receives all elements from the channel and returns them as a List.
*/
suspend fun <E> ReceiveChannel<E>.toList(): List<E>
/**
* Creates a produce block from this channel.
*/
fun <E> ReceiveChannel<E>.consumeAsFlow(): Flow<E>
/**
* Converts this Flow to a ReceiveChannel.
*/
fun <T> Flow<T>.produceIn(scope: CoroutineScope): ReceiveChannel<T>
/**
* Returns a channel of [SendChannel] that feeds all elements from this channel.
*/
fun <E> ReceiveChannel<E>.broadcast(capacity: Int = 1): BroadcastChannel<E>Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
val scope = MainScope()
// Channel to list
scope.launch {
val channel = produce {
repeat(5) { send(it) }
}
val list = channel.toList()
println("Channel as list: $list")
}
// Channel to flow
scope.launch {
val channel = produce {
repeat(3) {
send("Item $it")
delay(100)
}
}
channel.consumeAsFlow()
.map { it.uppercase() }
.collect { println("From channel flow: $it") }
}
// Flow to channel
scope.launch {
val flow = flowOf(1, 2, 3, 4, 5)
val channel = flow.produceIn(scope)
for (value in channel) {
println("From flow channel: $value")
}
}
// ConsumeEach
scope.launch {
val channel = produce {
repeat(3) { send("Message $it") }
}
channel.consumeEach { message ->
println("Processing: $message")
delay(50)
}
}Use channels in select expressions for awaiting multiple channel operations.
/**
* Select clause using the send suspending function as a select clause.
*/
val <E> SendChannel<E>.onSend: SelectClause2<E, SendChannel<E>>
/**
* Select clause using the receive suspending function as a select clause.
*/
val <E> ReceiveChannel<E>.onReceive: SelectClause1<E>
/**
* Select clause using the receiveCatching suspending function as a select clause.
*/
val <E> ReceiveChannel<E>.onReceiveCatching: SelectClause1<ChannelResult<E>>Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
val scope = MainScope()
scope.launch {
val channel1 = Channel<String>()
val channel2 = Channel<String>()
// Producer for channel1
launch {
delay(100)
channel1.send("From channel 1")
}
// Producer for channel2
launch {
delay(200)
channel2.send("From channel 2")
}
// Select from multiple channels
val result = select<String> {
channel1.onReceive { value ->
"Received from channel1: $value"
}
channel2.onReceive { value ->
"Received from channel2: $value"
}
}
println(result) // Will print result from channel1 (faster)
channel1.close()
channel2.close()
}
// Select with send operations
scope.launch {
val fastChannel = Channel<Int>(Channel.UNLIMITED)
val slowChannel = Channel<Int>()
// Try to send to whichever channel can accept first
val sendResult = select<String> {
fastChannel.onSend(1) { channel ->
"Sent to fast channel"
}
slowChannel.onSend(2) { channel ->
"Sent to slow channel"
}
}
println(sendResult) // Will likely send to fast channel
fastChannel.close()
slowChannel.close()
}Common patterns and best practices for channel usage.
Fan-out Pattern:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
val scope = MainScope()
// Single producer, multiple consumers
fun fanOut() = scope.produce {
var x = 1
while (true) {
send(x++)
delay(100)
}
}
scope.launch {
val producer = fanOut()
repeat(3) { id ->
launch {
for (value in producer) {
println("Consumer $id received: $value")
if (value >= 10) break
}
}
}
}Fan-in Pattern:
// Multiple producers, single consumer
suspend fun fanIn(
input1: ReceiveChannel<String>,
input2: ReceiveChannel<String>
): ReceiveChannel<String> = scope.produce {
var input1Active = true
var input2Active = true
while (input1Active || input2Active) {
select<Unit> {
if (input1Active) {
input1.onReceiveCatching { result ->
result.onSuccess { send("From input1: $it") }
.onClosed { input1Active = false }
}
}
if (input2Active) {
input2.onReceiveCatching { result ->
result.onSuccess { send("From input2: $it") }
.onClosed { input2Active = false }
}
}
}
}
}Pipeline Pattern:
// Chain of processing stages
fun numbers() = scope.produce {
var x = 1
while (true) send(x++)
}
fun square(numbers: ReceiveChannel<Int>) = scope.produce {
for (x in numbers) send(x * x)
}
fun print(numbers: ReceiveChannel<Int>) = scope.launch {
for (x in numbers) println(x)
}
// Usage
val numbersPipeline = numbers()
val squaredPipeline = square(numbersPipeline)
print(squaredPipeline)Install with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core