0
# Channels - Communication Primitives
1
2
Producer-consumer communication channels for passing data between coroutines. Channels provide a way to transfer values between coroutines with various capacity and overflow strategies.
3
4
## Capabilities
5
6
### Core Channel Interfaces
7
8
The fundamental interfaces for channel-based communication between coroutines.
9
10
```kotlin { .api }
11
/**
12
* Channel is a non-blocking primitive for communication between a sender and a receiver.
13
* Conceptually, a channel is similar to BlockingQueue, but with suspend operations
14
* instead of blocking ones.
15
*/
16
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
17
18
/**
19
* Sender's interface to a Channel.
20
*/
21
interface SendChannel<in E> {
22
/**
23
* Returns true if this channel was closed by an invocation of close or
24
* its receiving side was cancelled.
25
*/
26
val isClosedForSend: Boolean
27
28
/**
29
* Sends the specified element to this channel, suspending the caller
30
* while the buffer of this channel is full.
31
*/
32
suspend fun send(element: E)
33
34
/**
35
* Tries to send the specified element to this channel without blocking.
36
*/
37
fun trySend(element: E): ChannelResult<Unit>
38
39
/**
40
* Closes this channel with an optional cause exception.
41
*/
42
fun close(cause: Throwable? = null): Boolean
43
44
/**
45
* Registers a handler which is synchronously invoked once the channel is closed.
46
*/
47
fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
48
}
49
50
/**
51
* Receiver's interface to a Channel.
52
*/
53
interface ReceiveChannel<out E> {
54
/**
55
* Returns true if this channel was closed and no more elements will ever be received.
56
*/
57
val isClosedForReceive: Boolean
58
59
/**
60
* Retrieves and removes an element from this channel, suspending the caller
61
* if this channel is empty.
62
*/
63
suspend fun receive(): E
64
65
/**
66
* Tries to retrieve and remove an element from this channel without blocking.
67
*/
68
fun tryReceive(): ChannelResult<E>
69
70
/**
71
* Cancels reception of remaining elements from this channel with an optional cause exception.
72
*/
73
fun cancel(cause: CancellationException? = null)
74
75
/**
76
* Returns a new iterator to receive elements from this channel using a for loop.
77
*/
78
operator fun iterator(): ChannelIterator<E>
79
}
80
81
/**
82
* Iterator for ReceiveChannel.
83
*/
84
interface ChannelIterator<out E> {
85
/**
86
* Returns true if the channel has more elements, suspending the caller
87
* if this channel is empty.
88
*/
89
suspend operator fun hasNext(): Boolean
90
91
/**
92
* Retrieves the next element from this channel.
93
*/
94
operator fun next(): E
95
}
96
```
97
98
### Channel Factory Functions
99
100
Functions to create channels with different configurations and behaviors.
101
102
```kotlin { .api }
103
/**
104
* Creates a channel with the specified buffer capacity.
105
*/
106
fun <E> Channel(
107
capacity: Int = Channel.RENDEZVOUS,
108
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
109
onUndeliveredElement: ((E) -> Unit)? = null
110
): Channel<E>
111
112
/**
113
* Channel capacity constants.
114
*/
115
object Channel {
116
/**
117
* Requests a rendezvous channel: Channel() with capacity = RENDEZVOUS.
118
*/
119
const val RENDEZVOUS = 0
120
121
/**
122
* Requests a conflated channel: Channel() with capacity = CONFLATED.
123
*/
124
const val CONFLATED = -1
125
126
/**
127
* Requests an unlimited channel: Channel() with capacity = UNLIMITED.
128
*/
129
const val UNLIMITED = Int.MAX_VALUE
130
131
/**
132
* Requests a buffered channel with the default buffer size.
133
*/
134
const val BUFFERED = -2
135
136
/**
137
* Default buffer capacity used when BUFFERED is specified.
138
*/
139
const val CHANNEL_DEFAULT_CAPACITY = 64
140
}
141
142
/**
143
* Buffer overflow strategies for channels.
144
*/
145
enum class BufferOverflow {
146
/**
147
* Suspend on buffer overflow (default).
148
*/
149
SUSPEND,
150
151
/**
152
* Drop oldest elements on buffer overflow.
153
*/
154
DROP_OLDEST,
155
156
/**
157
* Drop latest (newly emitted) elements on buffer overflow.
158
*/
159
DROP_LATEST
160
}
161
```
162
163
**Usage Examples:**
164
165
```kotlin
166
import kotlinx.coroutines.*
167
import kotlinx.coroutines.channels.*
168
169
val scope = MainScope()
170
171
// Rendezvous channel (capacity = 0)
172
val rendezvousChannel = Channel<Int>()
173
174
scope.launch {
175
println("Sending 1")
176
rendezvousChannel.send(1) // Suspends until received
177
println("Sent 1")
178
}
179
180
scope.launch {
181
delay(1000)
182
val value = rendezvousChannel.receive()
183
println("Received: $value")
184
}
185
186
// Buffered channel
187
val bufferedChannel = Channel<String>(capacity = 10)
188
189
scope.launch {
190
repeat(5) {
191
bufferedChannel.send("Message $it")
192
println("Sent: Message $it")
193
}
194
bufferedChannel.close()
195
}
196
197
scope.launch {
198
for (message in bufferedChannel) {
199
println("Received: $message")
200
delay(100)
201
}
202
}
203
204
// Conflated channel (only keeps latest)
205
val conflatedChannel = Channel<Int>(Channel.CONFLATED)
206
207
scope.launch {
208
repeat(10) {
209
conflatedChannel.send(it)
210
println("Sent: $it")
211
}
212
conflatedChannel.close()
213
}
214
215
scope.launch {
216
delay(500) // Let sender finish
217
for (value in conflatedChannel) {
218
println("Received: $value") // Will only receive the last value
219
}
220
}
221
222
// Channel with overflow strategy
223
val overflowChannel = Channel<Int>(
224
capacity = 3,
225
onBufferOverflow = BufferOverflow.DROP_OLDEST
226
)
227
228
scope.launch {
229
repeat(10) {
230
val result = overflowChannel.trySend(it)
231
println("Try send $it: ${result.isSuccess}")
232
}
233
overflowChannel.close()
234
}
235
```
236
237
### Channel Result Type
238
239
Type-safe result wrapper for non-blocking channel operations.
240
241
```kotlin { .api }
242
/**
243
* Represents the result of a channel operation.
244
*/
245
@JvmInline
246
value class ChannelResult<out T> {
247
/**
248
* Returns true if this instance represents a successful outcome.
249
*/
250
val isSuccess: Boolean
251
252
/**
253
* Returns true if this instance represents a closed channel.
254
*/
255
val isClosed: Boolean
256
257
/**
258
* Returns true if this instance represents a failed outcome.
259
*/
260
val isFailure: Boolean
261
262
/**
263
* Returns the encapsulated value if this instance represents success or null if closed/failed.
264
*/
265
fun getOrNull(): T?
266
267
/**
268
* Returns the encapsulated Throwable exception if this instance represents failure.
269
*/
270
fun exceptionOrNull(): Throwable?
271
272
/**
273
* Performs the given action on the encapsulated value if this instance represents success.
274
*/
275
inline fun onSuccess(action: (value: T) -> Unit): ChannelResult<T>
276
277
/**
278
* Performs the given action on the encapsulated Throwable if this instance represents failure.
279
*/
280
inline fun onFailure(action: (exception: Throwable) -> Unit): ChannelResult<T>
281
282
/**
283
* Performs the given action if this instance represents a closed channel.
284
*/
285
inline fun onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T>
286
}
287
```
288
289
**Usage Examples:**
290
291
```kotlin
292
import kotlinx.coroutines.*
293
import kotlinx.coroutines.channels.*
294
295
val scope = MainScope()
296
297
val channel = Channel<String>(capacity = 2)
298
299
scope.launch {
300
// Non-blocking send attempts
301
repeat(5) { i ->
302
val result = channel.trySend("Message $i")
303
result
304
.onSuccess { println("Successfully sent: Message $i") }
305
.onFailure { println("Failed to send: Message $i") }
306
.onClosed { println("Channel closed, cannot send: Message $i") }
307
}
308
309
channel.close()
310
}
311
312
scope.launch {
313
delay(100)
314
315
// Non-blocking receive attempts
316
repeat(10) {
317
val result = channel.tryReceive()
318
result
319
.onSuccess { value -> println("Successfully received: $value") }
320
.onFailure { println("No value available") }
321
.onClosed { println("Channel closed, no more values") }
322
323
delay(50)
324
}
325
}
326
```
327
328
### Producer Builder
329
330
Create receive channels using a producer coroutine pattern.
331
332
```kotlin { .api }
333
/**
334
* Launches a new coroutine to produce a stream of values by sending them
335
* to a channel and returns a ReceiveChannel that yields these values.
336
*/
337
fun <E> CoroutineScope.produce(
338
context: CoroutineContext = EmptyCoroutineContext,
339
capacity: Int = 0,
340
onCompletion: CompletionHandler? = null,
341
block: suspend ProducerScope<E>.() -> Unit
342
): ReceiveChannel<E>
343
344
/**
345
* Scope for produce builder.
346
*/
347
interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
348
/**
349
* The channel that this producer is sending to.
350
*/
351
val channel: SendChannel<E>
352
}
353
```
354
355
**Usage Examples:**
356
357
```kotlin
358
import kotlinx.coroutines.*
359
import kotlinx.coroutines.channels.*
360
361
val scope = MainScope()
362
363
// Basic producer
364
val numbersChannel = scope.produce {
365
for (i in 1..5) {
366
send(i)
367
delay(100)
368
}
369
}
370
371
scope.launch {
372
for (number in numbersChannel) {
373
println("Received number: $number")
374
}
375
}
376
377
// Producer with capacity
378
val bufferedProducer = scope.produce(capacity = 10) {
379
repeat(20) {
380
send("Item $it")
381
println("Produced: Item $it")
382
}
383
}
384
385
scope.launch {
386
delay(500) // Let producer get ahead
387
for (item in bufferedProducer) {
388
println("Consumed: $item")
389
delay(100)
390
}
391
}
392
393
// Producer with custom context
394
val ioProducer = scope.produce(context = Dispatchers.IO) {
395
repeat(3) {
396
val data = fetchDataFromNetwork() // IO operation
397
send(data)
398
}
399
}
400
401
scope.launch {
402
for (data in ioProducer) {
403
processData(data) // Process on main context
404
}
405
}
406
407
// Producer with exception handling
408
val robustProducer = scope.produce<String>(
409
onCompletion = { exception ->
410
println("Producer completed with: $exception")
411
}
412
) {
413
try {
414
repeat(5) {
415
if (it == 3) throw RuntimeException("Simulated error")
416
send("Value $it")
417
}
418
} catch (e: Exception) {
419
println("Producer caught exception: ${e.message}")
420
throw e // Re-throw to signal completion with exception
421
}
422
}
423
```
424
425
### Channel Extensions
426
427
Utility functions for working with channels and converting between channels and other types.
428
429
```kotlin { .api }
430
/**
431
* Performs the given action for each received element.
432
*/
433
suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit)
434
435
/**
436
* Receives all elements from the channel and returns them as a List.
437
*/
438
suspend fun <E> ReceiveChannel<E>.toList(): List<E>
439
440
/**
441
* Creates a produce block from this channel.
442
*/
443
fun <E> ReceiveChannel<E>.consumeAsFlow(): Flow<E>
444
445
/**
446
* Converts this Flow to a ReceiveChannel.
447
*/
448
fun <T> Flow<T>.produceIn(scope: CoroutineScope): ReceiveChannel<T>
449
450
/**
451
* Returns a channel of [SendChannel] that feeds all elements from this channel.
452
*/
453
fun <E> ReceiveChannel<E>.broadcast(capacity: Int = 1): BroadcastChannel<E>
454
```
455
456
**Usage Examples:**
457
458
```kotlin
459
import kotlinx.coroutines.*
460
import kotlinx.coroutines.channels.*
461
import kotlinx.coroutines.flow.*
462
463
val scope = MainScope()
464
465
// Channel to list
466
scope.launch {
467
val channel = produce {
468
repeat(5) { send(it) }
469
}
470
471
val list = channel.toList()
472
println("Channel as list: $list")
473
}
474
475
// Channel to flow
476
scope.launch {
477
val channel = produce {
478
repeat(3) {
479
send("Item $it")
480
delay(100)
481
}
482
}
483
484
channel.consumeAsFlow()
485
.map { it.uppercase() }
486
.collect { println("From channel flow: $it") }
487
}
488
489
// Flow to channel
490
scope.launch {
491
val flow = flowOf(1, 2, 3, 4, 5)
492
val channel = flow.produceIn(scope)
493
494
for (value in channel) {
495
println("From flow channel: $value")
496
}
497
}
498
499
// ConsumeEach
500
scope.launch {
501
val channel = produce {
502
repeat(3) { send("Message $it") }
503
}
504
505
channel.consumeEach { message ->
506
println("Processing: $message")
507
delay(50)
508
}
509
}
510
```
511
512
### Select Support for Channels
513
514
Use channels in select expressions for awaiting multiple channel operations.
515
516
```kotlin { .api }
517
/**
518
* Select clause using the send suspending function as a select clause.
519
*/
520
val <E> SendChannel<E>.onSend: SelectClause2<E, SendChannel<E>>
521
522
/**
523
* Select clause using the receive suspending function as a select clause.
524
*/
525
val <E> ReceiveChannel<E>.onReceive: SelectClause1<E>
526
527
/**
528
* Select clause using the receiveCatching suspending function as a select clause.
529
*/
530
val <E> ReceiveChannel<E>.onReceiveCatching: SelectClause1<ChannelResult<E>>
531
```
532
533
**Usage Examples:**
534
535
```kotlin
536
import kotlinx.coroutines.*
537
import kotlinx.coroutines.channels.*
538
import kotlinx.coroutines.selects.*
539
540
val scope = MainScope()
541
542
scope.launch {
543
val channel1 = Channel<String>()
544
val channel2 = Channel<String>()
545
546
// Producer for channel1
547
launch {
548
delay(100)
549
channel1.send("From channel 1")
550
}
551
552
// Producer for channel2
553
launch {
554
delay(200)
555
channel2.send("From channel 2")
556
}
557
558
// Select from multiple channels
559
val result = select<String> {
560
channel1.onReceive { value ->
561
"Received from channel1: $value"
562
}
563
channel2.onReceive { value ->
564
"Received from channel2: $value"
565
}
566
}
567
568
println(result) // Will print result from channel1 (faster)
569
570
channel1.close()
571
channel2.close()
572
}
573
574
// Select with send operations
575
scope.launch {
576
val fastChannel = Channel<Int>(Channel.UNLIMITED)
577
val slowChannel = Channel<Int>()
578
579
// Try to send to whichever channel can accept first
580
val sendResult = select<String> {
581
fastChannel.onSend(1) { channel ->
582
"Sent to fast channel"
583
}
584
slowChannel.onSend(2) { channel ->
585
"Sent to slow channel"
586
}
587
}
588
589
println(sendResult) // Will likely send to fast channel
590
591
fastChannel.close()
592
slowChannel.close()
593
}
594
```
595
596
### Advanced Channel Patterns
597
598
Common patterns and best practices for channel usage.
599
600
**Fan-out Pattern:**
601
602
```kotlin
603
import kotlinx.coroutines.*
604
import kotlinx.coroutines.channels.*
605
606
val scope = MainScope()
607
608
// Single producer, multiple consumers
609
fun fanOut() = scope.produce {
610
var x = 1
611
while (true) {
612
send(x++)
613
delay(100)
614
}
615
}
616
617
scope.launch {
618
val producer = fanOut()
619
620
repeat(3) { id ->
621
launch {
622
for (value in producer) {
623
println("Consumer $id received: $value")
624
if (value >= 10) break
625
}
626
}
627
}
628
}
629
```
630
631
**Fan-in Pattern:**
632
633
```kotlin
634
// Multiple producers, single consumer
635
suspend fun fanIn(
636
input1: ReceiveChannel<String>,
637
input2: ReceiveChannel<String>
638
): ReceiveChannel<String> = scope.produce {
639
var input1Active = true
640
var input2Active = true
641
642
while (input1Active || input2Active) {
643
select<Unit> {
644
if (input1Active) {
645
input1.onReceiveCatching { result ->
646
result.onSuccess { send("From input1: $it") }
647
.onClosed { input1Active = false }
648
}
649
}
650
if (input2Active) {
651
input2.onReceiveCatching { result ->
652
result.onSuccess { send("From input2: $it") }
653
.onClosed { input2Active = false }
654
}
655
}
656
}
657
}
658
}
659
```
660
661
**Pipeline Pattern:**
662
663
```kotlin
664
// Chain of processing stages
665
fun numbers() = scope.produce {
666
var x = 1
667
while (true) send(x++)
668
}
669
670
fun square(numbers: ReceiveChannel<Int>) = scope.produce {
671
for (x in numbers) send(x * x)
672
}
673
674
fun print(numbers: ReceiveChannel<Int>) = scope.launch {
675
for (x in numbers) println(x)
676
}
677
678
// Usage
679
val numbersPipeline = numbers()
680
val squaredPipeline = square(numbersPipeline)
681
print(squaredPipeline)
682
```