or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdcoroutine-builders.mddispatchers.mdexception-handling.mdflow-api.mdindex.mdjobs-deferreds.mdstructured-concurrency.mdsynchronization.md

flow-api.mddocs/

0

# Flow API - Reactive Streams

1

2

Asynchronous data streams with rich operator support for reactive programming patterns. Flow provides a cold stream implementation that enables declarative handling of asynchronous data sequences.

3

4

## Capabilities

5

6

### Core Flow Types

7

8

The foundational interfaces for reactive stream processing in kotlinx-coroutines.

9

10

```kotlin { .api }

11

/**

12

* An asynchronous data stream that sequentially emits values

13

* and completes normally or with an exception.

14

*/

15

interface Flow<out T> {

16

/**

17

* Accepts the given collector and emits values into it.

18

* This method should never be called directly.

19

*/

20

suspend fun collect(collector: FlowCollector<T>)

21

}

22

23

/**

24

* FlowCollector is used as an intermediate or a terminal collector of flow.

25

*/

26

interface FlowCollector<in T> {

27

/**

28

* Collects the value emitted by the upstream.

29

*/

30

suspend fun emit(value: T)

31

}

32

```

33

34

### Flow Builders

35

36

Functions to create flows from various sources and patterns.

37

38

```kotlin { .api }

39

/**

40

* Creates a cold flow from the given suspendable block.

41

* The flow being built is collected concurrently with the

42

* building block execution.

43

*/

44

fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>

45

46

/**

47

* Creates a flow that produces the given values.

48

*/

49

fun <T> flowOf(vararg elements: T): Flow<T>

50

51

/**

52

* Creates a flow that produces no values.

53

*/

54

fun <T> emptyFlow(): Flow<T>

55

56

/**

57

* Creates a flow from Iterable, Iterator, or arrays.

58

*/

59

fun <T> Iterable<T>.asFlow(): Flow<T>

60

fun <T> Iterator<T>.asFlow(): Flow<T>

61

fun <T> Array<out T>.asFlow(): Flow<T>

62

63

/**

64

* Creates a flow using channels which can be used from multiple coroutines.

65

* The resulting flow is a cold flow.

66

*/

67

fun <T> channelFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>

68

69

/**

70

* Creates a flow suitable for use with callback-based APIs.

71

*/

72

fun <T> callbackFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>

73

```

74

75

**Usage Examples:**

76

77

```kotlin

78

import kotlinx.coroutines.*

79

import kotlinx.coroutines.flow.*

80

81

// Basic flow creation

82

val simpleFlow = flow {

83

for (i in 1..5) {

84

emit(i)

85

delay(100)

86

}

87

}

88

89

// Flow from values

90

val valuesFlow = flowOf(1, 2, 3, 4, 5)

91

92

// Flow from collections

93

val listFlow = listOf("a", "b", "c").asFlow()

94

95

// Channel flow for concurrent emissions

96

val channelBasedFlow = channelFlow {

97

launch {

98

repeat(3) {

99

send("From coroutine 1: $it")

100

delay(100)

101

}

102

}

103

launch {

104

repeat(3) {

105

send("From coroutine 2: $it")

106

delay(150)

107

}

108

}

109

}

110

111

// Callback flow for bridging callback APIs

112

val callbackBasedFlow = callbackFlow {

113

val callback = object : EventCallback {

114

override fun onEvent(data: String) {

115

trySend(data)

116

}

117

override fun onError(error: Exception) {

118

close(error)

119

}

120

}

121

122

eventSource.registerCallback(callback)

123

124

awaitClose {

125

eventSource.unregisterCallback(callback)

126

}

127

}

128

```

129

130

### Intermediate Operators

131

132

Transform, filter, and manipulate flow emissions.

133

134

```kotlin { .api }

135

/**

136

* Returns a flow containing the results of applying the given transform

137

* function to each value of the original flow.

138

*/

139

fun <T, R> Flow<T>.map(transform: suspend (value: T) -> R): Flow<R>

140

141

/**

142

* Returns a flow containing only values of the original flow that match

143

* the given predicate.

144

*/

145

fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T>

146

147

/**

148

* Applies the given transform function to each value and emits all

149

* elements returned by transform function.

150

*/

151

fun <T, R> Flow<T>.transform(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>

152

153

/**

154

* Returns a flow that performs the given action on each value of the

155

* original flow as it passes through.

156

*/

157

fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>

158

159

/**

160

* Returns a flow that invokes the given action before this flow starts

161

* to be collected.

162

*/

163

fun <T> Flow<T>.onStart(action: suspend FlowCollector<T>.() -> Unit): Flow<T>

164

165

/**

166

* Returns a flow that invokes the given action after the flow is completed

167

* or cancelled.

168

*/

169

fun <T> Flow<T>.onCompletion(action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit): Flow<T>

170

```

