0
# Stream Operations and Transformations
1
2
Comprehensive set of stream processing operations including mapping, filtering, grouping, timing, and error handling available on all stream components through the FlowOps trait.
3
4
## FlowOps Trait
5
6
All stream operations are available through the FlowOps trait, which is mixed into Source, Flow, and SubFlow classes.
7
8
```scala { .api }
9
trait FlowOps[+Out, +Mat] {
10
type Repr[+O] <: FlowOps[O, Mat]
11
12
// Core transformation methods
13
def map[T](f: Out => T): Repr[T]
14
def filter(p: Out => Boolean): Repr[Out]
15
def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]
16
def mapConcat[T](f: Out => immutable.Iterable[T]): Repr[T]
17
}
18
```
19
20
## Element Transformation
21
22
### Basic Transformations
23
24
**Map and Filter:**
25
```scala { .api }
26
trait FlowOps[+Out, +Mat] {
27
def map[T](f: Out => T): Repr[T]
28
def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]
29
def mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T]): Repr[T]
30
def mapConcat[T](f: Out => immutable.Iterable[T]): Repr[T]
31
def filter(p: Out => Boolean): Repr[Out]
32
def filterNot(p: Out => Boolean): Repr[Out]
33
}
34
```
35
36
**Collect with Partial Functions:**
37
```scala { .api }
38
trait FlowOps[+Out, +Mat] {
39
def collect[T](pf: PartialFunction[Out, T]): Repr[T]
40
def collectType[T](implicit m: ClassTag[T]): Repr[T]
41
}
42
```
43
44
### Usage Examples
45
46
```scala
47
import akka.stream.scaladsl.{Source, Flow}
48
import scala.concurrent.Future
49
import scala.concurrent.ExecutionContext.Implicits.global
50
51
val source = Source(1 to 10)
52
53
// Basic map
54
val doubled = source.map(_ * 2)
55
56
// Async map with parallelism
57
val asyncMapped = source.mapAsync(4) { n =>
58
Future {
59
Thread.sleep(100)
60
n * n
61
}
62
}
63
64
// Flat map with mapConcat
65
val exploded = source.mapConcat(n => List.fill(n)(n))
66
67
// Filter
68
val evenOnly = source.filter(_ % 2 == 0)
69
70
// Collect with partial function
71
val strings = Source(List(1, "hello", 2, "world", 3))
72
val onlyStrings = strings.collect {
73
case s: String => s.toUpperCase
74
}
75
```
76
77
## Element Selection and Limiting
78
79
### Take and Drop Operations
80
81
```scala { .api }
82
trait FlowOps[+Out, +Mat] {
83
def take(n: Long): Repr[Out]
84
def takeWhile(p: Out => Boolean): Repr[Out]
85
def takeWithin(d: FiniteDuration): Repr[Out]
86
def drop(n: Long): Repr[Out]
87
def dropWhile(p: Out => Boolean): Repr[Out]
88
def dropWithin(d: FiniteDuration): Repr[Out]
89
}
90
```
91
92
### Sampling and Limiting
93
94
```scala { .api }
95
trait FlowOps[+Out, +Mat] {
96
def limit(n: Long): Repr[Out]
97
def limitWeighted(n: Long)(costFn: Out => Long): Repr[Out]
98
def throttle(elements: Int, per: FiniteDuration): Repr[Out]
99
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out]
100
}
101
```
102
103
### Usage Examples
104
105
```scala
106
import scala.concurrent.duration._
107
108
val source = Source(1 to 100)
109
110
// Take first 10 elements
111
val first10 = source.take(10)
112
113
// Take while condition is true
114
val whileLessThan50 = source.takeWhile(_ < 50)
115
116
// Take within time window
117
val within5Seconds = source.takeWithin(5.seconds)
118
119
// Rate limiting - max 10 elements per second
120
val throttled = source.throttle(10, 1.second)
121
122
// Burst throttling
123
val burstThrottled = source.throttle(
124
elements = 10,
125
per = 1.second,
126
maximumBurst = 5,
127
mode = ThrottleMode.Shaping
128
)
129
```
130
131
## Grouping and Batching
132
133
### Grouping Operations
134
135
```scala { .api }
136
trait FlowOps[+Out, +Mat] {
137
def grouped(n: Int): Repr[immutable.Seq[Out]]
138
def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]]
139
def groupedWeighted(minWeight: Long)(costFn: Out => Long): Repr[List[Out]]
140
def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Repr, Closed]
141
}
142
```
143
144
### Batching Operations
145
146
```scala { .api }
147
trait FlowOps[+Out, +Mat] {
148
def batch[S](max: Long, seed: Out => S)(aggregate: (S, Out) => S): Repr[S]
149
def batchWeighted[S](max: Long, costFn: Out => Long, seed: Out => S)(aggregate: (S, Out) => S): Repr[S]
150
}
151
```
152
153
### Usage Examples
154
155
```scala
156
import scala.concurrent.duration._
157
158
val source = Source(1 to 100)
159
160
// Group into batches of 10
161
val groups = source.grouped(10)
162
163
// Group by time or count (whichever comes first)
164
val timeGroups = source.groupedWithin(10, 1.second)
165
166
// Group by key into substreams
167
val byParity = source.groupBy(2, _ % 2)
168
.map(_ * 2)
169
.mergeSubstreams
170
171
// Batch with custom aggregation
172
val batched = source.batch(
173
max = 10,
174
seed = n => List(n),
175
aggregate = (acc, n) => n :: acc
176
)
177
```
178
179
## Timing Operations
180
181
### Delays and Timeouts
182
183
```scala { .api }
184
trait FlowOps[+Out, +Mat] {
185
def delay(d: FiniteDuration): Repr[Out]
186
def delayWith(delayStrategySupplier: () => DelayStrategy[Out]): Repr[Out]
187
def initialDelay(d: FiniteDuration): Repr[Out]
188
def idleTimeout(timeout: FiniteDuration): Repr[Out]
189
def completionTimeout(timeout: FiniteDuration): Repr[Out]
190
def backpressureTimeout(timeout: FiniteDuration): Repr[Out]
191
}
192
```
193
194
### Keep Alive
195
196
```scala { .api }
197
trait FlowOps[+Out, +Mat] {
198
def keepAlive(maxIdle: FiniteDuration, injectedElem: () => Out): Repr[Out]
199
}
200
```
201
202
### Usage Examples
203
204
```scala
205
import scala.concurrent.duration._
206
207
val source = Source(1 to 10)
208
209
// Delay each element by 1 second
210
val delayed = source.delay(1.second)
211
212
// Initial delay before first element
213
val withInitialDelay = source.initialDelay(5.seconds)
214
215
// Timeout if no elements for 30 seconds
216
val withIdleTimeout = source.idleTimeout(30.seconds)
217
218
// Keep alive by injecting elements
219
val keepAlive = source.keepAlive(10.seconds, () => 0)
220
```
221
222
## Accumulation Operations
223
224
### Scanning and Folding
225
226
```scala { .api }
227
trait FlowOps[+Out, +Mat] {
228
def scan[T](zero: T)(f: (T, Out) => T): Repr[T]
229
def scanAsync[T](zero: T)(f: (T, Out) => Future[T]): Repr[T]
230
def fold[T](zero: T)(f: (T, Out) => T): Repr[T]
231
def foldAsync[T](zero: T)(f: (T, Out) => Future[T]): Repr[T]
232
def reduce(f: (Out, Out) => Out): Repr[Out]
233
}
234
```
235
236
### Usage Examples
237
238
```scala
239
val source = Source(1 to 10)
240
241
// Running sum (scan emits intermediate results)
242
val runningSums = source.scan(0)(_ + _)
243
// Emits: 0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55
244
245
// Final sum only (fold emits only final result)
246
val totalSum = source.fold(0)(_ + _)
247
// Emits: 55
248
249
// Reduce without initial value
250
val product = source.reduce(_ * _)
251
// Emits: 3628800
252
253
// Async accumulation
254
val asyncSum = source.scanAsync(0) { (acc, n) =>
255
Future.successful(acc + n)
256
}
257
```
258
259
## Buffer Management
260
261
### Buffering Operations
262
263
```scala { .api }
264
trait FlowOps[+Out, +Mat] {
265
def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out]
266
def conflate[S](seed: Out => S)(aggregate: (S, Out) => S): Repr[S]
267
def conflateWithSeed[S](seed: Out => S)(aggregate: (S, Out) => S): Repr[S]
268
def expand[U](extrapolate: Out => Iterator[U]): Repr[U]
269
}
270
```
271
272
### Usage Examples
273
274
```scala
275
import akka.stream.OverflowStrategy
276
277
val source = Source(1 to 100)
278
279
// Buffer with overflow strategy
280
val buffered = source.buffer(10, OverflowStrategy.dropHead)
281
282
// Conflate (combine) when downstream is slow
283
val conflated = source.conflate(identity)(_ + _)
284
285
// Expand elements when downstream is fast
286
val expanded = source.expand(n => Iterator.fill(3)(n))
287
```
288
289
## Error Handling and Recovery
290
291
### Recovery Operations
292
293
```scala { .api }
294
trait FlowOps[+Out, +Mat] {
295
def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T]
296
def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T]
297
def mapError(f: Throwable => Throwable): Repr[Out]
298
}
299
```
300
301
### Usage Examples
302
303
```scala
304
val source = Source(List("1", "2", "abc", "4"))
305
306
// Recover from parsing errors
307
val safeParseInt = source
308
.map(_.toInt) // This will fail on "abc"
309
.recover {
310
case _: NumberFormatException => -1
311
}
312
313
// Recover with retries using alternative source
314
val withRetries = source
315
.map(_.toInt)
316
.recoverWithRetries(3, {
317
case _: NumberFormatException => Source.single(0)
318
})
319
```
320
321
## Stream Splitting and Substreams
322
323
### Splitting Operations
324
325
```scala { .api }
326
trait FlowOps[+Out, +Mat] {
327
type Closed
328
329
def splitWhen(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed]
330
def splitAfter(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed]
331
def splitWhen[U](substreamCancelStrategy: SubstreamCancelStrategy)(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed]
332
}
333
```
334
335
### SubFlow Operations
336
337
```scala { .api }
338
final class SubFlow[+Out, +Mat, +F[+_, +_], +C] {
339
def mergeSubstreams: F[Out, Mat]
340
def mergeSubstreamsWithParallelism(parallelism: Int): F[Out, Mat]
341
def concatSubstreams: F[Out, Mat]
342
343
// All FlowOps methods are available on SubFlow
344
def map[T](f: Out => T): SubFlow[T, Mat, F, C]
345
def filter(p: Out => Boolean): SubFlow[Out, Mat, F, C]
346
// ... etc
347
}
348
```
349
350
### Usage Examples
351
352
```scala
353
val source = Source(1 to 20)
354
355
// Split into substreams when element is divisible by 5
356
val substreams = source
357
.splitWhen(_ % 5 == 0)
358
.map(_ * 2) // Process each substream
359
.mergeSubstreams // Merge back to single stream
360
361
// Split after condition and concatenate results
362
val splitAfter = source
363
.splitAfter(_ % 7 == 0)
364
.fold(0)(_ + _) // Sum each substream
365
.concatSubstreams
366
```
367
368
## Utility Operations
369
370
### Element Inspection and Side Effects
371
372
```scala { .api }
373
trait FlowOps[+Out, +Mat] {
374
def log(name: String): Repr[Out]
375
def log(name: String, extract: Out => Any): Repr[Out]
376
def wireTap(sink: Graph[SinkShape[Out], _]): Repr[Out]
377
def alsoTo(sink: Graph[SinkShape[Out], _]): Repr[Out]
378
}
379
```
380
381
### Element Interspersing
382
383
```scala { .api }
384
trait FlowOps[+Out, +Mat] {
385
def intersperse[T >: Out](inject: T): Repr[T]
386
def intersperse[T >: Out](start: T, inject: T, end: T): Repr[T]
387
}
388
```
389
390
### Usage Examples
391
392
```scala
393
val source = Source(List("a", "b", "c"))
394
395
// Add logging
396
val logged = source.log("my-stream")
397
398
// Intersperse with separator
399
val withSeparator = source.intersperse(",")
400
// Result: "a", ",", "b", ",", "c"
401
402
// With start and end
403
val withBrackets = source.intersperse("[", ",", "]")
404
// Result: "[", "a", ",", "b", ",", "c", "]"
405
406
// Wire tap for side effects without affecting main stream
407
val withSideEffect = source.wireTap(Sink.foreach(x => println(s"Side: $x")))
408
```
409
410
## Monitoring and Lifecycle
411
412
### Stream Monitoring
413
414
```scala { .api }
415
trait FlowOps[+Out, +Mat] {
416
type ClosedMat[+M]
417
418
def watchTermination[Mat2]()(matF: (Mat, Future[Done]) => Mat2): ReprMat[Out, Mat2]
419
def monitor[Mat2]()(matF: (Mat, FlowMonitor[Out]) => Mat2): ReprMat[Out, Mat2]
420
}
421
```
422
423
### Usage Examples
424
425
```scala
426
val source = Source(1 to 10)
427
428
// Watch for completion
429
val withTerminationWatcher = source
430
.watchTermination() { (notUsed, done) =>
431
done.onComplete {
432
case Success(_) => println("Stream completed successfully")
433
case Failure(ex) => println(s"Stream failed: $ex")
434
}
435
notUsed
436
}
437
438
// Monitor stream state
439
val withMonitoring = source
440
.monitor() { (mat, monitor) =>
441
monitor.state.foreach { state =>
442
println(s"Stream state: $state")
443
}
444
mat
445
}
446
```