0
# Materialization and Execution
1
2
System for converting stream blueprints into running streams, managing resources, and controlling materialized values. Materialization is the process that transforms stream descriptions into executing stream processors.
3
4
## Capabilities
5
6
### Materializer
7
8
The core component responsible for turning stream graphs into running stream processors.
9
10
```scala { .api }
11
/**
12
* Component responsible for materializing stream graphs into running streams
13
*/
14
trait Materializer {
15
/**
16
* Materialize a stream graph
17
* @param runnable Complete stream graph ready for execution
18
* @return The materialized value
19
*/
20
def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
21
22
/**
23
* Schedule a task to run once after a delay
24
* @param delay Duration to wait before execution
25
* @param task Task to execute
26
* @return Cancellable to cancel the scheduled task
27
*/
28
def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable
29
30
/**
31
* Schedule a task to run repeatedly with fixed delay
32
* @param initialDelay Initial delay before first execution
33
* @param delay Delay between executions
34
* @param task Task to execute
35
* @return Cancellable to cancel the scheduled task
36
*/
37
def scheduleWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration, task: Runnable): Cancellable
38
39
/**
40
* Schedule a task to run repeatedly at fixed rate
41
* @param initialDelay Initial delay before first execution
42
* @param interval Interval between executions
43
* @param task Task to execute
44
* @return Cancellable to cancel the scheduled task
45
*/
46
def scheduleAtFixedRate(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable
47
48
/**
49
* Get the execution context used by this materializer
50
* @return ExecutionContext for async operations
51
*/
52
def executionContext: ExecutionContext
53
54
/**
55
* Create a materializer with a name prefix for debugging
56
* @param name Prefix for materializer name
57
* @return New materializer with the specified name prefix
58
*/
59
def withNamePrefix(name: String): Materializer
60
}
61
62
object Materializer {
63
/**
64
* Create a materializer from an ActorSystem
65
* @param system ActorSystem to create materializer from
66
* @return Materializer instance
67
*/
68
def apply(system: ActorSystem): Materializer
69
70
/**
71
* Create a materializer with custom settings
72
* @param system ActorSystem to use
73
* @param settings Custom materializer settings
74
* @return Materializer with custom configuration
75
*/
76
def apply(settings: ActorMaterializerSettings, system: ActorSystem): Materializer
77
}
78
```
79
80
**Usage Examples:**
81
82
```scala
83
import akka.actor.ActorSystem
84
import akka.stream.Materializer
85
import akka.stream.scaladsl.{Source, Sink}
86
87
// Create materializer
88
implicit val system: ActorSystem = ActorSystem("stream-system")
89
implicit val materializer: Materializer = Materializer(system)
90
91
// Materialize and run stream
92
val result = Source(1 to 10)
93
.map(_ * 2)
94
.runWith(Sink.seq)
95
96
// Schedule tasks
97
val cancellable = materializer.scheduleOnce(1.second, new Runnable {
98
def run(): Unit = println("Delayed task executed")
99
})
100
```
101
102
### Stream Execution Methods
103
104
Methods for running streams and controlling materialization.
105
106
```scala { .api }
107
/**
108
* Run a complete stream graph
109
* @param materializer Implicit materializer for execution
110
* @return The materialized value
111
*/
112
def run()(implicit materializer: Materializer): Mat
113
114
/**
115
* Run this source with the given sink
116
* @param sink Sink to connect to this source
117
* @param materializer Implicit materializer for execution
118
* @return The materialized value from the sink
119
*/
120
def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2
121
122
/**
123
* Connect this source to a sink and get a runnable graph
124
* @param sink Sink to connect to
125
* @return RunnableGraph ready for materialization
126
*/
127
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat]
128
129
/**
130
* Connect this source to a sink and combine materialized values
131
* @param sink Sink to connect to
132
* @param combine Function to combine materialized values
133
* @return RunnableGraph with combined materialized value
134
*/
135
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) => Mat3): RunnableGraph[Mat3]
136
```
137
138
**Usage Examples:**
139
140
```scala
141
// Direct execution
142
val future1 = Source(1 to 5).runWith(Sink.seq)
143
144
// Create runnable graph first
145
val graph = Source(1 to 5).toMat(Sink.seq)(Keep.right)
146
val future2 = graph.run()
147
148
// Combine materialized values
149
val (killSwitch, future3) = Source(1 to 5)
150
.viaMat(KillSwitches.single)(Keep.right)
151
.toMat(Sink.seq)(Keep.both)
152
.run()
153
```
154
155
### Materialized Value Control
156
157
Operations for controlling and transforming materialized values.
158
159
```scala { .api }
160
/**
161
* Transform the materialized value of a graph
162
* @param f Function to transform the materialized value
163
* @return Graph with transformed materialized value
164
*/
165
def mapMaterializedValue[Mat2](f: Mat => Mat2): Source[Out, Mat2]
166
167
/**
168
* Pre-materialize a source to get both materialized value and new source
169
* @param materializer Materializer to use for pre-materialization
170
* @return Tuple of materialized value and equivalent source
171
*/
172
def preMaterialize()(implicit materializer: Materializer): (Mat, Source[Out, NotUsed])
173
174
/**
175
* Utility functions for combining materialized values
176
*/
177
object Keep {
178
/**
179
* Keep the left materialized value
180
*/
181
def left[L, R]: (L, R) => L = (l, _) => l
182
183
/**
184
* Keep the right materialized value
185
*/
186
def right[L, R]: (L, R) => R = (_, r) => r
187
188
/**
189
* Keep both materialized values as a tuple
190
*/
191
def both[L, R]: (L, R) => (L, R) = (l, r) => (l, r)
192
193
/**
194
* Discard both materialized values
195
*/
196
def none[L, R]: (L, R) => NotUsed = (_, _) => NotUsed
197
}
198
```
199
200
**Usage Examples:**
201
202
```scala
203
// Transform materialized value
204
val countSource: Source[String, Future[Int]] = Source(List("a", "bb", "ccc"))
205
.toMat(Sink.seq)(Keep.right)
206
.mapMaterializedValue(_.map(_.length))
207
208
// Pre-materialize for reuse
209
val (future: Future[Seq[Int]], reusableSource: Source[Int, NotUsed]) =
210
Source(1 to 10)
211
.toMat(Sink.seq)(Keep.right)
212
.preMaterialize()
213
214
// Use Keep combinators
215
val (actorRef: ActorRef, future: Future[Done]) =
216
Source.actorRef[String](100, OverflowStrategy.fail)
217
.toMat(Sink.foreach(println))(Keep.both)
218
.run()
219
```
220
221
### System Materializer
222
223
Utility for getting a system-wide materializer instance.
224
225
```scala { .api }
226
/**
227
* System-wide materializer that uses the guardian actor's dispatcher
228
*/
229
object SystemMaterializer {
230
/**
231
* Get the system materializer for an ActorSystem
232
* @param system ActorSystem to get materializer for
233
* @return System materializer instance
234
*/
235
def get(system: ActorSystem): Materializer
236
237
/**
238
* Alias for get()
239
*/
240
def apply(system: ActorSystem): Materializer = get(system)
241
}
242
```
243
244
**Usage Examples:**
245
246
```scala
247
// Get system materializer
248
val system = ActorSystem("my-system")
249
val materializer = SystemMaterializer.get(system)
250
251
// Use in stream operations
252
Source(1 to 10).runWith(Sink.seq)(materializer)
253
```
254
255
### Materializer Settings
256
257
Configuration options for customizing materializer behavior.
258
259
```scala { .api }
260
/**
261
* Settings for configuring materializer behavior
262
*/
263
final class ActorMaterializerSettings private (
264
val initialInputBufferSize: Int,
265
val maxInputBufferSize: Int,
266
val dispatcher: String,
267
val supervisionDecider: Supervision.Decider,
268
val subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
269
val debugLogging: Boolean,
270
val outputBurstLimit: Int,
271
val fuzzingMode: Boolean,
272
val autoFusing: Boolean,
273
val maxFixedBufferSize: Int,
274
val syncProcessingLimit: Int,
275
val blockingIoDispatcher: String
276
) {
277
/**
278
* Create new settings with different buffer sizes
279
*/
280
def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings
281
282
/**
283
* Create new settings with different dispatcher
284
*/
285
def withDispatcher(dispatcher: String): ActorMaterializerSettings
286
287
/**
288
* Create new settings with different supervision decider
289
*/
290
def withSupervisionStrategy(decider: Supervision.Decider): ActorMaterializerSettings
291
292
/**
293
* Enable or disable debug logging
294
*/
295
def withDebugLogging(enable: Boolean): ActorMaterializerSettings
296
}
297
298
object ActorMaterializerSettings {
299
/**
300
* Create settings from ActorSystem configuration
301
*/
302
def apply(system: ActorSystem): ActorMaterializerSettings
303
304
/**
305
* Create settings from custom configuration
306
*/
307
def apply(config: Config): ActorMaterializerSettings
308
}
309
```
310
311
**Usage Examples:**
312
313
```scala
314
// Custom materializer settings
315
val settings = ActorMaterializerSettings(system)
316
.withInputBuffer(initialSize = 4, maxSize = 16)
317
.withDispatcher("my-custom-dispatcher")
318
.withDebugLogging(true)
319
320
val customMaterializer = Materializer(settings, system)
321
322
// Use custom materializer
323
Source(1 to 100).runWith(Sink.seq)(customMaterializer)
324
```
325
326
### Resource Management
327
328
Controlling materializer lifecycle and resource cleanup.
329
330
```scala { .api }
331
/**
332
* Shutdown the materializer and cleanup resources
333
* @return Future that completes when shutdown is finished
334
*/
335
def shutdown(): Future[Done]
336
337
/**
338
* Check if the materializer has been shutdown
339
* @return True if shutdown, false otherwise
340
*/
341
def isShutdown: Boolean
342
```
343
344
**Usage Examples:**
345
346
```scala
347
// Proper resource cleanup
348
val materializer = Materializer(system)
349
350
// Use materializer for stream processing
351
Source(1 to 10).runWith(Sink.seq)(materializer)
352
353
// Shutdown when done
354
materializer.shutdown().onComplete { _ =>
355
println("Materializer shut down")
356
system.terminate()
357
}
358
```
359
360
### Attributes
361
362
Configuration metadata that can be attached to stream operators.
363
364
```scala { .api }
365
/**
366
* Metadata container for stream operators
367
*/
368
final class Attributes private (val attributeList: List[Attributes.Attribute]) {
369
/**
370
* Get an attribute of the specified type
371
* @param c Class of the attribute type
372
* @return Optional attribute value
373
*/
374
def get[T <: Attributes.Attribute](c: Class[T]): Option[T]
375
376
/**
377
* Get an attribute with a default value
378
* @param default Default value if attribute not found
379
* @return Attribute value or default
380
*/
381
def getAttribute[T <: Attributes.Attribute](default: T): T
382
383
/**
384
* Combine with other attributes
385
* @param other Other attributes to combine with
386
* @return Combined attributes
387
*/
388
def and(other: Attributes): Attributes
389
}
390
391
object Attributes {
392
/**
393
* Empty attributes
394
*/
395
val none: Attributes
396
397
/**
398
* Create attributes with name
399
*/
400
def name(name: String): Attributes
401
402
/**
403
* Create async boundary attribute
404
*/
405
def asyncBoundary: Attributes
406
407
/**
408
* Create dispatcher attribute
409
*/
410
def dispatcher(dispatcher: String): Attributes
411
412
/**
413
* Create input buffer attribute
414
*/
415
def inputBuffer(initial: Int, max: Int): Attributes
416
}
417
```
418
419
**Usage Examples:**
420
421
```scala
422
// Add attributes to operators
423
Source(1 to 10)
424
.map(_ * 2)
425
.withAttributes(Attributes.name("doubler"))
426
.async // Add async boundary
427
.filter(_ > 10)
428
.withAttributes(Attributes.dispatcher("my-dispatcher"))
429
.runWith(Sink.seq)
430
```
431
432
## Types
433
434
```scala { .api }
435
// Essential types for materialization
436
type NotUsed = akka.NotUsed
437
438
// Completion marker
439
sealed abstract class Done
440
case object Done extends Done
441
442
// Cancellable interface for scheduled tasks
443
trait Cancellable {
444
def cancel(): Boolean
445
def isCancelled: Boolean
446
}
447
448
// Stream subscription timeout settings
449
final class StreamSubscriptionTimeoutSettings(
450
val mode: StreamSubscriptionTimeoutTerminationMode,
451
val timeout: FiniteDuration
452
)
453
454
sealed abstract class StreamSubscriptionTimeoutTerminationMode
455
case object NoopTermination extends StreamSubscriptionTimeoutTerminationMode
456
case object WarnTermination extends StreamSubscriptionTimeoutTerminationMode
457
case object CancelTermination extends StreamSubscriptionTimeoutTerminationMode
458
```