171

172

**Usage Examples:**

173

174

```kotlin

175

import kotlinx.coroutines.*

176

import kotlinx.coroutines.flow.*

177

178

val scope = MainScope()

179

180

scope.launch {

181

flowOf(1, 2, 3, 4, 5)

182

.map { it * 2 } // Transform each value

183

.filter { it > 4 } // Keep only values > 4

184

.onEach { println("Processing: $it") } // Side effect

185

.collect { value ->

186

println("Collected: $value")

187

}

188

// Output: Processing: 6, Collected: 6, Processing: 8, Collected: 8, Processing: 10, Collected: 10

189

}

190

191

// Complex transformation

192

scope.launch {

193

flow {

194

emit("hello")

195

emit("world")

196

}

197

.transform { value ->

198

emit(value.uppercase())

199

emit(value.length)

200

}

201

.collect { println(it) }

202

// Output: HELLO, 5, WORLD, 5

203

}

204

205

// Flow lifecycle hooks

206

scope.launch {

207

flowOf(1, 2, 3)

208

.onStart { emit(0) } // Emit 0 before starting

209

.onCompletion { emit(-1) } // Emit -1 after completion

210

.collect { println("Value: $it") }

211

// Output: Value: 0, Value: 1, Value: 2, Value: 3, Value: -1

212

}

213

```

214

215

### Flow Context and Threading

216

217

Control the execution context and threading behavior of flows.

218

219

```kotlin { .api }

220

/**

221

* Changes the context where this flow is executed to the given context.

222

* All operations upstream of flowOn are executed in the provided context.

223

*/

224

fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>

225

226

/**

227

* Buffers flow emissions via channel with given capacity and runs collector

228

* in a separate coroutine.

229

*/

230

fun <T> Flow<T>.buffer(

231

capacity: Int = Channel.BUFFERED,

232

onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND

233

): Flow<T>

234

235

/**

236

* Conflates flow emissions via conflated channel and runs collector in a

237

* separate coroutine. Latest emitted value overwrites previous values.

238

*/

239

fun <T> Flow<T>.conflate(): Flow<T>

240

```

241

242

**Usage Examples:**

243

244

```kotlin

245

import kotlinx.coroutines.*

246

import kotlinx.coroutines.flow.*

247

248

val scope = MainScope()

249

250

scope.launch {

251

flow {

252

println("Emitting on: ${Thread.currentThread().name}")

253

for (i in 1..3) {

254

emit(i)

255

delay(100)

256

}

257

}

258

.flowOn(Dispatchers.IO) // Upstream operations run on IO dispatcher

259

.collect { value ->

260

println("Collecting $value on: ${Thread.currentThread().name}")

261

}

262

}

263

264

// Buffering for performance

265

scope.launch {

266

flow {

267

repeat(100) {

268

emit(it)

269

delay(10) // Slow producer

270

}

271

}

272

.buffer(10) // Buffer up to 10 items

273

.collect { value ->

274

delay(50) // Slow consumer

275

println("Processed: $value")

276

}

277

}

278

279

// Conflation for latest-value semantics

280

scope.launch {

281

flow {

282

repeat(100) {

283

emit(it)

284

delay(10)

285

}

286

}

287

.conflate() // Only keep latest value when consumer is slow

288

.collect { value ->

289

delay(100) // Very slow consumer

290

println("Latest value: $value")

291

}

292

}

293

```

294

295

### Terminal Operators

296

297

Consume flow values and produce final results.

298

299

