0
# Graph Building and Composition
1
2
Advanced graph construction using GraphDSL for complex stream topologies including fan-in, fan-out, and custom shapes.
3
4
## GraphDSL Overview
5
6
GraphDSL provides a type-safe DSL for building complex stream graphs with multiple inputs, outputs, and custom topologies.
7
8
```scala { .api }
9
object GraphDSL {
10
def create[S <: Shape, Mat](buildBlock: GraphDSL.Builder[Mat] => S): Graph[S, Mat]
11
def create[S <: Shape](buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed]
12
13
class Builder[+M] {
14
def add[S <: Shape](graph: Graph[S, _]): S
15
def materializedValue: Outlet[M @uncheckedVariance]
16
}
17
}
18
```
19
20
## GraphDSL Builder
21
22
The Builder provides methods to add components and connect them.
23
24
```scala { .api }
25
class Builder[+M] {
26
def add[S <: Shape](graph: Graph[S, _]): S
27
def materializedValue: Outlet[M @uncheckedVariance]
28
}
29
```
30
31
### Connection Operators (Scala)
32
33
```scala { .api }
34
object GraphDSL {
35
object Implicits {
36
// Forward connections
37
implicit class CombinerBase[+T](val outlet: Outlet[T]) {
38
def ~>[U >: T](inlet: Inlet[U])(implicit b: Builder[_]): Unit
39
def ~>[Out](flow: Graph[FlowShape[T, Out], Any])(implicit b: Builder[_]): PortOps[Out]
40
def ~>(sink: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit
41
}
42
43
// Reverse connections
44
implicit class ReverseCombinerBase[T](val inlet: Inlet[T]) {
45
def <~[U <: T](outlet: Outlet[U])(implicit b: Builder[_]): Unit
46
def <~[In](flow: Graph[FlowShape[In, T], _])(implicit b: Builder[_]): ReversePortOps[In]
47
def <~(source: Graph[SourceShape[T], _])(implicit b: Builder[_]): Unit
48
}
49
}
50
}
51
```
52
53
### Usage Examples
54
55
**Basic Graph Construction:**
56
```scala
57
import akka.stream.{ClosedShape, SourceShape, SinkShape}
58
import akka.stream.scaladsl.{Source, Sink, Flow, GraphDSL, RunnableGraph}
59
import akka.stream.scaladsl.GraphDSL.Implicits._
60
61
// Simple linear graph
62
val linearGraph = GraphDSL.create() { implicit builder =>
63
val source = builder.add(Source(1 to 10))
64
val flow = builder.add(Flow[Int].map(_ * 2))
65
val sink = builder.add(Sink.foreach[Int](println))
66
67
source ~> flow ~> sink
68
69
ClosedShape
70
}
71
72
val runnable = RunnableGraph.fromGraph(linearGraph)
73
runnable.run()
74
```
75
76
**Fan-out Example:**
77
```scala
78
import akka.stream.scaladsl.Broadcast
79
80
val fanOutGraph = GraphDSL.create() { implicit builder =>
81
val source = builder.add(Source(1 to 10))
82
val broadcast = builder.add(Broadcast[Int](2))
83
val sink1 = builder.add(Sink.foreach[Int](x => println(s"Sink1: $x")))
84
val sink2 = builder.add(Sink.foreach[Int](x => println(s"Sink2: $x")))
85
86
source ~> broadcast
87
broadcast ~> sink1
88
broadcast ~> sink2
89
90
ClosedShape
91
}
92
```
93
94
**Fan-in Example:**
95
```scala
96
import akka.stream.scaladsl.Merge
97
98
val fanInGraph = GraphDSL.create() { implicit builder =>
99
val source1 = builder.add(Source(1 to 5))
100
val source2 = builder.add(Source(6 to 10))
101
val merge = builder.add(Merge[Int](2))
102
val sink = builder.add(Sink.foreach[Int](println))
103
104
source1 ~> merge
105
source2 ~> merge
106
merge ~> sink
107
108
ClosedShape
109
}
110
```
111
112
## Custom Shapes
113
114
Creating reusable graph components with custom shapes.
115
116
```scala { .api }
117
// Example: Custom shape with 2 inputs, 1 output
118
case class Add2Shape(in1: Inlet[Int], in2: Inlet[Int], out: Outlet[Int]) extends Shape {
119
val inlets: immutable.Seq[Inlet[_]] = immutable.Seq(in1, in2)
120
val outlets: immutable.Seq[Outlet[_]] = immutable.Seq(out)
121
122
def deepCopy(): Shape = Add2Shape(in1.carbonCopy(), in2.carbonCopy(), out.carbonCopy())
123
}
124
```
125
126
### Usage Example
127
128
```scala
129
// Create a reusable adder component
130
val adder = GraphDSL.create() { implicit builder =>
131
val zip = builder.add(Zip[Int, Int]())
132
val add = builder.add(Flow[(Int, Int)].map { case (a, b) => a + b })
133
134
zip.out ~> add
135
136
Add2Shape(zip.in0, zip.in1, add.out)
137
}
138
139
// Use the custom component
140
val mainGraph = GraphDSL.create() { implicit builder =>
141
val source1 = builder.add(Source(1 to 5))
142
val source2 = builder.add(Source(6 to 10))
143
val customAdder = builder.add(adder)
144
val sink = builder.add(Sink.foreach[Int](println))
145
146
source1 ~> customAdder.in1
147
source2 ~> customAdder.in2
148
customAdder.out ~> sink
149
150
ClosedShape
151
}
152
```
153
154
## Materialized Values in Graphs
155
156
Accessing and combining materialized values from graph components.
157
158
```scala { .api }
159
object GraphDSL {
160
def create[S <: Shape, M1, M2, Mat](
161
g1: Graph[_ <: Shape, M1],
162
g2: Graph[_ <: Shape, M2]
163
)(combineMat: (M1, M2) => Mat)(buildBlock: GraphDSL.Builder[Mat] => (g1.Shape, g2.Shape) => S): Graph[S, Mat]
164
}
165
```
166
167
### Usage Example
168
169
```scala
170
import akka.stream.scaladsl.Keep
171
172
// Graph that combines materialized values
173
val graphWithMat = GraphDSL.create(
174
Source.queue[Int](10, OverflowStrategy.backpressure),
175
Sink.head[Int]
176
)(Keep.both) { implicit builder => (queueSource, headSink) =>
177
178
val flow = builder.add(Flow[Int].map(_ * 2))
179
180
queueSource ~> flow ~> headSink
181
182
ClosedShape
183
}
184
185
val (queue, firstElement) = RunnableGraph.fromGraph(graphWithMat).run()
186
187
// Use the queue and await first element
188
queue.offer(42)
189
queue.complete()
190
firstElement.foreach(println) // Prints: 84
191
```
192
193
## Java GraphDSL
194
195
The Java API provides similar functionality with builder pattern.
196
197
```scala { .api }
198
object GraphDSL {
199
// Java API
200
def create[S <: Shape](buildBlock: function.Function[GraphDSL.Builder[NotUsed], S]): Graph[S, NotUsed]
201
202
final class Builder[+Mat] {
203
def add[S <: Shape](graph: Graph[S, _]): S
204
def from[T](out: Outlet[T]): ForwardOps[T]
205
def to[T](in: Inlet[T]): ReverseOps[T]
206
207
final class ForwardOps[T] {
208
def toInlet(in: Inlet[_ >: T]): Builder[Mat]
209
def to(sink: SinkShape[_ >: T]): Builder[Mat]
210
def via[U](flow: FlowShape[_ >: T, U]): ForwardOps[U]
211
}
212
}
213
}
214
```
215
216
### Java Usage Example
217
218
```java
219
import akka.stream.javadsl.*;
220
import akka.stream.ClosedShape;
221
222
Graph<ClosedShape, NotUsed> graph = GraphDSL.create(builder -> {
223
SourceShape<Integer> source = builder.add(Source.range(1, 10));
224
FlowShape<Integer, Integer> flow = builder.add(Flow.of(Integer.class).map(x -> x * 2));
225
SinkShape<Integer> sink = builder.add(Sink.foreach(System.out::println));
226
227
builder.from(source).via(flow).to(sink);
228
229
return ClosedShape.getInstance();
230
});
231
232
RunnableGraph.fromGraph(graph).run(system);
233
```
234
235
## Complex Graph Patterns
236
237
### Pipeline with Bypass
238
239
```scala
240
val pipelineWithBypass = GraphDSL.create() { implicit builder =>
241
val source = builder.add(Source(1 to 20))
242
val broadcast = builder.add(Broadcast[Int](2))
243
val merge = builder.add(Merge[Int](2))
244
val sink = builder.add(Sink.foreach[Int](println))
245
246
// Main processing path
247
val processFlow = builder.add(Flow[Int].filter(_ % 2 == 0).map(_ * 2))
248
249
// Bypass path for odd numbers
250
val bypassFlow = builder.add(Flow[Int].filter(_ % 2 == 1))
251
252
source ~> broadcast
253
broadcast ~> processFlow ~> merge
254
broadcast ~> bypassFlow ~> merge
255
merge ~> sink
256
257
ClosedShape
258
}
259
```
260
261
### Feedback Loop
262
263
```scala
264
import akka.stream.scaladsl.MergePreferred
265
266
val feedbackGraph = GraphDSL.create() { implicit builder =>
267
val source = builder.add(Source.single(1))
268
val merge = builder.add(MergePreferred[Int](1))
269
val flow = builder.add(Flow[Int].map { x =>
270
println(s"Processing: $x")
271
if (x < 10) x + 1 else x
272
})
273
val broadcast = builder.add(Broadcast[Int](2))
274
val sink = builder.add(Sink.head[Int])
275
276
source ~> merge ~> flow ~> broadcast
277
broadcast ~> sink
278
broadcast.filter(_ < 10) ~> merge.preferred
279
280
ClosedShape
281
}
282
```
283
284
This covers the essential graph building capabilities. The GraphDSL allows for creating complex, reusable stream topologies while maintaining type safety and efficient materialization.