or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdplatform-extensions.mdzsink.mdzstream.mdztransducer.md
tile.json

zsink.mddocs/

Stream Consumption

Comprehensive sink operations for consuming streams and producing results. ZSink provides powerful abstractions for collecting, folding, and processing stream elements.

Capabilities

Basic Collectors

Essential sinks for collecting stream elements.

object ZSink {
  /** Collect all elements into a chunk */
  def collectAll[A]: Sink[Nothing, A, Nothing, Chunk[A]]
  
  /** Collect all elements into a map by key */
  def collectAllToMap[A, K](key: A => K)(f: (A, A) => A): Sink[Nothing, A, Nothing, Map[K, A]]
  
  /** Collect all elements into a set */
  def collectAllToSet[A]: Sink[Nothing, A, Nothing, Set[A]]
  
  /** Collect first n elements */
  def take[A](n: Int): Sink[Nothing, A, A, Chunk[A]]
  
  /** Get first element */
  def head[A]: Sink[Nothing, A, A, Option[A]]
  
  /** Get last element */
  def last[A]: Sink[Nothing, A, Nothing, Option[A]]
}

Folding Operations

Sinks that fold stream elements into a single result.

object ZSink {
  /** Fold with continuation function */
  def fold[A, S](z: S)(contFn: S => Boolean)(f: (S, A) => S): Sink[Nothing, A, A, S]
  
  /** Left fold without continuation */
  def foldLeft[A, S](z: S)(f: (S, A) => S): Sink[Nothing, A, Nothing, S]
  
  /** Effectful fold */
  def foldM[R, E, A, S](z: S)(contFn: S => Boolean)(f: (S, A) => ZIO[R, E, S]): ZSink[R, E, A, A, S]
  
  /** Effectful left fold */
  def foldLeftM[R, E, A, S](z: S)(f: (S, A) => ZIO[R, E, S]): ZSink[R, E, A, Nothing, S]
  
  /** Fold chunks */
  def foldChunks[A, S](z: S)(contFn: S => Boolean)(f: (S, Chunk[A]) => S): Sink[Nothing, A, A, S]
  
  /** Effectful chunk folding */
  def foldChunksM[R, E, A, S](z: S)(contFn: S => Boolean)(f: (S, Chunk[A]) => ZIO[R, E, S]): ZSink[R, E, A, A, S]
  
  /** Reduce elements (requires non-empty stream) */
  def foldUntil[A, S](z: S, max: Int)(f: (S, A) => S): Sink[Nothing, A, A, S]
  
  /** Weighted fold with cost function */
  def foldWeighted[A, S](z: S)(costFn: A => Long, max: Long)(f: (S, A) => S): Sink[Nothing, A, A, S]
}

Numeric Operations

Specialized sinks for numeric computations.

object ZSink {
  /** Sum all numeric elements */
  def sum[A](implicit A: Numeric[A]): Sink[Nothing, A, Nothing, A]
}

Effect-Based Sinks

Sinks that perform effects on stream elements.

object ZSink {
  /** Create sink from effect */
  def fromEffect[R, E, Z](b: => ZIO[R, E, Z]): ZSink[R, E, Any, Nothing, Z]
  
  /** Apply effect to each element */
  def foreach[R, E, A](f: A => ZIO[R, E, Any]): ZSink[R, E, A, Nothing, Unit]
  
  /** Apply effect while predicate holds */
  def foreachWhile[R, E, A](f: A => ZIO[R, E, Boolean]): ZSink[R, E, A, A, Unit]
  
  /** Apply effectful function to chunks */
  def foreachChunk[R, E, A](f: Chunk[A] => ZIO[R, E, Any]): ZSink[R, E, A, Nothing, Unit]
  
  /** Apply effectful function while chunk predicate holds */
  def foreachChunkWhile[R, E, A](f: Chunk[A] => ZIO[R, E, Boolean]): ZSink[R, E, A, A, Unit]
}

Resource-Based Sinks

Sinks that work with managed resources.

object ZSink {
  /** Create sink from managed resource */
  def managed[R, E, A, Z](managed: ZManaged[R, E, ZSink[R, E, A, Any, Z]]): ZSink[R, E, A, Any, Z]
  
  /** Sink from Hub */
  def fromHub[A](hub: Hub[A]): Sink[Nothing, A, Nothing, Unit]
  
  /** Sink from Queue */
  def fromQueue[A](queue: Queue[A]): Sink[Nothing, A, Nothing, Unit]
  
  /** Bracketed sink creation */
  def bracket[R, E, A, Z](acquire: ZIO[R, E, A])(release: A => ZIO[R, Nothing, Any])(use: A => ZSink[R, E, Any, Any, Z]): ZSink[R, E, Any, Any, Z]
}

Sink Transformations

Transform sink behavior and results.

trait ZSinkOps[-R, +E, -I, +L, +Z] {
  /** Transform result */
  def map[Z2](f: Z => Z2): ZSink[R, E, I, L, Z2]
  
  /** Effectful result transformation */
  def mapM[R1 <: R, E1 >: E, Z2](f: Z => ZIO[R1, E1, Z2]): ZSink[R1, E1, I, L, Z2]
  
  /** Transform errors */
  def mapError[E2](f: E => E2): ZSink[R, E2, I, L, Z]
  
  /** Transform input */
  def contramap[I2](f: I2 => I): ZSink[R, E, I2, L, Z]
  
  /** Effectful input transformation */
  def contramapM[R1 <: R, E1 >: E, I2](f: I2 => ZIO[R1, E1, I]): ZSink[R1, E1, I2, L, Z]
  
