or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdplatform-extensions.mdzsink.mdzstream.mdztransducer.md
tile.json

ztransducer.mddocs/

Stream Transformation

Powerful transducers for transforming stream elements with effects, stateful processing, and composition capabilities. ZTransducer provides efficient stream-to-stream transformations.

Capabilities

Basic Transformations

Fundamental transducers for element transformation.

object ZTransducer {
  /** Identity transducer (pass-through) */
  def identity[A]: Transducer[Nothing, A, A]
  
  /** Transform elements with function */
  def map[A, B](f: A => B): Transducer[Nothing, A, B]
  
  /** Effectful element transformation */
  def mapM[R, E, A, B](f: A => ZIO[R, E, B]): ZTransducer[R, E, A, B]
  
  /** Transform chunks */
  def mapChunks[A, B](f: Chunk[A] => Chunk[B]): Transducer[Nothing, A, B]
  
  /** Effectful chunk transformation */
  def mapChunksM[R, E, A, B](f: Chunk[A] => ZIO[R, E, Chunk[B]]): ZTransducer[R, E, A, B]
  
  /** Filter elements with predicate */
  def filter[A](predicate: A => Boolean): Transducer[Nothing, A, A]
  
  /** Effectful filtering */
  def filterM[R, E, A](predicate: A => ZIO[R, E, Boolean]): ZTransducer[R, E, A, A]
  
  /** Create from chunk transformation function */
  def apply[I, O](f: Chunk[I] => Chunk[O]): Transducer[Nothing, I, O]
}

Collection Operations

Transducers for collecting and grouping elements.

object ZTransducer {
  /** Collect first n elements */
  def collectAllN[I](n: Int): Transducer[Nothing, I, Chunk[I]]
  
  /** Collect n elements into map by key */
  def collectAllToMapN[K, A](n: Long)(key: A => K): Transducer[Nothing, A, Map[K, A]]
  
  /** Collect n elements into set */
  def collectAllToSetN[A](n: Long): Transducer[Nothing, A, Set[A]]
  
  /** Collect all elements while predicate holds */
  def collectAllWhile[A](predicate: A => Boolean): Transducer[Nothing, A, List[A]]
  
  /** Collect all elements matching partial function */
  def collect[A, B](pf: PartialFunction[A, B]): Transducer[Nothing, A, B]
  
  /** Effectful collection with partial function */
  def collectM[R, E, A, B](pf: PartialFunction[A, ZIO[R, E, B]]): ZTransducer[R, E, A, B]
}

Folding Operations

Stateful transducers that fold elements.

object ZTransducer {
  /** Fold elements into accumulator */
  def fold[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]
  
  /** Left fold */
  def foldLeft[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]
  
  /** Effectful fold */
  def foldM[R, E, A, S](z: S)(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]
  
  /** Fold until condition */
  def foldUntil[A, S](z: S, max: Int)(f: (S, A) => S): Transducer[Nothing, A, S]
  
  /** Weighted fold with cost function */
  def foldWeighted[A, S](z: S)(costFn: A => Long, max: Long)(f: (S, A) => S): Transducer[Nothing, A, S]
  
  /** Effectful weighted fold */
  def foldWeightedM[R, E, A, S](z: S)(costFn: A => ZIO[R, E, Long], max: Long)(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]
  
  /** Fold with decomposition */
  def foldWeightedDecompose[R, E, A, S](z: S)(costFn: A => ZIO[R, E, Long], max: Long, decompose: A => ZIO[R, E, Chunk[A]])(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]
  
  /** Scanning (emit intermediate results) */
  def scan[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]
  
  /** Effectful scanning */
  def scanM[R, E, A, S](z: S)(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]
}

Partitioning Transducers

Transducers that partition or slice streams.

object ZTransducer {
  /** Drop first n elements */
  def drop[A](n: Long): Transducer[Nothing, A, A]
  
  /** Drop while predicate holds */
  def dropWhile[A](predicate: A => Boolean): Transducer[Nothing, A, A]
  
  /** Effectful drop while */
  def dropWhileM[R, E, A](predicate: A => ZIO[R, E, Boolean]): ZTransducer[R, E, A, A]
  
  /** Split strings on delimiter */
  def splitOn(delimiter: String): Transducer[Nothing, String, String]
}

Grouping Operations

Transducers for grouping adjacent or related elements.

object ZTransducer {
  /** Group adjacent elements by key */
  def groupAdjacentBy[A, K](f: A => K): Transducer[Nothing, A, (K, NonEmptyChunk[A])]
  
