or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdcoroutine-builders.mdcoroutine-management.mddispatchers.mderror-handling.mdflow-api.mdindex.mdselect-expression.mdsynchronization.md

flow-api.mddocs/

0

# Flow API

1

2

Cold reactive streams with comprehensive transformation operators, hot flows for state and event broadcasting. The Flow API provides a complete solution for asynchronous data streams with structured concurrency support.

3

4

## Capabilities

5

6

### Cold Flow Interface

7

8

Base interfaces for cold streams that are created fresh for each collector.

9

10

```kotlin { .api }

11

/**

12

* Represents an asynchronous flow of values

13

*/

14

interface Flow<out T> {

15

/** Collects the flow values with the provided collector */

16

suspend fun collect(collector: FlowCollector<T>)

17

}

18

19

/**

20

* Collector for flow values

21

*/

22

interface FlowCollector<in T> {

23

/** Emit a value to the flow */

24

suspend fun emit(value: T)

25

}

26

27

/**

28

* Base class for flow implementations

29

*/

30

abstract class AbstractFlow<T> : Flow<T> {

31

/** Collect implementation that ensures proper flow context */

32

final override suspend fun collect(collector: FlowCollector<T>)

33

/** Abstract method for subclasses to implement */

34

abstract suspend fun collectSafely(collector: FlowCollector<T>)

35

}

36

```

37

38

### Flow Builders

39

40

Functions for creating flows from various sources.

41

42

```kotlin { .api }

43

/**

44

* Creates a flow with a builder block

45

* @param block the flow builder that emits values

46

* @return Flow that emits values from the builder

47

*/

48

fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>

49

50

/**

51

* Creates a flow from a sequence of values

52

* @param elements values to emit

53

* @return Flow that emits the provided values

54

*/

55

fun <T> flowOf(vararg elements: T): Flow<T>

56

57

/**

58

* Creates an empty flow

59

*/

60

fun <T> emptyFlow(): Flow<T>

61

62

/**

63

* Creates a flow from a collection

64

*/

65

fun <T> Iterable<T>.asFlow(): Flow<T>

66

67

/**

68

* Creates a flow from a sequence

69

*/

70

fun <T> Sequence<T>.asFlow(): Flow<T>

71

```

72

73

**Usage Examples:**

74

75

```kotlin

76

import kotlinx.coroutines.*

77

import kotlinx.coroutines.flow.*

78

79

// Basic flow creation

80

val numberFlow = flow {

81

for (i in 1..5) {

82

delay(100)

83

emit(i)

84

}

85

}

86

87

// Flow from values

88

val valueFlow = flowOf(1, 2, 3, 4, 5)

89

90

// Flow from collection

91

val listFlow = listOf("a", "b", "c").asFlow()

92

93

// Collect the flow

94

numberFlow.collect { value ->

95

println("Received: $value")

96

}

97

```

98

99

### Channel Flow Builder

100

101

Flow with concurrent emission capabilities for complex producers.

102

103

```kotlin { .api }

104

/**

105

* Creates a flow with a channel-based producer

106

* @param block producer block with ProducerScope

107

* @return Flow backed by a channel

108

*/

109

fun <T> channelFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>

110

111

/**

112

* Creates a flow from callback-based APIs

113

* @param block producer block with callback registration

114

* @return Flow that emits callback values

115

*/

116

fun <T> callbackFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>

117

```

118

119

**Usage Examples:**

120

121

```kotlin

122

import kotlinx.coroutines.*

123

import kotlinx.coroutines.flow.*

124

125

// Channel flow for concurrent emission

126

val concurrentFlow = channelFlow {

127

launch {

128

for (i in 1..5) {

129

send(i)

130

delay(100)

131

}

132

}

133

launch {

134

for (i in 6..10) {

135

send(i)

136

delay(150)

137

}

138

}

139

}

140

141

// Callback flow for integrating with callback APIs

142

val callbackBasedFlow = callbackFlow {

143

val listener = object : DataListener {

144

override fun onData(data: String) {

145

trySend(data)

146

}

147

override fun onComplete() {

148

close()

149

}

150

}

151

152

// Register listener

153

dataSource.addListener(listener)

154

155

// Cleanup when flow is cancelled

156

awaitClose {

157

dataSource.removeListener(listener)

158

}

159

}

160

```

