CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.

Pending
Overview
Eval results
Files

materialization.mddocs/

Materialization and Execution

Stream materialization with ActorMaterializer, lifecycle management, and execution control for running stream blueprints.

Materializer Abstract Class

The base abstraction for materializing stream graphs into running streams.

abstract class Materializer {
  def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
  def materialize[Mat](runnable: Graph[ClosedShape, Mat], defaultAttributes: Attributes): Mat
  def withNamePrefix(name: String): Materializer
  implicit def executionContext: ExecutionContextExecutor
  def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable
  def schedulePeriodically(
    initialDelay: FiniteDuration, 
    interval: FiniteDuration, 
    task: Runnable
  ): Cancellable
}

ActorMaterializer

The default materializer implementation that uses Akka actors to run streams.

abstract class ActorMaterializer extends Materializer with MaterializerLoggingProvider {
  def settings: ActorMaterializerSettings
  def shutdown(): Unit
  def isShutdown: Boolean
  def system: ActorSystem
}

Factory Methods

Scala API:

object ActorMaterializer {
  def apply(
    materializerSettings: Option[ActorMaterializerSettings] = None,
    namePrefix: Option[String] = None
  )(implicit context: ActorRefFactory): ActorMaterializer
  
  def apply(
    settings: ActorMaterializerSettings, 
    namePrefix: String
  )(implicit context: ActorRefFactory): ActorMaterializer
}

Java API:

object ActorMaterializer {
  def create(context: ActorRefFactory): ActorMaterializer
  def create(settings: ActorMaterializerSettings, context: ActorRefFactory): ActorMaterializer
  def create(settings: ActorMaterializerSettings, context: ActorRefFactory, namePrefix: String): ActorMaterializer
}

Usage Examples

Basic Materializer Setup:

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink}

// Create actor system (required for materializer)
implicit val system: ActorSystem = ActorSystem("MySystem")

// Create materializer with default settings
implicit val materializer: ActorMaterializer = ActorMaterializer()

// Alternative with custom settings
val customSettings = ActorMaterializerSettings(system)
  .withInputBuffer(initialSize = 64, maxSize = 64)
  .withDispatcher("my-dispatcher")

implicit val customMaterializer: ActorMaterializer = 
  ActorMaterializer(customSettings)

// Use materializer to run streams
val source = Source(1 to 10)
val sink = Sink.foreach[Int](println)
val result = source.runWith(sink) // Uses implicit materializer

Explicit Materialization:

import akka.stream.scaladsl.RunnableGraph

val graph: RunnableGraph[Future[Done]] = source.to(sink)

// Materialize and get materialized value
val materializedValue: Future[Done] = materializer.materialize(graph)

// With custom attributes
val withAttributes = graph.withAttributes(Attributes.name("my-stream"))
val result2 = materializer.materialize(withAttributes)

ActorMaterializerSettings

Configuration for the ActorMaterializer with various tuning options.

final class ActorMaterializerSettings(
  initialInputBufferSize: Int,
  maxInputBufferSize: Int,
  dispatcher: String,
  supervisionDecider: Supervision.Decider,
  subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
  debugLogging: Boolean,
  outputBurstLimit: Int,
  fuzzingMode: Boolean,
  autoFusing: Boolean,
  maxFixedBufferSize: Int,
  syncProcessingLimit: Int,
  blockingIoDispatcher: String
)

Configuration Methods

Buffer Configuration:

class ActorMaterializerSettings {
  def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings
  def withMaxFixedBufferSize(maxSize: Int): ActorMaterializerSettings
  def withSyncProcessingLimit(limit: Int): ActorMaterializerSettings
}

Dispatcher Configuration:

class ActorMaterializerSettings {
  def withDispatcher(dispatcher: String): ActorMaterializerSettings
  def withBlockingIoDispatcher(dispatcher: String): ActorMaterializerSettings
}

Supervision and Error Handling:

class ActorMaterializerSettings {
  def withSupervisionStrategy(decider: Supervision.Decider): ActorMaterializerSettings
}

Debug and Performance:

class ActorMaterializerSettings {
  def withDebugLogging(enable: Boolean): ActorMaterializerSettings
  def withFuzzingMode(enable: Boolean): ActorMaterializerSettings
  def withAutoFusing(enable: Boolean): ActorMaterializerSettings
}

Usage Examples

import akka.stream.{ActorMaterializerSettings, Supervision}
import scala.concurrent.duration._

val settings = ActorMaterializerSettings(system)
  .withInputBuffer(initialSize = 16, maxSize = 128)
  .withDispatcher("stream-dispatcher")
  .withSupervisionStrategy(Supervision.restartingDecider)
  .withDebugLogging(true)
  .withSubscriptionTimeout(
    StreamSubscriptionTimeoutSettings(
      mode = StreamSubscriptionTimeoutTerminationMode.noop,
      timeout = 5.seconds
    )
  )

val materializer = ActorMaterializer(settings)

RunnableGraph

A graph with no open ports that can be materialized to run.

final class RunnableGraph[+Mat](
  override val traversalBuilder: TraversalBuilder,
  override val shape: ClosedShape
) extends Graph[ClosedShape, Mat]

