or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdcoroutine-builders.mdcoroutine-management.mddispatchers.mderror-handling.mdflow-api.mdindex.mdselect-expression.mdsynchronization.md

channels.mddocs/

0

# Channels

1

2

Communication primitives between coroutines with various buffering strategies and channel types. Channels provide a way to send values between coroutines with flow control and different delivery guarantees.

3

4

## Capabilities

5

6

### Channel Interfaces

7

8

Core interfaces for sending and receiving values between coroutines.

9

10

```kotlin { .api }

11

/**

12

* Interface for sending values to a channel

13

*/

14

interface SendChannel<in E> {

15

/** True if channel is closed for sending */

16

val isClosedForSend: Boolean

17

18

/** Send a value, suspending if channel is full */

19

suspend fun send(element: E)

20

21

/** Try to send a value immediately without suspending */

22

fun trySend(element: E): ChannelResult<Unit>

23

24

/** Close the channel optionally with a cause */

25

fun close(cause: Throwable? = null): Boolean

26

27

/** Register a handler for when channel is closed */

28

fun invokeOnClose(handler: (cause: Throwable?) -> Unit): Unit

29

}

30

31

/**

32

* Interface for receiving values from a channel

33

*/

34

interface ReceiveChannel<out E> {

35

/** True if channel is closed for receiving and empty */

36

val isClosedForReceive: Boolean

37

38

/** True if channel is empty */

39

val isEmpty: Boolean

40

41

/** Receive a value, suspending if channel is empty */

42

suspend fun receive(): E

43

44

/** Try to receive a value immediately without suspending */

45

fun tryReceive(): ChannelResult<E>

46

47

/** Receive a value or null/failure if closed */

48

suspend fun receiveCatching(): ChannelResult<E>

49

50

/** Cancel the channel with optional cause */

51

fun cancel(cause: CancellationException? = null)

52

53

/** Get iterator for consuming values */

54

operator fun iterator(): ChannelIterator<E>

55

}

56

57

/**

58

* Bidirectional channel combining SendChannel and ReceiveChannel

59

*/

60

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

61

```

62

63

### Channel Result

64

65

Result wrapper for non-blocking channel operations.

66

67

```kotlin { .api }

68

/**

69

* Result of channel operations

70

*/

71

@JvmInline

72

value class ChannelResult<out T> {

73

/** True if operation was successful */

74

val isSuccess: Boolean

75

76

/** True if channel was closed */

77

val isClosed: Boolean

78

79

/** True if operation failed (channel full/empty) */

80

val isFailure: Boolean

81

82

/** Get the value or throw if not successful */

83

fun getOrThrow(): T

84

85

/** Get the value or null if not successful */

86

fun getOrNull(): T?

87

88

/** Get the exception or null if successful */

89

fun exceptionOrNull(): Throwable?

90

}

91

```

92

93

### Channel Iterator

94

95

Iterator for consuming channel values.

96

97

```kotlin { .api }

98

/**

99

* Iterator for channel values

100

*/

101

interface ChannelIterator<out E> {

102

/** Check if there are more values (suspending) */

103

suspend fun hasNext(): Boolean

104

105

/** Get the next value */

106

operator fun next(): E

107

}

108

```

109

110

### Channel Factory

111

112

Factory function for creating channels with various configurations.

113

114

```kotlin { .api }

115

/**

116

* Create a channel with specified capacity and behavior

117

* @param capacity channel capacity (RENDEZVOUS, CONFLATED, UNLIMITED, or positive number)

118

* @param onBufferOverflow behavior when buffer is full

119

* @param onUndeliveredElement handler for undelivered elements

120

*/

121

fun <E> Channel(

122

capacity: Int = RENDEZVOUS,

123

onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,

124

onUndeliveredElement: ((E) -> Unit)? = null

125

): Channel<E>

126

127

// Capacity constants

128

const val RENDEZVOUS = 0 // No buffering, direct handoff

129

const val CONFLATED = -1 // Keep only latest value

130

const val UNLIMITED = Int.MAX_VALUE // Unlimited buffering

131

const val BUFFERED = -2 // Use default buffer size

132

```

133

134

**Usage Examples:**

135

136

