or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-stream-types.mdcustom-stages.mderror-handling.mdindex.mdintegration.mdmaterialization.mdstream-combining.mdstream-control.mdstream-sinks.mdstream-sources.mdstream-transformations.md
tile.json

stream-sinks.mddocs/

Stream Sinks

Endpoints for consuming stream elements including collection sinks, side-effect sinks, and integration with external systems. Sinks are the termination points of stream processing pipelines.

Capabilities

Collection Sinks

Sinks that collect stream elements into various collection types.

object Sink {
  /**
   * Collect all elements into an immutable sequence
   * @return Sink that materializes to Future[immutable.Seq[T]]
   */
  def seq[T]: Sink[T, Future[immutable.Seq[T]]]
  
  /**  
   * Collect elements into a collection using implicit Factory
   * @param cbf Factory for creating the collection type
   * @return Sink that materializes to Future of the collection type
   */
  def collection[T, That](implicit cbf: Factory[T, That with immutable.Iterable[_]]): Sink[T, Future[That]]
  
  /**
   * Get only the first element
   * @return Sink that materializes to Future[T] with the first element
   */
  def head[T]: Sink[T, Future[T]]
  
  /**
   * Get the first element if available
   * @return Sink that materializes to Future[Option[T]]
   */
  def headOption[T]: Sink[T, Future[Option[T]]]
  
  /**
   * Get only the last element  
   * @return Sink that materializes to Future[T] with the last element
   */
  def last[T]: Sink[T, Future[T]]
  
  /**
   * Get the last element if available
   * @return Sink that materializes to Future[Option[T]]
   */
  def lastOption[T]: Sink[T, Future[Option[T]]]
}

Usage Examples:

import akka.stream.scaladsl.{Source, Sink}
import scala.concurrent.Future

// Collect to sequence
val seqResult: Future[Seq[Int]] = Source(1 to 10).runWith(Sink.seq)

// Get first/last elements
val firstResult: Future[Int] = Source(1 to 10).runWith(Sink.head)
val lastResult: Future[Int] = Source(1 to 10).runWith(Sink.last)

// Optional first/last
val firstOptional: Future[Option[String]] = Source.empty[String].runWith(Sink.headOption)

// Custom collection
val listResult: Future[List[Int]] = Source(1 to 5).runWith(
  Sink.collection(() => List.newBuilder[Int])
)

Aggregation Sinks

Sinks that perform aggregation operations on stream elements.

/**
 * Fold all elements using an accumulator function
 * @param zero Initial accumulator value
 * @param f Function to combine accumulator with each element
 * @return Sink that materializes to Future with final accumulated value
 */
def fold[U, T](zero: U)(f: (U, T) => U): Sink[T, Future[U]]

/**
 * Fold all elements asynchronously
 * @param zero Initial accumulator value
 * @param f Async function to combine accumulator with each element
 * @return Sink that materializes to Future with final accumulated value
 */
def foldAsync[U, T](zero: U)(f: (U, T) => Future[U]): Sink[T, Future[U]]

/**
 * Reduce elements using a binary operator (requires at least one element)
 * @param f Binary operator for reduction
 * @return Sink that materializes to Future with reduced value
 */
def reduce[T](f: (T, T) => T): Sink[T, Future[T]]

/**
 * Find the minimum element
 * @param ord Ordering for comparison
 * @return Sink that materializes to Future with minimum element
 */
def min[T](implicit ord: Ordering[T]): Sink[T, Future[T]]

/**
 * Find the maximum element  
 * @param ord Ordering for comparison
 * @return Sink that materializes to Future with maximum element
 */
def max[T](implicit ord: Ordering[T]): Sink[T, Future[T]]

Usage Examples:

// Sum all numbers
val sum: Future[Int] = Source(1 to 10).runWith(Sink.fold(0)(_ + _))

// Concatenate strings
val combined: Future[String] = Source(List("Hello", " ", "World"))
  .runWith(Sink.reduce(_ + _))

