CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.

Pending
Overview
Eval results
Files

graph-building.mddocs/

Graph Building and Composition

Advanced graph construction using GraphDSL for complex stream topologies including fan-in, fan-out, and custom shapes.

GraphDSL Overview

GraphDSL provides a type-safe DSL for building complex stream graphs with multiple inputs, outputs, and custom topologies.

object GraphDSL {
  def create[S <: Shape, Mat](buildBlock: GraphDSL.Builder[Mat] => S): Graph[S, Mat]
  def create[S <: Shape](buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed]
  
  class Builder[+M] {
    def add[S <: Shape](graph: Graph[S, _]): S
    def materializedValue: Outlet[M @uncheckedVariance]
  }
}

GraphDSL Builder

The Builder provides methods to add components and connect them.

class Builder[+M] {
  def add[S <: Shape](graph: Graph[S, _]): S
  def materializedValue: Outlet[M @uncheckedVariance]
}

Connection Operators (Scala)

object GraphDSL {
  object Implicits {
    // Forward connections
    implicit class CombinerBase[+T](val outlet: Outlet[T]) {
      def ~>[U >: T](inlet: Inlet[U])(implicit b: Builder[_]): Unit
      def ~>[Out](flow: Graph[FlowShape[T, Out], Any])(implicit b: Builder[_]): PortOps[Out]
      def ~>(sink: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit
    }
    
    // Reverse connections  
    implicit class ReverseCombinerBase[T](val inlet: Inlet[T]) {
      def <~[U <: T](outlet: Outlet[U])(implicit b: Builder[_]): Unit
      def <~[In](flow: Graph[FlowShape[In, T], _])(implicit b: Builder[_]): ReversePortOps[In]
      def <~(source: Graph[SourceShape[T], _])(implicit b: Builder[_]): Unit
    }
  }
}

Usage Examples

Basic Graph Construction:

import akka.stream.{ClosedShape, SourceShape, SinkShape}
import akka.stream.scaladsl.{Source, Sink, Flow, GraphDSL, RunnableGraph}
import akka.stream.scaladsl.GraphDSL.Implicits._

// Simple linear graph
val linearGraph = GraphDSL.create() { implicit builder =>
  val source = builder.add(Source(1 to 10))
  val flow = builder.add(Flow[Int].map(_ * 2))
  val sink = builder.add(Sink.foreach[Int](println))
  
  source ~> flow ~> sink
  
  ClosedShape
}

val runnable = RunnableGraph.fromGraph(linearGraph)
runnable.run()

Fan-out Example:

import akka.stream.scaladsl.Broadcast

val fanOutGraph = GraphDSL.create() { implicit builder =>
  val source = builder.add(Source(1 to 10))
  val broadcast = builder.add(Broadcast[Int](2))
  val sink1 = builder.add(Sink.foreach[Int](x => println(s"Sink1: $x")))
  val sink2 = builder.add(Sink.foreach[Int](x => println(s"Sink2: $x")))
  
  source ~> broadcast
  broadcast ~> sink1
  broadcast ~> sink2
  
  ClosedShape
}

Fan-in Example:

import akka.stream.scaladsl.Merge

val fanInGraph = GraphDSL.create() { implicit builder =>
  val source1 = builder.add(Source(1 to 5))
  val source2 = builder.add(Source(6 to 10)) 
  val merge = builder.add(Merge[Int](2))
  val sink = builder.add(Sink.foreach[Int](println))
  
  source1 ~> merge
  source2 ~> merge
  merge ~> sink
  
  ClosedShape
}

Custom Shapes

Creating reusable graph components with custom shapes.

// Example: Custom shape with 2 inputs, 1 output
case class Add2Shape(in1: Inlet[Int], in2: Inlet[Int], out: Outlet[Int]) extends Shape {
  val inlets: immutable.Seq[Inlet[_]] = immutable.Seq(in1, in2)
  val outlets: immutable.Seq[Outlet[_]] = immutable.Seq(out)
  
  def deepCopy(): Shape = Add2Shape(in1.carbonCopy(), in2.carbonCopy(), out.carbonCopy())
}

Usage Example

// Create a reusable adder component
val adder = GraphDSL.create() { implicit builder =>
  val zip = builder.add(Zip[Int, Int]())
  val add = builder.add(Flow[(Int, Int)].map { case (a, b) => a + b })
  
  zip.out ~> add
  
  Add2Shape(zip.in0, zip.in1, add.out)
}

// Use the custom component
val mainGraph = GraphDSL.create() { implicit builder =>
  val source1 = builder.add(Source(1 to 5))
  val source2 = builder.add(Source(6 to 10))
  val customAdder = builder.add(adder)
  val sink = builder.add(Sink.foreach[Int](println))
  
  source1 ~> customAdder.in1
  source2 ~> customAdder.in2
  customAdder.out ~> sink
  
  ClosedShape
}

Materialized Values in Graphs

Accessing and combining materialized values from graph components.

object GraphDSL {
  def create[S <: Shape, M1, M2, Mat](
    g1: Graph[_ <: Shape, M1], 
    g2: Graph[_ <: Shape, M2]
  )(combineMat: (M1, M2) => Mat)(buildBlock: GraphDSL.Builder[Mat] => (g1.Shape, g2.Shape) => S): Graph[S, Mat]
}

Usage Example

import akka.stream.scaladsl.Keep

// Graph that combines materialized values
val graphWithMat = GraphDSL.create(
  Source.queue[Int](10, OverflowStrategy.backpressure),
  Sink.head[Int]
)(Keep.both) { implicit builder => (queueSource, headSink) =>
  
  val flow = builder.add(Flow[Int].map(_ * 2))
  
  queueSource ~> flow ~> headSink
  
  ClosedShape
}

val (queue, firstElement) = RunnableGraph.fromGraph(graphWithMat).run()

// Use the queue and await first element
queue.offer(42)
queue.complete()
firstElement.foreach(println) // Prints: 84

Java GraphDSL

The Java API provides similar functionality with builder pattern.

object GraphDSL {
  // Java API
  def create[S <: Shape](buildBlock: function.Function[GraphDSL.Builder[NotUsed], S]): Graph[S, NotUsed]
  
