Comprehensive sink operations for consuming streams and producing results. ZSink provides powerful abstractions for collecting, folding, and processing stream elements.
Essential sinks for collecting stream elements.
object ZSink {
/** Collect all elements into a chunk */
def collectAll[A]: Sink[Nothing, A, Nothing, Chunk[A]]
/** Collect all elements into a map by key */
def collectAllToMap[A, K](key: A => K)(f: (A, A) => A): Sink[Nothing, A, Nothing, Map[K, A]]
/** Collect all elements into a set */
def collectAllToSet[A]: Sink[Nothing, A, Nothing, Set[A]]
/** Collect first n elements */
def take[A](n: Int): Sink[Nothing, A, A, Chunk[A]]
/** Get first element */
def head[A]: Sink[Nothing, A, A, Option[A]]
/** Get last element */
def last[A]: Sink[Nothing, A, Nothing, Option[A]]
}Sinks that fold stream elements into a single result.
object ZSink {
/** Fold with continuation function */
def fold[A, S](z: S)(contFn: S => Boolean)(f: (S, A) => S): Sink[Nothing, A, A, S]
/** Left fold without continuation */
def foldLeft[A, S](z: S)(f: (S, A) => S): Sink[Nothing, A, Nothing, S]
/** Effectful fold */
def foldM[R, E, A, S](z: S)(contFn: S => Boolean)(f: (S, A) => ZIO[R, E, S]): ZSink[R, E, A, A, S]
/** Effectful left fold */
def foldLeftM[R, E, A, S](z: S)(f: (S, A) => ZIO[R, E, S]): ZSink[R, E, A, Nothing, S]
/** Fold chunks */
def foldChunks[A, S](z: S)(contFn: S => Boolean)(f: (S, Chunk[A]) => S): Sink[Nothing, A, A, S]
/** Effectful chunk folding */
def foldChunksM[R, E, A, S](z: S)(contFn: S => Boolean)(f: (S, Chunk[A]) => ZIO[R, E, S]): ZSink[R, E, A, A, S]
/** Reduce elements (requires non-empty stream) */
def foldUntil[A, S](z: S, max: Int)(f: (S, A) => S): Sink[Nothing, A, A, S]
/** Weighted fold with cost function */
def foldWeighted[A, S](z: S)(costFn: A => Long, max: Long)(f: (S, A) => S): Sink[Nothing, A, A, S]
}Specialized sinks for numeric computations.
object ZSink {
/** Sum all numeric elements */
def sum[A](implicit A: Numeric[A]): Sink[Nothing, A, Nothing, A]
}Sinks that perform effects on stream elements.
object ZSink {
/** Create sink from effect */
def fromEffect[R, E, Z](b: => ZIO[R, E, Z]): ZSink[R, E, Any, Nothing, Z]
/** Apply effect to each element */
def foreach[R, E, A](f: A => ZIO[R, E, Any]): ZSink[R, E, A, Nothing, Unit]
/** Apply effect while predicate holds */
def foreachWhile[R, E, A](f: A => ZIO[R, E, Boolean]): ZSink[R, E, A, A, Unit]
/** Apply effectful function to chunks */
def foreachChunk[R, E, A](f: Chunk[A] => ZIO[R, E, Any]): ZSink[R, E, A, Nothing, Unit]
/** Apply effectful function while chunk predicate holds */
def foreachChunkWhile[R, E, A](f: Chunk[A] => ZIO[R, E, Boolean]): ZSink[R, E, A, A, Unit]
}Sinks that work with managed resources.
object ZSink {
/** Create sink from managed resource */
def managed[R, E, A, Z](managed: ZManaged[R, E, ZSink[R, E, A, Any, Z]]): ZSink[R, E, A, Any, Z]
/** Sink from Hub */
def fromHub[A](hub: Hub[A]): Sink[Nothing, A, Nothing, Unit]
/** Sink from Queue */
def fromQueue[A](queue: Queue[A]): Sink[Nothing, A, Nothing, Unit]
/** Bracketed sink creation */
def bracket[R, E, A, Z](acquire: ZIO[R, E, A])(release: A => ZIO[R, Nothing, Any])(use: A => ZSink[R, E, Any, Any, Z]): ZSink[R, E, Any, Any, Z]
}Transform sink behavior and results.
trait ZSinkOps[-R, +E, -I, +L, +Z] {
/** Transform result */
def map[Z2](f: Z => Z2): ZSink[R, E, I, L, Z2]
/** Effectful result transformation */
def mapM[R1 <: R, E1 >: E, Z2](f: Z => ZIO[R1, E1, Z2]): ZSink[R1, E1, I, L, Z2]
/** Transform errors */
def mapError[E2](f: E => E2): ZSink[R, E2, I, L, Z]
/** Transform input */
def contramap[I2](f: I2 => I): ZSink[R, E, I2, L, Z]
/** Effectful input transformation */
def contramapM[R1 <: R, E1 >: E, I2](f: I2 => ZIO[R1, E1, I]): ZSink[R1, E1, I2, L, Z]
/** Transform chunks */
def contramapChunks[I2](f: Chunk[I2] => Chunk[I]): ZSink[R, E, I2, L, Z]
/** Dimap both input and output */
def dimap[I2, Z2](f: I2 => I)(g: Z => Z2): ZSink[R, E, I2, L, Z2]
}Combine multiple sinks in various ways.
trait ZSinkOps[-R, +E, -I, +L, +Z] {
/** Zip sinks sequentially */
def zip[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, (Z, Z2)]
/** Zip sinks with function */
def zipWith[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2, Z3](that: ZSink[R1, E1, I1, L1, Z2])(f: (Z, Z2) => Z3): ZSink[R1, E1, I1, L1, Z3]
/** Zip sinks in parallel */
def zipPar[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, (Z, Z2)]
/** Zip keeping left result */
def zipLeft[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, Z]
/** Zip keeping right result */
def zipRight[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, Z2]
/** Race sinks (first to complete wins) */
def race[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z1 >: Z](that: ZSink[R1, E1, I1, L1, Z1]): ZSink[R1, E1, I1, L1, Z1]
/** Fallback sink */
def orElse[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z1 >: Z](that: ZSink[R1, E1, I1, L1, Z1]): ZSink[R1, E1, I1, L1, Z1]
}Monadic composition for sink chaining.
trait ZSinkOps[-R, +E, -I, +L, +Z] {
/** Monadic bind */
def flatMap[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](f: Z => ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, Z2]
/** Chain with function */
def andThen[R1 <: R, E1 >: E, Z2](f: Z => ZSink[R1, E1, L, Any, Z2]): ZSink[R1, E1, I, Any, Z2]
/** Provide sink result as input to function */
def foldM[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](z: Z2)(contFn: Z2 => Boolean)(f: (Z2, Z) => ZIO[R1, E1, Z2]): ZSink[R1, E1, I1, L1, Z2]
}Error handling operations for sinks.
trait ZSinkOps[-R, +E, -I, +L, +Z] {
/** Handle all errors */
def catchAll[R1 <: R, E2, I1 <: I, L1 >: L, Z1 >: Z](h: E => ZSink[R1, E2, I1, L1, Z1]): ZSink[R1, E2, I1, L1, Z1]
/** Retry sink on failure */
def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZSink[R1, E, I, L, Z]
/** Convert to option on error */
def option: ZSink[R, Nothing, I, L, Option[Z]]
/** Use either for error handling */
def either: ZSink[R, Nothing, I, L, Either[E, Z]]
}Resource management for sinks.
trait ZSinkOps[-R, +E, -I, +L, +Z] {
/** Provide environment */
def provide[R1](env: R1)(implicit ev: R1 <:< R): ZSink[Any, E, I, L, Z]
/** Provide layer */
def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZSink[R0, E, I, L, Z]
/** Time sink execution */
def timed: ZSink[R with Clock, E, I, L, (Duration, Z)]
/** Summarize sink execution */
def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZSink[R1, E1, I, L, (C, Z)]
/** Convert to transducer */
def toTransducer: ZTransducer[R, E, I, L]
}Core types used by sinks.
object ZSink {
/** Push interface for sink implementation */
type Push[R, E, I, L, Z] = Option[Chunk[I]] => ZIO[R, (Either[E, Z], Chunk[L]), Chunk[L]]
/** Sink that ignores input and produces unit */
val drain: Sink[Nothing, Any, Nothing, Unit] = ZSink.foreach(_ => ZIO.unit)
/** Identity sink that passes through all input */
def identity[A]: Sink[Nothing, A, Nothing, Chunk[A]] = collectAllToChunk[A]
/** Never-completing sink */
def never: Sink[Nothing, Any, Nothing, Nothing] = ZSink.fromEffect(ZIO.never)
/** Immediately succeeding sink */
def succeed[Z](z: => Z): Sink[Nothing, Any, Nothing, Z] = fromEffect(ZIO.succeed(z))
/** Immediately failing sink */
def fail[E](e: => E): Sink[E, Any, Nothing, Nothing] = fromEffect(ZIO.fail(e))
}Usage Examples:
import zio._
import zio.stream._
// Basic collectors
val numbers = ZStream.range(1, 10)
val allNumbers: UIO[List[Int]] = numbers.run(ZSink.collectAll)
val sum: UIO[Int] = numbers.run(ZSink.sum)
// Folding operations
val evenSum: UIO[Int] = numbers.run(
ZSink.foldLeft(0)((acc, n) => if (n % 2 == 0) acc + n else acc)
)
// Take operations
val firstThree: UIO[List[Int]] = numbers.run(ZSink.take(3))
val firstEven: UIO[Option[Int]] = numbers.filter(_ % 2 == 0).run(ZSink.head)
// Effectful processing
val logged: UIO[Unit] = numbers.run(
ZSink.foreach(n => Console.printLine(s"Processing: $n"))
)
// Sink combination
val combined: UIO[(Int, Long)] = numbers.run(
ZSink.sum.zip(ZSink.count)
)
// Error handling
val safeSum: UIO[Either[String, Int]] =
numbers.run(ZSink.sum.either)