or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

control-flow.mdcore-components.mderror-handling.mdgraph-building.mdindex.mdio-integration.mdjunction-operations.mdmaterialization.mdstream-operations.md

stream-operations.mddocs/

0

# Stream Operations and Transformations

1

2

Comprehensive set of stream processing operations including mapping, filtering, grouping, timing, and error handling available on all stream components through the FlowOps trait.

3

4

## FlowOps Trait

5

6

All stream operations are available through the FlowOps trait, which is mixed into Source, Flow, and SubFlow classes.

7

8

```scala { .api }

9

trait FlowOps[+Out, +Mat] {

10

type Repr[+O] <: FlowOps[O, Mat]

11

12

// Core transformation methods

13

def map[T](f: Out => T): Repr[T]

14

def filter(p: Out => Boolean): Repr[Out]

15

def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]

16

def mapConcat[T](f: Out => immutable.Iterable[T]): Repr[T]

17

}

18

```

19

20

## Element Transformation

21

22

### Basic Transformations

23

24

**Map and Filter:**

25

```scala { .api }

26

trait FlowOps[+Out, +Mat] {

27

def map[T](f: Out => T): Repr[T]

28

def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]

29

def mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T]): Repr[T]

30

def mapConcat[T](f: Out => immutable.Iterable[T]): Repr[T]

31

def filter(p: Out => Boolean): Repr[Out]

32

def filterNot(p: Out => Boolean): Repr[Out]

33

}

34

```

35

36

**Collect with Partial Functions:**

37

```scala { .api }

38

trait FlowOps[+Out, +Mat] {

39

def collect[T](pf: PartialFunction[Out, T]): Repr[T]

40

def collectType[T](implicit m: ClassTag[T]): Repr[T]

41

}

42

```

43

44

### Usage Examples

45

46

```scala

47

import akka.stream.scaladsl.{Source, Flow}

48

import scala.concurrent.Future

49

import scala.concurrent.ExecutionContext.Implicits.global

50

51

val source = Source(1 to 10)

52

53

// Basic map

54

val doubled = source.map(_ * 2)

55

56

// Async map with parallelism

57

val asyncMapped = source.mapAsync(4) { n =>

58

Future {

59

Thread.sleep(100)

60

n * n

61

}

62

}

63

64

// Flat map with mapConcat

65

val exploded = source.mapConcat(n => List.fill(n)(n))

66

67

// Filter

68

val evenOnly = source.filter(_ % 2 == 0)

69

70

// Collect with partial function

71

val strings = Source(List(1, "hello", 2, "world", 3))

72

val onlyStrings = strings.collect {

73

case s: String => s.toUpperCase

74

}

75

```

76

77

## Element Selection and Limiting

78

79

### Take and Drop Operations

80

81

```scala { .api }

82

trait FlowOps[+Out, +Mat] {

83

def take(n: Long): Repr[Out]

84

def takeWhile(p: Out => Boolean): Repr[Out]

85

def takeWithin(d: FiniteDuration): Repr[Out]

86

def drop(n: Long): Repr[Out]

87

def dropWhile(p: Out => Boolean): Repr[Out]

88

def dropWithin(d: FiniteDuration): Repr[Out]

89

}

90

```

91

92

### Sampling and Limiting

93

94

```scala { .api }

95

trait FlowOps[+Out, +Mat] {

96

def limit(n: Long): Repr[Out]

97

def limitWeighted(n: Long)(costFn: Out => Long): Repr[Out]

98

def throttle(elements: Int, per: FiniteDuration): Repr[Out]

99

def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out]

100

}

101

```

102

103

### Usage Examples

104

105

```scala

106

import scala.concurrent.duration._

107

108

val source = Source(1 to 100)

109

110

// Take first 10 elements

111

val first10 = source.take(10)

112

113

// Take while condition is true

114

val whileLessThan50 = source.takeWhile(_ < 50)

115

116

// Take within time window

117

val within5Seconds = source.takeWithin(5.seconds)

118

119

// Rate limiting - max 10 elements per second

120

val throttled = source.throttle(10, 1.second)

121

122

// Burst throttling

123

val burstThrottled = source.throttle(

124

elements = 10,

125

per = 1.second,

126

maximumBurst = 5,

127

mode = ThrottleMode.Shaping

128

)

129

```

130

131

## Grouping and Batching

132

133

### Grouping Operations

134

135

```scala { .api }

136

trait FlowOps[+Out, +Mat] {

137

def grouped(n: Int): Repr[immutable.Seq[Out]]

138

def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]]

139

def groupedWeighted(minWeight: Long)(costFn: Out => Long): Repr[List[Out]]

140

def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Repr, Closed]

141

}

142

```

143

144

### Batching Operations

