or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

channels.mdcoroutine-builders.mddispatchers.mdexception-handling.mdflow-api.mdindex.mdjob-management.mdjvm-integration.mdsynchronization.md

flow-api.mddocs/

0

# Flow API

1

2

Reactive streams implementation providing cold flows for asynchronous sequences and hot flows for shared state management with backpressure and exception transparency.

3

4

## Capabilities

5

6

### Flow Interface

7

8

Base interface for cold asynchronous data streams that emit values sequentially.

9

10

```kotlin { .api }

11

interface Flow<out T> {

12

/** Accepts collector and emits values into it */

13

suspend fun collect(collector: FlowCollector<T>)

14

}

15

16

interface FlowCollector<in T> {

17

/** Emits a value to the flow */

18

suspend fun emit(value: T)

19

}

20

```

21

22

**Usage Examples:**

23

24

```kotlin

25

import kotlinx.coroutines.*

26

import kotlinx.coroutines.flow.*

27

28

fun main() = runBlocking {

29

// Basic flow collection

30

val simpleFlow = flow {

31

repeat(3) { i ->

32

emit(i)

33

delay(100)

34

}

35

}

36

37

simpleFlow.collect { value ->

38

println("Collected: $value")

39

}

40

41

// Flow with transformation

42

simpleFlow

43

.map { it * 2 }

44

.filter { it > 0 }

45

.collect { value ->

46

println("Transformed: $value")

47

}

48

}

49

```

50

51

### Flow Builders

52

53

Functions for creating flows from various sources.

54

55

```kotlin { .api }

56

/** Creates a flow from a builder block */

57

fun <T> flow(block: suspend FlowCollector<T>.() -> Unit): Flow<T>

58

59

/** Creates a flow from fixed values */

60

fun <T> flowOf(vararg elements: T): Flow<T>

61

62

/** Creates an empty flow */

63

fun <T> emptyFlow(): Flow<T>

64

65

/** Creates a flow from a channel */

66

fun <T> channelFlow(block: suspend ProducerScope<T>.() -> Unit): Flow<T>

67

68

/** Converts various types to flows */

69

fun <T> Iterable<T>.asFlow(): Flow<T>

70

fun <T> Sequence<T>.asFlow(): Flow<T>

71

fun <T> Array<T>.asFlow(): Flow<T>

72

```

73

74

**Usage Examples:**

75

76

```kotlin

77

import kotlinx.coroutines.*

78

import kotlinx.coroutines.flow.*

79

80

fun main() = runBlocking {

81

// Flow from builder

82

val countFlow = flow {

83

repeat(5) { i ->

84

emit(i)

85

delay(100)

86

}

87

}

88

89

// Flow from values

90

val valuesFlow = flowOf("a", "b", "c", "d")

91

92

// Flow from collection

93

val listFlow = listOf(1, 2, 3, 4, 5).asFlow()

94

95

// Channel flow for concurrent emission

96

val channelFlow = channelFlow {

97

launch {

98

repeat(3) { i ->

99

send(i * 10)

100

delay(50)

101

}

102

}

103

launch {

104

repeat(3) { i ->

105

send(i * 100)

106

delay(75)

107

}

108

}

109

}

110

111

channelFlow.collect { println("Channel flow: $it") }

112

}

113

```

114

115

### Flow Operators

116

117

Intermediate operators for transforming flows.

118

119

```kotlin { .api }

120

/** Transform each value */

121

fun <T, R> Flow<T>.map(transform: suspend (T) -> R): Flow<R>

122

123

/** Filter values based on predicate */

124

fun <T> Flow<T>.filter(predicate: suspend (T) -> Boolean): Flow<T>

125

126

/** Transform to flows and flatten */

127

fun <T, R> Flow<T>.flatMapConcat(transform: suspend (T) -> Flow<R>): Flow<R>

128

fun <T, R> Flow<T>.flatMapMerge(

129

concurrency: Int = DEFAULT_CONCURRENCY,

130

transform: suspend (T) -> Flow<R>

131

): Flow<R>

132

133

/** Take first N elements */

134

fun <T> Flow<T>.take(count: Int): Flow<T>

135

136

/** Drop first N elements */

137

fun <T> Flow<T>.drop(count: Int): Flow<T>

138

139

/** Execute on different context */

140

fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>

141

142

/** Buffer emissions */

143

fun <T> Flow<T>.buffer(

144

capacity: Int = BUFFERED,

145

onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND

146

): Flow<T>

147

148

/** Combine with another flow */

149

fun <T1, T2, R> Flow<T1>.combine(

150

flow: Flow<T2>,

151

transform: suspend (T1, T2) -> R

152

): Flow<R>

153

154

/** Zip with another flow */

155

fun <T1, T2, R> Flow<T1>.zip(

156

other: Flow<T2>,

157

transform: suspend (T1, T2) -> R

158

): Flow<R>

159

```

160

161

**Usage Examples:**

162

163