Key Methods

class RunnableGraph[+Mat] {
  def run()(implicit materializer: Materializer): Mat
  def runWith[Mat2](sink: Graph[SinkShape[Any], Mat2])(implicit materializer: Materializer): Mat2
  def withAttributes(attr: Attributes): RunnableGraph[Mat]
  def named(name: String): RunnableGraph[Mat]
}

Usage Examples

import akka.stream.scaladsl.{Source, Sink, RunnableGraph}

val source = Source(1 to 10)
val sink = Sink.foreach[Int](println)

// Create runnable graph
val graph: RunnableGraph[Future[Done]] = source.to(sink)

// Run the graph
val result: Future[Done] = graph.run()

// Add attributes before running
val namedGraph = graph
  .withAttributes(Attributes.name("numbered-printer"))
  .addAttributes(ActorAttributes.dispatcher("my-dispatcher"))

val result2 = namedGraph.run()

Materialized Values

Understanding how materialized values flow through stream composition.

Keep Object

Controls which materialized values to keep when combining streams.

object Keep {
  val left: (Any, Any) => Any
  val right: (Any, Any) => Any  
  val both: (Any, Any) => (Any, Any)
  val none: (Any, Any) => NotUsed
}

Usage Examples

Materialized Value Handling:

import akka.stream.scaladsl.{Source, Sink, Keep}

val source: Source[Int, NotUsed] = Source(1 to 10)
val sink: Sink[Int, Future[Done]] = Sink.foreach(println)

// Keep left materialized value (NotUsed)
val graph1 = source.toMat(sink)(Keep.left)
val result1: NotUsed = graph1.run()

// Keep right materialized value (Future[Done])
val graph2 = source.toMat(sink)(Keep.right)  
val result2: Future[Done] = graph2.run()

// Keep both materialized values
val graph3 = source.toMat(sink)(Keep.both)
val result3: (NotUsed, Future[Done]) = graph3.run()

// Custom combination
val graph4 = source.toMat(sink) { (left, right) =>
  right.map(_ => "Completed!")
}
val result4: Future[String] = graph4.run()

Complex Materialized Value Examples:

val source = Source(1 to 100)
val throttledSource = source.throttle(10, 1.second)

// Source with queue for dynamic element injection
val queueSource: Source[Int, SourceQueueWithComplete[Int]] = 
  Source.queue(10, OverflowStrategy.backpressure)

// Sink that materializes to the first element
val headSink: Sink[Int, Future[Int]] = Sink.head

// Combine to get both queue and first element
val graph = queueSource.toMat(headSink)(Keep.both)
val (queue, firstElement) = graph.run()

// Use the queue to add elements
queue.offer(42)
queue.offer(84)
queue.complete()

// firstElement will complete with 42

Stream Execution Lifecycle

Starting and Stopping Streams

// Streams start automatically when materialized
val runningStream = source.runWith(sink)

// For ActorMaterializer, shutdown stops all streams
materializer.shutdown()

// Check if materializer is shutdown
if (materializer.isShutdown) {
  println("Materializer has been shut down")
}

Resource Management

import akka.Done
import scala.util.{Success, Failure}

// Proper resource cleanup
val system = ActorSystem("MySystem")
val materializer = ActorMaterializer()(system)

val streamResult = source.runWith(sink)(materializer)

streamResult.onComplete {
  case Success(_) => 
    println("Stream completed successfully")
    materializer.shutdown()
    system.terminate()
  case Failure(ex) =>
    println(s"Stream failed: $ex")
    materializer.shutdown()
    system.terminate()
}

Performance Tuning

Buffer Sizing

// Configure buffer sizes for throughput vs memory tradeoff
val settings = ActorMaterializerSettings(system)
  .withInputBuffer(initialSize = 4, maxSize = 16)    // Small buffers, low memory
  .withInputBuffer(initialSize = 64, maxSize = 1024) // Large buffers, high throughput
  
val materializer = ActorMaterializer(settings)

Dispatcher Configuration

// Use dedicated dispatcher for streams
val settings = ActorMaterializerSettings(system)
  .withDispatcher("akka.stream.default-blocking-io-dispatcher")
  
// Or custom dispatcher
val streamSettings = settings.withDispatcher("my-stream-dispatcher")

Async Boundaries

// Add async boundaries for better CPU utilization
val processedSource = source
  .map(heavyComputation)    // CPU intensive
  .async                    // Async boundary
  .map(anotherComputation)  // Can run on different thread
  .async                    // Another boundary

Supervision Strategies

import akka.stream.Supervision

// Configure supervision for error handling
val settings = ActorMaterializerSettings(system)
  .withSupervisionStrategy { ex =>
    ex match {
      case _: IllegalArgumentException => Supervision.Resume
      case _: RuntimeException => Supervision.Restart  
      case _ => Supervision.Stop
    }
  }

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

docs

control-flow.md

core-components.md

error-handling.md

graph-building.md

index.md

io-integration.md

junction-operations.md

materialization.md

stream-operations.md

tile.json