Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.
—
Advanced graph construction using GraphDSL for complex stream topologies including fan-in, fan-out, and custom shapes.
GraphDSL provides a type-safe DSL for building complex stream graphs with multiple inputs, outputs, and custom topologies.
object GraphDSL {
def create[S <: Shape, Mat](buildBlock: GraphDSL.Builder[Mat] => S): Graph[S, Mat]
def create[S <: Shape](buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed]
class Builder[+M] {
def add[S <: Shape](graph: Graph[S, _]): S
def materializedValue: Outlet[M @uncheckedVariance]
}
}The Builder provides methods to add components and connect them.
class Builder[+M] {
def add[S <: Shape](graph: Graph[S, _]): S
def materializedValue: Outlet[M @uncheckedVariance]
}object GraphDSL {
object Implicits {
// Forward connections
implicit class CombinerBase[+T](val outlet: Outlet[T]) {
def ~>[U >: T](inlet: Inlet[U])(implicit b: Builder[_]): Unit
def ~>[Out](flow: Graph[FlowShape[T, Out], Any])(implicit b: Builder[_]): PortOps[Out]
def ~>(sink: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit
}
// Reverse connections
implicit class ReverseCombinerBase[T](val inlet: Inlet[T]) {
def <~[U <: T](outlet: Outlet[U])(implicit b: Builder[_]): Unit
def <~[In](flow: Graph[FlowShape[In, T], _])(implicit b: Builder[_]): ReversePortOps[In]
def <~(source: Graph[SourceShape[T], _])(implicit b: Builder[_]): Unit
}
}
}Basic Graph Construction:
import akka.stream.{ClosedShape, SourceShape, SinkShape}
import akka.stream.scaladsl.{Source, Sink, Flow, GraphDSL, RunnableGraph}
import akka.stream.scaladsl.GraphDSL.Implicits._
// Simple linear graph
val linearGraph = GraphDSL.create() { implicit builder =>
val source = builder.add(Source(1 to 10))
val flow = builder.add(Flow[Int].map(_ * 2))
val sink = builder.add(Sink.foreach[Int](println))
source ~> flow ~> sink
ClosedShape
}
val runnable = RunnableGraph.fromGraph(linearGraph)
runnable.run()Fan-out Example:
import akka.stream.scaladsl.Broadcast
val fanOutGraph = GraphDSL.create() { implicit builder =>
val source = builder.add(Source(1 to 10))
val broadcast = builder.add(Broadcast[Int](2))
val sink1 = builder.add(Sink.foreach[Int](x => println(s"Sink1: $x")))
val sink2 = builder.add(Sink.foreach[Int](x => println(s"Sink2: $x")))
source ~> broadcast
broadcast ~> sink1
broadcast ~> sink2
ClosedShape
}Fan-in Example:
import akka.stream.scaladsl.Merge
val fanInGraph = GraphDSL.create() { implicit builder =>
val source1 = builder.add(Source(1 to 5))
val source2 = builder.add(Source(6 to 10))
val merge = builder.add(Merge[Int](2))
val sink = builder.add(Sink.foreach[Int](println))
source1 ~> merge
source2 ~> merge
merge ~> sink
ClosedShape
}Creating reusable graph components with custom shapes.
// Example: Custom shape with 2 inputs, 1 output
case class Add2Shape(in1: Inlet[Int], in2: Inlet[Int], out: Outlet[Int]) extends Shape {
val inlets: immutable.Seq[Inlet[_]] = immutable.Seq(in1, in2)
val outlets: immutable.Seq[Outlet[_]] = immutable.Seq(out)
def deepCopy(): Shape = Add2Shape(in1.carbonCopy(), in2.carbonCopy(), out.carbonCopy())
}// Create a reusable adder component
val adder = GraphDSL.create() { implicit builder =>
val zip = builder.add(Zip[Int, Int]())
val add = builder.add(Flow[(Int, Int)].map { case (a, b) => a + b })
zip.out ~> add
Add2Shape(zip.in0, zip.in1, add.out)
}
// Use the custom component
val mainGraph = GraphDSL.create() { implicit builder =>
val source1 = builder.add(Source(1 to 5))
val source2 = builder.add(Source(6 to 10))
val customAdder = builder.add(adder)
val sink = builder.add(Sink.foreach[Int](println))
source1 ~> customAdder.in1
source2 ~> customAdder.in2
customAdder.out ~> sink
ClosedShape
}Accessing and combining materialized values from graph components.
object GraphDSL {
def create[S <: Shape, M1, M2, Mat](
g1: Graph[_ <: Shape, M1],
g2: Graph[_ <: Shape, M2]
)(combineMat: (M1, M2) => Mat)(buildBlock: GraphDSL.Builder[Mat] => (g1.Shape, g2.Shape) => S): Graph[S, Mat]
}import akka.stream.scaladsl.Keep
// Graph that combines materialized values
val graphWithMat = GraphDSL.create(
Source.queue[Int](10, OverflowStrategy.backpressure),
Sink.head[Int]
)(Keep.both) { implicit builder => (queueSource, headSink) =>
val flow = builder.add(Flow[Int].map(_ * 2))
queueSource ~> flow ~> headSink
ClosedShape
}
val (queue, firstElement) = RunnableGraph.fromGraph(graphWithMat).run()
// Use the queue and await first element
queue.offer(42)
queue.complete()
firstElement.foreach(println) // Prints: 84The Java API provides similar functionality with builder pattern.
object GraphDSL {
// Java API
def create[S <: Shape](buildBlock: function.Function[GraphDSL.Builder[NotUsed], S]): Graph[S, NotUsed]
final class Builder[+Mat] {
def add[S <: Shape](graph: Graph[S, _]): S
def from[T](out: Outlet[T]): ForwardOps[T]
def to[T](in: Inlet[T]): ReverseOps[T]
final class ForwardOps[T] {
def toInlet(in: Inlet[_ >: T]): Builder[Mat]
def to(sink: SinkShape[_ >: T]): Builder[Mat]
def via[U](flow: FlowShape[_ >: T, U]): ForwardOps[U]
}
}
}import akka.stream.javadsl.*;
import akka.stream.ClosedShape;
Graph<ClosedShape, NotUsed> graph = GraphDSL.create(builder -> {
SourceShape<Integer> source = builder.add(Source.range(1, 10));
FlowShape<Integer, Integer> flow = builder.add(Flow.of(Integer.class).map(x -> x * 2));
SinkShape<Integer> sink = builder.add(Sink.foreach(System.out::println));
builder.from(source).via(flow).to(sink);
return ClosedShape.getInstance();
});
RunnableGraph.fromGraph(graph).run(system);val pipelineWithBypass = GraphDSL.create() { implicit builder =>
val source = builder.add(Source(1 to 20))
val broadcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val sink = builder.add(Sink.foreach[Int](println))
// Main processing path
val processFlow = builder.add(Flow[Int].filter(_ % 2 == 0).map(_ * 2))
// Bypass path for odd numbers
val bypassFlow = builder.add(Flow[Int].filter(_ % 2 == 1))
source ~> broadcast
broadcast ~> processFlow ~> merge
broadcast ~> bypassFlow ~> merge
merge ~> sink
ClosedShape
}import akka.stream.scaladsl.MergePreferred
val feedbackGraph = GraphDSL.create() { implicit builder =>
val source = builder.add(Source.single(1))
val merge = builder.add(MergePreferred[Int](1))
val flow = builder.add(Flow[Int].map { x =>
println(s"Processing: $x")
if (x < 10) x + 1 else x
})
val broadcast = builder.add(Broadcast[Int](2))
val sink = builder.add(Sink.head[Int])
source ~> merge ~> flow ~> broadcast
broadcast ~> sink
broadcast.filter(_ < 10) ~> merge.preferred
ClosedShape
}This covers the essential graph building capabilities. The GraphDSL allows for creating complex, reusable stream topologies while maintaining type safety and efficient materialization.
Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5