or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

control-flow.mdcore-components.mderror-handling.mdgraph-building.mdindex.mdio-integration.mdjunction-operations.mdmaterialization.mdstream-operations.md

graph-building.mddocs/

0

# Graph Building and Composition

1

2

Advanced graph construction using GraphDSL for complex stream topologies including fan-in, fan-out, and custom shapes.

3

4

## GraphDSL Overview

5

6

GraphDSL provides a type-safe DSL for building complex stream graphs with multiple inputs, outputs, and custom topologies.

7

8

```scala { .api }

9

object GraphDSL {

10

def create[S <: Shape, Mat](buildBlock: GraphDSL.Builder[Mat] => S): Graph[S, Mat]

11

def create[S <: Shape](buildBlock: GraphDSL.Builder[NotUsed] => S): Graph[S, NotUsed]

12

13

class Builder[+M] {

14

def add[S <: Shape](graph: Graph[S, _]): S

15

def materializedValue: Outlet[M @uncheckedVariance]

16

}

17

}

18

```

19

20

## GraphDSL Builder

21

22

The Builder provides methods to add components and connect them.

23

24

```scala { .api }

25

class Builder[+M] {

26

def add[S <: Shape](graph: Graph[S, _]): S

27

def materializedValue: Outlet[M @uncheckedVariance]

28

}

29

```

30

31

### Connection Operators (Scala)

32

33

```scala { .api }

34

object GraphDSL {

35

object Implicits {

36

// Forward connections

37

implicit class CombinerBase[+T](val outlet: Outlet[T]) {

38

def ~>[U >: T](inlet: Inlet[U])(implicit b: Builder[_]): Unit

39

def ~>[Out](flow: Graph[FlowShape[T, Out], Any])(implicit b: Builder[_]): PortOps[Out]

40

def ~>(sink: Graph[SinkShape[T], _])(implicit b: Builder[_]): Unit

41

}

42

43

// Reverse connections

44

implicit class ReverseCombinerBase[T](val inlet: Inlet[T]) {

45

def <~[U <: T](outlet: Outlet[U])(implicit b: Builder[_]): Unit

46

def <~[In](flow: Graph[FlowShape[In, T], _])(implicit b: Builder[_]): ReversePortOps[In]

47

def <~(source: Graph[SourceShape[T], _])(implicit b: Builder[_]): Unit

48

}

49

}

50

}

51

```

52

53

### Usage Examples

54

55

**Basic Graph Construction:**

56

```scala

57

import akka.stream.{ClosedShape, SourceShape, SinkShape}

58

import akka.stream.scaladsl.{Source, Sink, Flow, GraphDSL, RunnableGraph}

59

import akka.stream.scaladsl.GraphDSL.Implicits._

60

61

// Simple linear graph

62

val linearGraph = GraphDSL.create() { implicit builder =>

63

val source = builder.add(Source(1 to 10))

64

val flow = builder.add(Flow[Int].map(_ * 2))

65

val sink = builder.add(Sink.foreach[Int](println))

66

67

source ~> flow ~> sink

68

69

ClosedShape

70

}

71

72

val runnable = RunnableGraph.fromGraph(linearGraph)

73

runnable.run()

74

```

75

76

**Fan-out Example:**

77

```scala

78

import akka.stream.scaladsl.Broadcast

79

80

val fanOutGraph = GraphDSL.create() { implicit builder =>

81

val source = builder.add(Source(1 to 10))

82

val broadcast = builder.add(Broadcast[Int](2))

83

val sink1 = builder.add(Sink.foreach[Int](x => println(s"Sink1: $x")))

84

val sink2 = builder.add(Sink.foreach[Int](x => println(s"Sink2: $x")))

85

86

source ~> broadcast

87

broadcast ~> sink1

88

broadcast ~> sink2

89

90

ClosedShape

91

}

92

```

93

94

**Fan-in Example:**

95

```scala

96

import akka.stream.scaladsl.Merge

97

98

val fanInGraph = GraphDSL.create() { implicit builder =>

99

val source1 = builder.add(Source(1 to 5))

100

val source2 = builder.add(Source(6 to 10))

101

val merge = builder.add(Merge[Int](2))

102

val sink = builder.add(Sink.foreach[Int](println))

103

104

source1 ~> merge

105

source2 ~> merge

106

merge ~> sink

107

108

ClosedShape

109

}

110

```

111

112

## Custom Shapes

113

114

Creating reusable graph components with custom shapes.

115

116

```scala { .api }

117

// Example: Custom shape with 2 inputs, 1 output

118

case class Add2Shape(in1: Inlet[Int], in2: Inlet[Int], out: Outlet[Int]) extends Shape {

119

val inlets: immutable.Seq[Inlet[_]] = immutable.Seq(in1, in2)

120

val outlets: immutable.Seq[Outlet[_]] = immutable.Seq(out)

121

122

def deepCopy(): Shape = Add2Shape(in1.carbonCopy(), in2.carbonCopy(), out.carbonCopy())

123

}

124

```

125

126

### Usage Example

127

128

