Comprehensive streaming operations for creating, transforming, combining, and executing streams. ZStream provides the foundational streaming abstraction in ZIO Streams.
Factory methods for creating streams from various sources.
object ZStream {
/** Create stream from varargs values */
def apply[A](as: A*): UStream[A]
/** Create single-value stream */
def succeed[A](a: => A): UStream[A]
/** Create failed stream */
def fail[E](error: => E): Stream[E, Nothing]
/** Empty stream */
def empty: UStream[Nothing]
/** Never-ending stream */
def never: UStream[Nothing]
/** Stream from chunk */
def fromChunk[O](c: => Chunk[O]): UStream[O]
/** Stream from iterable */
def fromIterable[O](as: => Iterable[O]): UStream[O]
/** Stream from iterator */
def fromIterator[A](iterator: => Iterator[A]): UStream[A]
/** Stream from Java iterator */
def fromJavaIterator[A](iterator: => java.util.Iterator[A]): UStream[A]
/** Integer range stream */
def range(min: Int, max: Int, chunkSize: Int = DefaultChunkSize): UStream[Int]
}Create streams from ZIO effects and managed resources.
object ZStream {
/** Single effect as stream */
def fromEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]
/** Repeat effect as stream */
def repeatEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]
/** Unwrap effect containing stream */
def unwrap[R, E, A](fa: ZIO[R, E, ZStream[R, E, A]]): ZStream[R, E, A]
/** Stream from managed resource */
def managed[R, E, A](managed: ZManaged[R, E, A]): ZStream[R, E, A]
/** Stream from schedule */
def fromSchedule[R, A](schedule: Schedule[R, Any, A]): ZStream[R with Clock, Never, A]
/** Periodic ticks */
def tick(interval: Duration): ZStream[Clock, Never, Unit]
}Functional generators for creating streams.
object ZStream {
/** Iterate function over seed value */
def iterate[A](a: A)(f: A => A): UStream[A]
/** Unfold state function */
def unfold[S, A](s: S)(f: S => Option[(A, S)]): UStream[A]
/** Effectful unfold */
def unfoldM[R, E, S, A](s: S)(f: S => ZIO[R, E, Option[(A, S)]]): ZStream[R, E, A]
/** Paginate values */
def paginate[A](a: A)(f: A => (A, Option[A])): UStream[A]
/** Paginate with effects */
def paginateM[R, E, A](a: A)(f: A => ZIO[R, E, (A, Option[A])]): ZStream[R, E, A]
}Essential stream transformation operations.
trait ZStreamOps[-R, +E, +O] {
/** Transform each element */
def map[B](f: O => B): ZStream[R, E, B]
/** Effectful element transformation */
def mapM[R1 <: R, E1 >: E, B](f: O => ZIO[R1, E1, B]): ZStream[R1, E1, B]
/** Transform chunks */
def mapChunks[O2](f: Chunk[O] => Chunk[O2]): ZStream[R, E, O2]
/** Monadic bind */
def flatMap[R1 <: R, E1 >: E, O2](f: O => ZStream[R1, E1, O2]): ZStream[R1, E1, O2]
/** Collect with partial function */
def collect[B](pf: PartialFunction[O, B]): ZStream[R, E, B]
/** Effectful collect */
def collectM[R1 <: R, E1 >: E, B](pf: PartialFunction[O, ZIO[R1, E1, B]]): ZStream[R1, E1, B]
/** Filter elements */
def filter(predicate: O => Boolean): ZStream[R, E, O]
/** Effectful filter */
def filterM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]
}Operations that maintain state across stream elements.
trait ZStreamOps[-R, +E, +O] {
/** Stateful scanning */
def scan[S](s: S)(f: (S, O) => S): ZStream[R, E, S]
/** Effectful stateful scanning */
def scanM[R1 <: R, E1 >: E, S](s: S)(f: (S, O) => ZIO[R1, E1, S]): ZStream[R1, E1, S]
/** Scan with early termination */
def scanReduce[O1 >: O](f: (O1, O1) => O1): ZStream[R, E, O1]
/** Effectful scan with early termination */
def scanReduceM[R1 <: R, E1 >: E, O1 >: O](f: (O1, O1) => ZIO[R1, E1, O1]): ZStream[R1, E1, O1]
/** Accumulate elements */
def mapAccum[S, B](s: S)(f: (S, O) => (S, B)): ZStream[R, E, B]
/** Effectful accumulation */
def mapAccumM[R1 <: R, E1 >: E, S, B](s: S)(f: (S, O) => ZIO[R1, E1, (S, B)]): ZStream[R1, E1, B]
}Combine multiple streams in various ways.
trait ZStreamOps[-R, +E, +O] {
/** Concatenate streams sequentially */
def ++[R1 <: R, E1 >: E, O1 >: O](that: => ZStream[R1, E1, O1]): ZStream[R1, E1, O1]
/** Zip with another stream */
def zip[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, (O, O2)]
/** Zip with function */
def zipWith[R1 <: R, E1 >: E, O2, C](that: ZStream[R1, E1, O2])(f: (O, O2) => C): ZStream[R1, E1, C]
/** Zip keeping left */
def zipLeft[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, O]
/** Zip keeping right */
def zipRight[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, O2]
/** Zip with index */
def zipWithIndex: ZStream[R, E, (O, Long)]
/** Merge streams concurrently */
def merge[R1 <: R, E1 >: E, O1 >: O](that: ZStream[R1, E1, O1]): ZStream[R1, E1, O1]
/** Interleave elements */
def interleave[R1 <: R, E1 >: E, O1 >: O](that: ZStream[R1, E1, O1]): ZStream[R1, E1, O1]
/** Cross product */
def cross[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, (O, O2)]
}Operations for taking subsets of stream elements.
trait ZStreamOps[-R, +E, +O] {
/** Take first n elements */
def take(n: Long): ZStream[R, E, O]
/** Take while predicate holds */
def takeWhile(predicate: O => Boolean): ZStream[R, E, O]
/** Effectful take while */
def takeWhileM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]
/** Drop first n elements */
def drop(n: Long): ZStream[R, E, O]
/** Drop while predicate holds */
def dropWhile(predicate: O => Boolean): ZStream[R, E, O]
/** Effectful drop while */
def dropWhileM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]
/** Group into chunks */
def grouped(chunkSize: Int): ZStream[R, E, Chunk[O]]
/** Group by key */
def groupByKey[K](f: O => K): ZStream[R, E, (K, Chunk[O])]
/** Group adjacent by key */
def groupAdjacentBy[K](f: O => K): ZStream[R, E, (K, NonEmptyChunk[O])]
}Comprehensive error handling capabilities.
trait ZStreamOps[-R, +E, +O] {
/** Handle all errors */
def catchAll[R1 <: R, E2, O1 >: O](h: E => ZStream[R1, E2, O1]): ZStream[R1, E2, O1]
/** Handle some errors */
def catchSome[R1 <: R, E1 >: E, O1 >: O](pf: PartialFunction[E, ZStream[R1, E1, O1]]): ZStream[R1, E1, O1]
/** Fallback stream */
def orElse[R1 <: R, E2, O1 >: O](that: => ZStream[R1, E2, O1]): ZStream[R1, E2, O1]
/** Transform errors */
def mapError[E2](f: E => E2): ZStream[R, E2, O]
/** Effectful error transformation */
def mapErrorM[R1 <: R, E2](f: E => ZIO[R1, Nothing, E2]): ZStream[R1, E2, O]
/** Retry with schedule */
def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZStream[R1, E, O]
/** Ignore errors */
def ignore: ZStream[R, Nothing, O]
/** Convert to option on error */
def option: ZStream[R, Nothing, Option[O]]
/** Use default value on error */
def orElse[O1 >: O](default: O1): ZStream[R, Nothing, O1]
}Time-based stream operations.
trait ZStreamOps[-R, +E, +O] {
/** Rate limiting with backpressure */
def throttleEnforce(units: Long, duration: Duration): ZStream[R with Clock, E, O]
/** Traffic shaping */
def throttleShape(units: Long, duration: Duration): ZStream[R with Clock, E, O]
/** Debounce elements */
def debounce(duration: Duration): ZStream[R with Clock, E, O]
/** Timeout stream */
def timeout(duration: Duration): ZStream[R with Clock, E, O]
/** Delay elements */
def delay(duration: Duration): ZStream[R with Clock, E, O]
/** Schedule elements */
def schedule[R1 <: R](schedule: Schedule[R1, O, Any]): ZStream[R1 with Clock, E, O]
/** Repeat on schedule */
def repeat[R1 <: R](schedule: Schedule[R1, O, Any]): ZStream[R1 with Clock, E, O]
/** Time execution */
def timed: ZStream[R with Clock, E, (Duration, O)]
}Stream buffering strategies for performance optimization.
trait ZStreamOps[-R, +E, +O] {
/** Buffer with backpressure */
def buffer(capacity: Int): ZStream[R, E, O]
/** Dropping buffer (drop oldest when full) */
def bufferDropping(capacity: Int): ZStream[R, E, O]
/** Sliding window buffer */
def bufferSliding(capacity: Int): ZStream[R, E, O]
/** Unbounded buffer */
def bufferUnbounded: ZStream[R, E, O]
/** Buffer chunks */
def bufferChunks(capacity: Int): ZStream[R, E, O]
}Terminal operations for consuming streams.
trait ZStreamOps[-R, +E, +O] {
/** Run stream with sink */
def run[R1 <: R, E1 >: E, B](sink: ZSink[R1, E1, O, Any, B]): ZIO[R1, E1, B]
/** Collect all elements */
def runCollect: ZIO[R, E, List[O]]
/** Get first element */
def runHead: ZIO[R, E, Option[O]]
/** Get last element */
def runLast: ZIO[R, E, Option[O]]
/** Run and discard results */
def runDrain: ZIO[R, E, Unit]
/** Count elements */
def runCount: ZIO[R, E, Long]
/** Sum numeric elements */
def runSum[O1 >: O](implicit ev: Numeric[O1]): ZIO[R, E, O1]
/** Apply effect to each element */
def foreach[R1 <: R, E1 >: E](f: O => ZIO[R1, E1, Any]): ZIO[R1, E1, Unit]
/** Apply effect while predicate holds */
def foreachWhile[R1 <: R, E1 >: E](f: O => ZIO[R1, E1, Boolean]): ZIO[R1, E1, Unit]
}Safe resource handling and cleanup.
trait ZStreamOps[-R, +E, +O] {
/** Ensure finalizer runs */
def ensuring[R1 <: R](finalizer: ZIO[R1, Nothing, Any]): ZStream[R1, E, O]
/** Bracket resource acquisition/release */
def bracket[R1 <: R, A](acquire: ZIO[R1, E, A])(release: A => ZIO[R1, Nothing, Any]): ZStream[R1, E, A]
/** Provide environment layer */
def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZStream[R0, E, O]
/** Provide environment */
def provide[R1](env: R1)(implicit ev: R1 <:< R): ZStream[Any, E, O]
/** Access environment */
def access[R1 <: R, B](f: R1 => B): ZStream[R1, E, B]
/** Timed execution with duration */
def timed: ZStream[R with Clock, E, (Duration, O)]
/** Summarized execution */
def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZStream[R1, E1, (C, O)]
}Usage Examples:
import zio._
import zio.stream._
import zio.duration._
// Create and transform streams
val numbers = ZStream.range(1, 100)
val evens = numbers.filter(_ % 2 == 0)
val doubled = evens.map(_ * 2)
// Combine streams
val stream1 = ZStream(1, 2, 3)
val stream2 = ZStream(4, 5, 6)
val combined = stream1 ++ stream2
// Error handling
val safeStream = ZStream.fail("error").catchAll(_ => ZStream.succeed(0))
// Timing operations
val throttled = numbers.throttleShape(10, 1.second)
val debounced = numbers.debounce(100.millis)
// Resource management
val managed = ZStream.managed(ZManaged.make(openFile)(closeFile))