CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-jvm

JVM-specific implementation of kotlinx.coroutines core library providing coroutine primitives, builders, dispatchers, and synchronization primitives for asynchronous programming in Kotlin.

Pending
Overview
Eval results
Files

channels.mddocs/

Channels

Message passing primitives for communication between coroutines with configurable capacity, buffering strategies, and bi-directional communication patterns.

Capabilities

Channel Interface

Combines sending and receiving capabilities for bi-directional communication.

interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
    /** Channel capacity constants */
    companion object {
        const val UNLIMITED: Int = Int.MAX_VALUE
        const val CONFLATED: Int = -1
        const val RENDEZVOUS: Int = 0
        const val BUFFERED: Int = -2
    }
}

/** Creates a channel with specified capacity */
fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    // Rendezvous channel (capacity = 0)
    val rendezvousChannel = Channel<Int>()
    
    launch {
        repeat(3) { i ->
            println("Sending $i")
            rendezvousChannel.send(i)
            println("Sent $i")
        }
        rendezvousChannel.close()
    }
    
    launch {
        for (value in rendezvousChannel) {
            println("Received $value")
            delay(100) // Simulate processing
        }
    }
    
    delay(1000)
    
    // Buffered channel
    val bufferedChannel = Channel<String>(capacity = 3)
    
    // Can send up to capacity without suspending
    bufferedChannel.send("Message 1")
    bufferedChannel.send("Message 2")
    bufferedChannel.send("Message 3")
    
    println("Sent 3 messages without suspending")
    
    // This would suspend until space is available
    launch {
        bufferedChannel.send("Message 4")
        println("Sent message 4")
    }
    
    // Receive messages
    repeat(4) {
        val message = bufferedChannel.receive()
        println("Received: $message")
    }
    
    bufferedChannel.close()
}

SendChannel Interface

Sending side of a channel for message production.

interface SendChannel<in E> {
    /** True if channel is closed for sending */
    val isClosedForSend: Boolean
    
    /** Suspends until element can be sent */
    suspend fun send(element: E)
    
    /** Tries to send element immediately */
    fun trySend(element: E): ChannelResult<Unit>
    
    /** Closes the channel with optional cause */
    fun close(cause: Throwable? = null): Boolean
    
    /** Registers handler for when channel is closed */
    fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<Int>(capacity = 2)
    
    // Producer coroutine
    val producer = launch {
        try {
            repeat(5) { i ->
                println("Trying to send $i")
                
                // Try to send without suspending first
                val result = channel.trySend(i)
                if (result.isSuccess) {
                    println("Sent $i immediately")
                } else {
                    println("Buffer full, suspending to send $i")
                    channel.send(i)
                    println("Sent $i after suspending")
                }
                
                delay(100)
            }
        } catch (e: Exception) {
            println("Producer failed: ${e.message}")
        } finally {
            channel.close()
            println("Channel closed by producer")
        }
    }
    
    // Consumer coroutine (slower than producer)
    launch {
        try {
            while (!channel.isClosedForReceive) {
                val value = channel.receive()
                println("Received $value")
                delay(300) // Slower consumer
            }
        } catch (e: ClosedReceiveChannelException) {
            println("Channel was closed")
        }
    }
    
    producer.join()
    delay(1000)
}

ReceiveChannel Interface

Receiving side of a channel for message consumption.

interface ReceiveChannel<out E> {
    /** True if channel is closed for receiving */
    val isClosedForReceive: Boolean
    
    /** True if channel is empty */
    val isEmpty: Boolean
    
    /** Suspends until element is available */
    suspend fun receive(): E
    
    /** Tries to receive element immediately */
    fun tryReceive(): ChannelResult<E>
    
    /** Receives element or close/failure result */
    suspend fun receiveCatching(): ChannelResult<E>
    
    /** Iterator for consuming channel */
    operator fun iterator(): ChannelIterator<E>
    
    /** Cancels reception from channel */
    fun cancel(cause: CancellationException? = null)
}

interface ChannelIterator<out E> {
    suspend fun hasNext(): Boolean
    suspend fun next(): E
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<String>(capacity = Channel.UNLIMITED)
    
    // Fill channel with data
    launch {
        repeat(5) { i ->
            channel.send("Item $i")
        }
        channel.close()
    }
    
    // Different ways to consume
    
    // 1. Using iterator (for-in loop)
    println("Method 1: for-in loop")
    for (item in channel) {
        println("Received: $item")
    }
    
    // Refill for next example
    val channel2 = Channel<String>()
    launch {
        repeat(3) { i ->
            channel2.send("Data $i")
        }
        channel2.close()
    }
    
    // 2. Using receiveCatching for error handling
    println("Method 2: receiveCatching")
    while (true) {
        val result = channel2.receiveCatching()
        if (result.isSuccess) {
            println("Received: ${result.getOrNull()}")
        } else {
            println("Channel closed or failed")
            break
        }
    }
    
    // 3. Using tryReceive for non-blocking
    val channel3 = Channel<Int>(capacity = 3)
    channel3.trySend(1)
    channel3.trySend(2)
    
    println("Method 3: tryReceive")
    while (true) {
        val result = channel3.tryReceive()
        if (result.isSuccess) {
            println("Received immediately: ${result.getOrNull()}")
        } else {
            println("No data available")
            break
        }
    }
    
