0
# Channels
1
2
Communication primitives between coroutines with various buffering strategies and channel types. Channels provide a way to send values between coroutines with flow control and different delivery guarantees.
3
4
## Capabilities
5
6
### Channel Interfaces
7
8
Core interfaces for sending and receiving values between coroutines.
9
10
```kotlin { .api }
11
/**
12
* Interface for sending values to a channel
13
*/
14
interface SendChannel<in E> {
15
/** True if channel is closed for sending */
16
val isClosedForSend: Boolean
17
18
/** Send a value, suspending if channel is full */
19
suspend fun send(element: E)
20
21
/** Try to send a value immediately without suspending */
22
fun trySend(element: E): ChannelResult<Unit>
23
24
/** Close the channel optionally with a cause */
25
fun close(cause: Throwable? = null): Boolean
26
27
/** Register a handler for when channel is closed */
28
fun invokeOnClose(handler: (cause: Throwable?) -> Unit): Unit
29
}
30
31
/**
32
* Interface for receiving values from a channel
33
*/
34
interface ReceiveChannel<out E> {
35
/** True if channel is closed for receiving and empty */
36
val isClosedForReceive: Boolean
37
38
/** True if channel is empty */
39
val isEmpty: Boolean
40
41
/** Receive a value, suspending if channel is empty */
42
suspend fun receive(): E
43
44
/** Try to receive a value immediately without suspending */
45
fun tryReceive(): ChannelResult<E>
46
47
/** Receive a value or null/failure if closed */
48
suspend fun receiveCatching(): ChannelResult<E>
49
50
/** Cancel the channel with optional cause */
51
fun cancel(cause: CancellationException? = null)
52
53
/** Get iterator for consuming values */
54
operator fun iterator(): ChannelIterator<E>
55
}
56
57
/**
58
* Bidirectional channel combining SendChannel and ReceiveChannel
59
*/
60
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
61
```
62
63
### Channel Result
64
65
Result wrapper for non-blocking channel operations.
66
67
```kotlin { .api }
68
/**
69
* Result of channel operations
70
*/
71
@JvmInline
72
value class ChannelResult<out T> {
73
/** True if operation was successful */
74
val isSuccess: Boolean
75
76
/** True if channel was closed */
77
val isClosed: Boolean
78
79
/** True if operation failed (channel full/empty) */
80
val isFailure: Boolean
81
82
/** Get the value or throw if not successful */
83
fun getOrThrow(): T
84
85
/** Get the value or null if not successful */
86
fun getOrNull(): T?
87
88
/** Get the exception or null if successful */
89
fun exceptionOrNull(): Throwable?
90
}
91
```
92
93
### Channel Iterator
94
95
Iterator for consuming channel values.
96
97
```kotlin { .api }
98
/**
99
* Iterator for channel values
100
*/
101
interface ChannelIterator<out E> {
102
/** Check if there are more values (suspending) */
103
suspend fun hasNext(): Boolean
104
105
/** Get the next value */
106
operator fun next(): E
107
}
108
```
109
110
### Channel Factory
111
112
Factory function for creating channels with various configurations.
113
114
```kotlin { .api }
115
/**
116
* Create a channel with specified capacity and behavior
117
* @param capacity channel capacity (RENDEZVOUS, CONFLATED, UNLIMITED, or positive number)
118
* @param onBufferOverflow behavior when buffer is full
119
* @param onUndeliveredElement handler for undelivered elements
120
*/
121
fun <E> Channel(
122
capacity: Int = RENDEZVOUS,
123
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
124
onUndeliveredElement: ((E) -> Unit)? = null
125
): Channel<E>
126
127
// Capacity constants
128
const val RENDEZVOUS = 0 // No buffering, direct handoff
129
const val CONFLATED = -1 // Keep only latest value
130
const val UNLIMITED = Int.MAX_VALUE // Unlimited buffering
131
const val BUFFERED = -2 // Use default buffer size
132
```
133
134
**Usage Examples:**
135
136
```kotlin
137
import kotlinx.coroutines.*
138
import kotlinx.coroutines.channels.*
139
140
// Rendezvous channel (no buffering)
141
val rendezvousChannel = Channel<Int>()
142
143
// Buffered channel
144
val bufferedChannel = Channel<String>(capacity = 10)
145
146
// Conflated channel (only latest value)
147
val conflatedChannel = Channel<Data>(capacity = Channel.CONFLATED)
148
149
// Unlimited channel
150
val unlimitedChannel = Channel<Event>(capacity = Channel.UNLIMITED)
151
152
// Producer-consumer example
153
launch {
154
// Producer
155
for (i in 1..5) {
156
rendezvousChannel.send(i)
157
println("Sent: $i")
158
}
159
rendezvousChannel.close()
160
}
161
162
launch {
163
// Consumer
164
for (value in rendezvousChannel) {
165
println("Received: $value")
166
delay(100) // Simulate processing
167
}
168
}
169
```
170
171
### Buffer Overflow Strategies
172
173
Configuration for how channels handle buffer overflow.
174
175
```kotlin { .api }
176
/**
177
* Strategy for handling buffer overflow
178
*/
179
enum class BufferOverflow {
180
/** Suspend sender when buffer is full */
181
SUSPEND,
182
/** Drop oldest values when buffer is full */
183
DROP_OLDEST,
184
/** Drop latest values when buffer is full */
185
DROP_LATEST
186
}
187
```
188
189
**Usage Examples:**
190
191
```kotlin
192
import kotlinx.coroutines.*
193
import kotlinx.coroutines.channels.*
194
195
// Channel that drops oldest values when full
196
val dropOldestChannel = Channel<Int>(
197
capacity = 3,
198
onBufferOverflow = BufferOverflow.DROP_OLDEST
199
)
200
201
// Channel that drops latest values when full
202
val dropLatestChannel = Channel<Int>(
203
capacity = 3,
204
onBufferOverflow = BufferOverflow.DROP_LATEST
205
)
206
207
// Fast producer, slow consumer
208
launch {
209
repeat(10) { i ->
210
val result = dropOldestChannel.trySend(i)
211
if (result.isSuccess) {
212
println("Sent: $i")
213
} else {
214
println("Dropped: $i")
215
}
216
}
217
dropOldestChannel.close()
218
}
219
220
launch {
221
delay(500) // Slow consumer
222
for (value in dropOldestChannel) {
223
println("Received: $value")
224
}
225
}
226
```
227
228
### Producer Function
229
230
Creates a receive channel from a coroutine that produces values.
231
232
```kotlin { .api }
233
/**
234
* Creates a receive channel from a producer coroutine
235
* @param context additional context for the producer
236
* @param capacity channel capacity
237
* @param block producer coroutine code
238
*/
239
fun <E> CoroutineScope.produce(
240
context: CoroutineContext = EmptyCoroutineContext,
241
capacity: Int = 0,
242
block: suspend ProducerScope<E>.() -> Unit
243
): ReceiveChannel<E>
244
245
/**
246
* Scope for producer coroutines
247
*/
248
interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
249
/** The channel being produced to */
250
val channel: SendChannel<E>
251
}
252
```
253
254
**Usage Examples:**
255
256
```kotlin
257
import kotlinx.coroutines.*
258
import kotlinx.coroutines.channels.*
259
260
// Simple producer
261
val numbers = produce {
262
for (i in 1..10) {
263
send(i * i)
264
delay(100)
265
}
266
}
267
268
// Consume the values
269
for (square in numbers) {
270
println("Square: $square")
271
}
272
273
// Producer with error handling
274
val dataProducer = produce<String> {
275
try {
276
while (true) {
277
val data = fetchData() // May throw exception
278
send(data)
279
delay(1000)
280
}
281
} catch (e: Exception) {
282
// Producer will close channel with this exception
283
throw e
284
}
285
}
286
```
287
288
### Select Clauses for Channels
289
290
Channel operations can be used in select expressions for non-blocking multi-way selection.
291
292
```kotlin { .api }
293
/**
294
* Select clauses for channels
295
*/
296
interface SendChannel<in E> {
297
/** Select clause for sending */
298
val onSend: SelectClause2<E, SendChannel<E>>
299
}
300
301
interface ReceiveChannel<out E> {
302
/** Select clause for receiving */
303
val onReceive: SelectClause1<E>
304
305
/** Select clause for receiving with result */
306
val onReceiveCatching: SelectClause1<ChannelResult<E>>
307
}
308
```
309
310
**Usage Examples:**
311
312
```kotlin
313
import kotlinx.coroutines.*
314
import kotlinx.coroutines.channels.*
315
import kotlinx.coroutines.selects.*
316
317
suspend fun selectChannelOperations() {
318
val channel1 = Channel<String>()
319
val channel2 = Channel<String>()
320
321
// Select between multiple channel operations
322
val result = select<String> {
323
channel1.onReceive { value ->
324
"From channel1: $value"
325
}
326
channel2.onReceive { value ->
327
"From channel2: $value"
328
}
329
onTimeout(1000) {
330
"Timeout"
331
}
332
}
333
334
println(result)
335
}
336
337
suspend fun selectSend() {
338
val channel1 = Channel<Int>(1)
339
val channel2 = Channel<Int>(1)
340
341
select<Unit> {
342
channel1.onSend(42) {
343
println("Sent to channel1")
344
}
345
channel2.onSend(24) {
346
println("Sent to channel2")
347
}
348
}
349
}
350
```
351
352
### Channel Extensions
353
354
Utility functions for working with channels.
355
356
```kotlin { .api }
357
/**
358
* Consume all values from the channel
359
*/
360
suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit)
361
362
/**
363
* Convert channel to list
364
*/
365
suspend fun <E> ReceiveChannel<E>.toList(): List<E>
366
367
/**
368
* Map channel values
369
*/
370
fun <E, R> ReceiveChannel<E>.map(transform: suspend (E) -> R): ReceiveChannel<R>
371
372
/**
373
* Filter channel values
374
*/
375
fun <E> ReceiveChannel<E>.filter(predicate: suspend (E) -> Boolean): ReceiveChannel<E>
376
377
/**
378
* Take first n values from channel
379
*/
380
fun <E> ReceiveChannel<E>.take(n: Int): ReceiveChannel<E>
381
```
382
383
### Broadcast Channel (Deprecated)
384
385
Legacy broadcasting channel replaced by SharedFlow.
386
387
```kotlin { .api }
388
/**
389
* @deprecated Use SharedFlow instead
390
* Channel that broadcasts values to multiple subscribers
391
*/
392
@Deprecated("Use SharedFlow instead", level = DeprecationLevel.WARNING)
393
interface BroadcastChannel<E> : SendChannel<E> {
394
/** Subscribe to the broadcast channel */
395
fun openSubscription(): ReceiveChannel<E>
396
397
/** Cancel all subscriptions */
398
fun cancel(cause: CancellationException? = null)
399
}
400
```
401
402
## Channel Patterns
403
404
### Fan-out Pattern
405
406
Multiple consumers processing values from a single channel.
407
408
```kotlin
409
val jobs = List(3) { workerId ->
410
launch {
411
for (work in workChannel) {
412
processWork(work, workerId)
413
}
414
}
415
}
416
```
417
418
### Fan-in Pattern
419
420
Multiple producers sending values to a single channel.
421
422
```kotlin
423
val outputChannel = Channel<Result>()
424
425
// Multiple producers
426
repeat(3) { producerId ->
427
launch {
428
repeat(10) {
429
outputChannel.send(produceResult(producerId, it))
430
}
431
}
432
}
433
```
434
435
### Pipeline Pattern
436
437
Chaining channels for multi-stage processing.
438
439
```kotlin
440
val rawData = produce { /* generate raw data */ }
441
val processed = produce {
442
for (data in rawData) {
443
send(processStage1(data))
444
}
445
}
446
val final = produce {
447
for (data in processed) {
448
send(processStage2(data))
449
}
450
}
451
```
452
453
## Channel vs Flow
454
455
| Feature | Channel | Flow |
456
|---------|---------|------|
457
| Nature | Hot (always active) | Cold (starts on collect) |
458
| Consumers | Multiple concurrent | Single sequential |
459
| Buffering | Built-in buffering | Operator-based buffering |
460
| Backpressure | Send suspension | Collector-driven |
461
| Use Case | Producer-consumer | Data transformation pipelines |