Functional, type-safe, composable streaming library built on ZIO's effect system for Scala
Comprehensive streaming operations for creating, transforming, combining, and executing streams. ZStream provides the foundational streaming abstraction in ZIO Streams.
Factory methods for creating streams from various sources.
object ZStream {
/** Create stream from varargs values */
def apply[A](as: A*): UStream[A]
/** Create single-value stream */
def succeed[A](a: => A): UStream[A]
/** Create failed stream */
def fail[E](error: => E): Stream[E, Nothing]
/** Empty stream */
def empty: UStream[Nothing]
/** Never-ending stream */
def never: UStream[Nothing]
/** Stream from chunk */
def fromChunk[O](c: => Chunk[O]): UStream[O]
/** Stream from iterable */
def fromIterable[O](as: => Iterable[O]): UStream[O]
/** Stream from iterator */
def fromIterator[A](iterator: => Iterator[A]): UStream[A]
/** Stream from Java iterator */
def fromJavaIterator[A](iterator: => java.util.Iterator[A]): UStream[A]
/** Integer range stream */
def range(min: Int, max: Int, chunkSize: Int = DefaultChunkSize): UStream[Int]
}Create streams from ZIO effects and managed resources.
object ZStream {
/** Single effect as stream */
def fromEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]
/** Repeat effect as stream */
def repeatEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]
/** Unwrap effect containing stream */
def unwrap[R, E, A](fa: ZIO[R, E, ZStream[R, E, A]]): ZStream[R, E, A]
/** Stream from managed resource */
def managed[R, E, A](managed: ZManaged[R, E, A]): ZStream[R, E, A]
/** Stream from schedule */
def fromSchedule[R, A](schedule: Schedule[R, Any, A]): ZStream[R with Clock, Never, A]
/** Periodic ticks */
def tick(interval: Duration): ZStream[Clock, Never, Unit]
}Functional generators for creating streams.
object ZStream {
/** Iterate function over seed value */
def iterate[A](a: A)(f: A => A): UStream[A]
/** Unfold state function */
def unfold[S, A](s: S)(f: S => Option[(A, S)]): UStream[A]
/** Effectful unfold */
def unfoldM[R, E, S, A](s: S)(f: S => ZIO[R, E, Option[(A, S)]]): ZStream[R, E, A]
/** Paginate values */
def paginate[A](a: A)(f: A => (A, Option[A])): UStream[A]
/** Paginate with effects */
def paginateM[R, E, A](a: A)(f: A => ZIO[R, E, (A, Option[A])]): ZStream[R, E, A]
}Essential stream transformation operations.
trait ZStreamOps[-R, +E, +O] {
/** Transform each element */
def map[B](f: O => B): ZStream[R, E, B]
/** Effectful element transformation */
def mapM[R1 <: R, E1 >: E, B](f: O => ZIO[R1, E1, B]): ZStream[R1, E1, B]
/** Transform chunks */
def mapChunks[O2](f: Chunk[O] => Chunk[O2]): ZStream[R, E, O2]
/** Monadic bind */
def flatMap[R1 <: R, E1 >: E, O2](f: O => ZStream[R1, E1, O2]): ZStream[R1, E1, O2]
/** Collect with partial function */
def collect[B](pf: PartialFunction[O, B]): ZStream[R, E, B]
/** Effectful collect */
def collectM[R1 <: R, E1 >: E, B](pf: PartialFunction[O, ZIO[R1, E1, B]]): ZStream[R1, E1, B]
/** Filter elements */
def filter(predicate: O => Boolean): ZStream[R, E, O]
/** Effectful filter */
def filterM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]
}Operations that maintain state across stream elements.
trait ZStreamOps[-R, +E, +O] {
/** Stateful scanning */
def scan[S](s: S)(f: (S, O) => S): ZStream[R, E, S]
/** Effectful stateful scanning */
def scanM[R1 <: R, E1 >: E, S](s: S)(f: (S, O) => ZIO[R1, E1, S]): ZStream[R1, E1, S]
/** Scan with early termination */
def scanReduce[O1 >: O](f: (O1, O1) => O1): ZStream[R, E, O1]
/** Effectful scan with early termination */
def scanReduceM[R1 <: R, E1 >: E, O1 >: O](f: (O1, O1) => ZIO[R1, E1, O1]): ZStream[R1, E1, O1]
/** Accumulate elements */
def mapAccum[S, B](s: S)(f: (S, O) => (S, B)): ZStream[R, E, B]
/** Effectful accumulation */
def mapAccumM[R1 <: R, E1 >: E, S, B](s: S)(f: (S, O) => ZIO[R1, E1, (S, B)]): ZStream[R1, E1, B]
}Combine multiple streams in various ways.
trait ZStreamOps[-R, +E, +O] {
/** Concatenate streams sequentially */
def ++[R1 <: R, E1 >: E, O1 >: O](that: => ZStream[R1, E1, O1]): ZStream[R1, E1, O1]
/** Zip with another stream */
def zip[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, (O, O2)]
/** Zip with function */
def zipWith[R1 <: R, E1 >: E, O2, C](that: ZStream[R1, E1, O2])(f: (O, O2) => C): ZStream[R1, E1, C]
/** Zip keeping left */
def zipLeft[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, O]
/** Zip keeping right */
def zipRight[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, O2]
/** Zip with index */
def zipWithIndex: ZStream[R, E, (O, Long)]
/** Merge streams concurrently */
def merge[R1 <: R, E1 >: E, O1 >: O](that: ZStream[R1, E1, O1]): ZStream[R1, E1, O1]
/** Interleave elements */
def interleave[R1 <: R, E1 >: E, O1 >: O](that: ZStream[R1, E1, O1]): ZStream[R1, E1, O1]
/** Cross product */
def cross[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, (O, O2)]
}Operations for taking subsets of stream elements.
trait ZStreamOps[-R, +E, +O] {
/** Take first n elements */
def take(n: Long): ZStream[R, E, O]
/** Take while predicate holds */
def takeWhile(predicate: O => Boolean): ZStream[R, E, O]
/** Effectful take while */
def takeWhileM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]
/** Drop first n elements */
def drop(n: Long): ZStream[R, E, O]
/** Drop while predicate holds */
def dropWhile(predicate: O => Boolean): ZStream[R, E, O]
/** Effectful drop while */
def dropWhileM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]
/** Group into chunks */
def grouped(chunkSize: Int): ZStream[R, E, Chunk[O]]
/** Group by key */
def groupByKey[K](f: O => K): ZStream[R, E, (K, Chunk[O])]
/** Group adjacent by key */
def groupAdjacentBy[K](f: O => K): ZStream[R, E, (K, NonEmptyChunk[O])]
}Comprehensive error handling capabilities.
trait ZStreamOps[-R, +E, +O] {
/** Handle all errors */
def catchAll[R1 <: R, E2, O1 >: O](h: E => ZStream[R1, E2, O1]): ZStream[R1, E2, O1]
/** Handle some errors */
def catchSome[R1 <: R, E1 >: E, O1 >: O](pf: PartialFunction[E, ZStream[R1, E1, O1]]): ZStream[R1, E1, O1]
/** Fallback stream */
def orElse[R1 <: R, E2, O1 >: O](that: => ZStream[R1, E2, O1]): ZStream[R1, E2, O1]
/** Transform errors */
def mapError[E2](f: E => E2): ZStream[R, E2, O]
/** Effectful error transformation */
def mapErrorM[R1 <: R, E2](f: E => ZIO[R1, Nothing, E2]): ZStream[R1, E2, O]
/** Retry with schedule */
def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZStream[R1, E, O]
/** Ignore errors */
def ignore: ZStream[R, Nothing, O]
/** Convert to option on error */
def option: ZStream[R, Nothing, Option[O]]
/** Use default value on error */
def orElse[O1 >: O](default: O1): ZStream[R, Nothing, O1]
}Time-based stream operations.
trait ZStreamOps[-R, +E, +O] {
/** Rate limiting with backpressure */
def throttleEnforce(units: Long, duration: Duration): ZStream[R with Clock, E, O]
/** Traffic shaping */
def throttleShape(units: Long, duration: Duration): ZStream[R with Clock, E, O]
/** Debounce elements */
def debounce(duration: Duration): ZStream[R with Clock, E, O]
/** Timeout stream */
def timeout(duration: Duration): ZStream[R with Clock, E, O]
/** Delay elements */
def delay(duration: Duration): ZStream[R with Clock, E, O]
/** Schedule elements */
def schedule[R1 <: R](schedule: Schedule[R1, O, Any]): ZStream[R1 with Clock, E, O]
/** Repeat on schedule */
def repeat[R1 <: R](schedule: Schedule[R1, O, Any]): ZStream[R1 with Clock, E, O]
/** Time execution */
def timed: ZStream[R with Clock, E, (Duration, O)]
}Stream buffering strategies for performance optimization.
trait ZStreamOps[-R, +E, +O] {
/** Buffer with backpressure */
def buffer(capacity: Int): ZStream[R, E, O]
/** Dropping buffer (drop oldest when full) */
def bufferDropping(capacity: Int): ZStream[R, E, O]
/** Sliding window buffer */
def bufferSliding(capacity: Int): ZStream[R, E, O]
/** Unbounded buffer */
def bufferUnbounded: ZStream[R, E, O]
/** Buffer chunks */
def bufferChunks(capacity: Int): ZStream[R, E, O]
}Terminal operations for consuming streams.
trait ZStreamOps[-R, +E, +O] {
/** Run stream with sink */
def run[R1 <: R, E1 >: E, B](sink: ZSink[R1, E1, O, Any, B]): ZIO[R1, E1, B]
/** Collect all elements */
def runCollect: ZIO[R, E, List[O]]
/** Get first element */
def runHead: ZIO[R, E, Option[O]]
/** Get last element */
def runLast: ZIO[R, E, Option[O]]
/** Run and discard results */
def runDrain: ZIO[R, E, Unit]
/** Count elements */
def runCount: ZIO[R, E, Long]
/** Sum numeric elements */
def runSum[O1 >: O](implicit ev: Numeric[O1]): ZIO[R, E, O1]
/** Apply effect to each element */
def foreach[R1 <: R, E1 >: E](f: O => ZIO[R1, E1, Any]): ZIO[R1, E1, Unit]
/** Apply effect while predicate holds */
def foreachWhile[R1 <: R, E1 >: E](f: O => ZIO[R1, E1, Boolean]): ZIO[R1, E1, Unit]
}Safe resource handling and cleanup.
trait ZStreamOps[-R, +E, +O] {
/** Ensure finalizer runs */
def ensuring[R1 <: R](finalizer: ZIO[R1, Nothing, Any]): ZStream[R1, E, O]
/** Bracket resource acquisition/release */
def bracket[R1 <: R, A](acquire: ZIO[R1, E, A])(release: A => ZIO[R1, Nothing, Any]): ZStream[R1, E, A]
/** Provide environment layer */
def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZStream[R0, E, O]
/** Provide environment */
def provide[R1](env: R1)(implicit ev: R1 <:< R): ZStream[Any, E, O]
/** Access environment */
def access[R1 <: R, B](f: R1 => B): ZStream[R1, E, B]
/** Timed execution with duration */
def timed: ZStream[R with Clock, E, (Duration, O)]
/** Summarized execution */
def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZStream[R1, E1, (C, O)]
}Usage Examples:
import zio._
import zio.stream._
import zio.duration._
// Create and transform streams
val numbers = ZStream.range(1, 100)
val evens = numbers.filter(_ % 2 == 0)
val doubled = evens.map(_ * 2)
// Combine streams
val stream1 = ZStream(1, 2, 3)
val stream2 = ZStream(4, 5, 6)
val combined = stream1 ++ stream2
// Error handling
val safeStream = ZStream.fail("error").catchAll(_ => ZStream.succeed(0))
// Timing operations
val throttled = numbers.throttleShape(10, 1.second)
val debounced = numbers.debounce(100.millis)
// Resource management
val managed = ZStream.managed(ZManaged.make(openFile)(closeFile))Install with Tessl CLI
npx tessl i tessl/maven-dev-zio--zio-streams-2-13