or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdplatform-extensions.mdzsink.mdzstream.mdztransducer.md
tile.json

index.mddocs/

ZIO Streams

ZIO Streams is a functional streaming library that provides a powerful abstraction for handling potentially infinite sequences of data in a type-safe, composable, and resource-safe manner. Built on top of ZIO's effect system, it offers comprehensive operators for transformation, filtering, composition, and integrates seamlessly with ZIO's concurrent and asynchronous programming model.

Package Information

  • Package Name: zio-streams_2.13
  • Package Type: Maven
  • Language: Scala
  • Group ID: dev.zio
  • Artifact ID: zio-streams_2.13
  • Version: 1.0.18
  • Installation: libraryDependencies += "dev.zio" %% "zio-streams" % "1.0.18"

Core Imports

import zio.stream._
import zio._

For specific components:

import zio.stream.{ZStream, ZSink, ZTransducer}
import zio.stream.{Stream, UStream, Sink, Transducer} // Type aliases

Basic Usage

import zio._
import zio.stream._

// Create a simple stream
val numbers: UStream[Int] = ZStream.range(1, 10)

// Transform the stream
val doubled: UStream[Int] = numbers.map(_ * 2)

// Consume the stream with a sink
val sumSink: Sink[Nothing, Int, Nothing, Int] = ZSink.sum[Int]

// Run the stream
val program: UIO[Int] = doubled.run(sumSink)

Architecture

ZIO Streams is built around several key components:

  • ZStream[R, E, O]: Core streaming abstraction representing programs that emit 0+ values of type O, may fail with errors of type E, and require environment of type R
  • ZSink[R, E, I, L, Z]: Consumes stream elements of type I and produces a result of type Z, with leftover elements of type L
  • ZTransducer[R, E, I, O]: Transforms stream elements from type I to type O with effects
  • Resource Management: Built on ZManaged for automatic resource cleanup and exception safety
  • Chunk-based Processing: Uses ZIO's Chunk for efficient batch processing and memory management
  • Pull-based Model: Inherent backpressure and lazy evaluation for memory efficiency

Capabilities

Core Streaming Operations

Fundamental streaming operations including creation, transformation, combination, and execution of streams. The heart of ZIO Streams functionality.

abstract class ZStream[-R, +E, +O](
  val process: ZManaged[R, Nothing, ZIO[R, Option[E], Chunk[O]]]
)

// Core factory methods
object ZStream {
  def apply[A](as: A*): UStream[A]
  def succeed[A](a: => A): UStream[A]
  def fail[E](error: => E): Stream[E, Nothing]
  def fromIterable[O](as: => Iterable[O]): UStream[O]
  def fromEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]
}

// Core transformation methods
trait ZStreamOps[R, E, O] {
  def map[B](f: O => B): ZStream[R, E, B]
  def flatMap[R1 <: R, E1 >: E, O2](f: O => ZStream[R1, E1, O2]): ZStream[R1, E1, O2]
  def filter(predicate: O => Boolean): ZStream[R, E, O]
  def take(n: Long): ZStream[R, E, O]
  def run[R1 <: R, E1 >: E, B](sink: ZSink[R1, E1, O, Any, B]): ZIO[R1, E1, B]
}

Core Streaming Operations

Stream Consumption

Powerful sink operations for consuming streams and producing results. Includes collectors, folders, and effectful processors.

abstract class ZSink[-R, +E, -I, +L, +Z](
  val push: ZManaged[R, Nothing, ZSink.Push[R, E, I, L, Z]]
)

// Core sink factory methods
object ZSink {
  def collectAll[A]: Sink[Nothing, A, Nothing, Chunk[A]]
  def fold[A, S](z: S)(contFn: S => Boolean)(f: (S, A) => S): Sink[Nothing, A, A, S]
  def foreach[R, E, A](f: A => ZIO[R, E, Any]): ZSink[R, E, A, Nothing, Unit]
  def head[A]: Sink[Nothing, A, A, Option[A]]
  def sum[A](implicit A: Numeric[A]): Sink[Nothing, A, Nothing, A]
}

Stream Consumption

Stream Transformation

Transducers for transforming stream elements with effects, composition, and stateful processing capabilities.

abstract class ZTransducer[-R, +E, -I, +O](
  val push: ZManaged[R, Nothing, Option[Chunk[I]] => ZIO[R, E, Chunk[O]]]
)

// Core transducer factory methods
object ZTransducer {
  def identity[A]: Transducer[Nothing, A, A]
  def filter[A](predicate: A => Boolean): Transducer[Nothing, A, A]
  def map[A, B](f: A => B): Transducer[Nothing, A, B]
  def fold[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]
}

Stream Transformation

Platform Extensions

Platform-specific functionality including file I/O, networking, compression (JVM), and async integration for different runtime environments.

// JVM-specific extensions
object ZStream {
  def fromFile(file: => File, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
  def fromInputStream(is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
  def fromSocketServer(port: Int, host: String = "localhost"): ZManaged[Blocking, IOException, ZStream[Blocking, IOException, Connection]]
}

object ZTransducer {
  def gzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]
  def gunzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]
}

Platform Extensions

Type Aliases

// Convenience type aliases
type Stream[+E, +A] = ZStream[Any, E, A]      // Environment-less streams
type UStream[+A] = ZStream[Any, Nothing, A]   // Infallible streams
type Sink[+E, A, +L, +B] = ZSink[Any, E, A, L, B]  // Environment-less sinks
type Transducer[+E, -A, +B] = ZTransducer[Any, E, A, B]  // Environment-less transducers

Supporting Types

// Stream element container
case class Take[+E, +A](exit: Exit[Option[E], Chunk[A]]) {
  def fold[Z](end: => Z, error: E => Z, value: Chunk[A] => Z): Z
  def map[B](f: A => B): Take[E, B]
  def isDone: Boolean
  def isFailure: Boolean
  def isSuccess: Boolean
}

// Reactive reference with change stream
final class SubscriptionRef[A](
  val ref: RefM[A], 
  val changes: Stream[Nothing, A]
)

object SubscriptionRef {
  def make[A](a: A): UIO[SubscriptionRef[A]]
}

API Coverage

This Knowledge Tile documents the core public API of ZIO Streams 1.0.18. The documentation focuses on the most commonly used methods and capabilities. For additional methods and advanced functionality, consult the source code or use IDE auto-completion to explore the full API surface.