0
# Stream Combining
1
2
Operations for merging, zipping, concatenating, and broadcasting streams to create complex data flow topologies. These operations enable composition of multiple streams into sophisticated processing pipelines.
3
4
## Capabilities
5
6
### Stream Merging
7
8
Combine multiple streams by merging their elements in arrival order or with specific strategies.
9
10
```scala { .api }
11
/**
12
* Merge this stream with another stream
13
* @param other Stream to merge with
14
* @param eagerComplete Complete when any input completes (default: true)
15
* @return Stream containing elements from both inputs in arrival order
16
*/
17
def merge[U >: Out](other: Graph[SourceShape[U], _], eagerComplete: Boolean = true): Source[U, Mat]
18
19
/**
20
* Merge with priority for this stream over the other
21
* @param other Stream to merge with (lower priority)
22
* @param preferred Number of elements to emit from this stream before considering other
23
* @return Stream with preferred emission from this stream
24
*/
25
def mergePreferred[U >: Out](other: Graph[SourceShape[U], _], preferred: Int): Source[U, Mat]
26
27
/**
28
* Merge multiple streams with a custom merge strategy
29
* @param those Additional streams to merge
30
* @param strategy Custom merging strategy
31
* @return Stream with elements merged according to strategy
32
*/
33
def mergeSorted[U >: Out](other: Graph[SourceShape[U], _])(implicit ord: Ordering[U]): Source[U, Mat]
34
```
35
36
**Usage Examples:**
37
38
```scala
39
import akka.stream.scaladsl.Source
40
41
// Simple merge
42
val stream1 = Source(1 to 5)
43
val stream2 = Source(6 to 10)
44
val merged = stream1.merge(stream2) // Elements in arrival order
45
46
// Merge with preference
47
val fastStream = Source.tick(100.millis, 100.millis, "fast")
48
val slowStream = Source.tick(500.millis, 500.millis, "slow")
49
val preferred = fastStream.mergePreferred(slowStream, 3) // 3 fast, then 1 slow
50
51
// Sorted merge
52
val sorted1 = Source(List(1, 3, 5, 7))
53
val sorted2 = Source(List(2, 4, 6, 8))
54
val sortedMerge = sorted1.mergeSorted(sorted2) // Maintains sorted order
55
```
56
57
### Stream Concatenation
58
59
Combine streams sequentially, processing one completely before the next.
60
61
```scala { .api }
62
/**
63
* Concatenate this stream with another stream
64
* @param other Stream to append after this stream completes
65
* @return Stream that emits all elements from this stream, then all from other
66
*/
67
def concat[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]
68
69
/**
70
* Prepend elements or another stream before this stream
71
* @param other Stream to emit before this stream
72
* @return Stream that emits other stream first, then this stream
73
*/
74
def prepend[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]
75
76
/**
77
* Concatenate streams from a source of sources
78
* @param sources Stream that emits source graphs
79
* @return Stream that concatenates all emitted sources
80
*/
81
def flatMapConcat[T, M](f: Out => Graph[SourceShape[T], M]): Source[T, Mat]
82
```
83
84
**Usage Examples:**
85
86
```scala
87
// Sequential concatenation
88
val first = Source(1 to 3)
89
val second = Source(4 to 6)
90
val concatenated = first.concat(second) // Emits: 1,2,3,4,5,6
91
92
// Prepending
93
val main = Source(List("World", "!"))
94
val greeting = Source.single("Hello ")
95
val combined = main.prepend(greeting) // Emits: "Hello ", "World", "!"
96
97
// Flat map concat
98
Source(List("file1.txt", "file2.txt"))
99
.flatMapConcat(filename => Source.fromIterator(() => readFileLines(filename)))
100
.runWith(Sink.seq)
101
```
102
103
### Stream Zipping
104
105
Combine streams by pairing elements from multiple inputs.
106
107
```scala { .api }
108
/**
109
* Zip this stream with another stream into tuples
110
* @param other Stream to zip with
111
* @return Stream of tuples containing paired elements
112
*/
113
def zip[U](other: Graph[SourceShape[U], _]): Source[(Out, U), Mat]
114
115
/**
116
* Zip streams using a custom combine function
117
* @param other Stream to zip with
118
* @param f Function to combine paired elements
119
* @return Stream with combined elements
120
*/
121
def zipWith[U, T](other: Graph[SourceShape[U], _])(f: (Out, U) => T): Source[T, Mat]
122
123
/**
124
* Zip with index, pairing each element with its position
125
* @return Stream of tuples with elements and their indices
126
*/
127
def zipWithIndex: Source[(Out, Long), Mat]
128
129
/**
130
* Zip but keep emitting from the longer stream after shorter completes
131
* @param other Stream to zip with
132
* @return Stream of tuples, with None for completed stream
133
*/
134
def zipAll[U, A >: Out, B >: U](other: Graph[SourceShape[U], _], thisElem: A, otherElem: B): Source[(A, B), Mat]
135
```
136
137
**Usage Examples:**
138
139
```scala
140
// Basic zipping
141
val numbers = Source(1 to 5)
142
val letters = Source(List("a", "b", "c", "d", "e"))
143
val zipped = numbers.zip(letters) // (1,"a"), (2,"b"), (3,"c"), (4,"d"), (5,"e")
144
145
// Zip with custom function
146
val combined = numbers.zipWith(letters)((n, l) => s"$n:$l")
147
148
// Zip with index
149
Source(List("apple", "banana", "cherry"))
150
.zipWithIndex // ("apple", 0), ("banana", 1), ("cherry", 2)
151
.runWith(Sink.seq)
152
153
// Zip all (handle different lengths)
154
val short = Source(1 to 3)
155
val long = Source(List("a", "b", "c", "d", "e"))
156
short.zipAll(long, 0, "") // (1,"a"), (2,"b"), (3,"c"), (0,"d"), (0,"e")
157
```
158
159
### Broadcasting and Fan-out
160
161
Split a single stream into multiple parallel streams.
162
163
```scala { .api }
164
/**
165
* Create a broadcast junction to split stream into multiple outputs
166
* @param outputCount Number of output streams
167
* @param eagerCancel Cancel upstream when any output cancels (default: false)
168
* @return Graph that broadcasts to multiple outputs
169
*/
170
def broadcast(outputCount: Int, eagerCancel: Boolean = false): Graph[UniformFanOutShape[Out, Out], NotUsed]
171
172
/**
173
* Balance elements across multiple outputs using round-robin
174
* @param outputCount Number of output streams
175
* @param waitForAllDownstreams Wait for all outputs to be ready (default: true)
176
* @return Graph that balances elements across outputs
177
*/
178
def balance[T](outputCount: Int, waitForAllDownstreams: Boolean = true): Graph[UniformFanOutShape[T, T], NotUsed]
179
180
/**
181
* Partition elements across outputs based on a predicate
182
* @param outputCount Number of output streams
183
* @param partitioner Function that returns output index for each element
184
* @return Graph that partitions elements to different outputs
185
*/
186
def partition[T](outputCount: Int, partitioner: T => Int): Graph[UniformFanOutShape[T, T], NotUsed]
187
```
188
189
**Usage Examples:**
190
191
```scala
192
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Broadcast, Balance}
193
import akka.stream.{UniformFanOutShape, ClosedShape}
194
195
// Broadcasting to multiple sinks
196
val broadcastGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
197
import GraphDSL.Implicits._
198
199
val source = Source(1 to 10)
200
val broadcast = builder.add(Broadcast[Int](2))
201
val sink1 = Sink.foreach[Int](x => println(s"Sink1: $x"))
202
val sink2 = Sink.foreach[Int](x => println(s"Sink2: $x"))
203
204
source ~> broadcast ~> sink1
205
broadcast ~> sink2
206
207
ClosedShape
208
})
209
210
// Load balancing across processors
211
val balanceGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
212
import GraphDSL.Implicits._
213
214
val source = Source(1 to 100)
215
val balance = builder.add(Balance[Int](3))
216
val sink = Sink.foreach[Int](x => println(s"Processed: $x"))
217
218
source ~> balance ~> Flow[Int].map(_ * 2) ~> sink
219
balance ~> Flow[Int].map(_ * 3) ~> sink
220
balance ~> Flow[Int].map(_ * 4) ~> sink
221
222
ClosedShape
223
})
224
```
225
226
### Interleaving and Alternating
227
228
Combine streams by alternating between sources or interleaving elements.
229
230
```scala { .api }
231
/**
232
* Interleave elements from this stream and another
233
* @param other Stream to interleave with
234
* @param segmentSize Number of elements to take from each stream in turn
235
* @param eagerClose Close when any stream closes (default: true)
236
* @return Stream with interleaved elements
237
*/
238
def interleave[U >: Out](other: Graph[SourceShape[U], _], segmentSize: Int, eagerClose: Boolean = true): Source[U, Mat]
239
240
/**
241
* Alternate between this stream and others based on a pattern
242
* @param others Other streams to alternate with
243
* @return Stream that alternates between all inputs
244
*/
245
def alsoTo[U >: Out](other: Graph[SinkShape[U], _]): Source[Out, Mat]
246
```
247
248
**Usage Examples:**
249
250
```scala
251
// Interleaving streams
252
val evens = Source(List(2, 4, 6, 8))
253
val odds = Source(List(1, 3, 5, 7))
254
val interleaved = evens.interleave(odds, 2) // 2,4,1,3,6,8,5,7
255
256
// Also to (tee/tap pattern)
257
Source(1 to 10)
258
.alsoTo(Sink.foreach(x => println(s"Logging: $x"))) // Side effect
259
.map(_ * 2)
260
.runWith(Sink.seq) // Main processing
261
```
262
263
### Complex Graph Construction
264
265
Build complex stream topologies using the GraphDSL for advanced fan-in/fan-out patterns.
266
267
```scala { .api }
268
/**
269
* GraphDSL for building complex stream graphs
270
*/
271
object GraphDSL {
272
def create[S <: Shape, Mat]()(buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed]
273
def create[S <: Shape, Mat, M1](g1: Graph[_, M1])(buildBlock: GraphDSL.Builder[M1] => S): Graph[S, M1]
274
275
// Builder provides graph construction operations
276
trait Builder[+Mat] {
277
def add[S <: Shape](graph: Graph[S, _]): S
278
279
// Implicit conversions for ~> operator
280
object Implicits {
281
implicit class PortOps[T](outlet: Outlet[T]) {
282
def ~>[U >: T](inlet: Inlet[U]): Unit
283
}
284
}
285
}
286
}
287
288
// Common graph shapes
289
trait UniformFanInShape[-T, +O] extends Shape
290
trait UniformFanOutShape[-I, +T] extends Shape
291
```
292
293
**Usage Examples:**
294
295
```scala
296
import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Merge, Zip}
297
import akka.stream.{UniformFanInShape, ClosedShape}
298
299
// Complex merge pattern
300
val complexGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
301
import GraphDSL.Implicits._
302
303
val source1 = Source(1 to 10)
304
val source2 = Source(11 to 20)
305
val source3 = Source(21 to 30)
306
307
val merge = builder.add(Merge[Int](3))
308
val sink = Sink.foreach[Int](println)
309
310
source1 ~> merge ~> sink
311
source2 ~> merge
312
source3 ~> merge
313
314
ClosedShape
315
})
316
317
// Fan-in with zip
318
val zipGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
319
import GraphDSL.Implicits._
320
321
val source1 = Source(1 to 5)
322
val source2 = Source(List("a", "b", "c", "d", "e"))
323
val zip = builder.add(Zip[Int, String])
324
val sink = Sink.foreach[(Int, String)](println)
325
326
source1 ~> zip.in0
327
source2 ~> zip.in1
328
zip.out ~> sink
329
330
ClosedShape
331
})
332
```
333
334
## Types
335
336
```scala { .api }
337
// Fan-out shapes
338
trait UniformFanOutShape[-I, +O] extends Shape {
339
def in: Inlet[I]
340
def outs: immutable.Seq[Outlet[O]]
341
}
342
343
// Fan-in shapes
344
trait UniformFanInShape[-I, +O] extends Shape {
345
def ins: immutable.Seq[Inlet[I]]
346
def out: Outlet[O]
347
}
348
349
// Specific shapes
350
final class BroadcastShape[T](val in: Inlet[T], val outs: immutable.Seq[Outlet[T]]) extends UniformFanOutShape[T, T]
351
final class MergeShape[T](val ins: immutable.Seq[Inlet[T]], val out: Outlet[T]) extends UniformFanInShape[T, T]
352
final class BalanceShape[T](val in: Inlet[T], val outs: immutable.Seq[Outlet[T]]) extends UniformFanOutShape[T, T]
353
final class ZipShape[A, B](val in0: Inlet[A], val in1: Inlet[B], val out: Outlet[(A, B)]) extends Shape
354
```