0
# Junction Operations
1
2
Stream junction operators for merging, broadcasting, zipping, and partitioning multiple streams.
3
4
## Merge Operations
5
6
### Merge - Basic Multi-Input Merge
7
8
```scala { .api }
9
class Merge[T](val inputPorts: Int, val eagerComplete: Boolean = false)
10
extends GraphStage[UniformFanInShape[T, T]]
11
```
12
13
**Factory Methods:**
14
```scala { .api }
15
object Merge {
16
def apply[T](inputPorts: Int, eagerComplete: Boolean = false): Merge[T]
17
}
18
```
19
20
**Usage Example:**
21
```scala
22
import akka.stream.scaladsl.{Source, Merge, GraphDSL, RunnableGraph}
23
import akka.stream.ClosedShape
24
25
val graph = GraphDSL.create() { implicit builder =>
26
val source1 = builder.add(Source(1 to 3))
27
val source2 = builder.add(Source(4 to 6))
28
val source3 = builder.add(Source(7 to 9))
29
val merge = builder.add(Merge[Int](3))
30
val sink = builder.add(Sink.foreach[Int](println))
31
32
source1 ~> merge
33
source2 ~> merge
34
source3 ~> merge
35
merge ~> sink
36
37
ClosedShape
38
}
39
```
40
41
### MergePreferred - Priority Merge
42
43
```scala { .api }
44
class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolean = false)
45
extends GraphStage[MergePreferred.MergePreferredShape[T]]
46
```
47
48
**Usage Example:**
49
```scala
50
import akka.stream.scaladsl.MergePreferred
51
52
val priorityMerge = GraphDSL.create() { implicit builder =>
53
val highPriority = builder.add(Source.single("URGENT"))
54
val normalSource = builder.add(Source(List("normal1", "normal2")))
55
val merge = builder.add(MergePreferred[String](1)) // 1 secondary port
56
val sink = builder.add(Sink.foreach[String](println))
57
58
highPriority ~> merge.preferred
59
normalSource ~> merge.in(0)
60
merge ~> sink
61
62
ClosedShape
63
}
64
```
65
66
### MergePrioritized - Weighted Priority Merge
67
68
```scala { .api }
69
class MergePrioritized[T](val priorities: Seq[Int], val eagerComplete: Boolean = false)
70
extends GraphStage[UniformFanInShape[T, T]]
71
```
72
73
## Broadcast Operations
74
75
### Broadcast - Multi-Output Distribution
76
77
```scala { .api }
78
class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean = false)
79
extends GraphStage[UniformFanOutShape[T, T]]
80
```
81
82
**Factory Methods:**
83
```scala { .api }
84
object Broadcast {
85
def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T]
86
}
87
```
88
89
**Usage Example:**
90
```scala
91
import akka.stream.scaladsl.Broadcast
92
93
val broadcastGraph = GraphDSL.create() { implicit builder =>
94
val source = builder.add(Source(1 to 10))
95
val broadcast = builder.add(Broadcast[Int](3))
96
val sink1 = builder.add(Sink.foreach[Int](x => println(s"Sink1: $x")))
97
val sink2 = builder.add(Sink.foreach[Int](x => println(s"Sink2: $x")))
98
val sink3 = builder.add(Sink.foreach[Int](x => println(s"Sink3: $x")))
99
100
source ~> broadcast
101
broadcast ~> sink1
102
broadcast ~> sink2
103
broadcast ~> sink3
104
105
ClosedShape
106
}
107
```
108
109
### Balance - Load Balancing Distribution
110
111
```scala { .api }
112
class Balance[T](
113
val outputPorts: Int,
114
val waitForAllDownstreams: Boolean = false,
115
val eagerCancel: Boolean = false
116
) extends GraphStage[UniformFanOutShape[T, T]]
117
```
118
119
**Usage Example:**
120
```scala
121
import akka.stream.scaladsl.Balance
122
123
val balanceGraph = GraphDSL.create() { implicit builder =>
124
val source = builder.add(Source(1 to 20))
125
val balance = builder.add(Balance[Int](3)) // Distribute across 3 workers
126
val worker1 = builder.add(Flow[Int].map(x => s"Worker1: $x"))
127
val worker2 = builder.add(Flow[Int].map(x => s"Worker2: $x"))
128
val worker3 = builder.add(Flow[Int].map(x => s"Worker3: $x"))
129
val merge = builder.add(Merge[String](3))
130
val sink = builder.add(Sink.foreach[String](println))
131
132
source ~> balance
133
balance ~> worker1 ~> merge
134
balance ~> worker2 ~> merge
135
balance ~> worker3 ~> merge
136
merge ~> sink
137
138
ClosedShape
139
}
140
```
141
142
### Partition - Conditional Distribution
143
144
```scala { .api }
145
class Partition[T](
146
val outputPorts: Int,
147
val partitioner: T => Int,
148
val eagerCancel: Boolean = false
149
) extends GraphStage[UniformFanOutShape[T, T]]
150
```
151
152
**Usage Example:**
153
```scala
154
import akka.stream.scaladsl.Partition
155
156
val partitionGraph = GraphDSL.create() { implicit builder =>
157
val source = builder.add(Source(1 to 20))
158
val partition = builder.add(Partition[Int](3, _ % 3)) // Partition by modulo 3
159
val sink0 = builder.add(Sink.foreach[Int](x => println(s"Mod0: $x")))
160
val sink1 = builder.add(Sink.foreach[Int](x => println(s"Mod1: $x")))
161
val sink2 = builder.add(Sink.foreach[Int](x => println(s"Mod2: $x")))
162
163
source ~> partition
164
partition ~> sink0
165
partition ~> sink1
166
partition ~> sink2
167
168
ClosedShape
169
}
170
```
171
172
## Zip Operations
173
174
### Zip - Combine Two Streams
175
176
```scala { .api }
177
final class Zip[A, B] extends ZipWith2[A, B, (A, B)](Tuple2.apply)
178
```
179
180
**Factory Methods:**
181
```scala { .api }
182
object Zip {
183
def apply[A, B](): Zip[A, B]
184
}
185
```
186
187
**Usage Example:**
188
```scala
189
import akka.stream.scaladsl.Zip
190
191
val zipGraph = GraphDSL.create() { implicit builder =>
192
val source1 = builder.add(Source(1 to 5))
193
val source2 = builder.add(Source(List("a", "b", "c", "d", "e")))
194
val zip = builder.add(Zip[Int, String]())
195
val sink = builder.add(Sink.foreach[(Int, String)](println))
196
197
source1 ~> zip.in0
198
source2 ~> zip.in1
199
zip.out ~> sink
200
201
ClosedShape
202
}
203
// Output: (1,a), (2,b), (3,c), (4,d), (5,e)
204
```
205
206
### ZipWith - Custom Zip Function
207
208
```scala { .api }
209
class ZipWith2[A, B, C](zipper: (A, B) => C) extends GraphStage[FanInShape2[A, B, C]]
210
```
211
212
**Usage Example:**
213
```scala
214
import akka.stream.scaladsl.ZipWith
215
216
val zipWithGraph = GraphDSL.create() { implicit builder =>
217
val numbers = builder.add(Source(1 to 5))
218
val strings = builder.add(Source(List("one", "two", "three", "four", "five")))
219
val zipWith = builder.add(ZipWith[Int, String, String]((n, s) => s"$n-$s"))
220
val sink = builder.add(Sink.foreach[String](println))
221
222
numbers ~> zipWith.in0
223
strings ~> zipWith.in1
224
zipWith.out ~> sink
225
226
ClosedShape
227
}
228
// Output: "1-one", "2-two", "3-three", "4-four", "5-five"
229
```
230
231
### ZipN - Multiple Stream Zip
232
233
```scala { .api }
234
class ZipN[A](n: Int) extends GraphStage[UniformFanInShape[A, immutable.Seq[A]]]
235
```
236
237
**Usage Example:**
238
```scala
239
import akka.stream.scaladsl.ZipN
240
241
val zipNGraph = GraphDSL.create() { implicit builder =>
242
val source1 = builder.add(Source(1 to 3))
243
val source2 = builder.add(Source(4 to 6))
244
val source3 = builder.add(Source(7 to 9))
245
val zipN = builder.add(ZipN[Int](3))
246
val sink = builder.add(Sink.foreach[Seq[Int]](println))
247
248
source1 ~> zipN.in(0)
249
source2 ~> zipN.in(1)
250
source3 ~> zipN.in(2)
251
zipN.out ~> sink
252
253
ClosedShape
254
}
255
// Output: List(1,4,7), List(2,5,8), List(3,6,9)
256
```
257
258
### ZipLatest - Latest Value Zip
259
260
```scala { .api }
261
final class ZipLatest[A, B] extends ZipLatestWith2[A, B, (A, B)](Tuple2.apply)
262
```
263
264
**Usage Example:**
265
```scala
266
import akka.stream.scaladsl.ZipLatest
267
import scala.concurrent.duration._
268
269
val zipLatestGraph = GraphDSL.create() { implicit builder =>
270
val fastSource = builder.add(Source.tick(100.millis, 100.millis, "fast"))
271
val slowSource = builder.add(Source.tick(300.millis, 300.millis, "slow"))
272
val zipLatest = builder.add(ZipLatest[String, String]())
273
val sink = builder.add(Sink.foreach[(String, String)](println))
274
275
fastSource ~> zipLatest.in0
276
slowSource ~> zipLatest.in1
277
zipLatest.out ~> sink
278
279
ClosedShape
280
}
281
```
282
283
## Unzip Operations
284
285
### Unzip - Split Tuples
286
287
```scala { .api }
288
final class Unzip[A, B]() extends UnzipWith[(A, B), A, B]
289
```
290
291
**Usage Example:**
292
```scala
293
import akka.stream.scaladsl.Unzip
294
295
val unzipGraph = GraphDSL.create() { implicit builder =>
296
val source = builder.add(Source(List((1, "a"), (2, "b"), (3, "c"))))
297
val unzip = builder.add(Unzip[Int, String]())
298
val sink1 = builder.add(Sink.foreach[Int](x => println(s"Numbers: $x")))
299
val sink2 = builder.add(Sink.foreach[String](x => println(s"Strings: $x")))
300
301
source ~> unzip.in
302
unzip.out0 ~> sink1
303
unzip.out1 ~> sink2
304
305
ClosedShape
306
}
307
```
308
309
### UnzipWith - Custom Unzip Function
310
311
```scala { .api }
312
class UnzipWith[In, A, B](unzipper: In => (A, B)) extends GraphStage[FanOutShape2[In, A, B]]
313
```
314
315
**Usage Example:**
316
```scala
317
import akka.stream.scaladsl.UnzipWith
318
319
case class Person(name: String, age: Int)
320
321
val unzipWithGraph = GraphDSL.create() { implicit builder =>
322
val people = builder.add(Source(List(
323
Person("Alice", 25),
324
Person("Bob", 30),
325
Person("Charlie", 35)
326
)))
327
val unzipWith = builder.add(UnzipWith[Person, String, Int](p => (p.name, p.age)))
328
val namesSink = builder.add(Sink.foreach[String](name => println(s"Name: $name")))
329
val agesSink = builder.add(Sink.foreach[Int](age => println(s"Age: $age")))
330
331
people ~> unzipWith.in
332
unzipWith.out0 ~> namesSink
333
unzipWith.out1 ~> agesSink
334
335
ClosedShape
336
}
337
```
338
339
## Usage Patterns
340
341
### Fan-out then Fan-in
342
343
```scala
344
val fanOutInGraph = GraphDSL.create() { implicit builder =>
345
val source = builder.add(Source(1 to 10))
346
val broadcast = builder.add(Broadcast[Int](2))
347
val evenFilter = builder.add(Flow[Int].filter(_ % 2 == 0))
348
val oddFilter = builder.add(Flow[Int].filter(_ % 2 == 1))
349
val merge = builder.add(Merge[Int](2))
350
val sink = builder.add(Sink.foreach[Int](println))
351
352
source ~> broadcast
353
broadcast ~> evenFilter ~> merge
354
broadcast ~> oddFilter ~> merge
355
merge ~> sink
356
357
ClosedShape
358
}
359
```
360
361
### Processing Pipeline with Parallel Workers
362
363
```scala
364
val parallelProcessing = GraphDSL.create() { implicit builder =>
365
val source = builder.add(Source(1 to 100))
366
val balance = builder.add(Balance[Int](4)) // 4 parallel workers
367
val merge = builder.add(Merge[Int](4))
368
val sink = builder.add(Sink.foreach[Int](println))
369
370
// Heavy processing flow
371
val heavyProcess = Flow[Int].map { x =>
372
Thread.sleep(100) // Simulate work
373
x * x
374
}
375
376
source ~> balance
377
378
// Connect 4 parallel workers
379
for (i <- 0 until 4) {
380
balance ~> builder.add(heavyProcess) ~> merge
381
}
382
383
merge ~> sink
384
385
ClosedShape
386
}
387
```
388
389
Junction operations enable complex stream topologies while maintaining backpressure and type safety throughout the graph.