145

146

```scala { .api }

147

trait FlowOps[+Out, +Mat] {

148

def batch[S](max: Long, seed: Out => S)(aggregate: (S, Out) => S): Repr[S]

149

def batchWeighted[S](max: Long, costFn: Out => Long, seed: Out => S)(aggregate: (S, Out) => S): Repr[S]

150

}

151

```

152

153

### Usage Examples

154

155

```scala

156

import scala.concurrent.duration._

157

158

val source = Source(1 to 100)

159

160

// Group into batches of 10

161

val groups = source.grouped(10)

162

163

// Group by time or count (whichever comes first)

164

val timeGroups = source.groupedWithin(10, 1.second)

165

166

// Group by key into substreams

167

val byParity = source.groupBy(2, _ % 2)

168

.map(_ * 2)

169

.mergeSubstreams

170

171

// Batch with custom aggregation

172

val batched = source.batch(

173

max = 10,

174

seed = n => List(n),

175

aggregate = (acc, n) => n :: acc

176

)

177

```

178

179

## Timing Operations

180

181

### Delays and Timeouts

182

183

```scala { .api }

184

trait FlowOps[+Out, +Mat] {

185

def delay(d: FiniteDuration): Repr[Out]

186

def delayWith(delayStrategySupplier: () => DelayStrategy[Out]): Repr[Out]

187

def initialDelay(d: FiniteDuration): Repr[Out]

188

def idleTimeout(timeout: FiniteDuration): Repr[Out]

189

def completionTimeout(timeout: FiniteDuration): Repr[Out]

190

def backpressureTimeout(timeout: FiniteDuration): Repr[Out]

191

}

192

```

193

194

### Keep Alive

195

196

```scala { .api }

197

trait FlowOps[+Out, +Mat] {

198

def keepAlive(maxIdle: FiniteDuration, injectedElem: () => Out): Repr[Out]

199

}

200

```

201

202

### Usage Examples

203

204

```scala

205

import scala.concurrent.duration._

206

207

val source = Source(1 to 10)

208

209

// Delay each element by 1 second

210

val delayed = source.delay(1.second)

211

212

// Initial delay before first element

213

val withInitialDelay = source.initialDelay(5.seconds)

214

215

// Timeout if no elements for 30 seconds

216

val withIdleTimeout = source.idleTimeout(30.seconds)

217

218

// Keep alive by injecting elements

219

val keepAlive = source.keepAlive(10.seconds, () => 0)

220

```

221

222

## Accumulation Operations

223

224

### Scanning and Folding

225

226

```scala { .api }

227

trait FlowOps[+Out, +Mat] {

228

def scan[T](zero: T)(f: (T, Out) => T): Repr[T]

229

def scanAsync[T](zero: T)(f: (T, Out) => Future[T]): Repr[T]

230

def fold[T](zero: T)(f: (T, Out) => T): Repr[T]

231

def foldAsync[T](zero: T)(f: (T, Out) => Future[T]): Repr[T]

232

def reduce(f: (Out, Out) => Out): Repr[Out]

233

}

234

```

235

236

### Usage Examples

237

238

```scala

239

val source = Source(1 to 10)

240

241

// Running sum (scan emits intermediate results)

242

val runningSums = source.scan(0)(_ + _)

243

// Emits: 0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55

244

245

// Final sum only (fold emits only final result)

246

val totalSum = source.fold(0)(_ + _)

247

// Emits: 55

248

249

// Reduce without initial value

250

val product = source.reduce(_ * _)

251

// Emits: 3628800

252

253

// Async accumulation

254

val asyncSum = source.scanAsync(0) { (acc, n) =>

255

Future.successful(acc + n)

256

}

257

```

258

259

## Buffer Management

260

261

### Buffering Operations

262

263

```scala { .api }

264

trait FlowOps[+Out, +Mat] {

265

def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out]

266

def conflate[S](seed: Out => S)(aggregate: (S, Out) => S): Repr[S]

267

def conflateWithSeed[S](seed: Out => S)(aggregate: (S, Out) => S): Repr[S]

268

def expand[U](extrapolate: Out => Iterator[U]): Repr[U]

269

}

270

```

271

272

### Usage Examples

273

274

```scala

275

import akka.stream.OverflowStrategy

276

277

val source = Source(1 to 100)

278

279

// Buffer with overflow strategy

280

val buffered = source.buffer(10, OverflowStrategy.dropHead)

281

282

// Conflate (combine) when downstream is slow

283

val conflated = source.conflate(identity)(_ + _)

284

285

// Expand elements when downstream is fast

286

val expanded = source.expand(n => Iterator.fill(3)(n))

287

```

288

289

## Error Handling and Recovery

290

291

### Recovery Operations

