0
# Materialization and Execution
1
2
Stream materialization with ActorMaterializer, lifecycle management, and execution control for running stream blueprints.
3
4
## Materializer Abstract Class
5
6
The base abstraction for materializing stream graphs into running streams.
7
8
```scala { .api }
9
abstract class Materializer {
10
def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
11
def materialize[Mat](runnable: Graph[ClosedShape, Mat], defaultAttributes: Attributes): Mat
12
def withNamePrefix(name: String): Materializer
13
implicit def executionContext: ExecutionContextExecutor
14
def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable
15
def schedulePeriodically(
16
initialDelay: FiniteDuration,
17
interval: FiniteDuration,
18
task: Runnable
19
): Cancellable
20
}
21
```
22
23
## ActorMaterializer
24
25
The default materializer implementation that uses Akka actors to run streams.
26
27
```scala { .api }
28
abstract class ActorMaterializer extends Materializer with MaterializerLoggingProvider {
29
def settings: ActorMaterializerSettings
30
def shutdown(): Unit
31
def isShutdown: Boolean
32
def system: ActorSystem
33
}
34
```
35
36
### Factory Methods
37
38
**Scala API:**
39
```scala { .api }
40
object ActorMaterializer {
41
def apply(
42
materializerSettings: Option[ActorMaterializerSettings] = None,
43
namePrefix: Option[String] = None
44
)(implicit context: ActorRefFactory): ActorMaterializer
45
46
def apply(
47
settings: ActorMaterializerSettings,
48
namePrefix: String
49
)(implicit context: ActorRefFactory): ActorMaterializer
50
}
51
```
52
53
**Java API:**
54
```scala { .api }
55
object ActorMaterializer {
56
def create(context: ActorRefFactory): ActorMaterializer
57
def create(settings: ActorMaterializerSettings, context: ActorRefFactory): ActorMaterializer
58
def create(settings: ActorMaterializerSettings, context: ActorRefFactory, namePrefix: String): ActorMaterializer
59
}
60
```
61
62
### Usage Examples
63
64
**Basic Materializer Setup:**
65
```scala
66
import akka.actor.ActorSystem
67
import akka.stream.ActorMaterializer
68
import akka.stream.scaladsl.{Source, Sink}
69
70
// Create actor system (required for materializer)
71
implicit val system: ActorSystem = ActorSystem("MySystem")
72
73
// Create materializer with default settings
74
implicit val materializer: ActorMaterializer = ActorMaterializer()
75
76
// Alternative with custom settings
77
val customSettings = ActorMaterializerSettings(system)
78
.withInputBuffer(initialSize = 64, maxSize = 64)
79
.withDispatcher("my-dispatcher")
80
81
implicit val customMaterializer: ActorMaterializer =
82
ActorMaterializer(customSettings)
83
84
// Use materializer to run streams
85
val source = Source(1 to 10)
86
val sink = Sink.foreach[Int](println)
87
val result = source.runWith(sink) // Uses implicit materializer
88
```
89
90
**Explicit Materialization:**
91
```scala
92
import akka.stream.scaladsl.RunnableGraph
93
94
val graph: RunnableGraph[Future[Done]] = source.to(sink)
95
96
// Materialize and get materialized value
97
val materializedValue: Future[Done] = materializer.materialize(graph)
98
99
// With custom attributes
100
val withAttributes = graph.withAttributes(Attributes.name("my-stream"))
101
val result2 = materializer.materialize(withAttributes)
102
```
103
104
## ActorMaterializerSettings
105
106
Configuration for the ActorMaterializer with various tuning options.
107
108
```scala { .api }
109
final class ActorMaterializerSettings(
110
initialInputBufferSize: Int,
111
maxInputBufferSize: Int,
112
dispatcher: String,
113
supervisionDecider: Supervision.Decider,
114
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
115
debugLogging: Boolean,
116
outputBurstLimit: Int,
117
fuzzingMode: Boolean,
118
autoFusing: Boolean,
119
maxFixedBufferSize: Int,
120
syncProcessingLimit: Int,
121
blockingIoDispatcher: String
122
)
123
```
124
125
### Configuration Methods
126
127
**Buffer Configuration:**
128
```scala { .api }
129
class ActorMaterializerSettings {
130
def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings
131
def withMaxFixedBufferSize(maxSize: Int): ActorMaterializerSettings
132
def withSyncProcessingLimit(limit: Int): ActorMaterializerSettings
133
}
134
```
135
136
**Dispatcher Configuration:**
137
```scala { .api }
138
class ActorMaterializerSettings {
139
def withDispatcher(dispatcher: String): ActorMaterializerSettings
140
def withBlockingIoDispatcher(dispatcher: String): ActorMaterializerSettings
141
}
142
```
143
144
**Supervision and Error Handling:**
145
```scala { .api }
146
class ActorMaterializerSettings {
147
def withSupervisionStrategy(decider: Supervision.Decider): ActorMaterializerSettings
148
}
149
```
150
151
**Debug and Performance:**
152
```scala { .api }
153
class ActorMaterializerSettings {
154
def withDebugLogging(enable: Boolean): ActorMaterializerSettings
155
def withFuzzingMode(enable: Boolean): ActorMaterializerSettings
156
def withAutoFusing(enable: Boolean): ActorMaterializerSettings
157
}
158
```
159
160
### Usage Examples
161
162
```scala
163
import akka.stream.{ActorMaterializerSettings, Supervision}
164
import scala.concurrent.duration._
165
166
val settings = ActorMaterializerSettings(system)
167
.withInputBuffer(initialSize = 16, maxSize = 128)
168
.withDispatcher("stream-dispatcher")
169
.withSupervisionStrategy(Supervision.restartingDecider)
170
.withDebugLogging(true)
171
.withSubscriptionTimeout(
172
StreamSubscriptionTimeoutSettings(
173
mode = StreamSubscriptionTimeoutTerminationMode.noop,
174
timeout = 5.seconds
175
)
176
)
177
178
val materializer = ActorMaterializer(settings)
179
```
180
181
## RunnableGraph
182
183
A graph with no open ports that can be materialized to run.
184
185
```scala { .api }
186
final class RunnableGraph[+Mat](
187
override val traversalBuilder: TraversalBuilder,
188
override val shape: ClosedShape
189
) extends Graph[ClosedShape, Mat]
190
```
191
192
### Key Methods
193
194
```scala { .api }
195
class RunnableGraph[+Mat] {
196
def run()(implicit materializer: Materializer): Mat
197
def runWith[Mat2](sink: Graph[SinkShape[Any], Mat2])(implicit materializer: Materializer): Mat2
198
def withAttributes(attr: Attributes): RunnableGraph[Mat]
199
def named(name: String): RunnableGraph[Mat]
200
}
201
```
202
203
### Usage Examples
204
205
```scala
206
import akka.stream.scaladsl.{Source, Sink, RunnableGraph}
207
208
val source = Source(1 to 10)
209
val sink = Sink.foreach[Int](println)
210
211
// Create runnable graph
212
val graph: RunnableGraph[Future[Done]] = source.to(sink)
213
214
// Run the graph
215
val result: Future[Done] = graph.run()
216
217
// Add attributes before running
218
val namedGraph = graph
219
.withAttributes(Attributes.name("numbered-printer"))
220
.addAttributes(ActorAttributes.dispatcher("my-dispatcher"))
221
222
val result2 = namedGraph.run()
223
```
224
225
## Materialized Values
226
227
Understanding how materialized values flow through stream composition.
228
229
### Keep Object
230
231
Controls which materialized values to keep when combining streams.
232
233
```scala { .api }
234
object Keep {
235
val left: (Any, Any) => Any
236
val right: (Any, Any) => Any
237
val both: (Any, Any) => (Any, Any)
238
val none: (Any, Any) => NotUsed
239
}
240
```
241
242
### Usage Examples
243
244
**Materialized Value Handling:**
245
```scala
246
import akka.stream.scaladsl.{Source, Sink, Keep}
247
248
val source: Source[Int, NotUsed] = Source(1 to 10)
249
val sink: Sink[Int, Future[Done]] = Sink.foreach(println)
250
251
// Keep left materialized value (NotUsed)
252
val graph1 = source.toMat(sink)(Keep.left)
253
val result1: NotUsed = graph1.run()
254
255
// Keep right materialized value (Future[Done])
256
val graph2 = source.toMat(sink)(Keep.right)
257
val result2: Future[Done] = graph2.run()
258
259
// Keep both materialized values
260
val graph3 = source.toMat(sink)(Keep.both)
261
val result3: (NotUsed, Future[Done]) = graph3.run()
262
263
// Custom combination
264
val graph4 = source.toMat(sink) { (left, right) =>
265
right.map(_ => "Completed!")
266
}
267
val result4: Future[String] = graph4.run()
268
```
269
270
**Complex Materialized Value Examples:**
271
```scala
272
val source = Source(1 to 100)
273
val throttledSource = source.throttle(10, 1.second)
274
275
// Source with queue for dynamic element injection
276
val queueSource: Source[Int, SourceQueueWithComplete[Int]] =
277
Source.queue(10, OverflowStrategy.backpressure)
278
279
// Sink that materializes to the first element
280
val headSink: Sink[Int, Future[Int]] = Sink.head
281
282
// Combine to get both queue and first element
283
val graph = queueSource.toMat(headSink)(Keep.both)
284
val (queue, firstElement) = graph.run()
285
286
// Use the queue to add elements
287
queue.offer(42)
288
queue.offer(84)
289
queue.complete()
290
291
// firstElement will complete with 42
292
```
293
294
## Stream Execution Lifecycle
295
296
### Starting and Stopping Streams
297
298
```scala
299
// Streams start automatically when materialized
300
val runningStream = source.runWith(sink)
301
302
// For ActorMaterializer, shutdown stops all streams
303
materializer.shutdown()
304
305
// Check if materializer is shutdown
306
if (materializer.isShutdown) {
307
println("Materializer has been shut down")
308
}
309
```
310
311
### Resource Management
312
313
```scala
314
import akka.Done
315
import scala.util.{Success, Failure}
316
317
// Proper resource cleanup
318
val system = ActorSystem("MySystem")
319
val materializer = ActorMaterializer()(system)
320
321
val streamResult = source.runWith(sink)(materializer)
322
323
streamResult.onComplete {
324
case Success(_) =>
325
println("Stream completed successfully")
326
materializer.shutdown()
327
system.terminate()
328
case Failure(ex) =>
329
println(s"Stream failed: $ex")
330
materializer.shutdown()
331
system.terminate()
332
}
333
```
334
335
## Performance Tuning
336
337
### Buffer Sizing
338
339
```scala
340
// Configure buffer sizes for throughput vs memory tradeoff
341
val settings = ActorMaterializerSettings(system)
342
.withInputBuffer(initialSize = 4, maxSize = 16) // Small buffers, low memory
343
.withInputBuffer(initialSize = 64, maxSize = 1024) // Large buffers, high throughput
344
345
val materializer = ActorMaterializer(settings)
346
```
347
348
### Dispatcher Configuration
349
350
```scala
351
// Use dedicated dispatcher for streams
352
val settings = ActorMaterializerSettings(system)
353
.withDispatcher("akka.stream.default-blocking-io-dispatcher")
354
355
// Or custom dispatcher
356
val streamSettings = settings.withDispatcher("my-stream-dispatcher")
357
```
358
359
### Async Boundaries
360
361
```scala
362
// Add async boundaries for better CPU utilization
363
val processedSource = source
364
.map(heavyComputation) // CPU intensive
365
.async // Async boundary
366
.map(anotherComputation) // Can run on different thread
367
.async // Another boundary
368
```
369
370
### Supervision Strategies
371
372
```scala
373
import akka.stream.Supervision
374
375
// Configure supervision for error handling
376
val settings = ActorMaterializerSettings(system)
377
.withSupervisionStrategy { ex =>
378
ex match {
379
case _: IllegalArgumentException => Supervision.Resume
380
case _: RuntimeException => Supervision.Restart
381
case _ => Supervision.Stop
382
}
383
}
384
```