or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdcoroutine-builders.mddispatchers.mdexception-handling.mdflow-api.mdindex.mdjob-management.mdjvm-integration.mdsynchronization.md

channels.mddocs/

0

# Channels

1

2

Message passing primitives for communication between coroutines with configurable capacity, buffering strategies, and bi-directional communication patterns.

3

4

## Capabilities

5

6

### Channel Interface

7

8

Combines sending and receiving capabilities for bi-directional communication.

9

10

```kotlin { .api }

11

interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {

12

/** Channel capacity constants */

13

companion object {

14

const val UNLIMITED: Int = Int.MAX_VALUE

15

const val CONFLATED: Int = -1

16

const val RENDEZVOUS: Int = 0

17

const val BUFFERED: Int = -2

18

}

19

}

20

21

/** Creates a channel with specified capacity */

22

fun <E> Channel(

23

capacity: Int = RENDEZVOUS,

24

onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,

25

onUndeliveredElement: ((E) -> Unit)? = null

26

): Channel<E>

27

```

28

29

**Usage Examples:**

30

31

```kotlin

32

import kotlinx.coroutines.*

33

import kotlinx.coroutines.channels.*

34

35

fun main() = runBlocking {

36

// Rendezvous channel (capacity = 0)

37

val rendezvousChannel = Channel<Int>()

38

39

launch {

40

repeat(3) { i ->

41

println("Sending $i")

42

rendezvousChannel.send(i)

43

println("Sent $i")

44

}

45

rendezvousChannel.close()

46

}

47

48

launch {

49

for (value in rendezvousChannel) {

50

println("Received $value")

51

delay(100) // Simulate processing

52

}

53

}

54

55

delay(1000)

56

57

// Buffered channel

58

val bufferedChannel = Channel<String>(capacity = 3)

59

60

// Can send up to capacity without suspending

61

bufferedChannel.send("Message 1")

62

bufferedChannel.send("Message 2")

63

bufferedChannel.send("Message 3")

64

65

println("Sent 3 messages without suspending")

66

67

// This would suspend until space is available

68

launch {

69

bufferedChannel.send("Message 4")

70

println("Sent message 4")

71

}

72

73

// Receive messages

74

repeat(4) {

75

val message = bufferedChannel.receive()

76

println("Received: $message")

77

}

78

79

bufferedChannel.close()

80

}

81

```

82

83

### SendChannel Interface

84

85

Sending side of a channel for message production.

86

87

```kotlin { .api }

88

interface SendChannel<in E> {

89

/** True if channel is closed for sending */

90

val isClosedForSend: Boolean

91

92

/** Suspends until element can be sent */

93

suspend fun send(element: E)

94

95

/** Tries to send element immediately */

96

fun trySend(element: E): ChannelResult<Unit>

97

98

/** Closes the channel with optional cause */

99

fun close(cause: Throwable? = null): Boolean

100

101

/** Registers handler for when channel is closed */

102

fun invokeOnClose(handler: (cause: Throwable?) -> Unit)

103

}

104

```

105

106

**Usage Examples:**

107

108

```kotlin

109

import kotlinx.coroutines.*

110

import kotlinx.coroutines.channels.*

111

112

fun main() = runBlocking {

113

val channel = Channel<Int>(capacity = 2)

114

115

// Producer coroutine

116

val producer = launch {

117

try {

118

repeat(5) { i ->

119

println("Trying to send $i")

120

121

// Try to send without suspending first

122

val result = channel.trySend(i)

123

if (result.isSuccess) {

124

println("Sent $i immediately")

125

} else {

126

println("Buffer full, suspending to send $i")

127

channel.send(i)

128

println("Sent $i after suspending")

129

}

130

131

delay(100)

132

}

133

} catch (e: Exception) {

134

println("Producer failed: ${e.message}")

135

} finally {

136

channel.close()

137

println("Channel closed by producer")

138

}

139

}

140

141

// Consumer coroutine (slower than producer)

142

launch {

143

try {

144

while (!channel.isClosedForReceive) {

145

val value = channel.receive()

146

println("Received $value")

147

delay(300) // Slower consumer

148

}

149

} catch (e: ClosedReceiveChannelException) {

150

println("Channel was closed")

151

}

152

}

153

154

producer.join()

155

delay(1000)

156

}

157

```

158

159

### ReceiveChannel Interface

160

161

Receiving side of a channel for message consumption.

162

163

