or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdcoroutine-builders.mddispatchers.mdexception-handling.mdflow-api.mdindex.mdjobs-deferreds.mdstructured-concurrency.mdsynchronization.md

channels.mddocs/

0

# Channels - Communication Primitives

1

2

Producer-consumer communication channels for passing data between coroutines. Channels provide a way to transfer values between coroutines with various capacity and overflow strategies.

3

4

## Capabilities

5

6

### Core Channel Interfaces

7

8

The fundamental interfaces for channel-based communication between coroutines.

9

10

```kotlin { .api }

11

/**

12

* Channel is a non-blocking primitive for communication between a sender and a receiver.

13

* Conceptually, a channel is similar to BlockingQueue, but with suspend operations

14

* instead of blocking ones.

15

*/

16

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

17

18

/**

19

* Sender's interface to a Channel.

20

*/

21

interface SendChannel<in E> {

22

/**

23

* Returns true if this channel was closed by an invocation of close or

24

* its receiving side was cancelled.

25

*/

26

val isClosedForSend: Boolean

27

28

/**

29

* Sends the specified element to this channel, suspending the caller

30

* while the buffer of this channel is full.

31

*/

32

suspend fun send(element: E)

33

34

/**

35

* Tries to send the specified element to this channel without blocking.

36

*/

37

fun trySend(element: E): ChannelResult<Unit>

38

39

/**

40

* Closes this channel with an optional cause exception.

41

*/

42

fun close(cause: Throwable? = null): Boolean

43

44

/**

45

* Registers a handler which is synchronously invoked once the channel is closed.

46

*/

47

fun invokeOnClose(handler: (cause: Throwable?) -> Unit)

48

}

49

50

/**

51

* Receiver's interface to a Channel.

52

*/

53

interface ReceiveChannel<out E> {

54

/**

55

* Returns true if this channel was closed and no more elements will ever be received.

56

*/

57

val isClosedForReceive: Boolean

58

59

/**

60

* Retrieves and removes an element from this channel, suspending the caller

61

* if this channel is empty.

62

*/

63

suspend fun receive(): E

64

65

/**

66

* Tries to retrieve and remove an element from this channel without blocking.

67

*/

68

fun tryReceive(): ChannelResult<E>

69

70

/**

71

* Cancels reception of remaining elements from this channel with an optional cause exception.

72

*/

73

fun cancel(cause: CancellationException? = null)

74

75

/**

76

* Returns a new iterator to receive elements from this channel using a for loop.

77

*/

78

operator fun iterator(): ChannelIterator<E>

79

}

80

81

/**

82

* Iterator for ReceiveChannel.

83

*/

84

interface ChannelIterator<out E> {

85

/**

86

* Returns true if the channel has more elements, suspending the caller

87

* if this channel is empty.

88

*/

89

suspend operator fun hasNext(): Boolean

90

91

/**

92

* Retrieves the next element from this channel.

93

*/

94

operator fun next(): E

95

}

96

```

97

98

### Channel Factory Functions

99

100

Functions to create channels with different configurations and behaviors.

101

102

```kotlin { .api }

103

/**

104

* Creates a channel with the specified buffer capacity.

105

*/

106

fun <E> Channel(

107

capacity: Int = Channel.RENDEZVOUS,

108

onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,

109

onUndeliveredElement: ((E) -> Unit)? = null

110

): Channel<E>

111

112

/**

113

* Channel capacity constants.

114

*/

115

object Channel {

116

/**

117

* Requests a rendezvous channel: Channel() with capacity = RENDEZVOUS.

118

*/

119

const val RENDEZVOUS = 0

120

121

/**

122

* Requests a conflated channel: Channel() with capacity = CONFLATED.

123

*/

124

const val CONFLATED = -1

125

126

/**

127

* Requests an unlimited channel: Channel() with capacity = UNLIMITED.

128

*/

129

const val UNLIMITED = Int.MAX_VALUE

130

131

/**

132

* Requests a buffered channel with the default buffer size.

133

*/

134

const val BUFFERED = -2

135

136

/**

137

* Default buffer capacity used when BUFFERED is specified.

138

*/

139

const val CHANNEL_DEFAULT_CAPACITY = 64

140

}

141

142

/**

143

* Buffer overflow strategies for channels.

144

*/

145

enum class BufferOverflow {

146

/**

147

* Suspend on buffer overflow (default).

148

*/

149

SUSPEND,

150

151

/**

152

* Drop oldest elements on buffer overflow.

153

*/

154

DROP_OLDEST,

155

156

/**

157

* Drop latest (newly emitted) elements on buffer overflow.

158

*/

159

DROP_LATEST

160

}

161

```