```kotlin { .api }

300

/**

301

* Terminal flow operator that collects the given flow with a provided

302

* action that is applied to each emitted element.

303

*/

304

suspend fun <T> Flow<T>.collect(action: suspend (value: T) -> Unit)

305

306

/**

307

* Terminal operator that collects the given flow ensuring that emissions

308

* from upstream are performed on the current coroutine context.

309

*/

310

suspend fun <T> Flow<T>.collectLatest(action: suspend (value: T) -> Unit)

311

312

/**

313

* Collects all elements from the flow and returns them as a List.

314

*/

315

suspend fun <T> Flow<T>.toList(): List<T>

316

317

/**

318

* Collects all elements from the flow and returns them as a Set.

319

*/

320

suspend fun <T> Flow<T>.toSet(): Set<T>

321

322

/**

323

* Returns the first element emitted by the flow and cancels flow's collection.

324

*/

325

suspend fun <T> Flow<T>.first(): T

326

327

/**

328

* Returns the first element emitted by the flow matching the predicate.

329

*/

330

suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T

331

332

/**

333

* Returns the single element emitted by the flow.

334

*/

335

suspend fun <T> Flow<T>.single(): T

336

337

/**

338

* Accumulates value starting with initial value and applying operation

339

* from left to right to current accumulator value and each element.

340

*/

341

suspend fun <T, R> Flow<T>.fold(initial: R, operation: suspend (acc: R, value: T) -> R): R

342

343

/**

344

* Accumulates value starting with the first element and applying operation

345

* from left to right to current accumulator value and each element.

346

*/

347

suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

348

349

/**

350

* Returns the number of elements in this flow.

351

*/

352

suspend fun <T> Flow<T>.count(): Int

353

354

/**

355

* Returns the number of elements matching the given predicate.

356

*/

357

suspend fun <T> Flow<T>.count(predicate: suspend (T) -> Boolean): Int

358

```

359

360

**Usage Examples:**

361

362

```kotlin

363

import kotlinx.coroutines.*

364

import kotlinx.coroutines.flow.*

365

366

val scope = MainScope()

367

368

scope.launch {

369

val numbers = flowOf(1, 2, 3, 4, 5)

370

371

// Collect to list

372

val list = numbers.toList()

373

println("List: $list")

374

375

// Get first element

376

val first = numbers.first()

377

println("First: $first")

378

379

// Find first matching predicate

380

val firstEven = numbers.first { it % 2 == 0 }

381

println("First even: $firstEven")

382

383

// Fold operation

384

val sum = numbers.fold(0) { acc, value -> acc + value }

385

println("Sum: $sum")

386

387

// Reduce operation

388

val product = numbers.reduce { acc, value -> acc * value }

389

println("Product: $product")

390

391

// Count elements

392

val evenCount = numbers.count { it % 2 == 0 }

393

println("Even numbers: $evenCount")

394

}

395

396

// collectLatest cancels previous collection

397

scope.launch {

398

flow {

399

repeat(5) {

400

emit(it)

401

delay(100)

402

}

403

}

404

.collectLatest { value ->

405

println("Processing $value")

406

delay(200) // Slow processing

407

println("Finished $value") // Only last value will finish

408

}

409

}

410

```

411

412

### SharedFlow and StateFlow

413

414

Hot flows that share emissions among multiple collectors.

415

416

```kotlin { .api }

417

/**

418

* A hot Flow that shares emitted values among all its collectors in a broadcast fashion.

419

*/

420

interface SharedFlow<out T> : Flow<T> {

421

/**

422

* A snapshot of the most recently emitted values into this shared flow.

423

*/

424

val replayCache: List<T>

425

426

/**

427

* The number of subscribers (active collectors) to this shared flow.

428

*/

429

val subscriptionCount: StateFlow<Int>

430

}

431

432

/**

433

* A mutable SharedFlow that provides functions to emit values to the flow.

434

*/

435

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {

436

/**

437

* Tries to emit a value to this flow without suspending the caller.

438

*/

439

fun tryEmit(value: T): Boolean

440

441

/**

442

* Resets the replayCache of this flow to an empty state.

443

*/

444

fun resetReplayCache()

445

446

/**

447

* The number of subscribers (active collectors) to this shared flow.

448

*/

449

override val subscriptionCount: StateFlow<Int>

450

}

451

452

/**

453

* Creates a MutableSharedFlow with the given configuration.

454

*/

455

fun <T> MutableSharedFlow(

456

replay: Int = 0,

457

extraBufferCapacity: Int = 0,

458

onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND

459

): MutableSharedFlow<T>

460

461

/**

462

* A SharedFlow that represents a read-only state with a single updatable value.

463

*/

464

interface StateFlow<out T> : SharedFlow<T> {

465

/**

466

* The current value of this state flow.

467

*/

468

val value: T

469

}

470

471

/**

472

* A mutable StateFlow that provides a setter for value.

473

*/

474

interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {

475

/**

476

* The current value of this state flow.

477

*/

478

override var value: T

479

480

/**

481

* Atomically compares the current value with expect and sets it to update if they are the same.

482

*/

483

fun compareAndSet(expect: T, update: T): Boolean

484

}

485

486

/**

487

* Creates a MutableStateFlow with the given initial value.

488

*/

489

fun <T> MutableStateFlow(value: T): MutableStateFlow<T>

490

491

/**

492

* Updates the MutableStateFlow.value atomically using the specified function of its value.

493

*/

494

fun <T> MutableStateFlow<T>.update(function: (T) -> T)

495

496

/**

497

* Updates the MutableStateFlow.value atomically using the specified function of its value and returns the new value.

498

*/

499

fun <T> MutableStateFlow<T>.updateAndGet(function: (T) -> T): T

500

501

/**

502

* Updates the MutableStateFlow.value atomically using the specified function of its value and returns its prior value.

503

*/

504

fun <T> MutableStateFlow<T>.getAndUpdate(function: (T) -> T): T

505

```