292

293

```scala { .api }

294

trait FlowOps[+Out, +Mat] {

295

def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T]

296

def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T]

297

def mapError(f: Throwable => Throwable): Repr[Out]

298

}

299

```

300

301

### Usage Examples

302

303

```scala

304

val source = Source(List("1", "2", "abc", "4"))

305

306

// Recover from parsing errors

307

val safeParseInt = source

308

.map(_.toInt) // This will fail on "abc"

309

.recover {

310

case _: NumberFormatException => -1

311

}

312

313

// Recover with retries using alternative source

314

val withRetries = source

315

.map(_.toInt)

316

.recoverWithRetries(3, {

317

case _: NumberFormatException => Source.single(0)

318

})

319

```

320

321

## Stream Splitting and Substreams

322

323

### Splitting Operations

324

325

```scala { .api }

326

trait FlowOps[+Out, +Mat] {

327

type Closed

328

329

def splitWhen(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed]

330

def splitAfter(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed]

331

def splitWhen[U](substreamCancelStrategy: SubstreamCancelStrategy)(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed]

332

}

333

```

334

335

### SubFlow Operations

336

337

```scala { .api }

338

final class SubFlow[+Out, +Mat, +F[+_, +_], +C] {

339

def mergeSubstreams: F[Out, Mat]

340

def mergeSubstreamsWithParallelism(parallelism: Int): F[Out, Mat]

341

def concatSubstreams: F[Out, Mat]

342

343

// All FlowOps methods are available on SubFlow

344

def map[T](f: Out => T): SubFlow[T, Mat, F, C]

345

def filter(p: Out => Boolean): SubFlow[Out, Mat, F, C]

346

// ... etc

347

}

348

```

349

350

### Usage Examples

351

352

```scala

353

val source = Source(1 to 20)

354

355

// Split into substreams when element is divisible by 5

356

val substreams = source

357

.splitWhen(_ % 5 == 0)

358

.map(_ * 2) // Process each substream

359

.mergeSubstreams // Merge back to single stream

360

361

// Split after condition and concatenate results

362

val splitAfter = source

363

.splitAfter(_ % 7 == 0)

364

.fold(0)(_ + _) // Sum each substream

365

.concatSubstreams

366

```

367

368

## Utility Operations

369

370

### Element Inspection and Side Effects

371

372

```scala { .api }

373

trait FlowOps[+Out, +Mat] {

374

def log(name: String): Repr[Out]

375

def log(name: String, extract: Out => Any): Repr[Out]

376

def wireTap(sink: Graph[SinkShape[Out], _]): Repr[Out]

377

def alsoTo(sink: Graph[SinkShape[Out], _]): Repr[Out]

378

}

379

```

380

381

### Element Interspersing

382

383

```scala { .api }

384

trait FlowOps[+Out, +Mat] {

385

def intersperse[T >: Out](inject: T): Repr[T]

386

def intersperse[T >: Out](start: T, inject: T, end: T): Repr[T]

387

}

388

```

389

390

### Usage Examples

391

392

```scala

393

val source = Source(List("a", "b", "c"))

394

395

// Add logging

396

val logged = source.log("my-stream")

397

398

// Intersperse with separator

399

val withSeparator = source.intersperse(",")

400

// Result: "a", ",", "b", ",", "c"

401

402

// With start and end

403

val withBrackets = source.intersperse("[", ",", "]")

404

// Result: "[", "a", ",", "b", ",", "c", "]"

405

406

// Wire tap for side effects without affecting main stream

407

val withSideEffect = source.wireTap(Sink.foreach(x => println(s"Side: $x")))

408

```

409

410

## Monitoring and Lifecycle

411

412

### Stream Monitoring

413

414

```scala { .api }

415

trait FlowOps[+Out, +Mat] {

416

type ClosedMat[+M]

417

418

def watchTermination[Mat2]()(matF: (Mat, Future[Done]) => Mat2): ReprMat[Out, Mat2]

419

def monitor[Mat2]()(matF: (Mat, FlowMonitor[Out]) => Mat2): ReprMat[Out, Mat2]

420

}

421

```

422

423

### Usage Examples

424

425

```scala

426

val source = Source(1 to 10)

427

428

// Watch for completion

429

val withTerminationWatcher = source

430

.watchTermination() { (notUsed, done) =>

431

done.onComplete {

432

case Success(_) => println("Stream completed successfully")

433

case Failure(ex) => println(s"Stream failed: $ex")

434

}

435

notUsed

436

}

437

438

// Monitor stream state

439

val withMonitoring = source

440

.monitor() { (mat, monitor) =>

441

monitor.state.foreach { state =>

442

println(s"Stream state: $state")

443

}

444

mat

445

}

446

```