162

163

**Usage Examples:**

164

165

```kotlin

166

import kotlinx.coroutines.*

167

import kotlinx.coroutines.channels.*

168

169

val scope = MainScope()

170

171

// Rendezvous channel (capacity = 0)

172

val rendezvousChannel = Channel<Int>()

173

174

scope.launch {

175

println("Sending 1")

176

rendezvousChannel.send(1) // Suspends until received

177

println("Sent 1")

178

}

179

180

scope.launch {

181

delay(1000)

182

val value = rendezvousChannel.receive()

183

println("Received: $value")

184

}

185

186

// Buffered channel

187

val bufferedChannel = Channel<String>(capacity = 10)

188

189

scope.launch {

190

repeat(5) {

191

bufferedChannel.send("Message $it")

192

println("Sent: Message $it")

193

}

194

bufferedChannel.close()

195

}

196

197

scope.launch {

198

for (message in bufferedChannel) {

199

println("Received: $message")

200

delay(100)

201

}

202

}

203

204

// Conflated channel (only keeps latest)

205

val conflatedChannel = Channel<Int>(Channel.CONFLATED)

206

207

scope.launch {

208

repeat(10) {

209

conflatedChannel.send(it)

210

println("Sent: $it")

211

}

212

conflatedChannel.close()

213

}

214

215

scope.launch {

216

delay(500) // Let sender finish

217

for (value in conflatedChannel) {

218

println("Received: $value") // Will only receive the last value

219

}

220

}

221

222

// Channel with overflow strategy

223

val overflowChannel = Channel<Int>(

224

capacity = 3,

225

onBufferOverflow = BufferOverflow.DROP_OLDEST

226

)

227

228

scope.launch {

229

repeat(10) {

230

val result = overflowChannel.trySend(it)

231

println("Try send $it: ${result.isSuccess}")

232

}

233

overflowChannel.close()

234

}

235

```

236

237

### Channel Result Type

238

239

Type-safe result wrapper for non-blocking channel operations.

240

241

```kotlin { .api }

242

/**

243

* Represents the result of a channel operation.

244

*/

245

@JvmInline

246

value class ChannelResult<out T> {

247

/**

248

* Returns true if this instance represents a successful outcome.

249

*/

250

val isSuccess: Boolean

251

252

/**

253

* Returns true if this instance represents a closed channel.

254

*/

255

val isClosed: Boolean

256

257

/**

258

* Returns true if this instance represents a failed outcome.

259

*/

260

val isFailure: Boolean

261

262

/**

263

* Returns the encapsulated value if this instance represents success or null if closed/failed.

264

*/

265

fun getOrNull(): T?

266

267

/**

268

* Returns the encapsulated Throwable exception if this instance represents failure.

269

*/

270

fun exceptionOrNull(): Throwable?

271

272

/**

273

* Performs the given action on the encapsulated value if this instance represents success.

274

*/

275

inline fun onSuccess(action: (value: T) -> Unit): ChannelResult<T>

276

277

/**

278

* Performs the given action on the encapsulated Throwable if this instance represents failure.

279

*/

280

inline fun onFailure(action: (exception: Throwable) -> Unit): ChannelResult<T>

281

282

/**

283

* Performs the given action if this instance represents a closed channel.

284

*/

285

inline fun onClosed(action: (exception: Throwable?) -> Unit): ChannelResult<T>

286

}

287

```

288

289

**Usage Examples:**

290

291

```kotlin

292

import kotlinx.coroutines.*

293

import kotlinx.coroutines.channels.*

294

295

val scope = MainScope()

296

297

val channel = Channel<String>(capacity = 2)

298

299

scope.launch {

300

// Non-blocking send attempts

301

repeat(5) { i ->

302

val result = channel.trySend("Message $i")

303

result

304

.onSuccess { println("Successfully sent: Message $i") }

305

.onFailure { println("Failed to send: Message $i") }

306

.onClosed { println("Channel closed, cannot send: Message $i") }

307

}

308

309

channel.close()

310

}

311

312

scope.launch {

313

delay(100)

314

315

// Non-blocking receive attempts

316

repeat(10) {

317

val result = channel.tryReceive()

318

result

319

.onSuccess { value -> println("Successfully received: $value") }

320

.onFailure { println("No value available") }

321

.onClosed { println("Channel closed, no more values") }

322

323

delay(50)

324

}

325

}

326

```

327

328

### Producer Builder

329

330