// Find min/max
val minimum: Future[Int] = Source(List(5, 2, 8, 1, 9)).runWith(Sink.min)
val maximum: Future[Int] = Source(List(5, 2, 8, 1, 9)).runWith(Sink.max)

// Async aggregation
val asyncSum: Future[Int] = Source(1 to 10).runWith(
  Sink.foldAsync(0) { (acc, elem) =>
    Future.successful(acc + elem)
  }
)

Side-Effect Sinks

Sinks that perform side effects without collecting elements.

/**
 * Execute a side effect for each element
 * @param f Function to execute for each element
 * @return Sink that materializes to Future[Done] when complete
 */
def foreach[T](f: T => Unit): Sink[T, Future[Done]]

/**
 * Execute an async side effect for each element
 * @param parallelism Maximum number of concurrent operations
 * @param f Async function to execute for each element
 * @return Sink that materializes to Future[Done] when complete
 */
def foreachAsync[T](parallelism: Int)(f: T => Future[_]): Sink[T, Future[Done]]

/**
 * Execute a side effect for each element in parallel
 * @param parallelism Maximum number of concurrent operations
 * @param f Function to execute for each element
 * @return Sink that materializes to Future[Done] when complete
 */
def foreachParallel[T](parallelism: Int)(f: T => Unit): Sink[T, Future[Done]]

/**
 * Ignore all elements
 * @return Sink that materializes to Future[Done] and discards all elements
 */
def ignore: Sink[Any, Future[Done]]

/**
 * Sink that is immediately cancelled
 * @return Sink that cancels upstream immediately
 */
def cancelled[T]: Sink[T, NotUsed]

/**
 * Execute function when stream completes or fails
 * @param callback Function called when stream terminates
 * @return Sink that materializes to Future[Done]
 */
def onComplete[T](callback: Try[Done] => Unit): Sink[T, Future[Done]]

Usage Examples:

import akka.Done
import scala.util.{Success, Failure}

// Print each element
Source(1 to 5).runWith(Sink.foreach(println))

// Async processing
Source(List("url1", "url2", "url3")).runWith(
  Sink.foreachAsync(2) { url =>
    // Simulate async HTTP call
    Future {
      println(s"Processing $url")
      Thread.sleep(100)
    }
  }
)

// Ignore all elements (useful for testing)
Source(1 to 100).runWith(Sink.ignore)

// Handle completion
Source(1 to 5).runWith(Sink.onComplete {
  case Success(Done) => println("Stream completed successfully")
  case Failure(ex) => println(s"Stream failed: $ex")
})

Actor Integration Sinks

Sinks that integrate with Akka actors for sending elements as messages.

/**
 * Send elements as messages to an actor
 * @param ref Target actor reference
 * @return Sink that sends each element as a message
 */
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed]

/**
 * Send elements to an actor with backpressure support
 * @param ref Target actor reference
 * @param messageAdapter Function to wrap elements in messages
 * @param initMessage Optional initialization message
 * @param ackMessage Message that actor sends to acknowledge receipt
 * @param onCompleteMessage Message sent when stream completes
 * @param onFailureMessage Function to create failure message
 * @return Sink with backpressure control
 */
def actorRefWithBackpressure[T](
  ref: ActorRef,
  messageAdapter: T => Any,
  initMessage: Option[Any] = None,
  ackMessage: Any,
  onCompleteMessage: Any,
  onFailureMessage: Throwable => Any = Status.Failure(_)
): Sink[T, NotUsed]

Usage Examples:

import akka.actor.{ActorRef, ActorSystem, Props}

// Simple actor sink
val actorRef: ActorRef = system.actorOf(Props[ProcessingActor])
Source(1 to 10).runWith(Sink.actorRef(actorRef, "complete"))

