or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

control-flow.mdcore-components.mderror-handling.mdgraph-building.mdindex.mdio-integration.mdjunction-operations.mdmaterialization.mdstream-operations.md

materialization.mddocs/

0

# Materialization and Execution

1

2

Stream materialization with ActorMaterializer, lifecycle management, and execution control for running stream blueprints.

3

4

## Materializer Abstract Class

5

6

The base abstraction for materializing stream graphs into running streams.

7

8

```scala { .api }

9

abstract class Materializer {

10

def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat

11

def materialize[Mat](runnable: Graph[ClosedShape, Mat], defaultAttributes: Attributes): Mat

12

def withNamePrefix(name: String): Materializer

13

implicit def executionContext: ExecutionContextExecutor

14

def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable

15

def schedulePeriodically(

16

initialDelay: FiniteDuration,

17

interval: FiniteDuration,

18

task: Runnable

19

): Cancellable

20

}

21

```

22

23

## ActorMaterializer

24

25

The default materializer implementation that uses Akka actors to run streams.

26

27

```scala { .api }

28

abstract class ActorMaterializer extends Materializer with MaterializerLoggingProvider {

29

def settings: ActorMaterializerSettings

30

def shutdown(): Unit

31

def isShutdown: Boolean

32

def system: ActorSystem

33

}

34

```

35

36

### Factory Methods

37

38

**Scala API:**

39

```scala { .api }

40

object ActorMaterializer {

41

def apply(

42

materializerSettings: Option[ActorMaterializerSettings] = None,

43

namePrefix: Option[String] = None

44

)(implicit context: ActorRefFactory): ActorMaterializer

45

46

def apply(

47

settings: ActorMaterializerSettings,

48

namePrefix: String

49

)(implicit context: ActorRefFactory): ActorMaterializer

50

}

51

```

52

53

**Java API:**

54

```scala { .api }

55

object ActorMaterializer {

56

def create(context: ActorRefFactory): ActorMaterializer

57

def create(settings: ActorMaterializerSettings, context: ActorRefFactory): ActorMaterializer

58

def create(settings: ActorMaterializerSettings, context: ActorRefFactory, namePrefix: String): ActorMaterializer

59

}

60

```

61

62

### Usage Examples

63

64

**Basic Materializer Setup:**

65

```scala

66

import akka.actor.ActorSystem

67

import akka.stream.ActorMaterializer

68

import akka.stream.scaladsl.{Source, Sink}

69

70

// Create actor system (required for materializer)

71

implicit val system: ActorSystem = ActorSystem("MySystem")

72

73

// Create materializer with default settings

74

implicit val materializer: ActorMaterializer = ActorMaterializer()

75

76

// Alternative with custom settings

77

val customSettings = ActorMaterializerSettings(system)

78

.withInputBuffer(initialSize = 64, maxSize = 64)

79

.withDispatcher("my-dispatcher")

80

81

implicit val customMaterializer: ActorMaterializer =

82

ActorMaterializer(customSettings)

83

84

// Use materializer to run streams

85

val source = Source(1 to 10)

86

val sink = Sink.foreach[Int](println)

87

val result = source.runWith(sink) // Uses implicit materializer

88

```

89

90

**Explicit Materialization:**

91

```scala

92

import akka.stream.scaladsl.RunnableGraph

93

94

val graph: RunnableGraph[Future[Done]] = source.to(sink)

95

96

// Materialize and get materialized value

97

val materializedValue: Future[Done] = materializer.materialize(graph)

98

99

// With custom attributes

100

val withAttributes = graph.withAttributes(Attributes.name("my-stream"))

101

val result2 = materializer.materialize(withAttributes)

102

```

103

104

## ActorMaterializerSettings

105

106

Configuration for the ActorMaterializer with various tuning options.

107

108

```scala { .api }

109

final class ActorMaterializerSettings(

110

initialInputBufferSize: Int,

111

maxInputBufferSize: Int,

112

dispatcher: String,

113

supervisionDecider: Supervision.Decider,

114

subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,

115

debugLogging: Boolean,

116

outputBurstLimit: Int,

117

fuzzingMode: Boolean,

118

autoFusing: Boolean,

119

maxFixedBufferSize: Int,

120

syncProcessingLimit: Int,

121

blockingIoDispatcher: String

122

)

123

```

