Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.
—
Stream junction operators for merging, broadcasting, zipping, and partitioning multiple streams.
class Merge[T](val inputPorts: Int, val eagerComplete: Boolean = false)
extends GraphStage[UniformFanInShape[T, T]]Factory Methods:
object Merge {
def apply[T](inputPorts: Int, eagerComplete: Boolean = false): Merge[T]
}Usage Example:
import akka.stream.scaladsl.{Source, Merge, GraphDSL, RunnableGraph}
import akka.stream.ClosedShape
val graph = GraphDSL.create() { implicit builder =>
val source1 = builder.add(Source(1 to 3))
val source2 = builder.add(Source(4 to 6))
val source3 = builder.add(Source(7 to 9))
val merge = builder.add(Merge[Int](3))
val sink = builder.add(Sink.foreach[Int](println))
source1 ~> merge
source2 ~> merge
source3 ~> merge
merge ~> sink
ClosedShape
}class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolean = false)
extends GraphStage[MergePreferred.MergePreferredShape[T]]Usage Example:
import akka.stream.scaladsl.MergePreferred
val priorityMerge = GraphDSL.create() { implicit builder =>
val highPriority = builder.add(Source.single("URGENT"))
val normalSource = builder.add(Source(List("normal1", "normal2")))
val merge = builder.add(MergePreferred[String](1)) // 1 secondary port
val sink = builder.add(Sink.foreach[String](println))
highPriority ~> merge.preferred
normalSource ~> merge.in(0)
merge ~> sink
ClosedShape
}class MergePrioritized[T](val priorities: Seq[Int], val eagerComplete: Boolean = false)
extends GraphStage[UniformFanInShape[T, T]]class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean = false)
extends GraphStage[UniformFanOutShape[T, T]]Factory Methods:
object Broadcast {
def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T]
}Usage Example:
import akka.stream.scaladsl.Broadcast
val broadcastGraph = GraphDSL.create() { implicit builder =>
val source = builder.add(Source(1 to 10))
val broadcast = builder.add(Broadcast[Int](3))
val sink1 = builder.add(Sink.foreach[Int](x => println(s"Sink1: $x")))
val sink2 = builder.add(Sink.foreach[Int](x => println(s"Sink2: $x")))
val sink3 = builder.add(Sink.foreach[Int](x => println(s"Sink3: $x")))
source ~> broadcast
broadcast ~> sink1
broadcast ~> sink2
broadcast ~> sink3
ClosedShape
}class Balance[T](
val outputPorts: Int,
val waitForAllDownstreams: Boolean = false,
val eagerCancel: Boolean = false
) extends GraphStage[UniformFanOutShape[T, T]]Usage Example:
import akka.stream.scaladsl.Balance
val balanceGraph = GraphDSL.create() { implicit builder =>
val source = builder.add(Source(1 to 20))
val balance = builder.add(Balance[Int](3)) // Distribute across 3 workers
val worker1 = builder.add(Flow[Int].map(x => s"Worker1: $x"))
val worker2 = builder.add(Flow[Int].map(x => s"Worker2: $x"))
val worker3 = builder.add(Flow[Int].map(x => s"Worker3: $x"))
val merge = builder.add(Merge[String](3))
val sink = builder.add(Sink.foreach[String](println))
source ~> balance
balance ~> worker1 ~> merge
balance ~> worker2 ~> merge
balance ~> worker3 ~> merge
merge ~> sink
ClosedShape
}class Partition[T](
val outputPorts: Int,
val partitioner: T => Int,
val eagerCancel: Boolean = false
) extends GraphStage[UniformFanOutShape[T, T]]Usage Example:
import akka.stream.scaladsl.Partition
val partitionGraph = GraphDSL.create() { implicit builder =>
val source = builder.add(Source(1 to 20))
val partition = builder.add(Partition[Int](3, _ % 3)) // Partition by modulo 3
val sink0 = builder.add(Sink.foreach[Int](x => println(s"Mod0: $x")))
val sink1 = builder.add(Sink.foreach[Int](x => println(s"Mod1: $x")))
val sink2 = builder.add(Sink.foreach[Int](x => println(s"Mod2: $x")))
source ~> partition
partition ~> sink0
partition ~> sink1
partition ~> sink2
ClosedShape
}final class Zip[A, B] extends ZipWith2[A, B, (A, B)](Tuple2.apply)Factory Methods:
object Zip {
def apply[A, B](): Zip[A, B]
}Usage Example:
import akka.stream.scaladsl.Zip
val zipGraph = GraphDSL.create() { implicit builder =>
val source1 = builder.add(Source(1 to 5))
val source2 = builder.add(Source(List("a", "b", "c", "d", "e")))
val zip = builder.add(Zip[Int, String]())
val sink = builder.add(Sink.foreach[(Int, String)](println))
source1 ~> zip.in0
source2 ~> zip.in1
zip.out ~> sink
ClosedShape
}
// Output: (1,a), (2,b), (3,c), (4,d), (5,e)class ZipWith2[A, B, C](zipper: (A, B) => C) extends GraphStage[FanInShape2[A, B, C]]Usage Example:
import akka.stream.scaladsl.ZipWith
val zipWithGraph = GraphDSL.create() { implicit builder =>
val numbers = builder.add(Source(1 to 5))
val strings = builder.add(Source(List("one", "two", "three", "four", "five")))
val zipWith = builder.add(ZipWith[Int, String, String]((n, s) => s"$n-$s"))
val sink = builder.add(Sink.foreach[String](println))
numbers ~> zipWith.in0
strings ~> zipWith.in1
zipWith.out ~> sink
ClosedShape
}
// Output: "1-one", "2-two", "3-three", "4-four", "5-five"class ZipN[A](n: Int) extends GraphStage[UniformFanInShape[A, immutable.Seq[A]]]Usage Example:
import akka.stream.scaladsl.ZipN
val zipNGraph = GraphDSL.create() { implicit builder =>
val source1 = builder.add(Source(1 to 3))
val source2 = builder.add(Source(4 to 6))
val source3 = builder.add(Source(7 to 9))
val zipN = builder.add(ZipN[Int](3))
val sink = builder.add(Sink.foreach[Seq[Int]](println))
source1 ~> zipN.in(0)
source2 ~> zipN.in(1)
source3 ~> zipN.in(2)
zipN.out ~> sink
ClosedShape
}
// Output: List(1,4,7), List(2,5,8), List(3,6,9)final class ZipLatest[A, B] extends ZipLatestWith2[A, B, (A, B)](Tuple2.apply)Usage Example:
import akka.stream.scaladsl.ZipLatest
import scala.concurrent.duration._
val zipLatestGraph = GraphDSL.create() { implicit builder =>
val fastSource = builder.add(Source.tick(100.millis, 100.millis, "fast"))
val slowSource = builder.add(Source.tick(300.millis, 300.millis, "slow"))
val zipLatest = builder.add(ZipLatest[String, String]())
val sink = builder.add(Sink.foreach[(String, String)](println))
fastSource ~> zipLatest.in0
slowSource ~> zipLatest.in1
zipLatest.out ~> sink
ClosedShape
}final class Unzip[A, B]() extends UnzipWith[(A, B), A, B]Usage Example:
import akka.stream.scaladsl.Unzip
val unzipGraph = GraphDSL.create() { implicit builder =>
val source = builder.add(Source(List((1, "a"), (2, "b"), (3, "c"))))
val unzip = builder.add(Unzip[Int, String]())
val sink1 = builder.add(Sink.foreach[Int](x => println(s"Numbers: $x")))
val sink2 = builder.add(Sink.foreach[String](x => println(s"Strings: $x")))
source ~> unzip.in
unzip.out0 ~> sink1
unzip.out1 ~> sink2
ClosedShape
}class UnzipWith[In, A, B](unzipper: In => (A, B)) extends GraphStage[FanOutShape2[In, A, B]]Usage Example:
import akka.stream.scaladsl.UnzipWith
case class Person(name: String, age: Int)
val unzipWithGraph = GraphDSL.create() { implicit builder =>
val people = builder.add(Source(List(
Person("Alice", 25),
Person("Bob", 30),
Person("Charlie", 35)
)))
val unzipWith = builder.add(UnzipWith[Person, String, Int](p => (p.name, p.age)))
val namesSink = builder.add(Sink.foreach[String](name => println(s"Name: $name")))
val agesSink = builder.add(Sink.foreach[Int](age => println(s"Age: $age")))
people ~> unzipWith.in
unzipWith.out0 ~> namesSink
unzipWith.out1 ~> agesSink
ClosedShape
}val fanOutInGraph = GraphDSL.create() { implicit builder =>
val source = builder.add(Source(1 to 10))
val broadcast = builder.add(Broadcast[Int](2))
val evenFilter = builder.add(Flow[Int].filter(_ % 2 == 0))
val oddFilter = builder.add(Flow[Int].filter(_ % 2 == 1))
val merge = builder.add(Merge[Int](2))
val sink = builder.add(Sink.foreach[Int](println))
source ~> broadcast
broadcast ~> evenFilter ~> merge
broadcast ~> oddFilter ~> merge
merge ~> sink
ClosedShape
}val parallelProcessing = GraphDSL.create() { implicit builder =>
val source = builder.add(Source(1 to 100))
val balance = builder.add(Balance[Int](4)) // 4 parallel workers
val merge = builder.add(Merge[Int](4))
val sink = builder.add(Sink.foreach[Int](println))
// Heavy processing flow
val heavyProcess = Flow[Int].map { x =>
Thread.sleep(100) // Simulate work
x * x
}
source ~> balance
// Connect 4 parallel workers
for (i <- 0 until 4) {
balance ~> builder.add(heavyProcess) ~> merge
}
merge ~> sink
ClosedShape
}Junction operations enable complex stream topologies while maintaining backpressure and type safety throughout the graph.
Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5