Operations for merging, zipping, concatenating, and broadcasting streams to create complex data flow topologies. These operations enable composition of multiple streams into sophisticated processing pipelines.
Combine multiple streams by merging their elements in arrival order or with specific strategies.
/**
* Merge this stream with another stream
* @param other Stream to merge with
* @param eagerComplete Complete when any input completes (default: true)
* @return Stream containing elements from both inputs in arrival order
*/
def merge[U >: Out](other: Graph[SourceShape[U], _], eagerComplete: Boolean = true): Source[U, Mat]
/**
* Merge with priority for this stream over the other
* @param other Stream to merge with (lower priority)
* @param preferred Number of elements to emit from this stream before considering other
* @return Stream with preferred emission from this stream
*/
def mergePreferred[U >: Out](other: Graph[SourceShape[U], _], preferred: Int): Source[U, Mat]
/**
* Merge multiple streams with a custom merge strategy
* @param those Additional streams to merge
* @param strategy Custom merging strategy
* @return Stream with elements merged according to strategy
*/
def mergeSorted[U >: Out](other: Graph[SourceShape[U], _])(implicit ord: Ordering[U]): Source[U, Mat]Usage Examples:
import akka.stream.scaladsl.Source
// Simple merge
val stream1 = Source(1 to 5)
val stream2 = Source(6 to 10)
val merged = stream1.merge(stream2) // Elements in arrival order
// Merge with preference
val fastStream = Source.tick(100.millis, 100.millis, "fast")
val slowStream = Source.tick(500.millis, 500.millis, "slow")
val preferred = fastStream.mergePreferred(slowStream, 3) // 3 fast, then 1 slow
// Sorted merge
val sorted1 = Source(List(1, 3, 5, 7))
val sorted2 = Source(List(2, 4, 6, 8))
val sortedMerge = sorted1.mergeSorted(sorted2) // Maintains sorted orderCombine streams sequentially, processing one completely before the next.
/**
* Concatenate this stream with another stream
* @param other Stream to append after this stream completes
* @return Stream that emits all elements from this stream, then all from other
*/
def concat[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]
/**
* Prepend elements or another stream before this stream
* @param other Stream to emit before this stream
* @return Stream that emits other stream first, then this stream
*/
def prepend[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]
/**
* Concatenate streams from a source of sources
* @param sources Stream that emits source graphs
* @return Stream that concatenates all emitted sources
*/
def flatMapConcat[T, M](f: Out => Graph[SourceShape[T], M]): Source[T, Mat]Usage Examples:
// Sequential concatenation
val first = Source(1 to 3)
val second = Source(4 to 6)
val concatenated = first.concat(second) // Emits: 1,2,3,4,5,6
// Prepending
val main = Source(List("World", "!"))
val greeting = Source.single("Hello ")
val combined = main.prepend(greeting) // Emits: "Hello ", "World", "!"
// Flat map concat
Source(List("file1.txt", "file2.txt"))
.flatMapConcat(filename => Source.fromIterator(() => readFileLines(filename)))
.runWith(Sink.seq)Combine streams by pairing elements from multiple inputs.
/**
* Zip this stream with another stream into tuples
* @param other Stream to zip with
* @return Stream of tuples containing paired elements
*/
def zip[U](other: Graph[SourceShape[U], _]): Source[(Out, U), Mat]
/**
* Zip streams using a custom combine function
* @param other Stream to zip with
* @param f Function to combine paired elements
* @return Stream with combined elements
*/
def zipWith[U, T](other: Graph[SourceShape[U], _])(f: (Out, U) => T): Source[T, Mat]
/**
* Zip with index, pairing each element with its position
* @return Stream of tuples with elements and their indices
*/
def zipWithIndex: Source[(Out, Long), Mat]
/**
* Zip but keep emitting from the longer stream after shorter completes
* @param other Stream to zip with
* @return Stream of tuples, with None for completed stream
*/
def zipAll[U, A >: Out, B >: U](other: Graph[SourceShape[U], _], thisElem: A, otherElem: B): Source[(A, B), Mat]Usage Examples:
// Basic zipping
val numbers = Source(1 to 5)
val letters = Source(List("a", "b", "c", "d", "e"))
val zipped = numbers.zip(letters) // (1,"a"), (2,"b"), (3,"c"), (4,"d"), (5,"e")
// Zip with custom function
val combined = numbers.zipWith(letters)((n, l) => s"$n:$l")
// Zip with index
Source(List("apple", "banana", "cherry"))
.zipWithIndex // ("apple", 0), ("banana", 1), ("cherry", 2)
.runWith(Sink.seq)
// Zip all (handle different lengths)
val short = Source(1 to 3)
val long = Source(List("a", "b", "c", "d", "e"))
short.zipAll(long, 0, "") // (1,"a"), (2,"b"), (3,"c"), (0,"d"), (0,"e")Split a single stream into multiple parallel streams.
/**
* Create a broadcast junction to split stream into multiple outputs
* @param outputCount Number of output streams
* @param eagerCancel Cancel upstream when any output cancels (default: false)
* @return Graph that broadcasts to multiple outputs
*/
def broadcast(outputCount: Int, eagerCancel: Boolean = false): Graph[UniformFanOutShape[Out, Out], NotUsed]
/**
* Balance elements across multiple outputs using round-robin
* @param outputCount Number of output streams
* @param waitForAllDownstreams Wait for all outputs to be ready (default: true)
* @return Graph that balances elements across outputs
*/
def balance[T](outputCount: Int, waitForAllDownstreams: Boolean = true): Graph[UniformFanOutShape[T, T], NotUsed]
/**
* Partition elements across outputs based on a predicate
* @param outputCount Number of output streams
* @param partitioner Function that returns output index for each element
* @return Graph that partitions elements to different outputs
*/
def partition[T](outputCount: Int, partitioner: T => Int): Graph[UniformFanOutShape[T, T], NotUsed]Usage Examples:
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Broadcast, Balance}
import akka.stream.{UniformFanOutShape, ClosedShape}
// Broadcasting to multiple sinks
val broadcastGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source = Source(1 to 10)
val broadcast = builder.add(Broadcast[Int](2))
val sink1 = Sink.foreach[Int](x => println(s"Sink1: $x"))
val sink2 = Sink.foreach[Int](x => println(s"Sink2: $x"))
source ~> broadcast ~> sink1
broadcast ~> sink2
ClosedShape
})
// Load balancing across processors
val balanceGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source = Source(1 to 100)
val balance = builder.add(Balance[Int](3))
val sink = Sink.foreach[Int](x => println(s"Processed: $x"))
source ~> balance ~> Flow[Int].map(_ * 2) ~> sink
balance ~> Flow[Int].map(_ * 3) ~> sink
balance ~> Flow[Int].map(_ * 4) ~> sink
ClosedShape
})Combine streams by alternating between sources or interleaving elements.
/**
* Interleave elements from this stream and another
* @param other Stream to interleave with
* @param segmentSize Number of elements to take from each stream in turn
* @param eagerClose Close when any stream closes (default: true)
* @return Stream with interleaved elements
*/
def interleave[U >: Out](other: Graph[SourceShape[U], _], segmentSize: Int, eagerClose: Boolean = true): Source[U, Mat]
/**
* Alternate between this stream and others based on a pattern
* @param others Other streams to alternate with
* @return Stream that alternates between all inputs
*/
def alsoTo[U >: Out](other: Graph[SinkShape[U], _]): Source[Out, Mat]Usage Examples:
// Interleaving streams
val evens = Source(List(2, 4, 6, 8))
val odds = Source(List(1, 3, 5, 7))
val interleaved = evens.interleave(odds, 2) // 2,4,1,3,6,8,5,7
// Also to (tee/tap pattern)
Source(1 to 10)
.alsoTo(Sink.foreach(x => println(s"Logging: $x"))) // Side effect
.map(_ * 2)
.runWith(Sink.seq) // Main processingBuild complex stream topologies using the GraphDSL for advanced fan-in/fan-out patterns.
/**
* GraphDSL for building complex stream graphs
*/
object GraphDSL {
def create[S <: Shape, Mat]()(buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed]
def create[S <: Shape, Mat, M1](g1: Graph[_, M1])(buildBlock: GraphDSL.Builder[M1] => S): Graph[S, M1]
// Builder provides graph construction operations
trait Builder[+Mat] {
def add[S <: Shape](graph: Graph[S, _]): S
// Implicit conversions for ~> operator
object Implicits {
implicit class PortOps[T](outlet: Outlet[T]) {
def ~>[U >: T](inlet: Inlet[U]): Unit
}
}
}
}
// Common graph shapes
trait UniformFanInShape[-T, +O] extends Shape
trait UniformFanOutShape[-I, +T] extends ShapeUsage Examples:
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Merge, Zip}
import akka.stream.{UniformFanInShape, ClosedShape}
// Complex merge pattern
val complexGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source1 = Source(1 to 10)
val source2 = Source(11 to 20)
val source3 = Source(21 to 30)
val merge = builder.add(Merge[Int](3))
val sink = Sink.foreach[Int](println)
source1 ~> merge ~> sink
source2 ~> merge
source3 ~> merge
ClosedShape
})
// Fan-in with zip
val zipGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source1 = Source(1 to 5)
val source2 = Source(List("a", "b", "c", "d", "e"))
val zip = builder.add(Zip[Int, String])
val sink = Sink.foreach[(Int, String)](println)
source1 ~> zip.in0
source2 ~> zip.in1
zip.out ~> sink
ClosedShape
})// Fan-out shapes
trait UniformFanOutShape[-I, +O] extends Shape {
def in: Inlet[I]
def outs: immutable.Seq[Outlet[O]]
}
// Fan-in shapes
trait UniformFanInShape[-I, +O] extends Shape {
def ins: immutable.Seq[Inlet[I]]
def out: Outlet[O]
}
// Specific shapes
final class BroadcastShape[T](val in: Inlet[T], val outs: immutable.Seq[Outlet[T]]) extends UniformFanOutShape[T, T]
final class MergeShape[T](val ins: immutable.Seq[Inlet[T]], val out: Outlet[T]) extends UniformFanInShape[T, T]
final class BalanceShape[T](val in: Inlet[T], val outs: immutable.Seq[Outlet[T]]) extends UniformFanOutShape[T, T]
final class ZipShape[A, B](val in0: Inlet[A], val in1: Inlet[B], val out: Outlet[(A, B)]) extends Shape