0
# Channels
1
2
Message passing primitives for communication between coroutines with configurable capacity, buffering strategies, and bi-directional communication patterns.
3
4
## Capabilities
5
6
### Channel Interface
7
8
Combines sending and receiving capabilities for bi-directional communication.
9
10
```kotlin { .api }
11
interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
12
/** Channel capacity constants */
13
companion object {
14
const val UNLIMITED: Int = Int.MAX_VALUE
15
const val CONFLATED: Int = -1
16
const val RENDEZVOUS: Int = 0
17
const val BUFFERED: Int = -2
18
}
19
}
20
21
/** Creates a channel with specified capacity */
22
fun <E> Channel(
23
capacity: Int = RENDEZVOUS,
24
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
25
onUndeliveredElement: ((E) -> Unit)? = null
26
): Channel<E>
27
```
28
29
**Usage Examples:**
30
31
```kotlin
32
import kotlinx.coroutines.*
33
import kotlinx.coroutines.channels.*
34
35
fun main() = runBlocking {
36
// Rendezvous channel (capacity = 0)
37
val rendezvousChannel = Channel<Int>()
38
39
launch {
40
repeat(3) { i ->
41
println("Sending $i")
42
rendezvousChannel.send(i)
43
println("Sent $i")
44
}
45
rendezvousChannel.close()
46
}
47
48
launch {
49
for (value in rendezvousChannel) {
50
println("Received $value")
51
delay(100) // Simulate processing
52
}
53
}
54
55
delay(1000)
56
57
// Buffered channel
58
val bufferedChannel = Channel<String>(capacity = 3)
59
60
// Can send up to capacity without suspending
61
bufferedChannel.send("Message 1")
62
bufferedChannel.send("Message 2")
63
bufferedChannel.send("Message 3")
64
65
println("Sent 3 messages without suspending")
66
67
// This would suspend until space is available
68
launch {
69
bufferedChannel.send("Message 4")
70
println("Sent message 4")
71
}
72
73
// Receive messages
74
repeat(4) {
75
val message = bufferedChannel.receive()
76
println("Received: $message")
77
}
78
79
bufferedChannel.close()
80
}
81
```
82
83
### SendChannel Interface
84
85
Sending side of a channel for message production.
86
87
```kotlin { .api }
88
interface SendChannel<in E> {
89
/** True if channel is closed for sending */
90
val isClosedForSend: Boolean
91
92
/** Suspends until element can be sent */
93
suspend fun send(element: E)
94
95
/** Tries to send element immediately */
96
fun trySend(element: E): ChannelResult<Unit>
97
98
/** Closes the channel with optional cause */
99
fun close(cause: Throwable? = null): Boolean
100
101
/** Registers handler for when channel is closed */
102
fun invokeOnClose(handler: (cause: Throwable?) -> Unit)
103
}
104
```
105
106
**Usage Examples:**
107
108
```kotlin
109
import kotlinx.coroutines.*
110
import kotlinx.coroutines.channels.*
111
112
fun main() = runBlocking {
113
val channel = Channel<Int>(capacity = 2)
114
115
// Producer coroutine
116
val producer = launch {
117
try {
118
repeat(5) { i ->
119
println("Trying to send $i")
120
121
// Try to send without suspending first
122
val result = channel.trySend(i)
123
if (result.isSuccess) {
124
println("Sent $i immediately")
125
} else {
126
println("Buffer full, suspending to send $i")
127
channel.send(i)
128
println("Sent $i after suspending")
129
}
130
131
delay(100)
132
}
133
} catch (e: Exception) {
134
println("Producer failed: ${e.message}")
135
} finally {
136
channel.close()
137
println("Channel closed by producer")
138
}
139
}
140
141
// Consumer coroutine (slower than producer)
142
launch {
143
try {
144
while (!channel.isClosedForReceive) {
145
val value = channel.receive()
146
println("Received $value")
147
delay(300) // Slower consumer
148
}
149
} catch (e: ClosedReceiveChannelException) {
150
println("Channel was closed")
151
}
152
}
153
154
producer.join()
155
delay(1000)
156
}
157
```
158
159
### ReceiveChannel Interface
160
161
Receiving side of a channel for message consumption.
162
163
```kotlin { .api }
164
interface ReceiveChannel<out E> {
165
/** True if channel is closed for receiving */
166
val isClosedForReceive: Boolean
167
168
/** True if channel is empty */
169
val isEmpty: Boolean
170
171
/** Suspends until element is available */
172
suspend fun receive(): E
173
174
/** Tries to receive element immediately */
175
fun tryReceive(): ChannelResult<E>
176
177
/** Receives element or close/failure result */
178
suspend fun receiveCatching(): ChannelResult<E>
179
180
/** Iterator for consuming channel */
181
operator fun iterator(): ChannelIterator<E>
182
183
/** Cancels reception from channel */
184
fun cancel(cause: CancellationException? = null)
185
}
186
187
interface ChannelIterator<out E> {
188
suspend fun hasNext(): Boolean
189
suspend fun next(): E
190
}
191
```
192
193
**Usage Examples:**
194
195
```kotlin
196
import kotlinx.coroutines.*
197
import kotlinx.coroutines.channels.*
198
199
fun main() = runBlocking {
200
val channel = Channel<String>(capacity = Channel.UNLIMITED)
201
202
// Fill channel with data
203
launch {
204
repeat(5) { i ->
205
channel.send("Item $i")
206
}
207
channel.close()
208
}
209
210
// Different ways to consume
211
212
// 1. Using iterator (for-in loop)
213
println("Method 1: for-in loop")
214
for (item in channel) {
215
println("Received: $item")
216
}
217
218
// Refill for next example
219
val channel2 = Channel<String>()
220
launch {
221
repeat(3) { i ->
222
channel2.send("Data $i")
223
}
224
channel2.close()
225
}
226
227
// 2. Using receiveCatching for error handling
228
println("Method 2: receiveCatching")
229
while (true) {
230
val result = channel2.receiveCatching()
231
if (result.isSuccess) {
232
println("Received: ${result.getOrNull()}")
233
} else {
234
println("Channel closed or failed")
235
break
236
}
237
}
238
239
// 3. Using tryReceive for non-blocking
240
val channel3 = Channel<Int>(capacity = 3)
241
channel3.trySend(1)
242
channel3.trySend(2)
243
244
println("Method 3: tryReceive")
245
while (true) {
246
val result = channel3.tryReceive()
247
if (result.isSuccess) {
248
println("Received immediately: ${result.getOrNull()}")
249
} else {
250
println("No data available")
251
break
252
}
253
}
254
255
channel3.close()
256
}
257
```
258
259
### Channel Factory Functions
260
261
Functions for creating channels with different characteristics.
262
263
```kotlin { .api }
264
/** Creates unlimited capacity channel */
265
fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E>
266
267
/** Creates produce-consumer pattern */
268
fun <E> CoroutineScope.produce(
269
context: CoroutineContext = EmptyCoroutineContext,
270
capacity: Int = 0,
271
onCompletion: CompletionHandler? = null,
272
block: suspend ProducerScope<E>.() -> Unit
273
): ReceiveChannel<E>
274
275
/** Creates actor pattern */
276
fun <E> CoroutineScope.actor(
277
context: CoroutineContext = EmptyCoroutineContext,
278
capacity: Int = 0,
279
start: CoroutineStart = CoroutineStart.DEFAULT,
280
onCompletion: CompletionHandler? = null,
281
block: suspend ActorScope<E>.() -> Unit
282
): SendChannel<E>
283
```
284
285
**Usage Examples:**
286
287
```kotlin
288
import kotlinx.coroutines.*
289
import kotlinx.coroutines.channels.*
290
291
fun main() = runBlocking {
292
// Producer pattern
293
val receiveChannel = produce {
294
repeat(5) { i ->
295
send("Produced $i")
296
delay(100)
297
}
298
}
299
300
for (item in receiveChannel) {
301
println("Consumed: $item")
302
}
303
304
// Actor pattern
305
val sendChannel = actor<String> {
306
for (message in channel) {
307
println("Actor processing: $message")
308
delay(50)
309
}
310
}
311
312
repeat(3) { i ->
313
sendChannel.send("Message $i")
314
}
315
316
sendChannel.close()
317
delay(200)
318
}
319
320
// Advanced producer example
321
fun CoroutineScope.numberProducer(max: Int) = produce<Int> {
322
for (i in 1..max) {
323
send(i)
324
delay(100)
325
}
326
}
327
328
fun CoroutineScope.squareProcessor(input: ReceiveChannel<Int>) = produce<Int> {
329
for (number in input) {
330
send(number * number)
331
}
332
}
333
334
suspend fun pipelineExample() = coroutineScope {
335
val numbers = numberProducer(5)
336
val squares = squareProcessor(numbers)
337
338
for (square in squares) {
339
println("Square: $square")
340
}
341
}
342
```
343
344
### Broadcast Channels (Deprecated)
345
346
Legacy broadcast functionality (deprecated in favor of SharedFlow).
347
348
```kotlin { .api }
349
@Deprecated("Use SharedFlow instead")
350
interface BroadcastChannel<E> : SendChannel<E> {
351
fun openSubscription(): ReceiveChannel<E>
352
fun cancel(cause: CancellationException? = null)
353
}
354
355
@Deprecated("Use SharedFlow instead")
356
fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>
357
```
358
359
### Channel Capacity Types
360
361
Different capacity configurations for channels.
362
363
```kotlin { .api }
364
companion object Channel {
365
/** Unlimited capacity - never suspends sends */
366
const val UNLIMITED: Int = Int.MAX_VALUE
367
368
/** Conflated - keeps only the latest value */
369
const val CONFLATED: Int = -1
370
371
/** Rendezvous - zero capacity, direct handoff */
372
const val RENDEZVOUS: Int = 0
373
374
/** Default buffered capacity */
375
const val BUFFERED: Int = -2
376
}
377
```
378
379
**Usage Examples:**
380
381
```kotlin
382
import kotlinx.coroutines.*
383
import kotlinx.coroutines.channels.*
384
385
fun main() = runBlocking {
386
// Unlimited channel - never blocks sends
387
val unlimitedChannel = Channel<Int>(Channel.UNLIMITED)
388
389
launch {
390
repeat(1000) { i ->
391
unlimitedChannel.send(i) // Never suspends
392
}
393
unlimitedChannel.close()
394
}
395
396
var count = 0
397
for (value in unlimitedChannel) {
398
count++
399
}
400
println("Received $count items from unlimited channel")
401
402
// Conflated channel - only latest value
403
val conflatedChannel = Channel<String>(Channel.CONFLATED)
404
405
launch {
406
repeat(5) { i ->
407
conflatedChannel.send("Value $i")
408
println("Sent: Value $i")
409
}
410
conflatedChannel.close()
411
}
412
413
delay(100) // Let all sends complete
414
415
// Will only receive the latest value
416
for (value in conflatedChannel) {
417
println("Conflated received: $value")
418
}
419
}
420
```
421
422
### Channel Result
423
424
Result type for non-blocking channel operations.
425
426
```kotlin { .api }
427
value class ChannelResult<out T> {
428
val isSuccess: Boolean
429
val isClosed: Boolean
430
val isFailure: Boolean
431
432
fun getOrNull(): T?
433
fun getOrThrow(): T
434
fun exceptionOrNull(): Throwable?
435
}
436
```
437
438
**Usage Examples:**
439
440
```kotlin
441
import kotlinx.coroutines.*
442
import kotlinx.coroutines.channels.*
443
444
fun main() = runBlocking {
445
val channel = Channel<Int>(capacity = 1)
446
447
// Non-blocking send
448
val sendResult1 = channel.trySend(1)
449
println("Send 1 success: ${sendResult1.isSuccess}")
450
451
val sendResult2 = channel.trySend(2)
452
println("Send 2 success: ${sendResult2.isSuccess}") // false - buffer full
453
454
// Non-blocking receive
455
val receiveResult1 = channel.tryReceive()
456
if (receiveResult1.isSuccess) {
457
println("Received: ${receiveResult1.getOrNull()}")
458
}
459
460
val receiveResult2 = channel.tryReceive()
461
println("Receive 2 success: ${receiveResult2.isSuccess}") // false - empty
462
463
channel.close()
464
465
// Receive from closed channel
466
val receiveResult3 = channel.tryReceive()
467
println("Receive from closed: ${receiveResult3.isClosed}")
468
}
469
```
470
471
## Types
472
473
### ProducerScope and ActorScope
474
475
Scopes for producer and actor patterns.
476
477
```kotlin { .api }
478
interface ProducerScope<in E> : CoroutineScope, SendChannel<E>
479
480
interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {
481
val channel: Channel<E>
482
}
483
```
484
485
### BufferOverflow
486
487
Strategy for handling buffer overflow in channels.
488
489
```kotlin { .api }
490
enum class BufferOverflow {
491
/** Suspend sender when buffer is full */
492
SUSPEND,
493
/** Drop oldest element when buffer is full */
494
DROP_OLDEST,
495
/** Drop latest element when buffer is full */
496
DROP_LATEST
497
}
498
```