Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.
—
Stream materialization with ActorMaterializer, lifecycle management, and execution control for running stream blueprints.
The base abstraction for materializing stream graphs into running streams.
abstract class Materializer {
def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
def materialize[Mat](runnable: Graph[ClosedShape, Mat], defaultAttributes: Attributes): Mat
def withNamePrefix(name: String): Materializer
implicit def executionContext: ExecutionContextExecutor
def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable
def schedulePeriodically(
initialDelay: FiniteDuration,
interval: FiniteDuration,
task: Runnable
): Cancellable
}The default materializer implementation that uses Akka actors to run streams.
abstract class ActorMaterializer extends Materializer with MaterializerLoggingProvider {
def settings: ActorMaterializerSettings
def shutdown(): Unit
def isShutdown: Boolean
def system: ActorSystem
}Scala API:
object ActorMaterializer {
def apply(
materializerSettings: Option[ActorMaterializerSettings] = None,
namePrefix: Option[String] = None
)(implicit context: ActorRefFactory): ActorMaterializer
def apply(
settings: ActorMaterializerSettings,
namePrefix: String
)(implicit context: ActorRefFactory): ActorMaterializer
}Java API:
object ActorMaterializer {
def create(context: ActorRefFactory): ActorMaterializer
def create(settings: ActorMaterializerSettings, context: ActorRefFactory): ActorMaterializer
def create(settings: ActorMaterializerSettings, context: ActorRefFactory, namePrefix: String): ActorMaterializer
}Basic Materializer Setup:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink}
// Create actor system (required for materializer)
implicit val system: ActorSystem = ActorSystem("MySystem")
// Create materializer with default settings
implicit val materializer: ActorMaterializer = ActorMaterializer()
// Alternative with custom settings
val customSettings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 64, maxSize = 64)
.withDispatcher("my-dispatcher")
implicit val customMaterializer: ActorMaterializer =
ActorMaterializer(customSettings)
// Use materializer to run streams
val source = Source(1 to 10)
val sink = Sink.foreach[Int](println)
val result = source.runWith(sink) // Uses implicit materializerExplicit Materialization:
import akka.stream.scaladsl.RunnableGraph
val graph: RunnableGraph[Future[Done]] = source.to(sink)
// Materialize and get materialized value
val materializedValue: Future[Done] = materializer.materialize(graph)
// With custom attributes
val withAttributes = graph.withAttributes(Attributes.name("my-stream"))
val result2 = materializer.materialize(withAttributes)Configuration for the ActorMaterializer with various tuning options.
final class ActorMaterializerSettings(
initialInputBufferSize: Int,
maxInputBufferSize: Int,
dispatcher: String,
supervisionDecider: Supervision.Decider,
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
debugLogging: Boolean,
outputBurstLimit: Int,
fuzzingMode: Boolean,
autoFusing: Boolean,
maxFixedBufferSize: Int,
syncProcessingLimit: Int,
blockingIoDispatcher: String
)Buffer Configuration:
class ActorMaterializerSettings {
def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings
def withMaxFixedBufferSize(maxSize: Int): ActorMaterializerSettings
def withSyncProcessingLimit(limit: Int): ActorMaterializerSettings
}Dispatcher Configuration:
class ActorMaterializerSettings {
def withDispatcher(dispatcher: String): ActorMaterializerSettings
def withBlockingIoDispatcher(dispatcher: String): ActorMaterializerSettings
}Supervision and Error Handling:
class ActorMaterializerSettings {
def withSupervisionStrategy(decider: Supervision.Decider): ActorMaterializerSettings
}Debug and Performance:
class ActorMaterializerSettings {
def withDebugLogging(enable: Boolean): ActorMaterializerSettings
def withFuzzingMode(enable: Boolean): ActorMaterializerSettings
def withAutoFusing(enable: Boolean): ActorMaterializerSettings
}import akka.stream.{ActorMaterializerSettings, Supervision}
import scala.concurrent.duration._
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 16, maxSize = 128)
.withDispatcher("stream-dispatcher")
.withSupervisionStrategy(Supervision.restartingDecider)
.withDebugLogging(true)
.withSubscriptionTimeout(
StreamSubscriptionTimeoutSettings(
mode = StreamSubscriptionTimeoutTerminationMode.noop,
timeout = 5.seconds
)
)
val materializer = ActorMaterializer(settings)A graph with no open ports that can be materialized to run.
final class RunnableGraph[+Mat](
override val traversalBuilder: TraversalBuilder,
override val shape: ClosedShape
) extends Graph[ClosedShape, Mat]class RunnableGraph[+Mat] {
def run()(implicit materializer: Materializer): Mat
def runWith[Mat2](sink: Graph[SinkShape[Any], Mat2])(implicit materializer: Materializer): Mat2
def withAttributes(attr: Attributes): RunnableGraph[Mat]
def named(name: String): RunnableGraph[Mat]
}import akka.stream.scaladsl.{Source, Sink, RunnableGraph}
val source = Source(1 to 10)
val sink = Sink.foreach[Int](println)
// Create runnable graph
val graph: RunnableGraph[Future[Done]] = source.to(sink)
// Run the graph
val result: Future[Done] = graph.run()
// Add attributes before running
val namedGraph = graph
.withAttributes(Attributes.name("numbered-printer"))
.addAttributes(ActorAttributes.dispatcher("my-dispatcher"))
val result2 = namedGraph.run()Understanding how materialized values flow through stream composition.
Controls which materialized values to keep when combining streams.
object Keep {
val left: (Any, Any) => Any
val right: (Any, Any) => Any
val both: (Any, Any) => (Any, Any)
val none: (Any, Any) => NotUsed
}Materialized Value Handling:
import akka.stream.scaladsl.{Source, Sink, Keep}
val source: Source[Int, NotUsed] = Source(1 to 10)
val sink: Sink[Int, Future[Done]] = Sink.foreach(println)
// Keep left materialized value (NotUsed)
val graph1 = source.toMat(sink)(Keep.left)
val result1: NotUsed = graph1.run()
// Keep right materialized value (Future[Done])
val graph2 = source.toMat(sink)(Keep.right)
val result2: Future[Done] = graph2.run()
// Keep both materialized values
val graph3 = source.toMat(sink)(Keep.both)
val result3: (NotUsed, Future[Done]) = graph3.run()
// Custom combination
val graph4 = source.toMat(sink) { (left, right) =>
right.map(_ => "Completed!")
}
val result4: Future[String] = graph4.run()Complex Materialized Value Examples:
val source = Source(1 to 100)
val throttledSource = source.throttle(10, 1.second)
// Source with queue for dynamic element injection
val queueSource: Source[Int, SourceQueueWithComplete[Int]] =
Source.queue(10, OverflowStrategy.backpressure)
// Sink that materializes to the first element
val headSink: Sink[Int, Future[Int]] = Sink.head
// Combine to get both queue and first element
val graph = queueSource.toMat(headSink)(Keep.both)
val (queue, firstElement) = graph.run()
// Use the queue to add elements
queue.offer(42)
queue.offer(84)
queue.complete()
// firstElement will complete with 42// Streams start automatically when materialized
val runningStream = source.runWith(sink)
// For ActorMaterializer, shutdown stops all streams
materializer.shutdown()
// Check if materializer is shutdown
if (materializer.isShutdown) {
println("Materializer has been shut down")
}import akka.Done
import scala.util.{Success, Failure}
// Proper resource cleanup
val system = ActorSystem("MySystem")
val materializer = ActorMaterializer()(system)
val streamResult = source.runWith(sink)(materializer)
streamResult.onComplete {
case Success(_) =>
println("Stream completed successfully")
materializer.shutdown()
system.terminate()
case Failure(ex) =>
println(s"Stream failed: $ex")
materializer.shutdown()
system.terminate()
}// Configure buffer sizes for throughput vs memory tradeoff
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 4, maxSize = 16) // Small buffers, low memory
.withInputBuffer(initialSize = 64, maxSize = 1024) // Large buffers, high throughput
val materializer = ActorMaterializer(settings)// Use dedicated dispatcher for streams
val settings = ActorMaterializerSettings(system)
.withDispatcher("akka.stream.default-blocking-io-dispatcher")
// Or custom dispatcher
val streamSettings = settings.withDispatcher("my-stream-dispatcher")// Add async boundaries for better CPU utilization
val processedSource = source
.map(heavyComputation) // CPU intensive
.async // Async boundary
.map(anotherComputation) // Can run on different thread
.async // Another boundaryimport akka.stream.Supervision
// Configure supervision for error handling
val settings = ActorMaterializerSettings(system)
.withSupervisionStrategy { ex =>
ex match {
case _: IllegalArgumentException => Supervision.Resume
case _: RuntimeException => Supervision.Restart
case _ => Supervision.Stop
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5