or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

core-stream-types.mdcustom-stages.mderror-handling.mdindex.mdintegration.mdmaterialization.mdstream-combining.mdstream-control.mdstream-sinks.mdstream-sources.mdstream-transformations.md

stream-combining.mddocs/

0

# Stream Combining

1

2

Operations for merging, zipping, concatenating, and broadcasting streams to create complex data flow topologies. These operations enable composition of multiple streams into sophisticated processing pipelines.

3

4

## Capabilities

5

6

### Stream Merging

7

8

Combine multiple streams by merging their elements in arrival order or with specific strategies.

9

10

```scala { .api }

11

/**

12

* Merge this stream with another stream

13

* @param other Stream to merge with

14

* @param eagerComplete Complete when any input completes (default: true)

15

* @return Stream containing elements from both inputs in arrival order

16

*/

17

def merge[U >: Out](other: Graph[SourceShape[U], _], eagerComplete: Boolean = true): Source[U, Mat]

18

19

/**

20

* Merge with priority for this stream over the other

21

* @param other Stream to merge with (lower priority)

22

* @param preferred Number of elements to emit from this stream before considering other

23

* @return Stream with preferred emission from this stream

24

*/

25

def mergePreferred[U >: Out](other: Graph[SourceShape[U], _], preferred: Int): Source[U, Mat]

26

27

/**

28

* Merge multiple streams with a custom merge strategy

29

* @param those Additional streams to merge

30

* @param strategy Custom merging strategy

31

* @return Stream with elements merged according to strategy

32

*/

33

def mergeSorted[U >: Out](other: Graph[SourceShape[U], _])(implicit ord: Ordering[U]): Source[U, Mat]

34

```

35

36

**Usage Examples:**

37

38

```scala

39

import akka.stream.scaladsl.Source

40

41

// Simple merge

42

val stream1 = Source(1 to 5)

43

val stream2 = Source(6 to 10)

44

val merged = stream1.merge(stream2) // Elements in arrival order

45

46

// Merge with preference

47

val fastStream = Source.tick(100.millis, 100.millis, "fast")

48

val slowStream = Source.tick(500.millis, 500.millis, "slow")

49

val preferred = fastStream.mergePreferred(slowStream, 3) // 3 fast, then 1 slow

50

51

// Sorted merge

52

val sorted1 = Source(List(1, 3, 5, 7))

53

val sorted2 = Source(List(2, 4, 6, 8))

54

val sortedMerge = sorted1.mergeSorted(sorted2) // Maintains sorted order

55

```

56

57

### Stream Concatenation

58

59

Combine streams sequentially, processing one completely before the next.

60

61

```scala { .api }

62

/**

63

* Concatenate this stream with another stream

64

* @param other Stream to append after this stream completes

65

* @return Stream that emits all elements from this stream, then all from other

66

*/

67

def concat[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]

68

69

/**

70

* Prepend elements or another stream before this stream

71

* @param other Stream to emit before this stream

72

* @return Stream that emits other stream first, then this stream

73

*/

74

def prepend[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]

75

76

/**

77

* Concatenate streams from a source of sources

78

* @param sources Stream that emits source graphs

79

* @return Stream that concatenates all emitted sources

80

*/

81

def flatMapConcat[T, M](f: Out => Graph[SourceShape[T], M]): Source[T, Mat]

82

```

83

84

**Usage Examples:**

85

86

```scala

87

// Sequential concatenation

88

val first = Source(1 to 3)

89

val second = Source(4 to 6)

90

val concatenated = first.concat(second) // Emits: 1,2,3,4,5,6

91

92

// Prepending

93

val main = Source(List("World", "!"))

94

val greeting = Source.single("Hello ")

95

val combined = main.prepend(greeting) // Emits: "Hello ", "World", "!"

96

97

// Flat map concat

98

Source(List("file1.txt", "file2.txt"))

99

.flatMapConcat(filename => Source.fromIterator(() => readFileLines(filename)))

100

.runWith(Sink.seq)

101

```

102

103

### Stream Zipping

104

105

Combine streams by pairing elements from multiple inputs.

106

107

