ZIO Streams provides composable, resource-safe streaming data processing with comprehensive backpressure handling and error recovery. Streams are pull-based, interruptible, and integrate seamlessly with the ZIO effect system.
The foundational streaming type representing potentially infinite sequences of values with environment, error, and element types.
/**
* A ZStream represents a lazy, potentially infinite sequence of values of type A
* that can fail with E and requires environment R
*/
sealed trait ZStream[-R, +E, +A] {
/** Transform each element with a function */
def map[B](f: A => B): ZStream[R, E, B]
/** Transform each element with an effect */
def mapZIO[R1 <: R, E1 >: E, B](f: A => ZIO[R1, E1, B]): ZStream[R1, E1, B]
/** Filter elements based on a predicate */
def filter(f: A => Boolean): ZStream[R, E, A]
/** Take the first n elements */
def take(n: => Long): ZStream[R, E, A]
/** Skip the first n elements */
def drop(n: => Long): ZStream[R, E, A]
/** Concatenate with another stream */
def ++[R1 <: R, E1 >: E, A1 >: A](that: => ZStream[R1, E1, A1]): ZStream[R1, E1, A1]
/** Transform errors */
def mapError[E2](f: E => E2): ZStream[R, E2, A]
/** Recover from errors */
def catchAll[R1 <: R, E2, A1 >: A](f: E => ZStream[R1, E2, A1]): ZStream[R1, E2, A1]
/** Merge with another stream */
def merge[R1 <: R, E1 >: E, A1 >: A](that: ZStream[R1, E1, A1]): ZStream[R1, E1, A1]
/** Broadcast elements to multiple streams */
def broadcast(n: Int): ZIO[R with Scope, Nothing, List[ZStream[Any, E, A]]]
/** Group elements into chunks */
def grouped(n: => Int): ZStream[R, E, Chunk[A]]
/** Run the stream with a sink */
def run[R1 <: R, E1 >: E, Z](sink: => ZSink[R1, E1, A, Any, Z]): ZIO[R1, E1, Z]
/** Convert to a ZIO effect collecting all elements */
def runCollect: ZIO[R, E, Chunk[A]]
/** Execute foreach on each element */
def runForeach[R1 <: R, E1 >: E](f: A => ZIO[R1, E1, Any]): ZIO[R1, E1, Unit]
}
// Type aliases for common patterns
type Stream[+E, +A] = ZStream[Any, E, A]
type UStream[+A] = ZStream[Any, Nothing, A]Usage Examples:
import zio._
import zio.stream._
// Create streams from various sources
val numbers = ZStream.range(1, 100)
val fromIterable = ZStream.fromIterable(List(1, 2, 3, 4, 5))
val fromEffect = ZStream.fromZIO(ZIO.succeed(42))
// Transform and process streams
val processed = ZStream.range(1, 1000)
.filter(_ % 2 == 0)
.map(_ * 2)
.take(10)
.runCollect
// Merge multiple streams
val merged = ZStream.range(1, 10)
.merge(ZStream.range(100, 110))
.runCollectCreate streams from various data sources including iterables, effects, and external resources.
/**
* Stream construction methods
*/
object ZStream {
/** Create a stream from an iterable */
def fromIterable[A](as: => Iterable[A]): UStream[A]
/** Create a stream from a single ZIO effect */
def fromZIO[R, E, A](zio: => ZIO[R, E, A]): ZStream[R, E, A]
/** Create a stream from a range of integers */
def range(min: Int, max: Int): UStream[Int]
/** Create a stream that repeats a value forever */
def repeat[A](a: => A): UStream[A]
/** Create a stream that repeats an effect forever */
def repeatZIO[R, E, A](zio: => ZIO[R, E, A]): ZStream[R, E, A]
/** Create a stream with a single value */
def succeed[A](a: => A): UStream[A]
/** Create a failing stream */
def fail[E](error: => E): Stream[E, Nothing]
/** Create an empty stream */
val empty: UStream[Nothing]
/** Create a stream from a Java InputStream */
def fromInputStream(is: InputStream, chunkSize: => Int): Stream[IOException, Byte]
/** Create a stream from file lines */
def fromPath(path: Path): Stream[IOException, String]
/** Create a stream that emits at regular intervals */
def tick(interval: => Duration): ZStream[Any, Nothing, Unit]
/** Create a stream from an async callback */
def async[R, E, A](
register: (ZIO[R, Option[E], Chunk[A]] => Unit) => Unit
): ZStream[R, E, A]
}Usage Examples:
// File streaming
val fileContent = ZStream.fromPath(Paths.get("data.txt"))
.via(ZPipeline.utf8Decode)
.runForeach(Console.printLine(_))
// Periodic emissions
val heartbeat = ZStream.tick(1.second)
.zipWith(ZStream.range(1, Int.MaxValue))((_, n) => s"Heartbeat $n")
.runForeach(Console.printLine(_))
// Async stream from callback
val asyncStream = ZStream.async[Any, Nothing, String] { callback =>
// Register callback with external system
externalSystem.onData(data => callback(ZIO.succeed(Chunk(data))))
}Sinks consume streams and produce results, providing backpressure and resource management.
/**
* A ZSink consumes values from a stream and produces a result
*/
sealed trait ZSink[-R, +E, -In, +L, +Z] {
/** Transform the result of the sink */
def map[Z2](f: Z => Z2): ZSink[R, E, In, L, Z2]
/** Transform the result with an effect */
def mapZIO[R1 <: R, E1 >: E, Z2](f: Z => ZIO[R1, E1, Z2]): ZSink[R1, E1, In, L, Z2]
/** Transform errors */
def mapError[E2](f: E => E2): ZSink[R, E2, In, L, Z]
/** Recover from errors */
def orElse[R1 <: R, E2, In1 <: In, L1 >: L, Z1 >: Z](
that: => ZSink[R1, E2, In1, L1, Z1]
): ZSink[R1, E2, In1, L1, Z1]
/** Combine with another sink in parallel */
def zipPar[R1 <: R, E1 >: E, In1 <: In, L1 >: L, Z2](
that: => ZSink[R1, E1, In1, L1, Z2]
): ZSink[R1, E1, In1, L1, (Z, Z2)]
/** Execute the sink on a stream */
def apply[R1 <: R, E1 >: E, In1 <: In](
stream: ZStream[R1, E1, In1]
): ZIO[R1, E1, Z]
}
/**
* Common sink constructors
*/
object ZSink {
/** Collect all elements into a Chunk */
def collectAll[A]: ZSink[Any, Nothing, A, Nothing, Chunk[A]]
/** Count the number of elements */
val count: ZSink[Any, Nothing, Any, Nothing, Long]
/** Take the first n elements */
def take[A](n: => Long): ZSink[Any, Nothing, A, A, Chunk[A]]
/** Fold over elements */
def fold[S, A](s: => S)(f: (S, A) => S): ZSink[Any, Nothing, A, Nothing, S]
/** Execute a side effect for each element */
def foreach[R, E, A](f: A => ZIO[R, E, Any]): ZSink[R, E, A, A, Unit]
/** Write to an OutputStream */
def fromOutputStream(os: OutputStream): ZSink[Any, IOException, Byte, Nothing, Unit]
/** Write to a file */
def fromPath(path: Path): ZSink[Any, IOException, Byte, Byte, Unit]
}Usage Examples:
// Collect stream results
val collected = ZStream.range(1, 100)
.run(ZSink.collectAll)
// Count elements
val elementCount = ZStream.fromIterable(someList)
.run(ZSink.count)
// Fold/reduce stream
val sum = ZStream.range(1, 101)
.run(ZSink.fold(0)(_ + _))
// Side effects during consumption
val logged = ZStream.range(1, 10)
.run(ZSink.foreach(n => Console.printLine(s"Processing: $n")))Pipelines provide reusable stream transformations that can be composed and applied to multiple streams.
/**
* A ZPipeline transforms one stream into another
*/
sealed trait ZPipeline[-R, +E, -In, +Out] {
/** Compose with another pipeline */
def >>>[R1 <: R, E1 >: E, Out2](
that: ZPipeline[R1, E1, Out, Out2]
): ZPipeline[R1, E1, In, Out2]
/** Apply this pipeline to a stream */
def apply[R1 <: R, E1 >: E, In1 <: In](
stream: ZStream[R1, E1, In1]
): ZStream[R1, E1, Out]
}
/**
* Common pipeline constructors
*/
object ZPipeline {
/** Identity pipeline that passes through all elements */
def identity[A]: ZPipeline[Any, Nothing, A, A]
/** Map each element */
def map[In, Out](f: In => Out): ZPipeline[Any, Nothing, In, Out]
/** Filter elements */
def filter[A](f: A => Boolean): ZPipeline[Any, Nothing, A, A]
/** Take the first n elements */
def take[A](n: => Long): ZPipeline[Any, Nothing, A, A]
/** Drop the first n elements */
def drop[A](n: => Long): ZPipeline[Any, Nothing, A, A]
/** Group elements into chunks */
def grouped[A](n: => Int): ZPipeline[Any, Nothing, A, Chunk[A]]
/** Decode UTF-8 bytes to strings */
val utf8Decode: ZPipeline[Any, CharacterCodingException, Byte, String]
/** Encode strings to UTF-8 bytes */
val utf8Encode: ZPipeline[Any, CharacterCodingException, String, Byte]
/** Split strings by lines */
val splitLines: ZPipeline[Any, Nothing, String, String]
/** Compress using gzip */
val gzip: ZPipeline[Any, Nothing, Byte, Byte]
/** Decompress gzip */
val gunzip: ZPipeline[Any, IOException, Byte, Byte]
}Usage Examples:
// Compose pipelines
val textProcessor = ZPipeline.utf8Decode >>>
ZPipeline.splitLines >>>
ZPipeline.filter(_.nonEmpty)
// Process file with pipeline
val processedFile = ZStream.fromPath(Paths.get("input.txt"))
.via(textProcessor)
.runForeach(Console.printLine(_))
// Reusable transformation
val numberProcessor = ZPipeline.map[String, Int](_.toInt)
.filter(_ > 0)
.map(_ * 2)
val processedNumbers = ZStream.fromIterable(List("1", "2", "3"))
.via(numberProcessor)
.runCollectSophisticated stream operations for complex data processing scenarios.
/**
* Advanced stream transformations and combinations
*/
trait ZStream[-R, +E, +A] {
/** Buffer elements up to a maximum size */
def buffer(capacity: => Int): ZStream[R, E, A]
/** Throttle stream to emit at most n elements per duration */
def throttleEnforce(n: => Long, duration: => Duration): ZStream[R, E, A]
/** Debounce rapid emissions */
def debounce(d: => Duration): ZStream[R, E, A]
/** Sliding window of elements */
def sliding(n: => Int): ZStream[R, E, Chunk[A]]
/** Aggregate elements with a schedule */
def aggregateAsync[R1 <: R, E1 >: E, B, C](
sink: => ZSink[R1, E1, A, A, B]
): ZStream[R1, E1, B]
/** Partition elements into multiple streams */
def partition[A1 >: A](
p: A1 => Boolean
): ZIO[R with Scope, Nothing, (ZStream[Any, E, A1], ZStream[Any, E, A1])]
/** Zip with another stream */
def zip[R1 <: R, E1 >: E, B](
that: ZStream[R1, E1, B]
): ZStream[R1, E1, (A, B)]
/** Interleave with another stream */
def interleave[R1 <: R, E1 >: E, A1 >: A](
that: ZStream[R1, E1, A1]
): ZStream[R1, E1, A1]
}Usage Examples:
// Rate limiting and buffering
val rateLimited = dataStream
.buffer(1000)
.throttleEnforce(100, 1.second)
.runForeach(processData)
// Windowing operations
val slidingAverage = numberStream
.sliding(5)
.map(chunk => chunk.sum / chunk.size.toDouble)
.runCollect
// Complex stream orchestration
val combined = for {
(evenStream, oddStream) <- numberStream.partition(_ % 2 == 0)
evens <- evenStream.runCollect.fork
odds <- oddStream.runCollect.fork
evenResults <- evens.join
oddResults <- odds.join
} yield (evenResults, oddResults)