Functional, type-safe, composable streaming library built on ZIO's effect system for Scala
npx @tessl/cli install tessl/maven-dev-zio--zio-streams-2-13@1.0.0ZIO Streams is a functional streaming library that provides a powerful abstraction for handling potentially infinite sequences of data in a type-safe, composable, and resource-safe manner. Built on top of ZIO's effect system, it offers comprehensive operators for transformation, filtering, composition, and integrates seamlessly with ZIO's concurrent and asynchronous programming model.
libraryDependencies += "dev.zio" %% "zio-streams" % "1.0.18"import zio.stream._
import zio._For specific components:
import zio.stream.{ZStream, ZSink, ZTransducer}
import zio.stream.{Stream, UStream, Sink, Transducer} // Type aliasesimport zio._
import zio.stream._
// Create a simple stream
val numbers: UStream[Int] = ZStream.range(1, 10)
// Transform the stream
val doubled: UStream[Int] = numbers.map(_ * 2)
// Consume the stream with a sink
val sumSink: Sink[Nothing, Int, Nothing, Int] = ZSink.sum[Int]
// Run the stream
val program: UIO[Int] = doubled.run(sumSink)ZIO Streams is built around several key components:
O, may fail with errors of type E, and require environment of type RI and produces a result of type Z, with leftover elements of type LI to type O with effectsFundamental streaming operations including creation, transformation, combination, and execution of streams. The heart of ZIO Streams functionality.
abstract class ZStream[-R, +E, +O](
val process: ZManaged[R, Nothing, ZIO[R, Option[E], Chunk[O]]]
)
// Core factory methods
object ZStream {
def apply[A](as: A*): UStream[A]
def succeed[A](a: => A): UStream[A]
def fail[E](error: => E): Stream[E, Nothing]
def fromIterable[O](as: => Iterable[O]): UStream[O]
def fromEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]
}
// Core transformation methods
trait ZStreamOps[R, E, O] {
def map[B](f: O => B): ZStream[R, E, B]
def flatMap[R1 <: R, E1 >: E, O2](f: O => ZStream[R1, E1, O2]): ZStream[R1, E1, O2]
def filter(predicate: O => Boolean): ZStream[R, E, O]
def take(n: Long): ZStream[R, E, O]
def run[R1 <: R, E1 >: E, B](sink: ZSink[R1, E1, O, Any, B]): ZIO[R1, E1, B]
}Powerful sink operations for consuming streams and producing results. Includes collectors, folders, and effectful processors.
abstract class ZSink[-R, +E, -I, +L, +Z](
val push: ZManaged[R, Nothing, ZSink.Push[R, E, I, L, Z]]
)
// Core sink factory methods
object ZSink {
def collectAll[A]: Sink[Nothing, A, Nothing, Chunk[A]]
def fold[A, S](z: S)(contFn: S => Boolean)(f: (S, A) => S): Sink[Nothing, A, A, S]
def foreach[R, E, A](f: A => ZIO[R, E, Any]): ZSink[R, E, A, Nothing, Unit]
def head[A]: Sink[Nothing, A, A, Option[A]]
def sum[A](implicit A: Numeric[A]): Sink[Nothing, A, Nothing, A]
}Transducers for transforming stream elements with effects, composition, and stateful processing capabilities.
abstract class ZTransducer[-R, +E, -I, +O](
val push: ZManaged[R, Nothing, Option[Chunk[I]] => ZIO[R, E, Chunk[O]]]
)
// Core transducer factory methods
object ZTransducer {
def identity[A]: Transducer[Nothing, A, A]
def filter[A](predicate: A => Boolean): Transducer[Nothing, A, A]
def map[A, B](f: A => B): Transducer[Nothing, A, B]
def fold[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]
}Platform-specific functionality including file I/O, networking, compression (JVM), and async integration for different runtime environments.
// JVM-specific extensions
object ZStream {
def fromFile(file: => File, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
def fromInputStream(is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
def fromSocketServer(port: Int, host: String = "localhost"): ZManaged[Blocking, IOException, ZStream[Blocking, IOException, Connection]]
}
object ZTransducer {
def gzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]
def gunzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]
}// Convenience type aliases
type Stream[+E, +A] = ZStream[Any, E, A] // Environment-less streams
type UStream[+A] = ZStream[Any, Nothing, A] // Infallible streams
type Sink[+E, A, +L, +B] = ZSink[Any, E, A, L, B] // Environment-less sinks
type Transducer[+E, -A, +B] = ZTransducer[Any, E, A, B] // Environment-less transducers// Stream element container
case class Take[+E, +A](exit: Exit[Option[E], Chunk[A]]) {
def fold[Z](end: => Z, error: E => Z, value: Chunk[A] => Z): Z
def map[B](f: A => B): Take[E, B]
def isDone: Boolean
def isFailure: Boolean
def isSuccess: Boolean
}
// Reactive reference with change stream
final class SubscriptionRef[A](
val ref: RefM[A],
val changes: Stream[Nothing, A]
)
object SubscriptionRef {
def make[A](a: A): UIO[SubscriptionRef[A]]
}This Knowledge Tile documents the core public API of ZIO Streams 1.0.18. The documentation focuses on the most commonly used methods and capabilities. For additional methods and advanced functionality, consult the source code or use IDE auto-completion to explore the full API surface.