506

507

**Usage Examples:**

508

509

```kotlin

510

import kotlinx.coroutines.*

511

import kotlinx.coroutines.flow.*

512

513

val scope = MainScope()

514

515

// SharedFlow example

516

val sharedFlow = MutableSharedFlow<String>(replay = 2)

517

518

// Multiple collectors

519

scope.launch {

520

sharedFlow.collect { value ->

521

println("Collector 1: $value")

522

}

523

}

524

525

scope.launch {

526

sharedFlow.collect { value ->

527

println("Collector 2: $value")

528

}

529

}

530

531

// Emit values

532

scope.launch {

533

sharedFlow.emit("First")

534

delay(100)

535

sharedFlow.emit("Second")

536

delay(100)

537

sharedFlow.emit("Third")

538

}

539

540

// StateFlow example

541

val stateFlow = MutableStateFlow("Initial")

542

543

// Observe state changes

544

scope.launch {

545

stateFlow.collect { state ->

546

println("Current state: $state")

547

}

548

}

549

550

// Update state

551

scope.launch {

552

delay(1000)

553

stateFlow.value = "Updated"

554

delay(1000)

555

stateFlow.value = "Final"

556

}

557

558

// Atomic updates using update functions

559

val counter = MutableStateFlow(0)

560

scope.launch {

561

// Safe concurrent increment

562

counter.update { it + 1 }

563

564

// Get new value after update

565

val newValue = counter.updateAndGet { it * 2 }

566

println("New value: $newValue")

567

568

// Get old value before update

569

val oldValue = counter.getAndUpdate { it - 5 }

570

println("Old value: $oldValue")

571

}

572

573

// StateFlow always has current value

574

println("Immediate value: ${stateFlow.value}")

575

```

576

577

### Flow Sharing Operators

578

579

Convert cold flows to hot flows with sharing behavior.

580

581

```kotlin { .api }

582

/**

583

* Converts this cold Flow into a hot SharedFlow that is started in the given

584

* coroutine scope, sharing emissions from a single running instance of the upstream flow.

585

*/

586

fun <T> Flow<T>.shareIn(

587

scope: CoroutineScope,

588

started: SharingStarted,

589

replay: Int = 0

590

): SharedFlow<T>

591

592

/**

593

* Converts this cold Flow into a hot StateFlow that is started in the given

594

* coroutine scope, sharing the most recently emitted value from a single running

595

* instance of the upstream flow.

596

*/

597

fun <T> Flow<T>.stateIn(

598

scope: CoroutineScope,

599

started: SharingStarted,

600

initialValue: T

601

): StateFlow<T>

602

603

/**

604

* A policy for starting and stopping the sharing coroutine in shareIn and stateIn operators.

605

*/

606

interface SharingStarted {

607

companion object {

608

val Eagerly: SharingStarted

609

val Lazily: SharingStarted

610

611

fun WhileSubscribed(

612

stopTimeoutMillis: Long = Long.MAX_VALUE,

613

replayExpirationMillis: Long = Long.MAX_VALUE

614

): SharingStarted

615

}

616

}

617

```

618

619

**Usage Examples:**

620

621