  /** Group by key with time window */
  def groupByKey[A, K](f: A => K): Transducer[Nothing, A, Map[K, NonEmptyChunk[A]]]
  
  /** Group elements within time window */
  def groupWithin[A](n: Int, duration: Duration): ZTransducer[Clock, Nothing, A, Chunk[A]]
  
  /** Batch elements by count */
  def batch[A](n: Long): Transducer[Nothing, A, Chunk[A]]
  
  /** Batch elements by count or time */
  def batchN[A](n: Long): Transducer[Nothing, A, Chunk[A]]
  
  /** Batch elements with weighted grouping */
  def batchWeighted[A](costFn: A => Long)(max: Long): Transducer[Nothing, A, Chunk[A]]
  
  /** Effectful weighted batching */
  def batchWeightedM[R, E, A](costFn: A => ZIO[R, E, Long])(max: Long): ZTransducer[R, E, A, Chunk[A]]
}

Utility Transducers

Helpful utility transducers for common patterns.

object ZTransducer {
  /** Get first element */
  def head[A]: Transducer[Nothing, A, Option[A]]
  
  /** Get last element */
  def last[A]: Transducer[Nothing, A, Option[A]]
  
  /** Prepend values to stream */
  def prepend[A](values: A*): Transducer[Nothing, A, A]
  
  /** Append values to stream */
  def append[A](values: A*): Transducer[Nothing, A, A]
  
  /** Intersperse separator between elements */
  def intersperse[A](separator: A): Transducer[Nothing, A, A]
  
  /** Add index to elements */
  def zipWithIndex[A]: Transducer[Nothing, A, (A, Long)]
  
  /** Deduplicate consecutive elements */
  def deduplicateAdjacent[A]: Transducer[Nothing, A, A]
  
  /** Deduplicate by key */
  def deduplicateAdjacentBy[A, K](f: A => K): Transducer[Nothing, A, A]
  
  /** Remove None values from Option stream */
  def collectSome[A]: Transducer[Nothing, Option[A], A]
  
  /** Flatten nested chunks */
  def flatten[A]: Transducer[Nothing, Chunk[A], A]
}

Effect-Based Transducers

Transducers that incorporate effects.

object ZTransducer {
  /** Create from effect */
  def fromEffect[R, E, A](effect: ZIO[R, E, A]): ZTransducer[R, E, Any, A]
  
  /** Create from function */
  def fromFunction[A, B](f: A => B): Transducer[Nothing, A, B]
  
  /** Create from effectful function */
  def fromFunctionM[R, E, A, B](f: A => ZIO[R, E, B]): ZTransducer[R, E, A, B]
  
  /** Apply effect to each element (tap) */
  def tap[R, E, A](f: A => ZIO[R, E, Any]): ZTransducer[R, E, A, A]
  
  /** Trace elements for debugging */
  def debug[A](prefix: String = ""): Transducer[Nothing, A, A]
  
  /** Time each element processing */
  def timed[A]: ZTransducer[Clock, Nothing, A, (Duration, A)]
}

Transducer Transformations

Transform transducer behavior and composition.

trait ZTransducerOps[-R, +E, -I, +O] {
  /** Transform output elements */
  def map[P](f: O => P): ZTransducer[R, E, I, P]
  
  /** Effectful output transformation */
  def mapM[R1 <: R, E1 >: E, P](f: O => ZIO[R1, E1, P]): ZTransducer[R1, E1, I, P]
  
  /** Transform output chunks */
  def mapChunks[P](f: Chunk[O] => Chunk[P]): ZTransducer[R, E, I, P]
  
  /** Transform input elements */
  def contramap[J](f: J => I): ZTransducer[R, E, J, O]
  
  /** Effectful input transformation */
  def contramapM[R1 <: R, E1 >: E, J](f: J => ZIO[R1, E1, I]): ZTransducer[R1, E1, J, O]
  
  /** Transform input chunks */
  def contramapChunks[J](f: Chunk[J] => Chunk[I]): ZTransducer[R, E, J, O]
  
  /** Transform errors */
  def mapError[E2](f: E => E2): ZTransducer[R, E2, I, O]
  
  /** Filter output elements */
  def filter(predicate: O => Boolean): ZTransducer[R, E, I, O]
  
  /** Effectful output filtering */
  def filterM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZTransducer[R1, E1, I, O]
}

Transducer Composition

Compose transducers together.

trait ZTransducerOps[-R, +E, -I, +O] {
  /** Compose with another transducer (andThen) */
  def >>>[R1 <: R, E1 >: E, P](that: ZTransducer[R1, E1, O, P]): ZTransducer[R1, E1, I, P]
  