```scala { .api }

108

/**

109

* Zip this stream with another stream into tuples

110

* @param other Stream to zip with

111

* @return Stream of tuples containing paired elements

112

*/

113

def zip[U](other: Graph[SourceShape[U], _]): Source[(Out, U), Mat]

114

115

/**

116

* Zip streams using a custom combine function

117

* @param other Stream to zip with

118

* @param f Function to combine paired elements

119

* @return Stream with combined elements

120

*/

121

def zipWith[U, T](other: Graph[SourceShape[U], _])(f: (Out, U) => T): Source[T, Mat]

122

123

/**

124

* Zip with index, pairing each element with its position

125

* @return Stream of tuples with elements and their indices

126

*/

127

def zipWithIndex: Source[(Out, Long), Mat]

128

129

/**

130

* Zip but keep emitting from the longer stream after shorter completes

131

* @param other Stream to zip with

132

* @return Stream of tuples, with None for completed stream

133

*/

134

def zipAll[U, A >: Out, B >: U](other: Graph[SourceShape[U], _], thisElem: A, otherElem: B): Source[(A, B), Mat]

135

```

136

137

**Usage Examples:**

138

139

```scala

140

// Basic zipping

141

val numbers = Source(1 to 5)

142

val letters = Source(List("a", "b", "c", "d", "e"))

143

val zipped = numbers.zip(letters) // (1,"a"), (2,"b"), (3,"c"), (4,"d"), (5,"e")

144

145

// Zip with custom function

146

val combined = numbers.zipWith(letters)((n, l) => s"$n:$l")

147

148

// Zip with index

149

Source(List("apple", "banana", "cherry"))

150

.zipWithIndex // ("apple", 0), ("banana", 1), ("cherry", 2)

151

.runWith(Sink.seq)

152

153

// Zip all (handle different lengths)

154

val short = Source(1 to 3)

155

val long = Source(List("a", "b", "c", "d", "e"))

156

short.zipAll(long, 0, "") // (1,"a"), (2,"b"), (3,"c"), (0,"d"), (0,"e")

157

```

158

159

### Broadcasting and Fan-out

160

161

Split a single stream into multiple parallel streams.

162

163

```scala { .api }

164

/**

165

* Create a broadcast junction to split stream into multiple outputs

166

* @param outputCount Number of output streams

167

* @param eagerCancel Cancel upstream when any output cancels (default: false)

168

* @return Graph that broadcasts to multiple outputs

169

*/

170

def broadcast(outputCount: Int, eagerCancel: Boolean = false): Graph[UniformFanOutShape[Out, Out], NotUsed]

171

172

/**

173

* Balance elements across multiple outputs using round-robin

174

* @param outputCount Number of output streams

175

* @param waitForAllDownstreams Wait for all outputs to be ready (default: true)

176

* @return Graph that balances elements across outputs

177

*/

178

def balance[T](outputCount: Int, waitForAllDownstreams: Boolean = true): Graph[UniformFanOutShape[T, T], NotUsed]

179

180

/**

181

* Partition elements across outputs based on a predicate

182

* @param outputCount Number of output streams

183

* @param partitioner Function that returns output index for each element

184

* @return Graph that partitions elements to different outputs

185

*/

186

def partition[T](outputCount: Int, partitioner: T => Int): Graph[UniformFanOutShape[T, T], NotUsed]

187

```

188

189

**Usage Examples:**

190

191

```scala

192

import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Broadcast, Balance}

193

import akka.stream.{UniformFanOutShape, ClosedShape}

194

195

// Broadcasting to multiple sinks

196

val broadcastGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>

197

import GraphDSL.Implicits._

198

199

val source = Source(1 to 10)

200

val broadcast = builder.add(Broadcast[Int](2))

201

val sink1 = Sink.foreach[Int](x => println(s"Sink1: $x"))

202

val sink2 = Sink.foreach[Int](x => println(s"Sink2: $x"))

203

204

source ~> broadcast ~> sink1

205

broadcast ~> sink2

206

207

ClosedShape

208

})

209

210

// Load balancing across processors

211

val balanceGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>

212

import GraphDSL.Implicits._

213

214

val source = Source(1 to 100)

215

val balance = builder.add(Balance[Int](3))

216

val sink = Sink.foreach[Int](x => println(s"Processed: $x"))

217

218

source ~> balance ~> Flow[Int].map(_ * 2) ~> sink

219

balance ~> Flow[Int].map(_ * 3) ~> sink

220

balance ~> Flow[Int].map(_ * 4) ~> sink

221

222

ClosedShape

223

})

224

```

225

226

### Interleaving and Alternating

227

228

Combine streams by alternating between sources or interleaving elements.

229

230