161

162

### Transformation Operators

163

164

Operators for transforming flow values.

165

166

```kotlin { .api }

167

/**

168

* Transform each value emitted by the flow

169

*/

170

fun <T, R> Flow<T>.map(transform: suspend (value: T) -> R): Flow<R>

171

172

/**

173

* Transform each value with its index

174

*/

175

fun <T, R> Flow<T>.mapIndexed(transform: suspend (index: Int, value: T) -> R): Flow<R>

176

177

/**

178

* Transform values and filter out nulls

179

*/

180

fun <T, R : Any> Flow<T>.mapNotNull(transform: suspend (value: T) -> R?): Flow<R>

181

182

/**

183

* Transform to flows and flatten concatenating

184

*/

185

fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R>

186

187

/**

188

* Transform to flows and flatten merging concurrently

189

*/

190

fun <T, R> Flow<T>.flatMapMerge(

191

concurrency: Int = DEFAULT_CONCURRENCY,

192

transform: suspend (value: T) -> Flow<R>

193

): Flow<R>

194

195

/**

196

* Transform with general transformation block

197

*/

198

fun <T, R> Flow<T>.transform(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>

199

200

/**

201

* Accumulate values with scan operation

202

*/

203

fun <T, R> Flow<T>.scan(initial: R, operation: suspend (accumulator: R, value: T) -> R): Flow<R>

204

```

205

206

**Usage Examples:**

207

208

```kotlin

209

import kotlinx.coroutines.flow.*

210

211

val numbers = flowOf(1, 2, 3, 4, 5)

212

213

// Map transformation

214

val doubled = numbers.map { it * 2 }

215

216

// Map with index

217

val indexed = numbers.mapIndexed { index, value -> "[$index] = $value" }

218

219

// Filter and transform

220

val evenSquares = numbers

221

.filter { it % 2 == 0 }

222

.map { it * it }

223

224

// Scan for running totals

225

val runningSum = numbers.scan(0) { acc, value -> acc + value }

226

227

// Transform with emission control

228

val expanded = numbers.transform { value ->

229

emit(value)

230

emit(value * 10)

231

}

232

```

233

234

### Filtering Operators

235

236

Operators for filtering flow values based on conditions.

237

238

```kotlin { .api }

239

/**

240

* Filter values based on predicate

241

*/

242

fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T>

243

244

/**

245

* Filter out null values

246

*/

247

fun <T : Any> Flow<T?>.filterNotNull(): Flow<T>

248

249

/**

250

* Take first N values

251

*/

252

fun <T> Flow<T>.take(count: Int): Flow<T>

253

254

/**

255

* Take while predicate is true

256

*/

257

fun <T> Flow<T>.takeWhile(predicate: suspend (T) -> Boolean): Flow<T>

258

259

/**

260

* Drop first N values

261

*/

262

fun <T> Flow<T>.drop(count: Int): Flow<T>

263

264

/**

265

* Drop while predicate is true

266

*/

267

fun <T> Flow<T>.dropWhile(predicate: suspend (T) -> Boolean): Flow<T>

268

269

/**

270

* Emit only distinct consecutive values

271

*/

272

fun <T> Flow<T>.distinctUntilChanged(): Flow<T>

273

274

/**

275

* Emit only distinct consecutive values by key

276

*/

277

fun <T, K> Flow<T>.distinctUntilChangedBy(keySelector: suspend (T) -> K): Flow<T>

278

```

279

280

### Advanced Transformation Operators

281

282

Advanced operators for complex transformations and timing control.

283

284