124

125

### Configuration Methods

126

127

**Buffer Configuration:**

128

```scala { .api }

129

class ActorMaterializerSettings {

130

def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings

131

def withMaxFixedBufferSize(maxSize: Int): ActorMaterializerSettings

132

def withSyncProcessingLimit(limit: Int): ActorMaterializerSettings

133

}

134

```

135

136

**Dispatcher Configuration:**

137

```scala { .api }

138

class ActorMaterializerSettings {

139

def withDispatcher(dispatcher: String): ActorMaterializerSettings

140

def withBlockingIoDispatcher(dispatcher: String): ActorMaterializerSettings

141

}

142

```

143

144

**Supervision and Error Handling:**

145

```scala { .api }

146

class ActorMaterializerSettings {

147

def withSupervisionStrategy(decider: Supervision.Decider): ActorMaterializerSettings

148

}

149

```

150

151

**Debug and Performance:**

152

```scala { .api }

153

class ActorMaterializerSettings {

154

def withDebugLogging(enable: Boolean): ActorMaterializerSettings

155

def withFuzzingMode(enable: Boolean): ActorMaterializerSettings

156

def withAutoFusing(enable: Boolean): ActorMaterializerSettings

157

}

158

```

159

160

### Usage Examples

161

162

```scala

163

import akka.stream.{ActorMaterializerSettings, Supervision}

164

import scala.concurrent.duration._

165

166

val settings = ActorMaterializerSettings(system)

167

.withInputBuffer(initialSize = 16, maxSize = 128)

168

.withDispatcher("stream-dispatcher")

169

.withSupervisionStrategy(Supervision.restartingDecider)

170

.withDebugLogging(true)

171

.withSubscriptionTimeout(

172

StreamSubscriptionTimeoutSettings(

173

mode = StreamSubscriptionTimeoutTerminationMode.noop,

174

timeout = 5.seconds

175

)

176

)

177

178

val materializer = ActorMaterializer(settings)

179

```

180

181

## RunnableGraph

182

183

A graph with no open ports that can be materialized to run.

184

185

```scala { .api }

186

final class RunnableGraph[+Mat](

187

override val traversalBuilder: TraversalBuilder,

188

override val shape: ClosedShape

189

) extends Graph[ClosedShape, Mat]

190

```

191

192

### Key Methods

193

194

```scala { .api }

195

class RunnableGraph[+Mat] {

196

def run()(implicit materializer: Materializer): Mat

197

def runWith[Mat2](sink: Graph[SinkShape[Any], Mat2])(implicit materializer: Materializer): Mat2

198

def withAttributes(attr: Attributes): RunnableGraph[Mat]

199

def named(name: String): RunnableGraph[Mat]

200

}

201

```

202

203

### Usage Examples

204

205

```scala

206

import akka.stream.scaladsl.{Source, Sink, RunnableGraph}

207

208

val source = Source(1 to 10)

209

val sink = Sink.foreach[Int](println)

210

211

// Create runnable graph

212

val graph: RunnableGraph[Future[Done]] = source.to(sink)

213

214

// Run the graph

215

val result: Future[Done] = graph.run()

216

217

// Add attributes before running

218

val namedGraph = graph

219

.withAttributes(Attributes.name("numbered-printer"))

220

.addAttributes(ActorAttributes.dispatcher("my-dispatcher"))

221

222

val result2 = namedGraph.run()

223

```

224

225

## Materialized Values

226

227

Understanding how materialized values flow through stream composition.

228

229

### Keep Object

230

231

Controls which materialized values to keep when combining streams.

232

233

```scala { .api }

234

object Keep {

235

val left: (Any, Any) => Any

236

val right: (Any, Any) => Any

237

val both: (Any, Any) => (Any, Any)

238

val none: (Any, Any) => NotUsed

239

}

240

```

241

242

### Usage Examples

243

244

**Materialized Value Handling:**

245