    channel3.close()
}

Channel Factory Functions

Functions for creating channels with different characteristics.

/** Creates unlimited capacity channel */
fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E>

/** Creates produce-consumer pattern */
fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    onCompletion: CompletionHandler? = null,
    block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>

/** Creates actor pattern */
fun <E> CoroutineScope.actor(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    onCompletion: CompletionHandler? = null,
    block: suspend ActorScope<E>.() -> Unit
): SendChannel<E>

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    // Producer pattern
    val receiveChannel = produce {
        repeat(5) { i ->
            send("Produced $i")
            delay(100)
        }
    }
    
    for (item in receiveChannel) {
        println("Consumed: $item")
    }
    
    // Actor pattern
    val sendChannel = actor<String> {
        for (message in channel) {
            println("Actor processing: $message")
            delay(50)
        }
    }
    
    repeat(3) { i ->
        sendChannel.send("Message $i")
    }
    
    sendChannel.close()
    delay(200)
}

// Advanced producer example
fun CoroutineScope.numberProducer(max: Int) = produce<Int> {
    for (i in 1..max) {
        send(i)
        delay(100)
    }
}

fun CoroutineScope.squareProcessor(input: ReceiveChannel<Int>) = produce<Int> {
    for (number in input) {
        send(number * number)
    }
}

suspend fun pipelineExample() = coroutineScope {
    val numbers = numberProducer(5)
    val squares = squareProcessor(numbers)
    
    for (square in squares) {
        println("Square: $square")
    }
}

Broadcast Channels (Deprecated)

Legacy broadcast functionality (deprecated in favor of SharedFlow).

@Deprecated("Use SharedFlow instead")
interface BroadcastChannel<E> : SendChannel<E> {
    fun openSubscription(): ReceiveChannel<E>
    fun cancel(cause: CancellationException? = null)
}

@Deprecated("Use SharedFlow instead")
fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>

Channel Capacity Types

Different capacity configurations for channels.

companion object Channel {
    /** Unlimited capacity - never suspends sends */
    const val UNLIMITED: Int = Int.MAX_VALUE
    
    /** Conflated - keeps only the latest value */
    const val CONFLATED: Int = -1
    
    /** Rendezvous - zero capacity, direct handoff */
    const val RENDEZVOUS: Int = 0
    
    /** Default buffered capacity */
    const val BUFFERED: Int = -2
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    // Unlimited channel - never blocks sends
    val unlimitedChannel = Channel<Int>(Channel.UNLIMITED)
    
    launch {
        repeat(1000) { i ->
            unlimitedChannel.send(i) // Never suspends
        }
        unlimitedChannel.close()
    }
    
    var count = 0
    for (value in unlimitedChannel) {
        count++
    }
    println("Received $count items from unlimited channel")
    
    // Conflated channel - only latest value
    val conflatedChannel = Channel<String>(Channel.CONFLATED)
    
    launch {
        repeat(5) { i ->
            conflatedChannel.send("Value $i")
            println("Sent: Value $i")
        }
        conflatedChannel.close()
    }
    
    delay(100) // Let all sends complete
    
    // Will only receive the latest value
    for (value in conflatedChannel) {
        println("Conflated received: $value")
    }
}

Channel Result

Result type for non-blocking channel operations.

value class ChannelResult<out T> {
    val isSuccess: Boolean
    val isClosed: Boolean
    val isFailure: Boolean
    
    fun getOrNull(): T?
    fun getOrThrow(): T
    fun exceptionOrNull(): Throwable?
}

Usage Examples:

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<Int>(capacity = 1)
    
    // Non-blocking send
    val sendResult1 = channel.trySend(1)
    println("Send 1 success: ${sendResult1.isSuccess}")
    
    val sendResult2 = channel.trySend(2)
    println("Send 2 success: ${sendResult2.isSuccess}") // false - buffer full
    
    // Non-blocking receive
    val receiveResult1 = channel.tryReceive()
    if (receiveResult1.isSuccess) {
        println("Received: ${receiveResult1.getOrNull()}")
    }
    
    val receiveResult2 = channel.tryReceive()
    println("Receive 2 success: ${receiveResult2.isSuccess}") // false - empty
    
    channel.close()
    
    // Receive from closed channel
    val receiveResult3 = channel.tryReceive()
    println("Receive from closed: ${receiveResult3.isClosed}")
}

Types

ProducerScope and ActorScope

Scopes for producer and actor patterns.

interface ProducerScope<in E> : CoroutineScope, SendChannel<E>

interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {
    val channel: Channel<E>
}

BufferOverflow

Strategy for handling buffer overflow in channels.

enum class BufferOverflow {
    /** Suspend sender when buffer is full */
    SUSPEND,
    /** Drop oldest element when buffer is full */
    DROP_OLDEST,
    /** Drop latest element when buffer is full */  
    DROP_LATEST
}

Install with Tessl CLI

npx tessl i tessl/maven-org-jetbrains-kotlinx--kotlinx-coroutines-core-jvm

docs

channels.md

coroutine-builders.md

dispatchers.md

exception-handling.md

flow-api.md

index.md

job-management.md

jvm-integration.md

synchronization.md

tile.json