JVM-specific implementation of kotlinx.coroutines core library providing coroutine primitives, builders, dispatchers, and synchronization primitives for asynchronous programming in Kotlin.
—
Message passing primitives for communication between coroutines with configurable capacity, buffering strategies, and bi-directional communication patterns.
Combines sending and receiving capabilities for bi-directional communication.
interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
/** Channel capacity constants */
companion object {
const val UNLIMITED: Int = Int.MAX_VALUE
const val CONFLATED: Int = -1
const val RENDEZVOUS: Int = 0
const val BUFFERED: Int = -2
}
}
/** Creates a channel with specified capacity */
fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
// Rendezvous channel (capacity = 0)
val rendezvousChannel = Channel<Int>()
launch {
repeat(3) { i ->
println("Sending $i")
rendezvousChannel.send(i)
println("Sent $i")
}
rendezvousChannel.close()
}
launch {
for (value in rendezvousChannel) {
println("Received $value")
delay(100) // Simulate processing
}
}
delay(1000)
// Buffered channel
val bufferedChannel = Channel<String>(capacity = 3)
// Can send up to capacity without suspending
bufferedChannel.send("Message 1")
bufferedChannel.send("Message 2")
bufferedChannel.send("Message 3")
println("Sent 3 messages without suspending")
// This would suspend until space is available
launch {
bufferedChannel.send("Message 4")
println("Sent message 4")
}
// Receive messages
repeat(4) {
val message = bufferedChannel.receive()
println("Received: $message")
}
bufferedChannel.close()
}Sending side of a channel for message production.
interface SendChannel<in E> {
/** True if channel is closed for sending */
val isClosedForSend: Boolean
/** Suspends until element can be sent */
suspend fun send(element: E)
/** Tries to send element immediately */
fun trySend(element: E): ChannelResult<Unit>
/** Closes the channel with optional cause */
fun close(cause: Throwable? = null): Boolean
/** Registers handler for when channel is closed */
fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>(capacity = 2)
// Producer coroutine
val producer = launch {
try {
repeat(5) { i ->
println("Trying to send $i")
// Try to send without suspending first
val result = channel.trySend(i)
if (result.isSuccess) {
println("Sent $i immediately")
} else {
println("Buffer full, suspending to send $i")
channel.send(i)
println("Sent $i after suspending")
}
delay(100)
}
} catch (e: Exception) {
println("Producer failed: ${e.message}")
} finally {
channel.close()
println("Channel closed by producer")
}
}
// Consumer coroutine (slower than producer)
launch {
try {
while (!channel.isClosedForReceive) {
val value = channel.receive()
println("Received $value")
delay(300) // Slower consumer
}
} catch (e: ClosedReceiveChannelException) {
println("Channel was closed")
}
}
producer.join()
delay(1000)
}Receiving side of a channel for message consumption.
interface ReceiveChannel<out E> {
/** True if channel is closed for receiving */
val isClosedForReceive: Boolean
/** True if channel is empty */
val isEmpty: Boolean
/** Suspends until element is available */
suspend fun receive(): E
/** Tries to receive element immediately */
fun tryReceive(): ChannelResult<E>
/** Receives element or close/failure result */
suspend fun receiveCatching(): ChannelResult<E>
/** Iterator for consuming channel */
operator fun iterator(): ChannelIterator<E>
/** Cancels reception from channel */
fun cancel(cause: CancellationException? = null)
}
interface ChannelIterator<out E> {
suspend fun hasNext(): Boolean
suspend fun next(): E
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<String>(capacity = Channel.UNLIMITED)
// Fill channel with data
launch {
repeat(5) { i ->
channel.send("Item $i")
}
channel.close()
}
// Different ways to consume
// 1. Using iterator (for-in loop)
println("Method 1: for-in loop")
for (item in channel) {
println("Received: $item")
}
// Refill for next example
val channel2 = Channel<String>()
launch {
repeat(3) { i ->
channel2.send("Data $i")
}
channel2.close()
}
// 2. Using receiveCatching for error handling
println("Method 2: receiveCatching")
while (true) {
val result = channel2.receiveCatching()
if (result.isSuccess) {
println("Received: ${result.getOrNull()}")
} else {
println("Channel closed or failed")
break
}
}
// 3. Using tryReceive for non-blocking
val channel3 = Channel<Int>(capacity = 3)
channel3.trySend(1)
channel3.trySend(2)
println("Method 3: tryReceive")
while (true) {
val result = channel3.tryReceive()
if (result.isSuccess) {
println("Received immediately: ${result.getOrNull()}")
} else {
println("No data available")
break
}
}
channel3.close()
}Functions for creating channels with different characteristics.
/** Creates unlimited capacity channel */
fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E>
/** Creates produce-consumer pattern */
fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
onCompletion: CompletionHandler? = null,
block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
/** Creates actor pattern */
fun <E> CoroutineScope.actor(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
onCompletion: CompletionHandler? = null,
block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
// Producer pattern
val receiveChannel = produce {
repeat(5) { i ->
send("Produced $i")
delay(100)
}
}
for (item in receiveChannel) {
println("Consumed: $item")
}
// Actor pattern
val sendChannel = actor<String> {
for (message in channel) {
println("Actor processing: $message")
delay(50)
}
}
repeat(3) { i ->
sendChannel.send("Message $i")
}
sendChannel.close()
delay(200)
}
// Advanced producer example
fun CoroutineScope.numberProducer(max: Int) = produce<Int> {
for (i in 1..max) {
send(i)
delay(100)
}
}
fun CoroutineScope.squareProcessor(input: ReceiveChannel<Int>) = produce<Int> {
for (number in input) {
send(number * number)
}
}
suspend fun pipelineExample() = coroutineScope {
val numbers = numberProducer(5)
val squares = squareProcessor(numbers)
for (square in squares) {
println("Square: $square")
}
}Legacy broadcast functionality (deprecated in favor of SharedFlow).
@Deprecated("Use SharedFlow instead")
interface BroadcastChannel<E> : SendChannel<E> {
fun openSubscription(): ReceiveChannel<E>
fun cancel(cause: CancellationException? = null)
}
@Deprecated("Use SharedFlow instead")
fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>Different capacity configurations for channels.
companion object Channel {
/** Unlimited capacity - never suspends sends */
const val UNLIMITED: Int = Int.MAX_VALUE
/** Conflated - keeps only the latest value */
const val CONFLATED: Int = -1
/** Rendezvous - zero capacity, direct handoff */
const val RENDEZVOUS: Int = 0
/** Default buffered capacity */
const val BUFFERED: Int = -2
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
// Unlimited channel - never blocks sends
val unlimitedChannel = Channel<Int>(Channel.UNLIMITED)
launch {
repeat(1000) { i ->
unlimitedChannel.send(i) // Never suspends
}
unlimitedChannel.close()
}
var count = 0
for (value in unlimitedChannel) {
count++
}
println("Received $count items from unlimited channel")
// Conflated channel - only latest value
val conflatedChannel = Channel<String>(Channel.CONFLATED)
launch {
repeat(5) { i ->
conflatedChannel.send("Value $i")
println("Sent: Value $i")
}
conflatedChannel.close()
}
delay(100) // Let all sends complete
// Will only receive the latest value
for (value in conflatedChannel) {
println("Conflated received: $value")
}
}Result type for non-blocking channel operations.
value class ChannelResult<out T> {
val isSuccess: Boolean
val isClosed: Boolean
val isFailure: Boolean
fun getOrNull(): T?
fun getOrThrow(): T
fun exceptionOrNull(): Throwable?
}Usage Examples:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<Int>(capacity = 1)
// Non-blocking send
val sendResult1 = channel.trySend(1)
println("Send 1 success: ${sendResult1.isSuccess}")
val sendResult2 = channel.trySend(2)
println("Send 2 success: ${sendResult2.isSuccess}") // false - buffer full
// Non-blocking receive
val receiveResult1 = channel.tryReceive()
if (receiveResult1.isSuccess) {
println("Received: ${receiveResult1.getOrNull()}")
}
val receiveResult2 = channel.tryReceive()
println("Receive 2 success: ${receiveResult2.isSuccess}") // false - empty
channel.close()
// Receive from closed channel
val receiveResult3 = channel.tryReceive()
println("Receive from closed: ${receiveResult3.isClosed}")
}Scopes for producer and actor patterns.
interface ProducerScope<in E> : CoroutineScope, SendChannel<E>
interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {
val channel: Channel<E>
}Strategy for handling buffer overflow in channels.
enum class BufferOverflow {
/** Suspend sender when buffer is full */
SUSPEND,
/** Drop oldest element when buffer is full */
DROP_OLDEST,
/** Drop latest element when buffer is full */
DROP_LATEST
}Install with Tessl CLI
npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-jvm