Endpoints for consuming stream elements including collection sinks, side-effect sinks, and integration with external systems. Sinks are the termination points of stream processing pipelines.
Sinks that collect stream elements into various collection types.
object Sink {
/**
* Collect all elements into an immutable sequence
* @return Sink that materializes to Future[immutable.Seq[T]]
*/
def seq[T]: Sink[T, Future[immutable.Seq[T]]]
/**
* Collect elements into a collection using implicit Factory
* @param cbf Factory for creating the collection type
* @return Sink that materializes to Future of the collection type
*/
def collection[T, That](implicit cbf: Factory[T, That with immutable.Iterable[_]]): Sink[T, Future[That]]
/**
* Get only the first element
* @return Sink that materializes to Future[T] with the first element
*/
def head[T]: Sink[T, Future[T]]
/**
* Get the first element if available
* @return Sink that materializes to Future[Option[T]]
*/
def headOption[T]: Sink[T, Future[Option[T]]]
/**
* Get only the last element
* @return Sink that materializes to Future[T] with the last element
*/
def last[T]: Sink[T, Future[T]]
/**
* Get the last element if available
* @return Sink that materializes to Future[Option[T]]
*/
def lastOption[T]: Sink[T, Future[Option[T]]]
}Usage Examples:
import akka.stream.scaladsl.{Source, Sink}
import scala.concurrent.Future
// Collect to sequence
val seqResult: Future[Seq[Int]] = Source(1 to 10).runWith(Sink.seq)
// Get first/last elements
val firstResult: Future[Int] = Source(1 to 10).runWith(Sink.head)
val lastResult: Future[Int] = Source(1 to 10).runWith(Sink.last)
// Optional first/last
val firstOptional: Future[Option[String]] = Source.empty[String].runWith(Sink.headOption)
// Custom collection
val listResult: Future[List[Int]] = Source(1 to 5).runWith(
Sink.collection(() => List.newBuilder[Int])
)Sinks that perform aggregation operations on stream elements.
/**
* Fold all elements using an accumulator function
* @param zero Initial accumulator value
* @param f Function to combine accumulator with each element
* @return Sink that materializes to Future with final accumulated value
*/
def fold[U, T](zero: U)(f: (U, T) => U): Sink[T, Future[U]]
/**
* Fold all elements asynchronously
* @param zero Initial accumulator value
* @param f Async function to combine accumulator with each element
* @return Sink that materializes to Future with final accumulated value
*/
def foldAsync[U, T](zero: U)(f: (U, T) => Future[U]): Sink[T, Future[U]]
/**
* Reduce elements using a binary operator (requires at least one element)
* @param f Binary operator for reduction
* @return Sink that materializes to Future with reduced value
*/
def reduce[T](f: (T, T) => T): Sink[T, Future[T]]
/**
* Find the minimum element
* @param ord Ordering for comparison
* @return Sink that materializes to Future with minimum element
*/
def min[T](implicit ord: Ordering[T]): Sink[T, Future[T]]
/**
* Find the maximum element
* @param ord Ordering for comparison
* @return Sink that materializes to Future with maximum element
*/
def max[T](implicit ord: Ordering[T]): Sink[T, Future[T]]Usage Examples:
// Sum all numbers
val sum: Future[Int] = Source(1 to 10).runWith(Sink.fold(0)(_ + _))
// Concatenate strings
val combined: Future[String] = Source(List("Hello", " ", "World"))
.runWith(Sink.reduce(_ + _))
// Find min/max
val minimum: Future[Int] = Source(List(5, 2, 8, 1, 9)).runWith(Sink.min)
val maximum: Future[Int] = Source(List(5, 2, 8, 1, 9)).runWith(Sink.max)
// Async aggregation
val asyncSum: Future[Int] = Source(1 to 10).runWith(
Sink.foldAsync(0) { (acc, elem) =>
Future.successful(acc + elem)
}
)Sinks that perform side effects without collecting elements.
/**
* Execute a side effect for each element
* @param f Function to execute for each element
* @return Sink that materializes to Future[Done] when complete
*/
def foreach[T](f: T => Unit): Sink[T, Future[Done]]
/**
* Execute an async side effect for each element
* @param parallelism Maximum number of concurrent operations
* @param f Async function to execute for each element
* @return Sink that materializes to Future[Done] when complete
*/
def foreachAsync[T](parallelism: Int)(f: T => Future[_]): Sink[T, Future[Done]]
/**
* Execute a side effect for each element in parallel
* @param parallelism Maximum number of concurrent operations
* @param f Function to execute for each element
* @return Sink that materializes to Future[Done] when complete
*/
def foreachParallel[T](parallelism: Int)(f: T => Unit): Sink[T, Future[Done]]
/**
* Ignore all elements
* @return Sink that materializes to Future[Done] and discards all elements
*/
def ignore: Sink[Any, Future[Done]]
/**
* Sink that is immediately cancelled
* @return Sink that cancels upstream immediately
*/
def cancelled[T]: Sink[T, NotUsed]
/**
* Execute function when stream completes or fails
* @param callback Function called when stream terminates
* @return Sink that materializes to Future[Done]
*/
def onComplete[T](callback: Try[Done] => Unit): Sink[T, Future[Done]]Usage Examples:
import akka.Done
import scala.util.{Success, Failure}
// Print each element
Source(1 to 5).runWith(Sink.foreach(println))
// Async processing
Source(List("url1", "url2", "url3")).runWith(
Sink.foreachAsync(2) { url =>
// Simulate async HTTP call
Future {
println(s"Processing $url")
Thread.sleep(100)
}
}
)
// Ignore all elements (useful for testing)
Source(1 to 100).runWith(Sink.ignore)
// Handle completion
Source(1 to 5).runWith(Sink.onComplete {
case Success(Done) => println("Stream completed successfully")
case Failure(ex) => println(s"Stream failed: $ex")
})Sinks that integrate with Akka actors for sending elements as messages.
/**
* Send elements as messages to an actor
* @param ref Target actor reference
* @return Sink that sends each element as a message
*/
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed]
/**
* Send elements to an actor with backpressure support
* @param ref Target actor reference
* @param messageAdapter Function to wrap elements in messages
* @param initMessage Optional initialization message
* @param ackMessage Message that actor sends to acknowledge receipt
* @param onCompleteMessage Message sent when stream completes
* @param onFailureMessage Function to create failure message
* @return Sink with backpressure control
*/
def actorRefWithBackpressure[T](
ref: ActorRef,
messageAdapter: T => Any,
initMessage: Option[Any] = None,
ackMessage: Any,
onCompleteMessage: Any,
onFailureMessage: Throwable => Any = Status.Failure(_)
): Sink[T, NotUsed]Usage Examples:
import akka.actor.{ActorRef, ActorSystem, Props}
// Simple actor sink
val actorRef: ActorRef = system.actorOf(Props[ProcessingActor])
Source(1 to 10).runWith(Sink.actorRef(actorRef, "complete"))
// Backpressure-aware actor sink
Source(1 to 100).runWith(
Sink.actorRefWithBackpressure(
ref = actorRef,
messageAdapter = (elem: Int) => ProcessElement(elem),
ackMessage = "ack",
onCompleteMessage = "complete"
)
)Sinks that provide dynamic pull-based consumption.
/**
* Create a sink that materializes to a queue for pulling elements
* @return Sink that materializes to SinkQueue for pulling elements on demand
*/
def queue[T](): Sink[T, SinkQueue[T]]
/**
* Interface for pulling elements from a queue-backed sink
*/
trait SinkQueue[T] {
/**
* Pull the next element from the stream
* @return Future with the next element or completion/failure
*/
def pull(): Future[Option[T]]
/**
* Cancel the sink and complete the stream
*/
def cancel(): Unit
}Usage Examples:
import akka.stream.SinkQueue
// Pull-based consumption
val (queue: SinkQueue[Int], source: Source[Int, NotUsed]) =
Source(1 to 10)
.toMat(Sink.queue())(Keep.both)
.preMaterialize()
// Pull elements on demand
def pullNext(): Unit = {
queue.pull().foreach {
case Some(element) =>
println(s"Got element: $element")
pullNext() // Pull next element
case None =>
println("Stream completed")
}
}
pullNext()Sinks for writing to files and other IO destinations.
object FileIO {
/**
* Write ByteString elements to a file
* @param f Path to the target file
* @param options File open options
* @return Sink that materializes to Future[IOResult]
*/
def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]]
}
/**
* Result of IO operations containing byte count and completion status
*/
final case class IOResult(count: Long, status: Try[Done])Usage Examples:
import akka.stream.scaladsl.FileIO
import akka.util.ByteString
import java.nio.file.Paths
// Write to file
val filePath = Paths.get("output.txt")
Source(List("Hello", "World", "!"))
.map(s => ByteString(s + "\n"))
.runWith(FileIO.toPath(filePath))
.map { result =>
println(s"Written ${result.count} bytes")
}Operations for creating custom sinks and transforming existing ones.
/**
* Transform the input type of a sink
* @param f Function to transform input elements
* @return Sink that accepts transformed input type
*/
def contramap[In2](f: In2 => In): Sink[In2, Mat]
/**
* Transform the materialized value of a sink
* @param f Function to transform materialized value
* @return Sink with transformed materialized value
*/
def mapMaterializedValue[Mat2](f: Mat => Mat2): Sink[In, Mat2]
/**
* Pre-materialize a sink to get both the materialized value and a new sink
* @return Tuple of materialized value and equivalent sink
*/
def preMaterialize()(implicit materializer: Materializer): (Mat, Sink[In, NotUsed])
/**
* Add attributes to a sink
* @param attrs Attributes to add
* @return Sink with added attributes
*/
def withAttributes(attrs: Attributes): Sink[In, Mat]Usage Examples:
// Transform input type
val intSink: Sink[Int, Future[Seq[Int]]] = Sink.seq[Int]
val stringSink: Sink[String, Future[Seq[Int]]] = intSink.contramap(_.toInt)
// Transform materialized value
val countSink: Sink[String, Future[Int]] = Sink.seq[String]
.mapMaterializedValue(_.map(_.length))
// Pre-materialize for reuse
val (future: Future[Seq[Int]], reusableSink: Sink[Int, NotUsed]) =
Sink.seq[Int].preMaterialize()// Queue interface for pull-based consumption
trait SinkQueue[T] {
def pull(): Future[Option[T]]
def cancel(): Unit
}
// IO operation result
final case class IOResult(count: Long, status: Try[Done]) {
def wasSuccessful: Boolean = status.isSuccess
}
// Completion marker
sealed abstract class Done
case object Done extends Done