```scala

129

// Create a reusable adder component

130

val adder = GraphDSL.create() { implicit builder =>

131

val zip = builder.add(Zip[Int, Int]())

132

val add = builder.add(Flow[(Int, Int)].map { case (a, b) => a + b })

133

134

zip.out ~> add

135

136

Add2Shape(zip.in0, zip.in1, add.out)

137

}

138

139

// Use the custom component

140

val mainGraph = GraphDSL.create() { implicit builder =>

141

val source1 = builder.add(Source(1 to 5))

142

val source2 = builder.add(Source(6 to 10))

143

val customAdder = builder.add(adder)

144

val sink = builder.add(Sink.foreach[Int](println))

145

146

source1 ~> customAdder.in1

147

source2 ~> customAdder.in2

148

customAdder.out ~> sink

149

150

ClosedShape

151

}

152

```

153

154

## Materialized Values in Graphs

155

156

Accessing and combining materialized values from graph components.

157

158

```scala { .api }

159

object GraphDSL {

160

def create[S <: Shape, M1, M2, Mat](

161

g1: Graph[_ <: Shape, M1],

162

g2: Graph[_ <: Shape, M2]

163

)(combineMat: (M1, M2) => Mat)(buildBlock: GraphDSL.Builder[Mat] => (g1.Shape, g2.Shape) => S): Graph[S, Mat]

164

}

165

```

166

167

### Usage Example

168

169

```scala

170

import akka.stream.scaladsl.Keep

171

172

// Graph that combines materialized values

173

val graphWithMat = GraphDSL.create(

174

Source.queue[Int](10, OverflowStrategy.backpressure),

175

Sink.head[Int]

176

)(Keep.both) { implicit builder => (queueSource, headSink) =>

177

178

val flow = builder.add(Flow[Int].map(_ * 2))

179

180

queueSource ~> flow ~> headSink

181

182

ClosedShape

183

}

184

185

val (queue, firstElement) = RunnableGraph.fromGraph(graphWithMat).run()

186

187

// Use the queue and await first element

188

queue.offer(42)

189

queue.complete()

190

firstElement.foreach(println) // Prints: 84

191

```

192

193

## Java GraphDSL

194

195

The Java API provides similar functionality with builder pattern.

196

197

```scala { .api }

198

object GraphDSL {

199

// Java API

200

def create[S <: Shape](buildBlock: function.Function[GraphDSL.Builder[NotUsed], S]): Graph[S, NotUsed]

201

202

final class Builder[+Mat] {

203

def add[S <: Shape](graph: Graph[S, _]): S

204

def from[T](out: Outlet[T]): ForwardOps[T]

205

def to[T](in: Inlet[T]): ReverseOps[T]

206

207

final class ForwardOps[T] {

208

def toInlet(in: Inlet[_ >: T]): Builder[Mat]

209

def to(sink: SinkShape[_ >: T]): Builder[Mat]

210

def via[U](flow: FlowShape[_ >: T, U]): ForwardOps[U]

211

}

212

}

213

}

214

```

215

216

### Java Usage Example

217

218

```java

219

import akka.stream.javadsl.*;

220

import akka.stream.ClosedShape;

221

222

Graph<ClosedShape, NotUsed> graph = GraphDSL.create(builder -> {

223

SourceShape<Integer> source = builder.add(Source.range(1, 10));

224

FlowShape<Integer, Integer> flow = builder.add(Flow.of(Integer.class).map(x -> x * 2));

225

SinkShape<Integer> sink = builder.add(Sink.foreach(System.out::println));

226

227

builder.from(source).via(flow).to(sink);

228

229

return ClosedShape.getInstance();

230

});

231

232

RunnableGraph.fromGraph(graph).run(system);

233

```

234

235

## Complex Graph Patterns

236

237

### Pipeline with Bypass

238

239

```scala

240

val pipelineWithBypass = GraphDSL.create() { implicit builder =>

241

val source = builder.add(Source(1 to 20))

242

val broadcast = builder.add(Broadcast[Int](2))

243

val merge = builder.add(Merge[Int](2))

244

val sink = builder.add(Sink.foreach[Int](println))

245

246

// Main processing path

247

val processFlow = builder.add(Flow[Int].filter(_ % 2 == 0).map(_ * 2))

248

249

// Bypass path for odd numbers

250

val bypassFlow = builder.add(Flow[Int].filter(_ % 2 == 1))

251

252

source ~> broadcast

253

broadcast ~> processFlow ~> merge

254

broadcast ~> bypassFlow ~> merge

255

merge ~> sink

256

257

ClosedShape

258

}

259

```

260

261

### Feedback Loop

262

263

```scala

264

import akka.stream.scaladsl.MergePreferred

265

266

val feedbackGraph = GraphDSL.create() { implicit builder =>

267

val source = builder.add(Source.single(1))

268

val merge = builder.add(MergePreferred[Int](1))

269

val flow = builder.add(Flow[Int].map { x =>

270

println(s"Processing: $x")

271

if (x < 10) x + 1 else x

272

})

273

val broadcast = builder.add(Broadcast[Int](2))

274

val sink = builder.add(Sink.head[Int])

275

276

source ~> merge ~> flow ~> broadcast

277

broadcast ~> sink

278

broadcast.filter(_ < 10) ~> merge.preferred

279

280

ClosedShape

281

}

282

```

283

284

This covers the essential graph building capabilities. The GraphDSL allows for creating complex, reusable stream topologies while maintaining type safety and efficient materialization.