Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.
—
Comprehensive set of stream processing operations including mapping, filtering, grouping, timing, and error handling available on all stream components through the FlowOps trait.
All stream operations are available through the FlowOps trait, which is mixed into Source, Flow, and SubFlow classes.
trait FlowOps[+Out, +Mat] {
type Repr[+O] <: FlowOps[O, Mat]
// Core transformation methods
def map[T](f: Out => T): Repr[T]
def filter(p: Out => Boolean): Repr[Out]
def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]
def mapConcat[T](f: Out => immutable.Iterable[T]): Repr[T]
}Map and Filter:
trait FlowOps[+Out, +Mat] {
def map[T](f: Out => T): Repr[T]
def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]
def mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T]): Repr[T]
def mapConcat[T](f: Out => immutable.Iterable[T]): Repr[T]
def filter(p: Out => Boolean): Repr[Out]
def filterNot(p: Out => Boolean): Repr[Out]
}Collect with Partial Functions:
trait FlowOps[+Out, +Mat] {
def collect[T](pf: PartialFunction[Out, T]): Repr[T]
def collectType[T](implicit m: ClassTag[T]): Repr[T]
}import akka.stream.scaladsl.{Source, Flow}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val source = Source(1 to 10)
// Basic map
val doubled = source.map(_ * 2)
// Async map with parallelism
val asyncMapped = source.mapAsync(4) { n =>
Future {
Thread.sleep(100)
n * n
}
}
// Flat map with mapConcat
val exploded = source.mapConcat(n => List.fill(n)(n))
// Filter
val evenOnly = source.filter(_ % 2 == 0)
// Collect with partial function
val strings = Source(List(1, "hello", 2, "world", 3))
val onlyStrings = strings.collect {
case s: String => s.toUpperCase
}trait FlowOps[+Out, +Mat] {
def take(n: Long): Repr[Out]
def takeWhile(p: Out => Boolean): Repr[Out]
def takeWithin(d: FiniteDuration): Repr[Out]
def drop(n: Long): Repr[Out]
def dropWhile(p: Out => Boolean): Repr[Out]
def dropWithin(d: FiniteDuration): Repr[Out]
}trait FlowOps[+Out, +Mat] {
def limit(n: Long): Repr[Out]
def limitWeighted(n: Long)(costFn: Out => Long): Repr[Out]
def throttle(elements: Int, per: FiniteDuration): Repr[Out]
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out]
}import scala.concurrent.duration._
val source = Source(1 to 100)
// Take first 10 elements
val first10 = source.take(10)
// Take while condition is true
val whileLessThan50 = source.takeWhile(_ < 50)
// Take within time window
val within5Seconds = source.takeWithin(5.seconds)
// Rate limiting - max 10 elements per second
val throttled = source.throttle(10, 1.second)
// Burst throttling
val burstThrottled = source.throttle(
elements = 10,
per = 1.second,
maximumBurst = 5,
mode = ThrottleMode.Shaping
)trait FlowOps[+Out, +Mat] {
def grouped(n: Int): Repr[immutable.Seq[Out]]
def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]]
def groupedWeighted(minWeight: Long)(costFn: Out => Long): Repr[List[Out]]
def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Repr, Closed]
}trait FlowOps[+Out, +Mat] {
def batch[S](max: Long, seed: Out => S)(aggregate: (S, Out) => S): Repr[S]
def batchWeighted[S](max: Long, costFn: Out => Long, seed: Out => S)(aggregate: (S, Out) => S): Repr[S]
}import scala.concurrent.duration._
val source = Source(1 to 100)
// Group into batches of 10
val groups = source.grouped(10)
// Group by time or count (whichever comes first)
val timeGroups = source.groupedWithin(10, 1.second)
// Group by key into substreams
val byParity = source.groupBy(2, _ % 2)
.map(_ * 2)
.mergeSubstreams
// Batch with custom aggregation
val batched = source.batch(
max = 10,
seed = n => List(n),
aggregate = (acc, n) => n :: acc
)trait FlowOps[+Out, +Mat] {
def delay(d: FiniteDuration): Repr[Out]
def delayWith(delayStrategySupplier: () => DelayStrategy[Out]): Repr[Out]
def initialDelay(d: FiniteDuration): Repr[Out]
def idleTimeout(timeout: FiniteDuration): Repr[Out]
def completionTimeout(timeout: FiniteDuration): Repr[Out]
def backpressureTimeout(timeout: FiniteDuration): Repr[Out]
}trait FlowOps[+Out, +Mat] {
def keepAlive(maxIdle: FiniteDuration, injectedElem: () => Out): Repr[Out]
}import scala.concurrent.duration._
val source = Source(1 to 10)
// Delay each element by 1 second
val delayed = source.delay(1.second)
// Initial delay before first element
val withInitialDelay = source.initialDelay(5.seconds)
// Timeout if no elements for 30 seconds
val withIdleTimeout = source.idleTimeout(30.seconds)
// Keep alive by injecting elements
val keepAlive = source.keepAlive(10.seconds, () => 0)trait FlowOps[+Out, +Mat] {
def scan[T](zero: T)(f: (T, Out) => T): Repr[T]
def scanAsync[T](zero: T)(f: (T, Out) => Future[T]): Repr[T]
def fold[T](zero: T)(f: (T, Out) => T): Repr[T]
def foldAsync[T](zero: T)(f: (T, Out) => Future[T]): Repr[T]
def reduce(f: (Out, Out) => Out): Repr[Out]
}val source = Source(1 to 10)
// Running sum (scan emits intermediate results)
val runningSums = source.scan(0)(_ + _)
// Emits: 0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55
// Final sum only (fold emits only final result)
val totalSum = source.fold(0)(_ + _)
// Emits: 55
// Reduce without initial value
val product = source.reduce(_ * _)
// Emits: 3628800
// Async accumulation
val asyncSum = source.scanAsync(0) { (acc, n) =>
Future.successful(acc + n)
}trait FlowOps[+Out, +Mat] {
def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out]
def conflate[S](seed: Out => S)(aggregate: (S, Out) => S): Repr[S]
def conflateWithSeed[S](seed: Out => S)(aggregate: (S, Out) => S): Repr[S]
def expand[U](extrapolate: Out => Iterator[U]): Repr[U]
}import akka.stream.OverflowStrategy
val source = Source(1 to 100)
// Buffer with overflow strategy
val buffered = source.buffer(10, OverflowStrategy.dropHead)
// Conflate (combine) when downstream is slow
val conflated = source.conflate(identity)(_ + _)
// Expand elements when downstream is fast
val expanded = source.expand(n => Iterator.fill(3)(n))trait FlowOps[+Out, +Mat] {
def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T]
def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T]
def mapError(f: Throwable => Throwable): Repr[Out]
}val source = Source(List("1", "2", "abc", "4"))
// Recover from parsing errors
val safeParseInt = source
.map(_.toInt) // This will fail on "abc"
.recover {
case _: NumberFormatException => -1
}
// Recover with retries using alternative source
val withRetries = source
.map(_.toInt)
.recoverWithRetries(3, {
case _: NumberFormatException => Source.single(0)
})trait FlowOps[+Out, +Mat] {
type Closed
def splitWhen(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed]
def splitAfter(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed]
def splitWhen[U](substreamCancelStrategy: SubstreamCancelStrategy)(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed]
}final class SubFlow[+Out, +Mat, +F[+_, +_], +C] {
def mergeSubstreams: F[Out, Mat]
def mergeSubstreamsWithParallelism(parallelism: Int): F[Out, Mat]
def concatSubstreams: F[Out, Mat]
// All FlowOps methods are available on SubFlow
def map[T](f: Out => T): SubFlow[T, Mat, F, C]
def filter(p: Out => Boolean): SubFlow[Out, Mat, F, C]
// ... etc
}val source = Source(1 to 20)
// Split into substreams when element is divisible by 5
val substreams = source
.splitWhen(_ % 5 == 0)
.map(_ * 2) // Process each substream
.mergeSubstreams // Merge back to single stream
// Split after condition and concatenate results
val splitAfter = source
.splitAfter(_ % 7 == 0)
.fold(0)(_ + _) // Sum each substream
.concatSubstreamstrait FlowOps[+Out, +Mat] {
def log(name: String): Repr[Out]
def log(name: String, extract: Out => Any): Repr[Out]
def wireTap(sink: Graph[SinkShape[Out], _]): Repr[Out]
def alsoTo(sink: Graph[SinkShape[Out], _]): Repr[Out]
}trait FlowOps[+Out, +Mat] {
def intersperse[T >: Out](inject: T): Repr[T]
def intersperse[T >: Out](start: T, inject: T, end: T): Repr[T]
}val source = Source(List("a", "b", "c"))
// Add logging
val logged = source.log("my-stream")
// Intersperse with separator
val withSeparator = source.intersperse(",")
// Result: "a", ",", "b", ",", "c"
// With start and end
val withBrackets = source.intersperse("[", ",", "]")
// Result: "[", "a", ",", "b", ",", "c", "]"
// Wire tap for side effects without affecting main stream
val withSideEffect = source.wireTap(Sink.foreach(x => println(s"Side: $x")))trait FlowOps[+Out, +Mat] {
type ClosedMat[+M]
def watchTermination[Mat2]()(matF: (Mat, Future[Done]) => Mat2): ReprMat[Out, Mat2]
def monitor[Mat2]()(matF: (Mat, FlowMonitor[Out]) => Mat2): ReprMat[Out, Mat2]
}val source = Source(1 to 10)
// Watch for completion
val withTerminationWatcher = source
.watchTermination() { (notUsed, done) =>
done.onComplete {
case Success(_) => println("Stream completed successfully")
case Failure(ex) => println(s"Stream failed: $ex")
}
notUsed
}
// Monitor stream state
val withMonitoring = source
.monitor() { (mat, monitor) =>
monitor.state.foreach { state =>
println(s"Stream state: $state")
}
mat
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5