```kotlin

164

import kotlinx.coroutines.*

165

import kotlinx.coroutines.flow.*

166

167

fun main() = runBlocking {

168

val sourceFlow = (1..10).asFlow()

169

170

// Chain multiple operators

171

sourceFlow

172

.filter { it % 2 == 0 }

173

.map { it * it }

174

.take(3)

175

.flowOn(Dispatchers.Default)

176

.collect { println("Result: $it") }

177

178

// FlatMap example

179

sourceFlow

180

.take(3)

181

.flatMapConcat { value ->

182

flow {

183

repeat(2) {

184

emit("$value-$it")

185

delay(50)

186

}

187

}

188

}

189

.collect { println("FlatMap: $it") }

190

191

// Combine flows

192

val flow1 = (1..3).asFlow().onEach { delay(100) }

193

val flow2 = (10..12).asFlow().onEach { delay(150) }

194

195

flow1.combine(flow2) { a, b -> "$a+$b" }

196

.collect { println("Combined: $it") }

197

}

198

```

199

200

### Terminal Operators

201

202

Operators that consume the flow and produce results.

203

204

```kotlin { .api }

205

/** Collect all values */

206

suspend fun <T> Flow<T>.collect(action: suspend (T) -> Unit)

207

208

/** Collect to list */

209

suspend fun <T> Flow<T>.toList(): List<T>

210

211

/** Collect to set */

212

suspend fun <T> Flow<T>.toSet(): Set<T>

213

214

/** Get first value */

215

suspend fun <T> Flow<T>.first(): T

216

suspend fun <T> Flow<T>.firstOrNull(): T?

217

218

/** Get single value */

219

suspend fun <T> Flow<T>.single(): T

220

suspend fun <T> Flow<T>.singleOrNull(): T?

221

222

/** Reduce values */

223

suspend fun <S, T : S> Flow<T>.reduce(operation: suspend (S, T) -> S): S

224

225

/** Fold with initial value */

226

suspend fun <T, R> Flow<T>.fold(

227

initial: R,

228

operation: suspend (R, T) -> R

229

): R

230

231

/** Count elements */

232

suspend fun <T> Flow<T>.count(): Int

233

suspend fun <T> Flow<T>.count(predicate: suspend (T) -> Boolean): Int

234

```

235

236

**Usage Examples:**

237

238

```kotlin

239

import kotlinx.coroutines.*

240

import kotlinx.coroutines.flow.*

241

242

fun main() = runBlocking {

243

val numberFlow = (1..5).asFlow()

244

245

// Collect to collections

246

val list = numberFlow.toList()

247

println("List: $list")

248

249

// Reduce operations

250

val sum = numberFlow.fold(0) { acc, value -> acc + value }

251

println("Sum: $sum")

252

253

val product = numberFlow.reduce { acc, value -> acc * value }

254

println("Product: $product")

255

256

// Single values

257

val first = numberFlow.first()

258

val count = numberFlow.count { it > 3 }

259

println("First: $first, Count > 3: $count")

260

}

261

```

262

263

### SharedFlow Interface

264

265

Hot flow that shares emitted values among multiple collectors.

266

267

```kotlin { .api }

268

interface SharedFlow<out T> : Flow<T> {

269

/** Most recent values available to new subscribers */

270

val replayCache: List<T>

271

272

/** Current number of subscribers */

273

val subscriptionCount: StateFlow<Int>

274

}

275

276

interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {

277

/** Number of subscribers */

278

val subscriptionCount: StateFlow<Int>

279

280

/** Emits value to all subscribers */

281

override suspend fun emit(value: T)

282

283

/** Tries to emit value immediately */

284

fun tryEmit(value: T): Boolean

285

286

/** Resets replay cache */

287

fun resetReplayCache()

288

}

289

290

/** Creates MutableSharedFlow */

291

fun <T> MutableSharedFlow(

292

replay: Int = 0,

293

extraBufferCapacity: Int = 0,

294

onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND

295

): MutableSharedFlow<T>

296

```

297

298

**Usage Examples:**

299

300

```kotlin

301

import kotlinx.coroutines.*

302

import kotlinx.coroutines.flow.*

303

304

fun main() = runBlocking {

305

val sharedFlow = MutableSharedFlow<Int>(replay = 2)

306

307

// Start collectors

308

val job1 = launch {

309

sharedFlow.collect { value ->

310

println("Collector 1: $value")

311

}

312

}

313

314

val job2 = launch {

315

sharedFlow.collect { value ->

316

println("Collector 2: $value")

317

}

318

}

319

320

delay(100)

321

322

// Emit values

323

sharedFlow.emit(1)

324

sharedFlow.emit(2)

325

sharedFlow.emit(3)

326

327

delay(100)

328

329

// New collector gets replay

330

val job3 = launch {

331

sharedFlow.collect { value ->

332

println("Collector 3 (late): $value")

333

}

334

}

335

336

sharedFlow.emit(4)

337

338

delay(100)

339

listOf(job1, job2, job3).forEach { it.cancel() }

340

}

341

```