  /** Transform chunks */
  def contramapChunks[I2](f: Chunk[I2] => Chunk[I]): ZSink[R, E, I2, L, Z]
  
  /** Dimap both input and output */
  def dimap[I2, Z2](f: I2 => I)(g: Z => Z2): ZSink[R, E, I2, L, Z2]
}

Sink Combination

Combine multiple sinks in various ways.

trait ZSinkOps[-R, +E, -I, +L, +Z] {
  /** Zip sinks sequentially */
  def zip[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, (Z, Z2)]
  
  /** Zip sinks with function */
  def zipWith[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2, Z3](that: ZSink[R1, E1, I1, L1, Z2])(f: (Z, Z2) => Z3): ZSink[R1, E1, I1, L1, Z3]
  
  /** Zip sinks in parallel */
  def zipPar[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, (Z, Z2)]
  
  /** Zip keeping left result */
  def zipLeft[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, Z]
  
  /** Zip keeping right result */
  def zipRight[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, Z2]
  
  /** Race sinks (first to complete wins) */
  def race[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z1 >: Z](that: ZSink[R1, E1, I1, L1, Z1]): ZSink[R1, E1, I1, L1, Z1]
  
  /** Fallback sink */
  def orElse[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z1 >: Z](that: ZSink[R1, E1, I1, L1, Z1]): ZSink[R1, E1, I1, L1, Z1]
}

Monadic Operations

Monadic composition for sink chaining.

trait ZSinkOps[-R, +E, -I, +L, +Z] {
  /** Monadic bind */
  def flatMap[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](f: Z => ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, Z2]
  
  /** Chain with function */
  def andThen[R1 <: R, E1 >: E, Z2](f: Z => ZSink[R1, E1, L, Any, Z2]): ZSink[R1, E1, I, Any, Z2]
  
  /** Provide sink result as input to function */
  def foldM[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](z: Z2)(contFn: Z2 => Boolean)(f: (Z2, Z) => ZIO[R1, E1, Z2]): ZSink[R1, E1, I1, L1, Z2]
}

Error Handling

Error handling operations for sinks.

trait ZSinkOps[-R, +E, -I, +L, +Z] {
  /** Handle all errors */
  def catchAll[R1 <: R, E2, I1 <: I, L1 >: L, Z1 >: Z](h: E => ZSink[R1, E2, I1, L1, Z1]): ZSink[R1, E2, I1, L1, Z1]
  
  /** Retry sink on failure */
  def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZSink[R1, E, I, L, Z]
  
  /** Convert to option on error */
  def option: ZSink[R, Nothing, I, L, Option[Z]]
  
  /** Use either for error handling */
  def either: ZSink[R, Nothing, I, L, Either[E, Z]]
}

Resource Management

Resource management for sinks.

trait ZSinkOps[-R, +E, -I, +L, +Z] {
  /** Provide environment */
  def provide[R1](env: R1)(implicit ev: R1 <:< R): ZSink[Any, E, I, L, Z]
  
  /** Provide layer */
  def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZSink[R0, E, I, L, Z]
  
  /** Time sink execution */
  def timed: ZSink[R with Clock, E, I, L, (Duration, Z)]
  
  /** Summarize sink execution */
  def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZSink[R1, E1, I, L, (C, Z)]
  
  /** Convert to transducer */
  def toTransducer: ZTransducer[R, E, I, L]
}

Type Definitions

Core types used by sinks.

object ZSink {
  /** Push interface for sink implementation */
  type Push[R, E, I, L, Z] = Option[Chunk[I]] => ZIO[R, (Either[E, Z], Chunk[L]), Chunk[L]]
  
  /** Sink that ignores input and produces unit */
  val drain: Sink[Nothing, Any, Nothing, Unit] = ZSink.foreach(_ => ZIO.unit)
  
  /** Identity sink that passes through all input */
  def identity[A]: Sink[Nothing, A, Nothing, Chunk[A]] = collectAllToChunk[A]
  
  /** Never-completing sink */
  def never: Sink[Nothing, Any, Nothing, Nothing] = ZSink.fromEffect(ZIO.never)
  
  /** Immediately succeeding sink */
  def succeed[Z](z: => Z): Sink[Nothing, Any, Nothing, Z] = fromEffect(ZIO.succeed(z))
  
  /** Immediately failing sink */
  def fail[E](e: => E): Sink[E, Any, Nothing, Nothing] = fromEffect(ZIO.fail(e))
}

Usage Examples:

import zio._
import zio.stream._

// Basic collectors
val numbers = ZStream.range(1, 10)
val allNumbers: UIO[List[Int]] = numbers.run(ZSink.collectAll)
val sum: UIO[Int] = numbers.run(ZSink.sum)

// Folding operations
val evenSum: UIO[Int] = numbers.run(
  ZSink.foldLeft(0)((acc, n) => if (n % 2 == 0) acc + n else acc)
)

// Take operations
val firstThree: UIO[List[Int]] = numbers.run(ZSink.take(3))
val firstEven: UIO[Option[Int]] = numbers.filter(_ % 2 == 0).run(ZSink.head)

// Effectful processing
val logged: UIO[Unit] = numbers.run(
  ZSink.foreach(n => Console.printLine(s"Processing: $n"))
)

// Sink combination
val combined: UIO[(Int, Long)] = numbers.run(
  ZSink.sum.zip(ZSink.count)
)

// Error handling
val safeSum: UIO[Either[String, Int]] = 
  numbers.run(ZSink.sum.either)