or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

control-flow.mdcore-components.mderror-handling.mdgraph-building.mdindex.mdio-integration.mdjunction-operations.mdmaterialization.mdstream-operations.md

junction-operations.mddocs/

0

# Junction Operations

1

2

Stream junction operators for merging, broadcasting, zipping, and partitioning multiple streams.

3

4

## Merge Operations

5

6

### Merge - Basic Multi-Input Merge

7

8

```scala { .api }

9

class Merge[T](val inputPorts: Int, val eagerComplete: Boolean = false)

10

extends GraphStage[UniformFanInShape[T, T]]

11

```

12

13

**Factory Methods:**

14

```scala { .api }

15

object Merge {

16

def apply[T](inputPorts: Int, eagerComplete: Boolean = false): Merge[T]

17

}

18

```

19

20

**Usage Example:**

21

```scala

22

import akka.stream.scaladsl.{Source, Merge, GraphDSL, RunnableGraph}

23

import akka.stream.ClosedShape

24

25

val graph = GraphDSL.create() { implicit builder =>

26

val source1 = builder.add(Source(1 to 3))

27

val source2 = builder.add(Source(4 to 6))

28

val source3 = builder.add(Source(7 to 9))

29

val merge = builder.add(Merge[Int](3))

30

val sink = builder.add(Sink.foreach[Int](println))

31

32

source1 ~> merge

33

source2 ~> merge

34

source3 ~> merge

35

merge ~> sink

36

37

ClosedShape

38

}

39

```

40

41

### MergePreferred - Priority Merge

42

43

```scala { .api }

44

class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolean = false)

45

extends GraphStage[MergePreferred.MergePreferredShape[T]]

46

```

47

48

**Usage Example:**

49

```scala

50

import akka.stream.scaladsl.MergePreferred

51

52

val priorityMerge = GraphDSL.create() { implicit builder =>

53

val highPriority = builder.add(Source.single("URGENT"))

54

val normalSource = builder.add(Source(List("normal1", "normal2")))

55

val merge = builder.add(MergePreferred[String](1)) // 1 secondary port

56

val sink = builder.add(Sink.foreach[String](println))

57

58

highPriority ~> merge.preferred

59

normalSource ~> merge.in(0)

60

merge ~> sink

61

62

ClosedShape

63

}

64

```

65

66

### MergePrioritized - Weighted Priority Merge

67

68

```scala { .api }

69

class MergePrioritized[T](val priorities: Seq[Int], val eagerComplete: Boolean = false)

70

extends GraphStage[UniformFanInShape[T, T]]

71

```

72

73

## Broadcast Operations

74

75

### Broadcast - Multi-Output Distribution

76

77

```scala { .api }

78

class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean = false)

79

extends GraphStage[UniformFanOutShape[T, T]]

80

```

81

82

**Factory Methods:**

83

```scala { .api }

84

object Broadcast {

85

def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T]

86

}

87

```

88

89

**Usage Example:**

90

```scala

91

import akka.stream.scaladsl.Broadcast

92

93

val broadcastGraph = GraphDSL.create() { implicit builder =>

94

val source = builder.add(Source(1 to 10))

95

val broadcast = builder.add(Broadcast[Int](3))

96

val sink1 = builder.add(Sink.foreach[Int](x => println(s"Sink1: $x")))

97

val sink2 = builder.add(Sink.foreach[Int](x => println(s"Sink2: $x")))

98

val sink3 = builder.add(Sink.foreach[Int](x => println(s"Sink3: $x")))

99

100

source ~> broadcast

101

broadcast ~> sink1

102

broadcast ~> sink2

103

broadcast ~> sink3

104

105

ClosedShape

106

}

107

```

108

109

### Balance - Load Balancing Distribution

110

111

```scala { .api }

112

class Balance[T](

113

val outputPorts: Int,

114

val waitForAllDownstreams: Boolean = false,

115

val eagerCancel: Boolean = false

116

) extends GraphStage[UniformFanOutShape[T, T]]

117

```

118

119

**Usage Example:**

120

```scala

121

import akka.stream.scaladsl.Balance

122

123

val balanceGraph = GraphDSL.create() { implicit builder =>

124

val source = builder.add(Source(1 to 20))

125

val balance = builder.add(Balance[Int](3)) // Distribute across 3 workers

126

val worker1 = builder.add(Flow[Int].map(x => s"Worker1: $x"))

127

val worker2 = builder.add(Flow[Int].map(x => s"Worker2: $x"))

128

val worker3 = builder.add(Flow[Int].map(x => s"Worker3: $x"))

129

val merge = builder.add(Merge[String](3))

130

val sink = builder.add(Sink.foreach[String](println))

131

132

source ~> balance

133

balance ~> worker1 ~> merge

134

balance ~> worker2 ~> merge

135

balance ~> worker3 ~> merge

136

merge ~> sink

137

138

ClosedShape

139

}

140

```