```scala

246

import akka.stream.scaladsl.{Source, Sink, Keep}

247

248

val source: Source[Int, NotUsed] = Source(1 to 10)

249

val sink: Sink[Int, Future[Done]] = Sink.foreach(println)

250

251

// Keep left materialized value (NotUsed)

252

val graph1 = source.toMat(sink)(Keep.left)

253

val result1: NotUsed = graph1.run()

254

255

// Keep right materialized value (Future[Done])

256

val graph2 = source.toMat(sink)(Keep.right)

257

val result2: Future[Done] = graph2.run()

258

259

// Keep both materialized values

260

val graph3 = source.toMat(sink)(Keep.both)

261

val result3: (NotUsed, Future[Done]) = graph3.run()

262

263

// Custom combination

264

val graph4 = source.toMat(sink) { (left, right) =>

265

right.map(_ => "Completed!")

266

}

267

val result4: Future[String] = graph4.run()

268

```

269

270

**Complex Materialized Value Examples:**

271

```scala

272

val source = Source(1 to 100)

273

val throttledSource = source.throttle(10, 1.second)

274

275

// Source with queue for dynamic element injection

276

val queueSource: Source[Int, SourceQueueWithComplete[Int]] =

277

Source.queue(10, OverflowStrategy.backpressure)

278

279

// Sink that materializes to the first element

280

val headSink: Sink[Int, Future[Int]] = Sink.head

281

282

// Combine to get both queue and first element

283

val graph = queueSource.toMat(headSink)(Keep.both)

284

val (queue, firstElement) = graph.run()

285

286

// Use the queue to add elements

287

queue.offer(42)

288

queue.offer(84)

289

queue.complete()

290

291

// firstElement will complete with 42

292

```

293

294

## Stream Execution Lifecycle

295

296

### Starting and Stopping Streams

297

298

```scala

299

// Streams start automatically when materialized

300

val runningStream = source.runWith(sink)

301

302

// For ActorMaterializer, shutdown stops all streams

303

materializer.shutdown()

304

305

// Check if materializer is shutdown

306

if (materializer.isShutdown) {

307

println("Materializer has been shut down")

308

}

309

```

310

311

### Resource Management

312

313

```scala

314

import akka.Done

315

import scala.util.{Success, Failure}

316

317

// Proper resource cleanup

318

val system = ActorSystem("MySystem")

319

val materializer = ActorMaterializer()(system)

320

321

val streamResult = source.runWith(sink)(materializer)

322

323

streamResult.onComplete {

324

case Success(_) =>

325

println("Stream completed successfully")

326

materializer.shutdown()

327

system.terminate()

328

case Failure(ex) =>

329

println(s"Stream failed: $ex")

330

materializer.shutdown()

331

system.terminate()

332

}

333

```

334

335

## Performance Tuning

336

337

### Buffer Sizing

338

339

```scala

340

// Configure buffer sizes for throughput vs memory tradeoff

341

val settings = ActorMaterializerSettings(system)

342

.withInputBuffer(initialSize = 4, maxSize = 16) // Small buffers, low memory

343

.withInputBuffer(initialSize = 64, maxSize = 1024) // Large buffers, high throughput

344

345

val materializer = ActorMaterializer(settings)

346

```

347

348

### Dispatcher Configuration

349

350

```scala

351

// Use dedicated dispatcher for streams

352

val settings = ActorMaterializerSettings(system)

353

.withDispatcher("akka.stream.default-blocking-io-dispatcher")

354

355

// Or custom dispatcher

356

val streamSettings = settings.withDispatcher("my-stream-dispatcher")

357

```

358

359

### Async Boundaries

360

361

```scala

362

// Add async boundaries for better CPU utilization

363

val processedSource = source

364

.map(heavyComputation) // CPU intensive

365

.async // Async boundary

366

.map(anotherComputation) // Can run on different thread

367

.async // Another boundary

368

```

369

370

### Supervision Strategies

371

372

```scala

373

import akka.stream.Supervision

374

375

// Configure supervision for error handling

376

val settings = ActorMaterializerSettings(system)

377

.withSupervisionStrategy { ex =>

378

ex match {

379

case _: IllegalArgumentException => Supervision.Resume

380

case _: RuntimeException => Supervision.Restart

381

case _ => Supervision.Stop

382

}

383

}

384

```