or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-stream-types.mdcustom-stages.mderror-handling.mdindex.mdintegration.mdmaterialization.mdstream-combining.mdstream-control.mdstream-sinks.mdstream-sources.mdstream-transformations.md
tile.json

materialization.mddocs/

Materialization and Execution

System for converting stream blueprints into running streams, managing resources, and controlling materialized values. Materialization is the process that transforms stream descriptions into executing stream processors.

Capabilities

Materializer

The core component responsible for turning stream graphs into running stream processors.

/**
 * Component responsible for materializing stream graphs into running streams
 */
trait Materializer {
  /**
   * Materialize a stream graph
   * @param runnable Complete stream graph ready for execution
   * @return The materialized value
   */
  def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
  
  /**
   * Schedule a task to run once after a delay
   * @param delay Duration to wait before execution
   * @param task Task to execute
   * @return Cancellable to cancel the scheduled task
   */
  def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable
  
  /**
   * Schedule a task to run repeatedly with fixed delay
   * @param initialDelay Initial delay before first execution  
   * @param delay Delay between executions
   * @param task Task to execute
   * @return Cancellable to cancel the scheduled task
   */
  def scheduleWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration, task: Runnable): Cancellable
  
  /**
   * Schedule a task to run repeatedly at fixed rate
   * @param initialDelay Initial delay before first execution
   * @param interval Interval between executions
   * @param task Task to execute  
   * @return Cancellable to cancel the scheduled task
   */
  def scheduleAtFixedRate(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable
  
  /**
   * Get the execution context used by this materializer
   * @return ExecutionContext for async operations
   */
  def executionContext: ExecutionContext
  
  /**
   * Create a materializer with a name prefix for debugging
   * @param name Prefix for materializer name
   * @return New materializer with the specified name prefix
   */
  def withNamePrefix(name: String): Materializer
}

object Materializer {
  /**
   * Create a materializer from an ActorSystem
   * @param system ActorSystem to create materializer from
   * @return Materializer instance
   */
  def apply(system: ActorSystem): Materializer
  
  /**
   * Create a materializer with custom settings
   * @param system ActorSystem to use
   * @param settings Custom materializer settings
   * @return Materializer with custom configuration
   */
  def apply(settings: ActorMaterializerSettings, system: ActorSystem): Materializer
}

Usage Examples:

import akka.actor.ActorSystem
import akka.stream.Materializer
import akka.stream.scaladsl.{Source, Sink}

// Create materializer
implicit val system: ActorSystem = ActorSystem("stream-system")
implicit val materializer: Materializer = Materializer(system)

// Materialize and run stream
val result = Source(1 to 10)
  .map(_ * 2)
  .runWith(Sink.seq)

// Schedule tasks
val cancellable = materializer.scheduleOnce(1.second, new Runnable {
  def run(): Unit = println("Delayed task executed")
})

Stream Execution Methods

Methods for running streams and controlling materialization.

/**
 * Run a complete stream graph
 * @param materializer Implicit materializer for execution
 * @return The materialized value
 */
def run()(implicit materializer: Materializer): Mat

/**
 * Run this source with the given sink
 * @param sink Sink to connect to this source
 * @param materializer Implicit materializer for execution
 * @return The materialized value from the sink
 */
def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2

/**
 * Connect this source to a sink and get a runnable graph
 * @param sink Sink to connect to
 * @return RunnableGraph ready for materialization
 */
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat]

/**
 * Connect this source to a sink and combine materialized values
 * @param sink Sink to connect to
 * @param combine Function to combine materialized values
 * @return RunnableGraph with combined materialized value
 */
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) => Mat3): RunnableGraph[Mat3]

Usage Examples:

// Direct execution
val future1 = Source(1 to 5).runWith(Sink.seq)

// Create runnable graph first
val graph = Source(1 to 5).toMat(Sink.seq)(Keep.right)
val future2 = graph.run()

// Combine materialized values  
val (killSwitch, future3) = Source(1 to 5)
  .viaMat(KillSwitches.single)(Keep.right)
  .toMat(Sink.seq)(Keep.both)
  .run()

Materialized Value Control

Operations for controlling and transforming materialized values.

/**
 * Transform the materialized value of a graph
 * @param f Function to transform the materialized value
 * @return Graph with transformed materialized value
 */
def mapMaterializedValue[Mat2](f: Mat => Mat2): Source[Out, Mat2]

/**
 * Pre-materialize a source to get both materialized value and new source
 * @param materializer Materializer to use for pre-materialization
 * @return Tuple of materialized value and equivalent source
 */
def preMaterialize()(implicit materializer: Materializer): (Mat, Source[Out, NotUsed])

/**
 * Utility functions for combining materialized values
 */
object Keep {
  /**
   * Keep the left materialized value
   */
  def left[L, R]: (L, R) => L = (l, _) => l
  
  /**
   * Keep the right materialized value  
   */
  def right[L, R]: (L, R) => R = (_, r) => r
  
  /**
   * Keep both materialized values as a tuple
   */
  def both[L, R]: (L, R) => (L, R) = (l, r) => (l, r)
  
  /**
   * Discard both materialized values
   */
  def none[L, R]: (L, R) => NotUsed = (_, _) => NotUsed
}

Usage Examples:

// Transform materialized value
val countSource: Source[String, Future[Int]] = Source(List("a", "bb", "ccc"))
  .toMat(Sink.seq)(Keep.right)
  .mapMaterializedValue(_.map(_.length))

// Pre-materialize for reuse
val (future: Future[Seq[Int]], reusableSource: Source[Int, NotUsed]) = 
  Source(1 to 10)
    .toMat(Sink.seq)(Keep.right)
    .preMaterialize()