141

142

### Partition - Conditional Distribution

143

144

```scala { .api }

145

class Partition[T](

146

val outputPorts: Int,

147

val partitioner: T => Int,

148

val eagerCancel: Boolean = false

149

) extends GraphStage[UniformFanOutShape[T, T]]

150

```

151

152

**Usage Example:**

153

```scala

154

import akka.stream.scaladsl.Partition

155

156

val partitionGraph = GraphDSL.create() { implicit builder =>

157

val source = builder.add(Source(1 to 20))

158

val partition = builder.add(Partition[Int](3, _ % 3)) // Partition by modulo 3

159

val sink0 = builder.add(Sink.foreach[Int](x => println(s"Mod0: $x")))

160

val sink1 = builder.add(Sink.foreach[Int](x => println(s"Mod1: $x")))

161

val sink2 = builder.add(Sink.foreach[Int](x => println(s"Mod2: $x")))

162

163

source ~> partition

164

partition ~> sink0

165

partition ~> sink1

166

partition ~> sink2

167

168

ClosedShape

169

}

170

```

171

172

## Zip Operations

173

174

### Zip - Combine Two Streams

175

176

```scala { .api }

177

final class Zip[A, B] extends ZipWith2[A, B, (A, B)](Tuple2.apply)

178

```

179

180

**Factory Methods:**

181

```scala { .api }

182

object Zip {

183

def apply[A, B](): Zip[A, B]

184

}

185

```

186

187

**Usage Example:**

188

```scala

189

import akka.stream.scaladsl.Zip

190

191

val zipGraph = GraphDSL.create() { implicit builder =>

192

val source1 = builder.add(Source(1 to 5))

193

val source2 = builder.add(Source(List("a", "b", "c", "d", "e")))

194

val zip = builder.add(Zip[Int, String]())

195

val sink = builder.add(Sink.foreach[(Int, String)](println))

196

197

source1 ~> zip.in0

198

source2 ~> zip.in1

199

zip.out ~> sink

200

201

ClosedShape

202

}

203

// Output: (1,a), (2,b), (3,c), (4,d), (5,e)

204

```

205

206

### ZipWith - Custom Zip Function

207

208

```scala { .api }

209

class ZipWith2[A, B, C](zipper: (A, B) => C) extends GraphStage[FanInShape2[A, B, C]]

210

```

211

212

**Usage Example:**

213

```scala

214

import akka.stream.scaladsl.ZipWith

215

216

val zipWithGraph = GraphDSL.create() { implicit builder =>

217

val numbers = builder.add(Source(1 to 5))

218

val strings = builder.add(Source(List("one", "two", "three", "four", "five")))

219

val zipWith = builder.add(ZipWith[Int, String, String]((n, s) => s"$n-$s"))

220

val sink = builder.add(Sink.foreach[String](println))

221

222

numbers ~> zipWith.in0

223

strings ~> zipWith.in1

224

zipWith.out ~> sink

225

226

ClosedShape

227

}

228

// Output: "1-one", "2-two", "3-three", "4-four", "5-five"

229

```

230

231

### ZipN - Multiple Stream Zip

232

233

```scala { .api }

234

class ZipN[A](n: Int) extends GraphStage[UniformFanInShape[A, immutable.Seq[A]]]

235

```

236

237

**Usage Example:**

238

```scala

239

import akka.stream.scaladsl.ZipN

240

241

val zipNGraph = GraphDSL.create() { implicit builder =>

242

val source1 = builder.add(Source(1 to 3))

243

val source2 = builder.add(Source(4 to 6))

244

val source3 = builder.add(Source(7 to 9))

245

val zipN = builder.add(ZipN[Int](3))

246

val sink = builder.add(Sink.foreach[Seq[Int]](println))

247

248

source1 ~> zipN.in(0)

249

source2 ~> zipN.in(1)

250

source3 ~> zipN.in(2)

251

zipN.out ~> sink

252

253

ClosedShape

254

}

255

// Output: List(1,4,7), List(2,5,8), List(3,6,9)

256

```

257

258

### ZipLatest - Latest Value Zip

259

260