```kotlin

622

import kotlinx.coroutines.*

623

import kotlinx.coroutines.flow.*

624

625

val scope = MainScope()

626

627

// Cold flow

628

val coldFlow = flow {

629

println("Flow started")

630

repeat(5) {

631

emit(it)

632

delay(1000)

633

}

634

}

635

636

// Convert to hot SharedFlow

637

val hotSharedFlow = coldFlow.shareIn(

638

scope = scope,

639

started = SharingStarted.WhileSubscribed(),

640

replay = 1

641

)

642

643

// Multiple collectors share the same flow instance

644

scope.launch {

645

delay(2000) // Start collecting after 2 seconds

646

hotSharedFlow.collect { value ->

647

println("Late collector: $value")

648

}

649

}

650

651

scope.launch {

652

hotSharedFlow.collect { value ->

653

println("Early collector: $value")

654

}

655

}

656

657

// Convert to StateFlow

658

val dataFlow = flow {

659

emit("Loading...")

660

delay(2000)

661

emit("Data loaded")

662

}

663

664

val dataState = dataFlow.stateIn(

665

scope = scope,

666

started = SharingStarted.Lazily,

667

initialValue = "Initial"

668

)

669

670

// State is immediately available

671

println("Current state: ${dataState.value}")

672

673

scope.launch {

674

dataState.collect { state ->

675

println("State changed: $state")

676

}

677

}

678

```

679

680

### Flow Combination Operators

681

682

Combine multiple flows into single flows with various strategies.

683

684

```kotlin { .api }

685

/**

686

* Zips values from the current flow with other flow using provided transform

687

* function applied to each pair of values.

688

*/

689

fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>

690

691

/**

692

* Returns a flow whose values are generated by transform function by combining

693

* the most recently emitted values by each flow.

694

*/

695

fun <T1, T2, R> Flow<T1>.combine(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R>

696

697

/**

698

* Flattens the given flow of flows into a single flow by merging emissions.

699

*/

700

fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = DEFAULT_CONCURRENCY): Flow<T>

701

702

/**

703

* Transforms elements emitted by the original flow by applying transform that

704

* returns another flow, and then merging the resulting flows.

705

*/

706

fun <T, R> Flow<T>.flatMapMerge(

707

concurrency: Int = DEFAULT_CONCURRENCY,

708

transform: suspend (value: T) -> Flow<R>

709

): Flow<R>

710

711

/**

712

* Transforms elements emitted by the original flow by applying transform that

713

* returns another flow, and then concatenating the resulting flows.

714

*/

715

fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>

716

717

/**

718

* Transforms elements emitted by the original flow by applying transform that

719

* returns another flow, and then flattening these flows by switching to new flows.

720

*/

721

fun <T, R> Flow<T>.flatMapLatest(transform: suspend (value: T) -> Flow<R>): Flow<R>

722

```

723

724

**Usage Examples:**

725

726

```kotlin

727

import kotlinx.coroutines.*

728

import kotlinx.coroutines.flow.*

729

730

val scope = MainScope()

731

732

scope.launch {

733

// Zip - waits for both flows to emit

734

val flow1 = flowOf(1, 2, 3).onEach { delay(100) }

735

val flow2 = flowOf("A", "B", "C").onEach { delay(150) }

736

737

flow1.zip(flow2) { num, letter ->

738

"$num$letter"

739

}.collect { println("Zipped: $it") }

740

// Output: Zipped: 1A, Zipped: 2B, Zipped: 3C

741

}

742

743

scope.launch {

744

// Combine - uses latest values from both

745

val numbers = flow {

746

repeat(3) {

747

emit(it)

748

delay(100)

749

}

750

}

751

val letters = flow {

752

repeat(3) {

753

emit('A' + it)

754

delay(150)

755

}

756

}

757

758

numbers.combine(letters) { num, letter ->

759

"$num$letter"

760

}.collect { println("Combined: $it") }

761

}

762

763

scope.launch {

764

// FlatMap merge - run multiple flows concurrently

765

flowOf(1, 2, 3)

766

.flatMapMerge { value ->

767

flow {

768

repeat(2) {

769

emit("$value-$it")

770

delay(100)

771

}

772

}

773

}

774

.collect { println("Merged: $it") }

775

}

776

777

scope.launch {

778

// FlatMap latest - cancel previous when new value arrives

779

flow {

780

emit(1)

781

delay(200)

782

emit(2)

783

delay(200)

784

emit(3)

785

}

786

.flatMapLatest { value ->

787

flow {

788

repeat(5) {

789

emit("$value-$it")

790

delay(100)

791

}

792

}

793

}

794

.collect { println("Latest: $it") }

795

}

796

```