  final class Builder[+Mat] {
    def add[S <: Shape](graph: Graph[S, _]): S
    def from[T](out: Outlet[T]): ForwardOps[T]
    def to[T](in: Inlet[T]): ReverseOps[T]
    
    final class ForwardOps[T] {
      def toInlet(in: Inlet[_ >: T]): Builder[Mat]
      def to(sink: SinkShape[_ >: T]): Builder[Mat]
      def via[U](flow: FlowShape[_ >: T, U]): ForwardOps[U]
    }
  }
}

Java Usage Example

import akka.stream.javadsl.*;
import akka.stream.ClosedShape;

Graph<ClosedShape, NotUsed> graph = GraphDSL.create(builder -> {
    SourceShape<Integer> source = builder.add(Source.range(1, 10));
    FlowShape<Integer, Integer> flow = builder.add(Flow.of(Integer.class).map(x -> x * 2));
    SinkShape<Integer> sink = builder.add(Sink.foreach(System.out::println));
    
    builder.from(source).via(flow).to(sink);
    
    return ClosedShape.getInstance();
});

RunnableGraph.fromGraph(graph).run(system);

Complex Graph Patterns

Pipeline with Bypass

val pipelineWithBypass = GraphDSL.create() { implicit builder =>
  val source = builder.add(Source(1 to 20))
  val broadcast = builder.add(Broadcast[Int](2))
  val merge = builder.add(Merge[Int](2))
  val sink = builder.add(Sink.foreach[Int](println))
  
  // Main processing path
  val processFlow = builder.add(Flow[Int].filter(_ % 2 == 0).map(_ * 2))
  
  // Bypass path for odd numbers
  val bypassFlow = builder.add(Flow[Int].filter(_ % 2 == 1))
  
  source ~> broadcast
  broadcast ~> processFlow ~> merge
  broadcast ~> bypassFlow ~> merge
  merge ~> sink
  
  ClosedShape
}

Feedback Loop

import akka.stream.scaladsl.MergePreferred

val feedbackGraph = GraphDSL.create() { implicit builder =>
  val source = builder.add(Source.single(1))
  val merge = builder.add(MergePreferred[Int](1))
  val flow = builder.add(Flow[Int].map { x =>
    println(s"Processing: $x")
    if (x < 10) x + 1 else x
  })
  val broadcast = builder.add(Broadcast[Int](2))
  val sink = builder.add(Sink.head[Int])
  
  source ~> merge ~> flow ~> broadcast
  broadcast ~> sink
  broadcast.filter(_ < 10) ~> merge.preferred
  
  ClosedShape
}

This covers the essential graph building capabilities. The GraphDSL allows for creating complex, reusable stream topologies while maintaining type safety and efficient materialization.

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

docs

control-flow.md

core-components.md

error-handling.md

graph-building.md

index.md

io-integration.md

junction-operations.md

materialization.md

stream-operations.md

tile.json