```kotlin { .api }

285

/**

286

* Transform values but cancel previous transformation on new emission

287

*/

288

fun <T, R> Flow<T>.mapLatest(transform: suspend (value: T) -> R): Flow<R>

289

290

/**

291

* Transform values with full control, cancelling on new emission

292

*/

293

fun <T, R> Flow<T>.transformLatest(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>

294

295

/**

296

* Running fold that emits intermediate results

297

*/

298

fun <T, R> Flow<T>.runningFold(initial: R, operation: suspend (accumulator: R, value: T) -> R): Flow<R>

299

300

/**

301

* Running reduce that emits intermediate results

302

*/

303

fun <T> Flow<T>.runningReduce(operation: suspend (accumulator: T, value: T) -> T): Flow<T>

304

```

305

306

### Timing Operators

307

308

Operators for controlling timing of emissions.

309

310

```kotlin { .api }

311

/**

312

* Sample emissions at specified intervals

313

*/

314

fun <T> Flow<T>.sample(periodMillis: Long): Flow<T>

315

316

/**

317

* Debounce emissions - only emit if no new value within timeout

318

*/

319

fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T>

320

321

/**

322

* Add timeout to flow emissions

323

*/

324

fun <T> Flow<T>.timeout(timeoutMillis: Long): Flow<T>

325

326

/**

327

* Delay each emission

328

*/

329

fun <T> Flow<T>.delayEach(delayMillis: Long): Flow<T>

330

```

331

332

### Terminal Operators

333

334

Operators that consume the flow and produce a final result.

335

336

```kotlin { .api }

337

/**

338

* Collect all values with the provided action

339

*/

340

suspend fun <T> Flow<T>.collect(action: suspend (value: T) -> Unit)

341

342

/**

343

* Collect with index

344

*/

345

suspend fun <T> Flow<T>.collectIndexed(action: suspend (index: Int, value: T) -> Unit)

346

347

/**

348

* Launch collection in the provided scope

349

*/

350

fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job

351

352

/**

353

* Reduce flow to a single value

354

*/

355

suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (accumulator: S, value: T) -> S): S

356

357

/**

358

* Fold flow values with initial value

359

*/

360

suspend fun <T, R> Flow<T>.fold(initial: R, operation: suspend (acc: R, value: T) -> R): R

361

362

/**

363

* Get first value

364

*/

365

suspend fun <T> Flow<T>.first(): T

366

367

/**

368

* Get first value or null

369

*/

370

suspend fun <T> Flow<T>.firstOrNull(): T?

371

372

/**

373

* Get single value

374

*/

375

suspend fun <T> Flow<T>.single(): T

376

377

/**

378

* Convert to list

379

*/

380

suspend fun <T> Flow<T>.toList(): List<T>

381

382

/**

383

* Convert to set

384

*/

385

suspend fun <T> Flow<T>.toSet(): Set<T>

386

387

/**

388

* Count values

389

*/

390

suspend fun <T> Flow<T>.count(): Int

391

```

392

393

### Hot Flows - SharedFlow

394

395

Hot flows that can have multiple collectors and emit values regardless of collectors.

396

397

```kotlin { .api }

398

/**

399

* A flow that can be shared among multiple collectors

400

*/

401

interface SharedFlow<out T> : Flow<T> {

402

/** Values currently available for replay */

403

val replayCache: List<T>

404

}

405

406

/**

407

* Mutable version of SharedFlow

408

*/

409

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {

410

/** Number of active subscribers */

411

val subscriptionCount: StateFlow<Int>

412

413

/** Emit a value to all subscribers */

414

override suspend fun emit(value: T)

415

416

/** Try to emit a value without suspending */

417

fun tryEmit(value: T): Boolean

418

419

/** Reset the replay cache */

420

fun resetReplayCache()

421

}

422

423

/**

424

* Create a MutableSharedFlow

425

* @param replay number of values to replay to new subscribers

426

* @param extraBufferCapacity additional buffer capacity

427

* @param onBufferOverflow strategy when buffer overflows

428

*/

429

fun <T> MutableSharedFlow(

430

replay: Int = 0,

431

extraBufferCapacity: Int = 0,

432

onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND

433

): MutableSharedFlow<T>

434

```