Create receive channels using a producer coroutine pattern.

331

332

```kotlin { .api }

333

/**

334

* Launches a new coroutine to produce a stream of values by sending them

335

* to a channel and returns a ReceiveChannel that yields these values.

336

*/

337

fun <E> CoroutineScope.produce(

338

context: CoroutineContext = EmptyCoroutineContext,

339

capacity: Int = 0,

340

onCompletion: CompletionHandler? = null,

341

block: suspend ProducerScope<E>.() -> Unit

342

): ReceiveChannel<E>

343

344

/**

345

* Scope for produce builder.

346

*/

347

interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {

348

/**

349

* The channel that this producer is sending to.

350

*/

351

val channel: SendChannel<E>

352

}

353

```

354

355

**Usage Examples:**

356

357

```kotlin

358

import kotlinx.coroutines.*

359

import kotlinx.coroutines.channels.*

360

361

val scope = MainScope()

362

363

// Basic producer

364

val numbersChannel = scope.produce {

365

for (i in 1..5) {

366

send(i)

367

delay(100)

368

}

369

}

370

371

scope.launch {

372

for (number in numbersChannel) {

373

println("Received number: $number")

374

}

375

}

376

377

// Producer with capacity

378

val bufferedProducer = scope.produce(capacity = 10) {

379

repeat(20) {

380

send("Item $it")

381

println("Produced: Item $it")

382

}

383

}

384

385

scope.launch {

386

delay(500) // Let producer get ahead

387

for (item in bufferedProducer) {

388

println("Consumed: $item")

389

delay(100)

390

}

391

}

392

393

// Producer with custom context

394

val ioProducer = scope.produce(context = Dispatchers.IO) {

395

repeat(3) {

396

val data = fetchDataFromNetwork() // IO operation

397

send(data)

398

}

399

}

400

401

scope.launch {

402

for (data in ioProducer) {

403

processData(data) // Process on main context

404

}

405

}

406

407

// Producer with exception handling

408

val robustProducer = scope.produce<String>(

409

onCompletion = { exception ->

410

println("Producer completed with: $exception")

411

}

412

) {

413

try {

414

repeat(5) {

415

if (it == 3) throw RuntimeException("Simulated error")

416

send("Value $it")

417

}

418

} catch (e: Exception) {

419

println("Producer caught exception: ${e.message}")

420

throw e // Re-throw to signal completion with exception

421

}

422

}

423

```

424

425

### Channel Extensions

426

427

Utility functions for working with channels and converting between channels and other types.

428

429

```kotlin { .api }

430

/**

431

* Performs the given action for each received element.

432

*/

433

suspend fun <E> ReceiveChannel<E>.consumeEach(action: suspend (E) -> Unit)

434

435

/**

436

* Receives all elements from the channel and returns them as a List.

437

*/

438

suspend fun <E> ReceiveChannel<E>.toList(): List<E>

439

440

/**

441

* Creates a produce block from this channel.

442

*/

443

fun <E> ReceiveChannel<E>.consumeAsFlow(): Flow<E>

444

445

/**

446

* Converts this Flow to a ReceiveChannel.

447

*/

448

fun <T> Flow<T>.produceIn(scope: CoroutineScope): ReceiveChannel<T>

449

450

/**

451

* Returns a channel of [SendChannel] that feeds all elements from this channel.

452

*/

453

fun <E> ReceiveChannel<E>.broadcast(capacity: Int = 1): BroadcastChannel<E>

454

```

455

456

**Usage Examples:**

457

458

```kotlin

459

import kotlinx.coroutines.*

460

import kotlinx.coroutines.channels.*

461

import kotlinx.coroutines.flow.*

462

463

val scope = MainScope()

464

465

// Channel to list

466

scope.launch {

467

val channel = produce {

468

repeat(5) { send(it) }

469

}

470

471

val list = channel.toList()

472

println("Channel as list: $list")

473

}

474

475

// Channel to flow

476

scope.launch {

477

val channel = produce {

478

repeat(3) {

479

send("Item $it")

480

delay(100)

481

}

482

}

483

484

channel.consumeAsFlow()

485

.map { it.uppercase() }

486

.collect { println("From channel flow: $it") }

487

}

488

489

// Flow to channel

490

scope.launch {

491

val flow = flowOf(1, 2, 3, 4, 5)

492

val channel = flow.produceIn(scope)

493

494

for (value in channel) {

495

println("From flow channel: $value")

496

}

497

}

498

499

// ConsumeEach

500

scope.launch {

501

val channel = produce {

502

repeat(3) { send("Message $it") }

503

}

504

505

channel.consumeEach { message ->

506

println("Processing: $message")

507

delay(50)

508

}

509

}

510

```

