Powerful transducers for transforming stream elements with effects, stateful processing, and composition capabilities. ZTransducer provides efficient stream-to-stream transformations.
Fundamental transducers for element transformation.
object ZTransducer {
/** Identity transducer (pass-through) */
def identity[A]: Transducer[Nothing, A, A]
/** Transform elements with function */
def map[A, B](f: A => B): Transducer[Nothing, A, B]
/** Effectful element transformation */
def mapM[R, E, A, B](f: A => ZIO[R, E, B]): ZTransducer[R, E, A, B]
/** Transform chunks */
def mapChunks[A, B](f: Chunk[A] => Chunk[B]): Transducer[Nothing, A, B]
/** Effectful chunk transformation */
def mapChunksM[R, E, A, B](f: Chunk[A] => ZIO[R, E, Chunk[B]]): ZTransducer[R, E, A, B]
/** Filter elements with predicate */
def filter[A](predicate: A => Boolean): Transducer[Nothing, A, A]
/** Effectful filtering */
def filterM[R, E, A](predicate: A => ZIO[R, E, Boolean]): ZTransducer[R, E, A, A]
/** Create from chunk transformation function */
def apply[I, O](f: Chunk[I] => Chunk[O]): Transducer[Nothing, I, O]
}Transducers for collecting and grouping elements.
object ZTransducer {
/** Collect first n elements */
def collectAllN[I](n: Int): Transducer[Nothing, I, Chunk[I]]
/** Collect n elements into map by key */
def collectAllToMapN[K, A](n: Long)(key: A => K): Transducer[Nothing, A, Map[K, A]]
/** Collect n elements into set */
def collectAllToSetN[A](n: Long): Transducer[Nothing, A, Set[A]]
/** Collect all elements while predicate holds */
def collectAllWhile[A](predicate: A => Boolean): Transducer[Nothing, A, List[A]]
/** Collect all elements matching partial function */
def collect[A, B](pf: PartialFunction[A, B]): Transducer[Nothing, A, B]
/** Effectful collection with partial function */
def collectM[R, E, A, B](pf: PartialFunction[A, ZIO[R, E, B]]): ZTransducer[R, E, A, B]
}Stateful transducers that fold elements.
object ZTransducer {
/** Fold elements into accumulator */
def fold[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]
/** Left fold */
def foldLeft[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]
/** Effectful fold */
def foldM[R, E, A, S](z: S)(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]
/** Fold until condition */
def foldUntil[A, S](z: S, max: Int)(f: (S, A) => S): Transducer[Nothing, A, S]
/** Weighted fold with cost function */
def foldWeighted[A, S](z: S)(costFn: A => Long, max: Long)(f: (S, A) => S): Transducer[Nothing, A, S]
/** Effectful weighted fold */
def foldWeightedM[R, E, A, S](z: S)(costFn: A => ZIO[R, E, Long], max: Long)(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]
/** Fold with decomposition */
def foldWeightedDecompose[R, E, A, S](z: S)(costFn: A => ZIO[R, E, Long], max: Long, decompose: A => ZIO[R, E, Chunk[A]])(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]
/** Scanning (emit intermediate results) */
def scan[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]
/** Effectful scanning */
def scanM[R, E, A, S](z: S)(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]
}Transducers that partition or slice streams.
object ZTransducer {
/** Drop first n elements */
def drop[A](n: Long): Transducer[Nothing, A, A]
/** Drop while predicate holds */
def dropWhile[A](predicate: A => Boolean): Transducer[Nothing, A, A]
/** Effectful drop while */
def dropWhileM[R, E, A](predicate: A => ZIO[R, E, Boolean]): ZTransducer[R, E, A, A]
/** Split strings on delimiter */
def splitOn(delimiter: String): Transducer[Nothing, String, String]
}Transducers for grouping adjacent or related elements.
object ZTransducer {
/** Group adjacent elements by key */
def groupAdjacentBy[A, K](f: A => K): Transducer[Nothing, A, (K, NonEmptyChunk[A])]
/** Group by key with time window */
def groupByKey[A, K](f: A => K): Transducer[Nothing, A, Map[K, NonEmptyChunk[A]]]
/** Group elements within time window */
def groupWithin[A](n: Int, duration: Duration): ZTransducer[Clock, Nothing, A, Chunk[A]]
/** Batch elements by count */
def batch[A](n: Long): Transducer[Nothing, A, Chunk[A]]
/** Batch elements by count or time */
def batchN[A](n: Long): Transducer[Nothing, A, Chunk[A]]
/** Batch elements with weighted grouping */
def batchWeighted[A](costFn: A => Long)(max: Long): Transducer[Nothing, A, Chunk[A]]
/** Effectful weighted batching */
def batchWeightedM[R, E, A](costFn: A => ZIO[R, E, Long])(max: Long): ZTransducer[R, E, A, Chunk[A]]
}Helpful utility transducers for common patterns.
object ZTransducer {
/** Get first element */
def head[A]: Transducer[Nothing, A, Option[A]]
/** Get last element */
def last[A]: Transducer[Nothing, A, Option[A]]
/** Prepend values to stream */
def prepend[A](values: A*): Transducer[Nothing, A, A]
/** Append values to stream */
def append[A](values: A*): Transducer[Nothing, A, A]
/** Intersperse separator between elements */
def intersperse[A](separator: A): Transducer[Nothing, A, A]
/** Add index to elements */
def zipWithIndex[A]: Transducer[Nothing, A, (A, Long)]
/** Deduplicate consecutive elements */
def deduplicateAdjacent[A]: Transducer[Nothing, A, A]
/** Deduplicate by key */
def deduplicateAdjacentBy[A, K](f: A => K): Transducer[Nothing, A, A]
/** Remove None values from Option stream */
def collectSome[A]: Transducer[Nothing, Option[A], A]
/** Flatten nested chunks */
def flatten[A]: Transducer[Nothing, Chunk[A], A]
}Transducers that incorporate effects.
object ZTransducer {
/** Create from effect */
def fromEffect[R, E, A](effect: ZIO[R, E, A]): ZTransducer[R, E, Any, A]
/** Create from function */
def fromFunction[A, B](f: A => B): Transducer[Nothing, A, B]
/** Create from effectful function */
def fromFunctionM[R, E, A, B](f: A => ZIO[R, E, B]): ZTransducer[R, E, A, B]
/** Apply effect to each element (tap) */
def tap[R, E, A](f: A => ZIO[R, E, Any]): ZTransducer[R, E, A, A]
/** Trace elements for debugging */
def debug[A](prefix: String = ""): Transducer[Nothing, A, A]
/** Time each element processing */
def timed[A]: ZTransducer[Clock, Nothing, A, (Duration, A)]
}Transform transducer behavior and composition.
trait ZTransducerOps[-R, +E, -I, +O] {
/** Transform output elements */
def map[P](f: O => P): ZTransducer[R, E, I, P]
/** Effectful output transformation */
def mapM[R1 <: R, E1 >: E, P](f: O => ZIO[R1, E1, P]): ZTransducer[R1, E1, I, P]
/** Transform output chunks */
def mapChunks[P](f: Chunk[O] => Chunk[P]): ZTransducer[R, E, I, P]
/** Transform input elements */
def contramap[J](f: J => I): ZTransducer[R, E, J, O]
/** Effectful input transformation */
def contramapM[R1 <: R, E1 >: E, J](f: J => ZIO[R1, E1, I]): ZTransducer[R1, E1, J, O]
/** Transform input chunks */
def contramapChunks[J](f: Chunk[J] => Chunk[I]): ZTransducer[R, E, J, O]
/** Transform errors */
def mapError[E2](f: E => E2): ZTransducer[R, E2, I, O]
/** Filter output elements */
def filter(predicate: O => Boolean): ZTransducer[R, E, I, O]
/** Effectful output filtering */
def filterM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZTransducer[R1, E1, I, O]
}Compose transducers together.
trait ZTransducerOps[-R, +E, -I, +O] {
/** Compose with another transducer (andThen) */
def >>>[R1 <: R, E1 >: E, P](that: ZTransducer[R1, E1, O, P]): ZTransducer[R1, E1, I, P]
/** Compose (alias for >>>) */
def andThen[R1 <: R, E1 >: E, P](that: ZTransducer[R1, E1, O, P]): ZTransducer[R1, E1, I, P]
/** Compose before */
def compose[R1 <: R, E1 >: E, C](that: ZTransducer[R1, E1, C, I]): ZTransducer[R1, E1, C, O]
/** Zip with another transducer */
def zip[R1 <: R, E1 >: E, I1 <: I, P](that: ZTransducer[R1, E1, I1, P]): ZTransducer[R1, E1, I1, (O, P)]
/** Zip with function */
def zipWith[R1 <: R, E1 >: E, I1 <: I, P, Q](that: ZTransducer[R1, E1, I1, P])(f: (O, P) => Q): ZTransducer[R1, E1, I1, Q]
/** Race two transducers */
def race[R1 <: R, E1 >: E, I1 <: I, O1 >: O](that: ZTransducer[R1, E1, I1, O1]): ZTransducer[R1, E1, I1, O1]
}Error handling for transducers.
trait ZTransducerOps[-R, +E, -I, +O] {
/** Handle all errors */
def catchAll[R1 <: R, E2, I1 <: I, O1 >: O](h: E => ZTransducer[R1, E2, I1, O1]): ZTransducer[R1, E2, I1, O1]
/** Fallback transducer */
def orElse[R1 <: R, E2, I1 <: I, O1 >: O](that: ZTransducer[R1, E2, I1, O1]): ZTransducer[R1, E2, I1, O1]
/** Convert to option on error */
def option: ZTransducer[R, Nothing, I, Option[O]]
/** Convert to either */
def either: ZTransducer[R, Nothing, I, Either[E, O]]
/** Retry on failure */
def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZTransducer[R1, E, I, O]
}Resource management for transducers.
trait ZTransducerOps[-R, +E, -I, +O] {
/** Provide environment */
def provide[R1](env: R1)(implicit ev: R1 <:< R): ZTransducer[Any, E, I, O]
/** Provide environment layer */
def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZTransducer[R0, E, I, O]
/** Time transducer execution */
def timed: ZTransducer[R with Clock, E, I, (Duration, O)]
/** Summarize execution */
def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZTransducer[R1, E1, I, (C, O)]
}Core types used by transducers.
object ZTransducer {
/** Push function signature for transducer implementation */
type Push[R, E, I, O] = Option[Chunk[I]] => ZIO[R, E, Chunk[O]]
/** Transducer that drains input without output */
def drain[A]: Transducer[Nothing, A, Nothing] = ZTransducer(_ => Chunk.empty)
/** Never-completing transducer */
def never[I, O]: Transducer[Nothing, I, O] = ZTransducer.fromEffect(ZIO.never)
/** Immediately succeeding transducer */
def succeed[O](o: => O): Transducer[Nothing, Any, O] = fromEffect(ZIO.succeed(o))
/** Immediately failing transducer */
def fail[E](e: => E): ZTransducer[Any, E, Any, Nothing] = fromEffect(ZIO.fail(e))
}Usage Examples:
import zio._
import zio.stream._
import zio.duration._
// Basic transformations
val numbers = ZStream.range(1, 10)
val doubled = numbers.transduce(ZTransducer.map(_ * 2))
val evens = numbers.transduce(ZTransducer.filter(_ % 2 == 0))
// Folding and scanning
val sum = numbers.transduce(ZTransducer.fold(0)(_ + _))
val runningSum = numbers.transduce(ZTransducer.scan(0)(_ + _))
// Collecting operations
val firstThree = numbers.transduce(ZTransducer.take(3))
val grouped = numbers.transduce(ZTransducer.batch(3))
// Composition
val composed = ZTransducer.filter[Int](_ > 5) >>> ZTransducer.map(_ * 2)
val result = numbers.transduce(composed)
// Effectful processing
val logged = ZTransducer.tap[Console, IOException, Int](n =>
Console.printLine(s"Processing: $n")
)
// Grouping operations
val adjacentGroups = ZStream("a", "a", "b", "b", "c")
.transduce(ZTransducer.groupAdjacentBy(identity))
// Timing operations
val timedProcessing = numbers.transduce(ZTransducer.timed)