435

436

### Hot Flows - StateFlow

437

438

SharedFlow specialized for holding current state.

439

440

```kotlin { .api }

441

/**

442

* A SharedFlow that represents a mutable state

443

*/

444

interface StateFlow<out T> : SharedFlow<T> {

445

/** Current state value */

446

val value: T

447

}

448

449

/**

450

* Mutable version of StateFlow

451

*/

452

interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {

453

/** Current state value (mutable) */

454

override var value: T

455

456

/** Atomically update the value */

457

fun update(function: (T) -> T)

458

459

/** Atomically update and return new value */

460

fun updateAndGet(function: (T) -> T): T

461

462

/** Atomically update and return old value */

463

fun getAndUpdate(function: (T) -> T): T

464

465

/** Compare and set the value */

466

fun compareAndSet(expect: T, update: T): Boolean

467

}

468

469

/**

470

* Create a MutableStateFlow

471

*/

472

fun <T> MutableStateFlow(value: T): MutableStateFlow<T>

473

```

474

475

**Usage Examples:**

476

477

```kotlin

478

import kotlinx.coroutines.*

479

import kotlinx.coroutines.flow.*

480

481

// SharedFlow for events

482

val eventFlow = MutableSharedFlow<String>()

483

484

// Multiple collectors

485

launch {

486

eventFlow.collect { event ->

487

println("Collector 1: $event")

488

}

489

}

490

491

launch {

492

eventFlow.collect { event ->

493

println("Collector 2: $event")

494

}

495

}

496

497

// Emit events

498

eventFlow.emit("Hello")

499

eventFlow.emit("World")

500

501

// StateFlow for state

502

val counterState = MutableStateFlow(0)

503

504

// Observe state changes

505

launch {

506

counterState.collect { count ->

507

println("Count: $count")

508

}

509

}

510

511

// Update state

512

counterState.value = 1

513

counterState.update { it + 1 }

514

```

515

516

### Flow Context and Threading

517

518

Operators for controlling flow execution context.

519

520

```kotlin { .api }

521

/**

522

* Change the context of upstream flow

523

*/

524

fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>

525

526

/**

527

* Add buffering between producer and consumer

528

*/

529

fun <T> Flow<T>.buffer(

530

capacity: Int = BUFFERED,

531

onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND

532

): Flow<T>

533

534

/**

535

* Ensure single collector at a time

536

*/

537

fun <T> Flow<T>.shareIn(

538

scope: CoroutineScope,

539

started: SharingStarted,

540

replay: Int = 0

541

): SharedFlow<T>

542

543

/**

544

* Convert to StateFlow

545

*/

546

fun <T> Flow<T>.stateIn(

547

scope: CoroutineScope,

548

started: SharingStarted,

549

initialValue: T

550

): StateFlow<T>

551

```

552

553

### Error Handling

554

555

Operators for handling exceptions in flows.

556

557

```kotlin { .api }

558

/**

559

* Catch exceptions and handle them

560

*/

561

fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T>

562

563

/**

564

* Retry on exceptions

565

*/

566

fun <T> Flow<T>.retry(retries: Long = Long.MAX_VALUE): Flow<T>

567

568

/**

569

* Retry with predicate

570

*/

571

fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T>

572

```

573

574

## Flow Principles

575

576

1. **Cold Nature**: Flows are cold by default - they don't start producing values until collected

577

2. **Sequential Processing**: Flow operators process values sequentially unless explicit concurrency is used

578

3. **Context Preservation**: Flows preserve coroutine context and respect structured concurrency

579

4. **Exception Transparency**: Exceptions in flows propagate to collectors unless handled with `catch`

580

5. **Cancellation Support**: Flows are cancellable and respect coroutine cancellation