```scala { .api }

231

/**

232

* Interleave elements from this stream and another

233

* @param other Stream to interleave with

234

* @param segmentSize Number of elements to take from each stream in turn

235

* @param eagerClose Close when any stream closes (default: true)

236

* @return Stream with interleaved elements

237

*/

238

def interleave[U >: Out](other: Graph[SourceShape[U], _], segmentSize: Int, eagerClose: Boolean = true): Source[U, Mat]

239

240

/**

241

* Alternate between this stream and others based on a pattern

242

* @param others Other streams to alternate with

243

* @return Stream that alternates between all inputs

244

*/

245

def alsoTo[U >: Out](other: Graph[SinkShape[U], _]): Source[Out, Mat]

246

```

247

248

**Usage Examples:**

249

250

```scala

251

// Interleaving streams

252

val evens = Source(List(2, 4, 6, 8))

253

val odds = Source(List(1, 3, 5, 7))

254

val interleaved = evens.interleave(odds, 2) // 2,4,1,3,6,8,5,7

255

256

// Also to (tee/tap pattern)

257

Source(1 to 10)

258

.alsoTo(Sink.foreach(x => println(s"Logging: $x"))) // Side effect

259

.map(_ * 2)

260

.runWith(Sink.seq) // Main processing

261

```

262

263

### Complex Graph Construction

264

265

Build complex stream topologies using the GraphDSL for advanced fan-in/fan-out patterns.

266

267

```scala { .api }

268

/**

269

* GraphDSL for building complex stream graphs

270

*/

271

object GraphDSL {

272

def create[S <: Shape, Mat]()(buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed]

273

def create[S <: Shape, Mat, M1](g1: Graph[_, M1])(buildBlock: GraphDSL.Builder[M1] => S): Graph[S, M1]

274

275

// Builder provides graph construction operations

276

trait Builder[+Mat] {

277

def add[S <: Shape](graph: Graph[S, _]): S

278

279

// Implicit conversions for ~> operator

280

object Implicits {

281

implicit class PortOps[T](outlet: Outlet[T]) {

282

def ~>[U >: T](inlet: Inlet[U]): Unit

283

}

284

}

285

}

286

}

287

288

// Common graph shapes

289

trait UniformFanInShape[-T, +O] extends Shape

290

trait UniformFanOutShape[-I, +T] extends Shape

291

```

292

293

**Usage Examples:**

294

295

```scala

296

import akka.stream.scaladsl.{GraphDSL, RunnableGraph, Merge, Zip}

297

import akka.stream.{UniformFanInShape, ClosedShape}

298

299

// Complex merge pattern

300

val complexGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>

301

import GraphDSL.Implicits._

302

303

val source1 = Source(1 to 10)

304

val source2 = Source(11 to 20)

305

val source3 = Source(21 to 30)

306

307

val merge = builder.add(Merge[Int](3))

308

val sink = Sink.foreach[Int](println)

309

310

source1 ~> merge ~> sink

311

source2 ~> merge

312

source3 ~> merge

313

314

ClosedShape

315

})

316

317

// Fan-in with zip

318

val zipGraph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>

319

import GraphDSL.Implicits._

320

321

val source1 = Source(1 to 5)

322

val source2 = Source(List("a", "b", "c", "d", "e"))

323

val zip = builder.add(Zip[Int, String])

324

val sink = Sink.foreach[(Int, String)](println)

325

326

source1 ~> zip.in0

327

source2 ~> zip.in1

328

zip.out ~> sink

329

330

ClosedShape

331

})

332

```

333

334

## Types

335

336

```scala { .api }

337

// Fan-out shapes

338

trait UniformFanOutShape[-I, +O] extends Shape {

339

def in: Inlet[I]

340

def outs: immutable.Seq[Outlet[O]]

341

}

342

343

// Fan-in shapes

344

trait UniformFanInShape[-I, +O] extends Shape {

345

def ins: immutable.Seq[Inlet[I]]

346

def out: Outlet[O]

347

}

348

349

// Specific shapes

350

final class BroadcastShape[T](val in: Inlet[T], val outs: immutable.Seq[Outlet[T]]) extends UniformFanOutShape[T, T]

351

final class MergeShape[T](val ins: immutable.Seq[Inlet[T]], val out: Outlet[T]) extends UniformFanInShape[T, T]

352

final class BalanceShape[T](val in: Inlet[T], val outs: immutable.Seq[Outlet[T]]) extends UniformFanOutShape[T, T]

353

final class ZipShape[A, B](val in0: Inlet[A], val in1: Inlet[B], val out: Outlet[(A, B)]) extends Shape

354

```