```kotlin

137

import kotlinx.coroutines.*

138

import kotlinx.coroutines.channels.*

139

140

// Rendezvous channel (no buffering)

141

val rendezvousChannel = Channel<Int>()

142

143

// Buffered channel

144

val bufferedChannel = Channel<String>(capacity = 10)

145

146

// Conflated channel (only latest value)

147

val conflatedChannel = Channel<Data>(capacity = Channel.CONFLATED)

148

149

// Unlimited channel

150

val unlimitedChannel = Channel<Event>(capacity = Channel.UNLIMITED)

151

152

// Producer-consumer example

153

launch {

154

// Producer

155

for (i in 1..5) {

156

rendezvousChannel.send(i)

157

println("Sent: $i")

158

}

159

rendezvousChannel.close()

160

}

161

162

launch {

163

// Consumer

164

for (value in rendezvousChannel) {

165

println("Received: $value")

166

delay(100) // Simulate processing

167

}

168

}

169

```

170

171

### Buffer Overflow Strategies

172

173

Configuration for how channels handle buffer overflow.

174

175

```kotlin { .api }

176

/**

177

* Strategy for handling buffer overflow

178

*/

179

enum class BufferOverflow {

180

/** Suspend sender when buffer is full */

181

SUSPEND,

182

/** Drop oldest values when buffer is full */

183

DROP_OLDEST,

184

/** Drop latest values when buffer is full */

185

DROP_LATEST

186

}

187

```

188

189

**Usage Examples:**

190

191

```kotlin

192

import kotlinx.coroutines.*

193

import kotlinx.coroutines.channels.*

194

195

// Channel that drops oldest values when full

196

val dropOldestChannel = Channel<Int>(

197

capacity = 3,

198

onBufferOverflow = BufferOverflow.DROP_OLDEST

199

)

200

201

// Channel that drops latest values when full

202

val dropLatestChannel = Channel<Int>(

203

capacity = 3,

204

onBufferOverflow = BufferOverflow.DROP_LATEST

205

)

206

207

// Fast producer, slow consumer

208

launch {

209

repeat(10) { i ->

210

val result = dropOldestChannel.trySend(i)

211

if (result.isSuccess) {

212

println("Sent: $i")

213

} else {

214

println("Dropped: $i")

215

}

216

}

217

dropOldestChannel.close()

218

}

219

220

launch {

221

delay(500) // Slow consumer

222

for (value in dropOldestChannel) {

223

println("Received: $value")

224

}

225

}

226

```

227

228

### Producer Function

229

230

Creates a receive channel from a coroutine that produces values.

231

232

```kotlin { .api }

233

/**

234

* Creates a receive channel from a producer coroutine

235

* @param context additional context for the producer

236

* @param capacity channel capacity

237

* @param block producer coroutine code

238

*/

239

fun <E> CoroutineScope.produce(

240

context: CoroutineContext = EmptyCoroutineContext,

241

capacity: Int = 0,

242

block: suspend ProducerScope<E>.() -> Unit

243

): ReceiveChannel<E>

244

245

/**

246

* Scope for producer coroutines

247

*/

248

interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {

249

/** The channel being produced to */

250

val channel: SendChannel<E>

251

}

252

```

253

254

**Usage Examples:**

255

256

```kotlin

257

import kotlinx.coroutines.*

258

import kotlinx.coroutines.channels.*

259

260

// Simple producer

261

val numbers = produce {

262

for (i in 1..10) {

263

send(i * i)

264

delay(100)

265

}

266

}

267

268

// Consume the values

269

for (square in numbers) {

270

println("Square: $square")

271

}

272

273

// Producer with error handling

274

val dataProducer = produce<String> {

275

try {

276

while (true) {

277

val data = fetchData() // May throw exception

278

send(data)

279

delay(1000)

280

}

281

} catch (e: Exception) {

282

// Producer will close channel with this exception

283

throw e

284

}

285

}

286

```

287

288

### Select Clauses for Channels

289

290

Channel operations can be used in select expressions for non-blocking multi-way selection.

291

292

```kotlin { .api }

293

/**

294

* Select clauses for channels

295

*/

296

interface SendChannel<in E> {

297

/** Select clause for sending */

298

val onSend: SelectClause2<E, SendChannel<E>>

299

}

300

301

interface ReceiveChannel<out E> {

302

/** Select clause for receiving */

303

val onReceive: SelectClause1<E>

304

305

/** Select clause for receiving with result */

306

val onReceiveCatching: SelectClause1<ChannelResult<E>>

307

}

308

```