// Backpressure-aware actor sink
Source(1 to 100).runWith(
  Sink.actorRefWithBackpressure(
    ref = actorRef,
    messageAdapter = (elem: Int) => ProcessElement(elem),
    ackMessage = "ack",
    onCompleteMessage = "complete"
  )
)

Queue Sinks

Sinks that provide dynamic pull-based consumption.

/**
 * Create a sink that materializes to a queue for pulling elements
 * @return Sink that materializes to SinkQueue for pulling elements on demand
 */
def queue[T](): Sink[T, SinkQueue[T]]

/**
 * Interface for pulling elements from a queue-backed sink
 */
trait SinkQueue[T] {
  /**
   * Pull the next element from the stream
   * @return Future with the next element or completion/failure
   */
  def pull(): Future[Option[T]]
  
  /**
   * Cancel the sink and complete the stream
   */
  def cancel(): Unit
}

Usage Examples:

import akka.stream.SinkQueue

// Pull-based consumption
val (queue: SinkQueue[Int], source: Source[Int, NotUsed]) = 
  Source(1 to 10)
    .toMat(Sink.queue())(Keep.both)
    .preMaterialize()

// Pull elements on demand
def pullNext(): Unit = {
  queue.pull().foreach {
    case Some(element) =>
      println(s"Got element: $element")
      pullNext() // Pull next element
    case None =>
      println("Stream completed")
  }
}
pullNext()

File and IO Sinks

Sinks for writing to files and other IO destinations.

object FileIO {
  /**
   * Write ByteString elements to a file
   * @param f Path to the target file
   * @param options File open options
   * @return Sink that materializes to Future[IOResult]
   */
  def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]]
}

/**
 * Result of IO operations containing byte count and completion status
 */
final case class IOResult(count: Long, status: Try[Done])

Usage Examples:

import akka.stream.scaladsl.FileIO
import akka.util.ByteString
import java.nio.file.Paths

// Write to file
val filePath = Paths.get("output.txt")
Source(List("Hello", "World", "!"))
  .map(s => ByteString(s + "\n"))
  .runWith(FileIO.toPath(filePath))
  .map { result =>
    println(s"Written ${result.count} bytes")
  }

Custom and Transformation Sinks

Operations for creating custom sinks and transforming existing ones.

/**
 * Transform the input type of a sink
 * @param f Function to transform input elements
 * @return Sink that accepts transformed input type
 */
def contramap[In2](f: In2 => In): Sink[In2, Mat]

/**
 * Transform the materialized value of a sink
 * @param f Function to transform materialized value
 * @return Sink with transformed materialized value
 */
def mapMaterializedValue[Mat2](f: Mat => Mat2): Sink[In, Mat2]

/**
 * Pre-materialize a sink to get both the materialized value and a new sink
 * @return Tuple of materialized value and equivalent sink
 */
def preMaterialize()(implicit materializer: Materializer): (Mat, Sink[In, NotUsed])

/**
 * Add attributes to a sink
 * @param attrs Attributes to add
 * @return Sink with added attributes
 */
def withAttributes(attrs: Attributes): Sink[In, Mat]

Usage Examples:

// Transform input type
val intSink: Sink[Int, Future[Seq[Int]]] = Sink.seq[Int]
val stringSink: Sink[String, Future[Seq[Int]]] = intSink.contramap(_.toInt)

// Transform materialized value
val countSink: Sink[String, Future[Int]] = Sink.seq[String]
  .mapMaterializedValue(_.map(_.length))

// Pre-materialize for reuse
val (future: Future[Seq[Int]], reusableSink: Sink[Int, NotUsed]) = 
  Sink.seq[Int].preMaterialize()

Types

// Queue interface for pull-based consumption
trait SinkQueue[T] {
  def pull(): Future[Option[T]]
  def cancel(): Unit
}

// IO operation result
final case class IOResult(count: Long, status: Try[Done]) {
  def wasSuccessful: Boolean = status.isSuccess
}

// Completion marker
sealed abstract class Done
case object Done extends Done