// Use Keep combinators
val (actorRef: ActorRef, future: Future[Done]) = 
  Source.actorRef[String](100, OverflowStrategy.fail)
    .toMat(Sink.foreach(println))(Keep.both)
    .run()

System Materializer

Utility for getting a system-wide materializer instance.

/**
 * System-wide materializer that uses the guardian actor's dispatcher
 */
object SystemMaterializer {
  /**
   * Get the system materializer for an ActorSystem
   * @param system ActorSystem to get materializer for
   * @return System materializer instance
   */
  def get(system: ActorSystem): Materializer
  
  /**
   * Alias for get()
   */
  def apply(system: ActorSystem): Materializer = get(system)
}

Usage Examples:

// Get system materializer
val system = ActorSystem("my-system")
val materializer = SystemMaterializer.get(system)

// Use in stream operations
Source(1 to 10).runWith(Sink.seq)(materializer)

Materializer Settings

Configuration options for customizing materializer behavior.

/**
 * Settings for configuring materializer behavior
 */
final class ActorMaterializerSettings private (
  val initialInputBufferSize: Int,
  val maxInputBufferSize: Int,
  val dispatcher: String,
  val supervisionDecider: Supervision.Decider,
  val subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
  val debugLogging: Boolean,
  val outputBurstLimit: Int,
  val fuzzingMode: Boolean,
  val autoFusing: Boolean,
  val maxFixedBufferSize: Int,
  val syncProcessingLimit: Int,
  val blockingIoDispatcher: String
) {
  /**
   * Create new settings with different buffer sizes
   */
  def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings
  
  /**
   * Create new settings with different dispatcher
   */
  def withDispatcher(dispatcher: String): ActorMaterializerSettings
  
  /**
   * Create new settings with different supervision decider
   */
  def withSupervisionStrategy(decider: Supervision.Decider): ActorMaterializerSettings
  
  /**
   * Enable or disable debug logging
   */
  def withDebugLogging(enable: Boolean): ActorMaterializerSettings
}

object ActorMaterializerSettings {
  /**
   * Create settings from ActorSystem configuration
   */
  def apply(system: ActorSystem): ActorMaterializerSettings
  
  /**
   * Create settings from custom configuration
   */
  def apply(config: Config): ActorMaterializerSettings
}

Usage Examples:

// Custom materializer settings
val settings = ActorMaterializerSettings(system)
  .withInputBuffer(initialSize = 4, maxSize = 16)
  .withDispatcher("my-custom-dispatcher")
  .withDebugLogging(true)

val customMaterializer = Materializer(settings, system)

// Use custom materializer
Source(1 to 100).runWith(Sink.seq)(customMaterializer)

Resource Management

Controlling materializer lifecycle and resource cleanup.

/**
 * Shutdown the materializer and cleanup resources
 * @return Future that completes when shutdown is finished
 */
def shutdown(): Future[Done]

/**
 * Check if the materializer has been shutdown
 * @return True if shutdown, false otherwise  
 */
def isShutdown: Boolean

Usage Examples:

// Proper resource cleanup
val materializer = Materializer(system)

// Use materializer for stream processing
Source(1 to 10).runWith(Sink.seq)(materializer)

// Shutdown when done
materializer.shutdown().onComplete { _ =>
  println("Materializer shut down")
  system.terminate()
}

Attributes

Configuration metadata that can be attached to stream operators.

/**
 * Metadata container for stream operators
 */
final class Attributes private (val attributeList: List[Attributes.Attribute]) {
  /**
   * Get an attribute of the specified type
   * @param c Class of the attribute type
   * @return Optional attribute value
   */
  def get[T <: Attributes.Attribute](c: Class[T]): Option[T]
  
  /**
   * Get an attribute with a default value
   * @param default Default value if attribute not found
   * @return Attribute value or default
   */
  def getAttribute[T <: Attributes.Attribute](default: T): T
  
  /**
   * Combine with other attributes
   * @param other Other attributes to combine with
   * @return Combined attributes
   */
  def and(other: Attributes): Attributes
}

object Attributes {
  /**
   * Empty attributes
   */
  val none: Attributes
  
  /**
   * Create attributes with name
   */
  def name(name: String): Attributes
  
  /**
   * Create async boundary attribute
   */
  def asyncBoundary: Attributes
  
  /**
   * Create dispatcher attribute
   */
  def dispatcher(dispatcher: String): Attributes
  
  /**
   * Create input buffer attribute
   */
  def inputBuffer(initial: Int, max: Int): Attributes
}

Usage Examples:

// Add attributes to operators
Source(1 to 10)
  .map(_ * 2)
  .withAttributes(Attributes.name("doubler"))
  .async  // Add async boundary
  .filter(_ > 10)
  .withAttributes(Attributes.dispatcher("my-dispatcher"))
  .runWith(Sink.seq)

Types

// Essential types for materialization
type NotUsed = akka.NotUsed

// Completion marker
sealed abstract class Done
case object Done extends Done

// Cancellable interface for scheduled tasks
trait Cancellable {
  def cancel(): Boolean
  def isCancelled: Boolean
}

// Stream subscription timeout settings
final class StreamSubscriptionTimeoutSettings(
  val mode: StreamSubscriptionTimeoutTerminationMode,
  val timeout: FiniteDuration
)

sealed abstract class StreamSubscriptionTimeoutTerminationMode
case object NoopTermination extends StreamSubscriptionTimeoutTerminationMode
case object WarnTermination extends StreamSubscriptionTimeoutTerminationMode
case object CancelTermination extends StreamSubscriptionTimeoutTerminationMode