```scala { .api }

261

final class ZipLatest[A, B] extends ZipLatestWith2[A, B, (A, B)](Tuple2.apply)

262

```

263

264

**Usage Example:**

265

```scala

266

import akka.stream.scaladsl.ZipLatest

267

import scala.concurrent.duration._

268

269

val zipLatestGraph = GraphDSL.create() { implicit builder =>

270

val fastSource = builder.add(Source.tick(100.millis, 100.millis, "fast"))

271

val slowSource = builder.add(Source.tick(300.millis, 300.millis, "slow"))

272

val zipLatest = builder.add(ZipLatest[String, String]())

273

val sink = builder.add(Sink.foreach[(String, String)](println))

274

275

fastSource ~> zipLatest.in0

276

slowSource ~> zipLatest.in1

277

zipLatest.out ~> sink

278

279

ClosedShape

280

}

281

```

282

283

## Unzip Operations

284

285

### Unzip - Split Tuples

286

287

```scala { .api }

288

final class Unzip[A, B]() extends UnzipWith[(A, B), A, B]

289

```

290

291

**Usage Example:**

292

```scala

293

import akka.stream.scaladsl.Unzip

294

295

val unzipGraph = GraphDSL.create() { implicit builder =>

296

val source = builder.add(Source(List((1, "a"), (2, "b"), (3, "c"))))

297

val unzip = builder.add(Unzip[Int, String]())

298

val sink1 = builder.add(Sink.foreach[Int](x => println(s"Numbers: $x")))

299

val sink2 = builder.add(Sink.foreach[String](x => println(s"Strings: $x")))

300

301

source ~> unzip.in

302

unzip.out0 ~> sink1

303

unzip.out1 ~> sink2

304

305

ClosedShape

306

}

307

```

308

309

### UnzipWith - Custom Unzip Function

310

311

```scala { .api }

312

class UnzipWith[In, A, B](unzipper: In => (A, B)) extends GraphStage[FanOutShape2[In, A, B]]

313

```

314

315

**Usage Example:**

316

```scala

317

import akka.stream.scaladsl.UnzipWith

318

319

case class Person(name: String, age: Int)

320

321

val unzipWithGraph = GraphDSL.create() { implicit builder =>

322

val people = builder.add(Source(List(

323

Person("Alice", 25),

324

Person("Bob", 30),

325

Person("Charlie", 35)

326

)))

327

val unzipWith = builder.add(UnzipWith[Person, String, Int](p => (p.name, p.age)))

328

val namesSink = builder.add(Sink.foreach[String](name => println(s"Name: $name")))

329

val agesSink = builder.add(Sink.foreach[Int](age => println(s"Age: $age")))

330

331

people ~> unzipWith.in

332

unzipWith.out0 ~> namesSink

333

unzipWith.out1 ~> agesSink

334

335

ClosedShape

336

}

337

```

338

339

## Usage Patterns

340

341

### Fan-out then Fan-in

342

343

```scala

344

val fanOutInGraph = GraphDSL.create() { implicit builder =>

345

val source = builder.add(Source(1 to 10))

346

val broadcast = builder.add(Broadcast[Int](2))

347

val evenFilter = builder.add(Flow[Int].filter(_ % 2 == 0))

348

val oddFilter = builder.add(Flow[Int].filter(_ % 2 == 1))

349

val merge = builder.add(Merge[Int](2))

350

val sink = builder.add(Sink.foreach[Int](println))

351

352

source ~> broadcast

353

broadcast ~> evenFilter ~> merge

354

broadcast ~> oddFilter ~> merge

355

merge ~> sink

356

357

ClosedShape

358

}

359

```

360

361

### Processing Pipeline with Parallel Workers

362

363

```scala

364

val parallelProcessing = GraphDSL.create() { implicit builder =>

365

val source = builder.add(Source(1 to 100))

366

val balance = builder.add(Balance[Int](4)) // 4 parallel workers

367

val merge = builder.add(Merge[Int](4))

368

val sink = builder.add(Sink.foreach[Int](println))

369

370

// Heavy processing flow

371

val heavyProcess = Flow[Int].map { x =>

372

Thread.sleep(100) // Simulate work

373

x * x

374

}

375

376

source ~> balance

377

378

// Connect 4 parallel workers

379

for (i <- 0 until 4) {

380

balance ~> builder.add(heavyProcess) ~> merge

381

}

382

383

merge ~> sink

384

385

ClosedShape

386

}

387

```

388

389

Junction operations enable complex stream topologies while maintaining backpressure and type safety throughout the graph.