  /** Compose (alias for >>>) */
  def andThen[R1 <: R, E1 >: E, P](that: ZTransducer[R1, E1, O, P]): ZTransducer[R1, E1, I, P]
  
  /** Compose before */
  def compose[R1 <: R, E1 >: E, C](that: ZTransducer[R1, E1, C, I]): ZTransducer[R1, E1, C, O]
  
  /** Zip with another transducer */
  def zip[R1 <: R, E1 >: E, I1 <: I, P](that: ZTransducer[R1, E1, I1, P]): ZTransducer[R1, E1, I1, (O, P)]
  
  /** Zip with function */
  def zipWith[R1 <: R, E1 >: E, I1 <: I, P, Q](that: ZTransducer[R1, E1, I1, P])(f: (O, P) => Q): ZTransducer[R1, E1, I1, Q]
  
  /** Race two transducers */
  def race[R1 <: R, E1 >: E, I1 <: I, O1 >: O](that: ZTransducer[R1, E1, I1, O1]): ZTransducer[R1, E1, I1, O1]
}

Error Handling

Error handling for transducers.

trait ZTransducerOps[-R, +E, -I, +O] {
  /** Handle all errors */
  def catchAll[R1 <: R, E2, I1 <: I, O1 >: O](h: E => ZTransducer[R1, E2, I1, O1]): ZTransducer[R1, E2, I1, O1]
  
  /** Fallback transducer */
  def orElse[R1 <: R, E2, I1 <: I, O1 >: O](that: ZTransducer[R1, E2, I1, O1]): ZTransducer[R1, E2, I1, O1]
  
  /** Convert to option on error */
  def option: ZTransducer[R, Nothing, I, Option[O]]
  
  /** Convert to either */
  def either: ZTransducer[R, Nothing, I, Either[E, O]]
  
  /** Retry on failure */
  def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZTransducer[R1, E, I, O]
}

Resource Management

Resource management for transducers.

trait ZTransducerOps[-R, +E, -I, +O] {
  /** Provide environment */
  def provide[R1](env: R1)(implicit ev: R1 <:< R): ZTransducer[Any, E, I, O]
  
  /** Provide environment layer */
  def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZTransducer[R0, E, I, O]
  
  /** Time transducer execution */
  def timed: ZTransducer[R with Clock, E, I, (Duration, O)]
  
  /** Summarize execution */
  def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZTransducer[R1, E1, I, (C, O)]
}

Type Definitions

Core types used by transducers.

object ZTransducer {
  /** Push function signature for transducer implementation */
  type Push[R, E, I, O] = Option[Chunk[I]] => ZIO[R, E, Chunk[O]]
  
  /** Transducer that drains input without output */
  def drain[A]: Transducer[Nothing, A, Nothing] = ZTransducer(_ => Chunk.empty)
  
  /** Never-completing transducer */
  def never[I, O]: Transducer[Nothing, I, O] = ZTransducer.fromEffect(ZIO.never)
  
  /** Immediately succeeding transducer */
  def succeed[O](o: => O): Transducer[Nothing, Any, O] = fromEffect(ZIO.succeed(o))
  
  /** Immediately failing transducer */
  def fail[E](e: => E): ZTransducer[Any, E, Any, Nothing] = fromEffect(ZIO.fail(e))
}

Usage Examples:

import zio._
import zio.stream._
import zio.duration._

// Basic transformations
val numbers = ZStream.range(1, 10)
val doubled = numbers.transduce(ZTransducer.map(_ * 2))
val evens = numbers.transduce(ZTransducer.filter(_ % 2 == 0))

// Folding and scanning
val sum = numbers.transduce(ZTransducer.fold(0)(_ + _))
val runningSum = numbers.transduce(ZTransducer.scan(0)(_ + _))

// Collecting operations
val firstThree = numbers.transduce(ZTransducer.take(3))
val grouped = numbers.transduce(ZTransducer.batch(3))

// Composition
val composed = ZTransducer.filter[Int](_ > 5) >>> ZTransducer.map(_ * 2)
val result = numbers.transduce(composed)

// Effectful processing
val logged = ZTransducer.tap[Console, IOException, Int](n => 
  Console.printLine(s"Processing: $n")
)

// Grouping operations
val adjacentGroups = ZStream("a", "a", "b", "b", "c")
  .transduce(ZTransducer.groupAdjacentBy(identity))

// Timing operations
val timedProcessing = numbers.transduce(ZTransducer.timed)