CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.

Pending
Overview
Eval results
Files

junction-operations.mddocs/

Junction Operations

Stream junction operators for merging, broadcasting, zipping, and partitioning multiple streams.

Merge Operations

Merge - Basic Multi-Input Merge

class Merge[T](val inputPorts: Int, val eagerComplete: Boolean = false) 
  extends GraphStage[UniformFanInShape[T, T]]

Factory Methods:

object Merge {
  def apply[T](inputPorts: Int, eagerComplete: Boolean = false): Merge[T]
}

Usage Example:

import akka.stream.scaladsl.{Source, Merge, GraphDSL, RunnableGraph}
import akka.stream.ClosedShape

val graph = GraphDSL.create() { implicit builder =>
  val source1 = builder.add(Source(1 to 3))
  val source2 = builder.add(Source(4 to 6))
  val source3 = builder.add(Source(7 to 9))
  val merge = builder.add(Merge[Int](3))
  val sink = builder.add(Sink.foreach[Int](println))
  
  source1 ~> merge
  source2 ~> merge  
  source3 ~> merge
  merge ~> sink
  
  ClosedShape
}

MergePreferred - Priority Merge

class MergePreferred[T](val secondaryPorts: Int, val eagerComplete: Boolean = false)
  extends GraphStage[MergePreferred.MergePreferredShape[T]]

Usage Example:

import akka.stream.scaladsl.MergePreferred

val priorityMerge = GraphDSL.create() { implicit builder =>
  val highPriority = builder.add(Source.single("URGENT"))
  val normalSource = builder.add(Source(List("normal1", "normal2")))
  val merge = builder.add(MergePreferred[String](1)) // 1 secondary port
  val sink = builder.add(Sink.foreach[String](println))
  
  highPriority ~> merge.preferred
  normalSource ~> merge.in(0)
  merge ~> sink
  
  ClosedShape
}

MergePrioritized - Weighted Priority Merge

class MergePrioritized[T](val priorities: Seq[Int], val eagerComplete: Boolean = false)
  extends GraphStage[UniformFanInShape[T, T]]

Broadcast Operations

Broadcast - Multi-Output Distribution

class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean = false)
  extends GraphStage[UniformFanOutShape[T, T]]

Factory Methods:

object Broadcast {
  def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T]
}

Usage Example:

import akka.stream.scaladsl.Broadcast

val broadcastGraph = GraphDSL.create() { implicit builder =>
  val source = builder.add(Source(1 to 10))
  val broadcast = builder.add(Broadcast[Int](3))
  val sink1 = builder.add(Sink.foreach[Int](x => println(s"Sink1: $x")))
  val sink2 = builder.add(Sink.foreach[Int](x => println(s"Sink2: $x")))
  val sink3 = builder.add(Sink.foreach[Int](x => println(s"Sink3: $x")))
  
  source ~> broadcast
  broadcast ~> sink1
  broadcast ~> sink2
  broadcast ~> sink3
  
  ClosedShape
}

Balance - Load Balancing Distribution

class Balance[T](
  val outputPorts: Int, 
  val waitForAllDownstreams: Boolean = false,
  val eagerCancel: Boolean = false
) extends GraphStage[UniformFanOutShape[T, T]]

Usage Example:

import akka.stream.scaladsl.Balance

val balanceGraph = GraphDSL.create() { implicit builder =>
  val source = builder.add(Source(1 to 20))
  val balance = builder.add(Balance[Int](3)) // Distribute across 3 workers
  val worker1 = builder.add(Flow[Int].map(x => s"Worker1: $x"))
  val worker2 = builder.add(Flow[Int].map(x => s"Worker2: $x"))  
  val worker3 = builder.add(Flow[Int].map(x => s"Worker3: $x"))
  val merge = builder.add(Merge[String](3))
  val sink = builder.add(Sink.foreach[String](println))
  
  source ~> balance
  balance ~> worker1 ~> merge
  balance ~> worker2 ~> merge
  balance ~> worker3 ~> merge
  merge ~> sink
  
  ClosedShape
}

Partition - Conditional Distribution

class Partition[T](
  val outputPorts: Int,
  val partitioner: T => Int,
  val eagerCancel: Boolean = false
) extends GraphStage[UniformFanOutShape[T, T]]

Usage Example:

import akka.stream.scaladsl.Partition

val partitionGraph = GraphDSL.create() { implicit builder =>
  val source = builder.add(Source(1 to 20))
  val partition = builder.add(Partition[Int](3, _ % 3)) // Partition by modulo 3
  val sink0 = builder.add(Sink.foreach[Int](x => println(s"Mod0: $x")))
  val sink1 = builder.add(Sink.foreach[Int](x => println(s"Mod1: $x")))
  val sink2 = builder.add(Sink.foreach[Int](x => println(s"Mod2: $x")))
  
  source ~> partition
  partition ~> sink0
  partition ~> sink1
  partition ~> sink2
  
  ClosedShape
}

Zip Operations

Zip - Combine Two Streams

final class Zip[A, B] extends ZipWith2[A, B, (A, B)](Tuple2.apply)

Factory Methods:

object Zip {
  def apply[A, B](): Zip[A, B]
}

Usage Example:

import akka.stream.scaladsl.Zip

val zipGraph = GraphDSL.create() { implicit builder =>
  val source1 = builder.add(Source(1 to 5))
  val source2 = builder.add(Source(List("a", "b", "c", "d", "e")))
  val zip = builder.add(Zip[Int, String]())
  val sink = builder.add(Sink.foreach[(Int, String)](println))
  
  source1 ~> zip.in0
  source2 ~> zip.in1
  zip.out ~> sink
  
  ClosedShape
}
// Output: (1,a), (2,b), (3,c), (4,d), (5,e)