342

343

### StateFlow Interface

344

345

Hot flow representing a state with current value.

346

347

```kotlin { .api }

348

interface StateFlow<out T> : SharedFlow<T> {

349

/** Current state value */

350

val value: T

351

}

352

353

interface MutableStateFlow<T> : StateFlow<T>, MutableSharedFlow<T> {

354

/** Current state value (mutable) */

355

override var value: T

356

357

/** Atomically compares and sets value */

358

fun compareAndSet(expect: T, update: T): Boolean

359

360

/** Updates value atomically with function */

361

inline fun update(function: (T) -> T)

362

363

/** Updates value atomically and returns new value */

364

inline fun updateAndGet(function: (T) -> T): T

365

366

/** Updates value atomically and returns old value */

367

inline fun getAndUpdate(function: (T) -> T): T

368

}

369

370

/** Creates MutableStateFlow */

371

fun <T> MutableStateFlow(value: T): MutableStateFlow<T>

372

```

373

374

**Usage Examples:**

375

376

```kotlin

377

import kotlinx.coroutines.*

378

import kotlinx.coroutines.flow.*

379

380

data class UiState(val loading: Boolean, val data: String)

381

382

class ViewModel {

383

private val _uiState = MutableStateFlow(UiState(loading = false, data = ""))

384

val uiState: StateFlow<UiState> = _uiState.asStateFlow()

385

386

fun loadData() {

387

_uiState.value = _uiState.value.copy(loading = true)

388

389

// Simulate async operation

390

GlobalScope.launch {

391

delay(1000)

392

_uiState.value = UiState(loading = false, data = "Loaded data")

393

}

394

}

395

396

fun updateData(newData: String) {

397

_uiState.update { currentState ->

398

currentState.copy(data = newData)

399

}

400

}

401

}

402

403

fun main() = runBlocking {

404

val viewModel = ViewModel()

405

406

// Collect state changes

407

val job = launch {

408

viewModel.uiState.collect { state ->

409

println("UI State: $state")

410

}

411

}

412

413

delay(100)

414

viewModel.loadData()

415

delay(1500)

416

viewModel.updateData("Updated data")

417

418

delay(100)

419

job.cancel()

420

}

421

```

422

423

### Exception Handling

424

425

Flow exception handling with catch and error recovery.

426

427

```kotlin { .api }

428

/** Catches upstream exceptions */

429

fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(Throwable) -> Unit): Flow<T>

430

431

/** Handles completion with optional exception */

432

fun <T> Flow<T>.onCompletion(

433

action: suspend FlowCollector<T>.(Throwable?) -> Unit

434

): Flow<T>

435

436

/** Retries on exception */

437

fun <T> Flow<T>.retry(

438

retries: Long = Long.MAX_VALUE,

439

predicate: suspend (Throwable) -> Boolean = { true }

440

): Flow<T>

441

442

/** Retries with when predicate */

443

fun <T> Flow<T>.retryWhen(

444

predicate: suspend FlowCollector<T>.(Throwable, attempt: Long) -> Boolean

445

): Flow<T>

446

```

447

448

**Usage Examples:**

449

450

```kotlin

451

import kotlinx.coroutines.*

452

import kotlinx.coroutines.flow.*

453

454

fun createFailingFlow() = flow {

455

repeat(5) { i ->

456

if (i == 3) throw RuntimeException("Simulated error")

457

emit(i)

458

delay(100)

459

}

460

}

461

462

fun main() = runBlocking {

463

// Exception handling with catch

464

createFailingFlow()

465

.catch { e ->

466

println("Caught exception: ${e.message}")

467

emit(-1) // Emit recovery value

468

}

469

.onCompletion { cause ->

470

if (cause == null) {

471

println("Flow completed successfully")

472

} else {

473

println("Flow completed with exception: $cause")

474

}

475

}

476

.collect { value ->

477

println("Value: $value")

478

}

479

480

// Retry example

481

createFailingFlow()

482

.retry(retries = 2) { exception ->

483

println("Retrying due to: ${exception.message}")

484

true

485

}

486

.catch { e ->

487

println("All retries failed: ${e.message}")

488

}

489

.collect { value ->

490

println("Retry value: $value")

491

}

492

}

493

```

494

495

## Types

496

497

### BufferOverflow

498

499

Strategy for handling buffer overflow.

500

501

```kotlin { .api }

502

enum class BufferOverflow {

503

/** Suspend on buffer overflow (default) */

504

SUSPEND,

505

/** Drop oldest values */

506

DROP_OLDEST,

507

/** Drop latest values */

508

DROP_LATEST

509

}

510

```

511

512

### FlowCollector

513

514

Interface for collecting flow values.

515

516

```kotlin { .api }

517

fun interface FlowCollector<in T> {

518

suspend fun emit(value: T)

519

}

520

```