```kotlin { .api }

164

interface ReceiveChannel<out E> {

165

/** True if channel is closed for receiving */

166

val isClosedForReceive: Boolean

167

168

/** True if channel is empty */

169

val isEmpty: Boolean

170

171

/** Suspends until element is available */

172

suspend fun receive(): E

173

174

/** Tries to receive element immediately */

175

fun tryReceive(): ChannelResult<E>

176

177

/** Receives element or close/failure result */

178

suspend fun receiveCatching(): ChannelResult<E>

179

180

/** Iterator for consuming channel */

181

operator fun iterator(): ChannelIterator<E>

182

183

/** Cancels reception from channel */

184

fun cancel(cause: CancellationException? = null)

185

}

186

187

interface ChannelIterator<out E> {

188

suspend fun hasNext(): Boolean

189

suspend fun next(): E

190

}

191

```

192

193

**Usage Examples:**

194

195

```kotlin

196

import kotlinx.coroutines.*

197

import kotlinx.coroutines.channels.*

198

199

fun main() = runBlocking {

200

val channel = Channel<String>(capacity = Channel.UNLIMITED)

201

202

// Fill channel with data

203

launch {

204

repeat(5) { i ->

205

channel.send("Item $i")

206

}

207

channel.close()

208

}

209

210

// Different ways to consume

211

212

// 1. Using iterator (for-in loop)

213

println("Method 1: for-in loop")

214

for (item in channel) {

215

println("Received: $item")

216

}

217

218

// Refill for next example

219

val channel2 = Channel<String>()

220

launch {

221

repeat(3) { i ->

222

channel2.send("Data $i")

223

}

224

channel2.close()

225

}

226

227

// 2. Using receiveCatching for error handling

228

println("Method 2: receiveCatching")

229

while (true) {

230

val result = channel2.receiveCatching()

231

if (result.isSuccess) {

232

println("Received: ${result.getOrNull()}")

233

} else {

234

println("Channel closed or failed")

235

break

236

}

237

}

238

239

// 3. Using tryReceive for non-blocking

240

val channel3 = Channel<Int>(capacity = 3)

241

channel3.trySend(1)

242

channel3.trySend(2)

243

244

println("Method 3: tryReceive")

245

while (true) {

246

val result = channel3.tryReceive()

247

if (result.isSuccess) {

248

println("Received immediately: ${result.getOrNull()}")

249

} else {

250

println("No data available")

251

break

252

}

253

}

254

255

channel3.close()

256

}

257

```

258

259

### Channel Factory Functions

260

261

Functions for creating channels with different characteristics.

262

263

```kotlin { .api }

264

/** Creates unlimited capacity channel */

265

fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E>

266

267

/** Creates produce-consumer pattern */

268

fun <E> CoroutineScope.produce(

269

context: CoroutineContext = EmptyCoroutineContext,

270

capacity: Int = 0,

271

onCompletion: CompletionHandler? = null,

272

block: suspend ProducerScope<E>.() -> Unit

273

): ReceiveChannel<E>

274

275

/** Creates actor pattern */

276

fun <E> CoroutineScope.actor(

277

context: CoroutineContext = EmptyCoroutineContext,

278

capacity: Int = 0,

279

start: CoroutineStart = CoroutineStart.DEFAULT,

280

onCompletion: CompletionHandler? = null,

281

block: suspend ActorScope<E>.() -> Unit

282

): SendChannel<E>

283

```

284

285

**Usage Examples:**

286

287

```kotlin

288

import kotlinx.coroutines.*

289

import kotlinx.coroutines.channels.*

290

291

fun main() = runBlocking {

292

// Producer pattern

293

val receiveChannel = produce {

294

repeat(5) { i ->

295

send("Produced $i")

296

delay(100)

297

}

298

}

299

300

for (item in receiveChannel) {

301

println("Consumed: $item")

302

}

303

304

// Actor pattern

305

val sendChannel = actor<String> {

306

for (message in channel) {

307

println("Actor processing: $message")

308

delay(50)

309

}

310

}

311

312

repeat(3) { i ->

313

sendChannel.send("Message $i")

314

}

315

316

sendChannel.close()

317

delay(200)

318

}

319

320

// Advanced producer example

321

fun CoroutineScope.numberProducer(max: Int) = produce<Int> {

322

for (i in 1..max) {

323

send(i)

324

delay(100)

325

}

326

}

327

328

fun CoroutineScope.squareProcessor(input: ReceiveChannel<Int>) = produce<Int> {

329

for (number in input) {

330

send(number * number)

331

}

332

}

333

334

suspend fun pipelineExample() = coroutineScope {

335

val numbers = numberProducer(5)

336

val squares = squareProcessor(numbers)

337

338

for (square in squares) {

339

println("Square: $square")

340

}

341

}

342

```