511

512

### Select Support for Channels

513

514

Use channels in select expressions for awaiting multiple channel operations.

515

516

```kotlin { .api }

517

/**

518

* Select clause using the send suspending function as a select clause.

519

*/

520

val <E> SendChannel<E>.onSend: SelectClause2<E, SendChannel<E>>

521

522

/**

523

* Select clause using the receive suspending function as a select clause.

524

*/

525

val <E> ReceiveChannel<E>.onReceive: SelectClause1<E>

526

527

/**

528

* Select clause using the receiveCatching suspending function as a select clause.

529

*/

530

val <E> ReceiveChannel<E>.onReceiveCatching: SelectClause1<ChannelResult<E>>

531

```

532

533

**Usage Examples:**

534

535

```kotlin

536

import kotlinx.coroutines.*

537

import kotlinx.coroutines.channels.*

538

import kotlinx.coroutines.selects.*

539

540

val scope = MainScope()

541

542

scope.launch {

543

val channel1 = Channel<String>()

544

val channel2 = Channel<String>()

545

546

// Producer for channel1

547

launch {

548

delay(100)

549

channel1.send("From channel 1")

550

}

551

552

// Producer for channel2

553

launch {

554

delay(200)

555

channel2.send("From channel 2")

556

}

557

558

// Select from multiple channels

559

val result = select<String> {

560

channel1.onReceive { value ->

561

"Received from channel1: $value"

562

}

563

channel2.onReceive { value ->

564

"Received from channel2: $value"

565

}

566

}

567

568

println(result) // Will print result from channel1 (faster)

569

570

channel1.close()

571

channel2.close()

572

}

573

574

// Select with send operations

575

scope.launch {

576

val fastChannel = Channel<Int>(Channel.UNLIMITED)

577

val slowChannel = Channel<Int>()

578

579

// Try to send to whichever channel can accept first

580

val sendResult = select<String> {

581

fastChannel.onSend(1) { channel ->

582

"Sent to fast channel"

583

}

584

slowChannel.onSend(2) { channel ->

585

"Sent to slow channel"

586

}

587

}

588

589

println(sendResult) // Will likely send to fast channel

590

591

fastChannel.close()

592

slowChannel.close()

593

}

594

```

595

596

### Advanced Channel Patterns

597

598

Common patterns and best practices for channel usage.

599

600

**Fan-out Pattern:**

601

602

```kotlin

603

import kotlinx.coroutines.*

604

import kotlinx.coroutines.channels.*

605

606

val scope = MainScope()

607

608

// Single producer, multiple consumers

609

fun fanOut() = scope.produce {

610

var x = 1

611

while (true) {

612

send(x++)

613

delay(100)

614

}

615

}

616

617

scope.launch {

618

val producer = fanOut()

619

620

repeat(3) { id ->

621

launch {

622

for (value in producer) {

623

println("Consumer $id received: $value")

624

if (value >= 10) break

625

}

626

}

627

}

628

}

629

```

630

631

**Fan-in Pattern:**

632

633

```kotlin

634

// Multiple producers, single consumer

635

suspend fun fanIn(

636

input1: ReceiveChannel<String>,

637

input2: ReceiveChannel<String>

638

): ReceiveChannel<String> = scope.produce {

639

var input1Active = true

640

var input2Active = true

641

642

while (input1Active || input2Active) {

643

select<Unit> {

644

if (input1Active) {

645

input1.onReceiveCatching { result ->

646

result.onSuccess { send("From input1: $it") }

647

.onClosed { input1Active = false }

648

}

649

}

650

if (input2Active) {

651

input2.onReceiveCatching { result ->

652

result.onSuccess { send("From input2: $it") }

653

.onClosed { input2Active = false }

654

}

655

}

656

}

657

}

658

}

659

```

660

661

**Pipeline Pattern:**

662

663

```kotlin

664

// Chain of processing stages

665

fun numbers() = scope.produce {

666

var x = 1

667

while (true) send(x++)

668

}

669

670

fun square(numbers: ReceiveChannel<Int>) = scope.produce {

671

for (x in numbers) send(x * x)

672

}

673

674

fun print(numbers: ReceiveChannel<Int>) = scope.launch {

675

for (x in numbers) println(x)

676

}

677

678

// Usage

679

val numbersPipeline = numbers()

680

val squaredPipeline = square(numbersPipeline)

681

print(squaredPipeline)

682

```