ZipWith - Custom Zip Function

class ZipWith2[A, B, C](zipper: (A, B) => C) extends GraphStage[FanInShape2[A, B, C]]

Usage Example:

import akka.stream.scaladsl.ZipWith

val zipWithGraph = GraphDSL.create() { implicit builder =>
  val numbers = builder.add(Source(1 to 5))
  val strings = builder.add(Source(List("one", "two", "three", "four", "five")))
  val zipWith = builder.add(ZipWith[Int, String, String]((n, s) => s"$n-$s"))
  val sink = builder.add(Sink.foreach[String](println))
  
  numbers ~> zipWith.in0
  strings ~> zipWith.in1
  zipWith.out ~> sink
  
  ClosedShape
}
// Output: "1-one", "2-two", "3-three", "4-four", "5-five"

ZipN - Multiple Stream Zip

class ZipN[A](n: Int) extends GraphStage[UniformFanInShape[A, immutable.Seq[A]]]

Usage Example:

import akka.stream.scaladsl.ZipN

val zipNGraph = GraphDSL.create() { implicit builder =>
  val source1 = builder.add(Source(1 to 3))
  val source2 = builder.add(Source(4 to 6))
  val source3 = builder.add(Source(7 to 9))
  val zipN = builder.add(ZipN[Int](3))
  val sink = builder.add(Sink.foreach[Seq[Int]](println))
  
  source1 ~> zipN.in(0)
  source2 ~> zipN.in(1)
  source3 ~> zipN.in(2)
  zipN.out ~> sink
  
  ClosedShape
}
// Output: List(1,4,7), List(2,5,8), List(3,6,9)

ZipLatest - Latest Value Zip

final class ZipLatest[A, B] extends ZipLatestWith2[A, B, (A, B)](Tuple2.apply)

Usage Example:

import akka.stream.scaladsl.ZipLatest
import scala.concurrent.duration._

val zipLatestGraph = GraphDSL.create() { implicit builder =>
  val fastSource = builder.add(Source.tick(100.millis, 100.millis, "fast"))
  val slowSource = builder.add(Source.tick(300.millis, 300.millis, "slow"))
  val zipLatest = builder.add(ZipLatest[String, String]())
  val sink = builder.add(Sink.foreach[(String, String)](println))
  
  fastSource ~> zipLatest.in0
  slowSource ~> zipLatest.in1  
  zipLatest.out ~> sink
  
  ClosedShape
}

Unzip Operations

Unzip - Split Tuples

final class Unzip[A, B]() extends UnzipWith[(A, B), A, B]

Usage Example:

import akka.stream.scaladsl.Unzip

val unzipGraph = GraphDSL.create() { implicit builder =>
  val source = builder.add(Source(List((1, "a"), (2, "b"), (3, "c"))))
  val unzip = builder.add(Unzip[Int, String]())
  val sink1 = builder.add(Sink.foreach[Int](x => println(s"Numbers: $x")))
  val sink2 = builder.add(Sink.foreach[String](x => println(s"Strings: $x")))
  
  source ~> unzip.in
  unzip.out0 ~> sink1
  unzip.out1 ~> sink2
  
  ClosedShape
}

UnzipWith - Custom Unzip Function

class UnzipWith[In, A, B](unzipper: In => (A, B)) extends GraphStage[FanOutShape2[In, A, B]]

Usage Example:

import akka.stream.scaladsl.UnzipWith

case class Person(name: String, age: Int)

val unzipWithGraph = GraphDSL.create() { implicit builder =>
  val people = builder.add(Source(List(
    Person("Alice", 25),
    Person("Bob", 30),
    Person("Charlie", 35)
  )))
  val unzipWith = builder.add(UnzipWith[Person, String, Int](p => (p.name, p.age)))
  val namesSink = builder.add(Sink.foreach[String](name => println(s"Name: $name")))
  val agesSink = builder.add(Sink.foreach[Int](age => println(s"Age: $age")))
  
  people ~> unzipWith.in
  unzipWith.out0 ~> namesSink
  unzipWith.out1 ~> agesSink
  
  ClosedShape
}

Usage Patterns

Fan-out then Fan-in

val fanOutInGraph = GraphDSL.create() { implicit builder =>
  val source = builder.add(Source(1 to 10))
  val broadcast = builder.add(Broadcast[Int](2))
  val evenFilter = builder.add(Flow[Int].filter(_ % 2 == 0))
  val oddFilter = builder.add(Flow[Int].filter(_ % 2 == 1))
  val merge = builder.add(Merge[Int](2))
  val sink = builder.add(Sink.foreach[Int](println))
  
  source ~> broadcast
  broadcast ~> evenFilter ~> merge
  broadcast ~> oddFilter ~> merge
  merge ~> sink
  
  ClosedShape
}

Processing Pipeline with Parallel Workers

val parallelProcessing = GraphDSL.create() { implicit builder =>
  val source = builder.add(Source(1 to 100))
  val balance = builder.add(Balance[Int](4)) // 4 parallel workers
  val merge = builder.add(Merge[Int](4))
  val sink = builder.add(Sink.foreach[Int](println))
  
  // Heavy processing flow
  val heavyProcess = Flow[Int].map { x =>
    Thread.sleep(100) // Simulate work
    x * x
  }
  
  source ~> balance
  
  // Connect 4 parallel workers
  for (i <- 0 until 4) {
    balance ~> builder.add(heavyProcess) ~> merge
  }
  
  merge ~> sink
  
  ClosedShape
}

Junction operations enable complex stream topologies while maintaining backpressure and type safety throughout the graph.

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

docs

control-flow.md

core-components.md

error-handling.md

graph-building.md

index.md

io-integration.md

junction-operations.md

materialization.md

stream-operations.md

tile.json