343

344

### Broadcast Channels (Deprecated)

345

346

Legacy broadcast functionality (deprecated in favor of SharedFlow).

347

348

```kotlin { .api }

349

@Deprecated("Use SharedFlow instead")

350

interface BroadcastChannel<E> : SendChannel<E> {

351

fun openSubscription(): ReceiveChannel<E>

352

fun cancel(cause: CancellationException? = null)

353

}

354

355

@Deprecated("Use SharedFlow instead")

356

fun <E> BroadcastChannel(capacity: Int): BroadcastChannel<E>

357

```

358

359

### Channel Capacity Types

360

361

Different capacity configurations for channels.

362

363

```kotlin { .api }

364

companion object Channel {

365

/** Unlimited capacity - never suspends sends */

366

const val UNLIMITED: Int = Int.MAX_VALUE

367

368

/** Conflated - keeps only the latest value */

369

const val CONFLATED: Int = -1

370

371

/** Rendezvous - zero capacity, direct handoff */

372

const val RENDEZVOUS: Int = 0

373

374

/** Default buffered capacity */

375

const val BUFFERED: Int = -2

376

}

377

```

378

379

**Usage Examples:**

380

381

```kotlin

382

import kotlinx.coroutines.*

383

import kotlinx.coroutines.channels.*

384

385

fun main() = runBlocking {

386

// Unlimited channel - never blocks sends

387

val unlimitedChannel = Channel<Int>(Channel.UNLIMITED)

388

389

launch {

390

repeat(1000) { i ->

391

unlimitedChannel.send(i) // Never suspends

392

}

393

unlimitedChannel.close()

394

}

395

396

var count = 0

397

for (value in unlimitedChannel) {

398

count++

399

}

400

println("Received $count items from unlimited channel")

401

402

// Conflated channel - only latest value

403

val conflatedChannel = Channel<String>(Channel.CONFLATED)

404

405

launch {

406

repeat(5) { i ->

407

conflatedChannel.send("Value $i")

408

println("Sent: Value $i")

409

}

410

conflatedChannel.close()

411

}

412

413

delay(100) // Let all sends complete

414

415

// Will only receive the latest value

416

for (value in conflatedChannel) {

417

println("Conflated received: $value")

418

}

419

}

420

```

421

422

### Channel Result

423

424

Result type for non-blocking channel operations.

425

426

```kotlin { .api }

427

value class ChannelResult<out T> {

428

val isSuccess: Boolean

429

val isClosed: Boolean

430

val isFailure: Boolean

431

432

fun getOrNull(): T?

433

fun getOrThrow(): T

434

fun exceptionOrNull(): Throwable?

435

}

436

```

437

438

**Usage Examples:**

439

440

```kotlin

441

import kotlinx.coroutines.*

442

import kotlinx.coroutines.channels.*

443

444

fun main() = runBlocking {

445

val channel = Channel<Int>(capacity = 1)

446

447

// Non-blocking send

448

val sendResult1 = channel.trySend(1)

449

println("Send 1 success: ${sendResult1.isSuccess}")

450

451

val sendResult2 = channel.trySend(2)

452

println("Send 2 success: ${sendResult2.isSuccess}") // false - buffer full

453

454

// Non-blocking receive

455

val receiveResult1 = channel.tryReceive()

456

if (receiveResult1.isSuccess) {

457

println("Received: ${receiveResult1.getOrNull()}")

458

}

459

460

val receiveResult2 = channel.tryReceive()

461

println("Receive 2 success: ${receiveResult2.isSuccess}") // false - empty

462

463

channel.close()

464

465

// Receive from closed channel

466

val receiveResult3 = channel.tryReceive()

467

println("Receive from closed: ${receiveResult3.isClosed}")

468

}

469

```

470

471

## Types

472

473

### ProducerScope and ActorScope

474

475

Scopes for producer and actor patterns.

476

477

```kotlin { .api }

478

interface ProducerScope<in E> : CoroutineScope, SendChannel<E>

479

480

interface ActorScope<E> : CoroutineScope, ReceiveChannel<E> {

481

val channel: Channel<E>

482

}

483

```

484

485

### BufferOverflow

486

487

Strategy for handling buffer overflow in channels.

488

489

```kotlin { .api }

490

enum class BufferOverflow {

491

/** Suspend sender when buffer is full */

492

SUSPEND,

493

/** Drop oldest element when buffer is full */

494

DROP_OLDEST,

495

/** Drop latest element when buffer is full */

496

DROP_LATEST

497

}

498

```