309

310

**Usage Examples:**

311

312

```kotlin

313

import kotlinx.coroutines.*

314

import kotlinx.coroutines.channels.*

315

import kotlinx.coroutines.selects.*

316

317

suspend fun selectChannelOperations() {

318

val channel1 = Channel<String>()

319

val channel2 = Channel<String>()

320

321

// Select between multiple channel operations

322

val result = select<String> {

323

channel1.onReceive { value ->

324

"From channel1: $value"

325

}

326

channel2.onReceive { value ->

327

"From channel2: $value"

328

}

329

onTimeout(1000) {

330

"Timeout"

331

}

332

}

333

334

println(result)

335

}

336

337

suspend fun selectSend() {

338

val channel1 = Channel<Int>(1)

339

val channel2 = Channel<Int>(1)

340

341

select<Unit> {

342

channel1.onSend(42) {

343

println("Sent to channel1")

344

}

345

channel2.onSend(24) {

346

println("Sent to channel2")

347

}

348

}

349

}

350

```

351

352

### Channel Extensions

353

354

Utility functions for working with channels.

355

356

```kotlin { .api }

357

/**

358

* Consume all values from the channel

359

*/

360

suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit)

361

362

/**

363

* Convert channel to list

364

*/

365

suspend fun <E> ReceiveChannel<E>.toList(): List<E>

366

367

/**

368

* Map channel values

369

*/

370

fun <E, R> ReceiveChannel<E>.map(transform: suspend (E) -> R): ReceiveChannel<R>

371

372

/**

373

* Filter channel values

374

*/

375

fun <E> ReceiveChannel<E>.filter(predicate: suspend (E) -> Boolean): ReceiveChannel<E>

376

377

/**

378

* Take first n values from channel

379

*/

380

fun <E> ReceiveChannel<E>.take(n: Int): ReceiveChannel<E>

381

```

382

383

### Broadcast Channel (Deprecated)

384

385

Legacy broadcasting channel replaced by SharedFlow.

386

387

```kotlin { .api }

388

/**

389

* @deprecated Use SharedFlow instead

390

* Channel that broadcasts values to multiple subscribers

391

*/

392

@Deprecated("Use SharedFlow instead", level = DeprecationLevel.WARNING)

393

interface BroadcastChannel<E> : SendChannel<E> {

394

/** Subscribe to the broadcast channel */

395

fun openSubscription(): ReceiveChannel<E>

396

397

/** Cancel all subscriptions */

398

fun cancel(cause: CancellationException? = null)

399

}

400

```

401

402

## Channel Patterns

403

404

### Fan-out Pattern

405

406

Multiple consumers processing values from a single channel.

407

408

```kotlin

409

val jobs = List(3) { workerId ->

410

launch {

411

for (work in workChannel) {

412

processWork(work, workerId)

413

}

414

}

415

}

416

```

417

418

### Fan-in Pattern

419

420

Multiple producers sending values to a single channel.

421

422

```kotlin

423

val outputChannel = Channel<Result>()

424

425

// Multiple producers

426

repeat(3) { producerId ->

427

launch {

428

repeat(10) {

429

outputChannel.send(produceResult(producerId, it))

430

}

431

}

432

}

433

```

434

435

### Pipeline Pattern

436

437

Chaining channels for multi-stage processing.

438

439

```kotlin

440

val rawData = produce { /* generate raw data */ }

441

val processed = produce {

442

for (data in rawData) {

443

send(processStage1(data))

444

}

445

}

446

val final = produce {

447

for (data in processed) {

448

send(processStage2(data))

449

}

450

}

451

```

452

453

## Channel vs Flow

454

455

| Feature | Channel | Flow |

456

|---------|---------|------|

457

| Nature | Hot (always active) | Cold (starts on collect) |

458

| Consumers | Multiple concurrent | Single sequential |

459

| Buffering | Built-in buffering | Operator-based buffering |

460

| Backpressure | Send suspension | Collector-driven |

461

| Use Case | Producer-consumer | Data transformation pipelines |