or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

application.mdconcurrency.mdcore-effects.mddependency-injection.mderror-handling.mdindex.mdmetrics.mdresource-management.mdservices.mdstm.mdstreams.mdtesting.md
tile.json

streams.mddocs/

ZIO Streams

ZIO Streams provides composable, resource-safe streaming data processing with comprehensive backpressure handling and error recovery. Streams are pull-based, interruptible, and integrate seamlessly with the ZIO effect system.

Capabilities

ZStream - Core Streaming Type

The foundational streaming type representing potentially infinite sequences of values with environment, error, and element types.

/**
 * A ZStream represents a lazy, potentially infinite sequence of values of type A
 * that can fail with E and requires environment R
 */
sealed trait ZStream[-R, +E, +A] {
  /** Transform each element with a function */
  def map[B](f: A => B): ZStream[R, E, B]
  
  /** Transform each element with an effect */
  def mapZIO[R1 <: R, E1 >: E, B](f: A => ZIO[R1, E1, B]): ZStream[R1, E1, B]
  
  /** Filter elements based on a predicate */
  def filter(f: A => Boolean): ZStream[R, E, A]
  
  /** Take the first n elements */
  def take(n: => Long): ZStream[R, E, A]
  
  /** Skip the first n elements */
  def drop(n: => Long): ZStream[R, E, A]
  
  /** Concatenate with another stream */
  def ++[R1 <: R, E1 >: E, A1 >: A](that: => ZStream[R1, E1, A1]): ZStream[R1, E1, A1]
  
  /** Transform errors */
  def mapError[E2](f: E => E2): ZStream[R, E2, A]
  
  /** Recover from errors */
  def catchAll[R1 <: R, E2, A1 >: A](f: E => ZStream[R1, E2, A1]): ZStream[R1, E2, A1]
  
  /** Merge with another stream */
  def merge[R1 <: R, E1 >: E, A1 >: A](that: ZStream[R1, E1, A1]): ZStream[R1, E1, A1]
  
  /** Broadcast elements to multiple streams */
  def broadcast(n: Int): ZIO[R with Scope, Nothing, List[ZStream[Any, E, A]]]
  
  /** Group elements into chunks */
  def grouped(n: => Int): ZStream[R, E, Chunk[A]]
  
  /** Run the stream with a sink */
  def run[R1 <: R, E1 >: E, Z](sink: => ZSink[R1, E1, A, Any, Z]): ZIO[R1, E1, Z]
  
  /** Convert to a ZIO effect collecting all elements */
  def runCollect: ZIO[R, E, Chunk[A]]
  
  /** Execute foreach on each element */
  def runForeach[R1 <: R, E1 >: E](f: A => ZIO[R1, E1, Any]): ZIO[R1, E1, Unit]
}

// Type aliases for common patterns
type Stream[+E, +A] = ZStream[Any, E, A]
type UStream[+A] = ZStream[Any, Nothing, A]

Usage Examples:

import zio._
import zio.stream._

// Create streams from various sources
val numbers = ZStream.range(1, 100)
val fromIterable = ZStream.fromIterable(List(1, 2, 3, 4, 5))
val fromEffect = ZStream.fromZIO(ZIO.succeed(42))

// Transform and process streams
val processed = ZStream.range(1, 1000)
  .filter(_ % 2 == 0)
  .map(_ * 2)
  .take(10)
  .runCollect

// Merge multiple streams
val merged = ZStream.range(1, 10)
  .merge(ZStream.range(100, 110))
  .runCollect

Stream Construction

Create streams from various data sources including iterables, effects, and external resources.

/**
 * Stream construction methods
 */
object ZStream {
  /** Create a stream from an iterable */
  def fromIterable[A](as: => Iterable[A]): UStream[A]
  
  /** Create a stream from a single ZIO effect */
  def fromZIO[R, E, A](zio: => ZIO[R, E, A]): ZStream[R, E, A]
  
  /** Create a stream from a range of integers */
  def range(min: Int, max: Int): UStream[Int]
  
  /** Create a stream that repeats a value forever */
  def repeat[A](a: => A): UStream[A]
  
  /** Create a stream that repeats an effect forever */
  def repeatZIO[R, E, A](zio: => ZIO[R, E, A]): ZStream[R, E, A]
  
  /** Create a stream with a single value */
  def succeed[A](a: => A): UStream[A]
  
  /** Create a failing stream */
  def fail[E](error: => E): Stream[E, Nothing]
  
  /** Create an empty stream */
  val empty: UStream[Nothing]
  
  /** Create a stream from a Java InputStream */
  def fromInputStream(is: InputStream, chunkSize: => Int): Stream[IOException, Byte]
  
  /** Create a stream from file lines */
  def fromPath(path: Path): Stream[IOException, String]
  
  /** Create a stream that emits at regular intervals */
  def tick(interval: => Duration): ZStream[Any, Nothing, Unit]
  
  /** Create a stream from an async callback */
  def async[R, E, A](
    register: (ZIO[R, Option[E], Chunk[A]] => Unit) => Unit
  ): ZStream[R, E, A]
}

Usage Examples:

// File streaming
val fileContent = ZStream.fromPath(Paths.get("data.txt"))
  .via(ZPipeline.utf8Decode)
  .runForeach(Console.printLine(_))

// Periodic emissions
val heartbeat = ZStream.tick(1.second)
  .zipWith(ZStream.range(1, Int.MaxValue))((_, n) => s"Heartbeat $n")
  .runForeach(Console.printLine(_))

// Async stream from callback
val asyncStream = ZStream.async[Any, Nothing, String] { callback =>
  // Register callback with external system
  externalSystem.onData(data => callback(ZIO.succeed(Chunk(data))))
}

ZSink - Stream Consumer

Sinks consume streams and produce results, providing backpressure and resource management.

/**
 * A ZSink consumes values from a stream and produces a result
 */
sealed trait ZSink[-R, +E, -In, +L, +Z] {
  /** Transform the result of the sink */
  def map[Z2](f: Z => Z2): ZSink[R, E, In, L, Z2]
  
  /** Transform the result with an effect */
  def mapZIO[R1 <: R, E1 >: E, Z2](f: Z => ZIO[R1, E1, Z2]): ZSink[R1, E1, In, L, Z2]
  
  /** Transform errors */
  def mapError[E2](f: E => E2): ZSink[R, E2, In, L, Z]
  
  /** Recover from errors */
  def orElse[R1 <: R, E2, In1 <: In, L1 >: L, Z1 >: Z](
    that: => ZSink[R1, E2, In1, L1, Z1]
  ): ZSink[R1, E2, In1, L1, Z1]
  
  /** Combine with another sink in parallel */
  def zipPar[R1 <: R, E1 >: E, In1 <: In, L1 >: L, Z2](
    that: => ZSink[R1, E1, In1, L1, Z2]
  ): ZSink[R1, E1, In1, L1, (Z, Z2)]
  
  /** Execute the sink on a stream */
  def apply[R1 <: R, E1 >: E, In1 <: In](
    stream: ZStream[R1, E1, In1]
  ): ZIO[R1, E1, Z]
}

/**
 * Common sink constructors
 */
object ZSink {
  /** Collect all elements into a Chunk */
  def collectAll[A]: ZSink[Any, Nothing, A, Nothing, Chunk[A]]
  
  /** Count the number of elements */
  val count: ZSink[Any, Nothing, Any, Nothing, Long]
  
  /** Take the first n elements */
  def take[A](n: => Long): ZSink[Any, Nothing, A, A, Chunk[A]]
  
  /** Fold over elements */
  def fold[S, A](s: => S)(f: (S, A) => S): ZSink[Any, Nothing, A, Nothing, S]
  
  /** Execute a side effect for each element */
  def foreach[R, E, A](f: A => ZIO[R, E, Any]): ZSink[R, E, A, A, Unit]
  
  /** Write to an OutputStream */
  def fromOutputStream(os: OutputStream): ZSink[Any, IOException, Byte, Nothing, Unit]
  
  /** Write to a file */
  def fromPath(path: Path): ZSink[Any, IOException, Byte, Byte, Unit]
}

Usage Examples:

// Collect stream results
val collected = ZStream.range(1, 100)
  .run(ZSink.collectAll)

// Count elements
val elementCount = ZStream.fromIterable(someList)
  .run(ZSink.count)

// Fold/reduce stream
val sum = ZStream.range(1, 101)
  .run(ZSink.fold(0)(_ + _))

// Side effects during consumption
val logged = ZStream.range(1, 10)
  .run(ZSink.foreach(n => Console.printLine(s"Processing: $n")))

ZPipeline - Stream Transformation

Pipelines provide reusable stream transformations that can be composed and applied to multiple streams.

/**
 * A ZPipeline transforms one stream into another
 */
sealed trait ZPipeline[-R, +E, -In, +Out] {
  /** Compose with another pipeline */
  def >>>[R1 <: R, E1 >: E, Out2](
    that: ZPipeline[R1, E1, Out, Out2]
  ): ZPipeline[R1, E1, In, Out2]
  
  /** Apply this pipeline to a stream */
  def apply[R1 <: R, E1 >: E, In1 <: In](
    stream: ZStream[R1, E1, In1]
  ): ZStream[R1, E1, Out]
}

/**
 * Common pipeline constructors
 */
object ZPipeline {
  /** Identity pipeline that passes through all elements */
  def identity[A]: ZPipeline[Any, Nothing, A, A]
  
  /** Map each element */
  def map[In, Out](f: In => Out): ZPipeline[Any, Nothing, In, Out]
  
  /** Filter elements */
  def filter[A](f: A => Boolean): ZPipeline[Any, Nothing, A, A]
  
  /** Take the first n elements */
  def take[A](n: => Long): ZPipeline[Any, Nothing, A, A]
  
  /** Drop the first n elements */
  def drop[A](n: => Long): ZPipeline[Any, Nothing, A, A]
  
  /** Group elements into chunks */
  def grouped[A](n: => Int): ZPipeline[Any, Nothing, A, Chunk[A]]
  
  /** Decode UTF-8 bytes to strings */
  val utf8Decode: ZPipeline[Any, CharacterCodingException, Byte, String]
  
  /** Encode strings to UTF-8 bytes */
  val utf8Encode: ZPipeline[Any, CharacterCodingException, String, Byte]
  
  /** Split strings by lines */
  val splitLines: ZPipeline[Any, Nothing, String, String]
  
  /** Compress using gzip */
  val gzip: ZPipeline[Any, Nothing, Byte, Byte]
  
  /** Decompress gzip */
  val gunzip: ZPipeline[Any, IOException, Byte, Byte]
}

Usage Examples:

// Compose pipelines
val textProcessor = ZPipeline.utf8Decode >>> 
                   ZPipeline.splitLines >>> 
                   ZPipeline.filter(_.nonEmpty)

// Process file with pipeline
val processedFile = ZStream.fromPath(Paths.get("input.txt"))
  .via(textProcessor)
  .runForeach(Console.printLine(_))

// Reusable transformation
val numberProcessor = ZPipeline.map[String, Int](_.toInt)
  .filter(_ > 0)
  .map(_ * 2)

val processedNumbers = ZStream.fromIterable(List("1", "2", "3"))
  .via(numberProcessor)
  .runCollect

Advanced Stream Operations

Sophisticated stream operations for complex data processing scenarios.

/**
 * Advanced stream transformations and combinations
 */
trait ZStream[-R, +E, +A] {
  /** Buffer elements up to a maximum size */
  def buffer(capacity: => Int): ZStream[R, E, A]
  
  /** Throttle stream to emit at most n elements per duration */
  def throttleEnforce(n: => Long, duration: => Duration): ZStream[R, E, A]
  
  /** Debounce rapid emissions */
  def debounce(d: => Duration): ZStream[R, E, A]
  
  /** Sliding window of elements */
  def sliding(n: => Int): ZStream[R, E, Chunk[A]]
  
  /** Aggregate elements with a schedule */
  def aggregateAsync[R1 <: R, E1 >: E, B, C](
    sink: => ZSink[R1, E1, A, A, B]
  ): ZStream[R1, E1, B]
  
  /** Partition elements into multiple streams */
  def partition[A1 >: A](
    p: A1 => Boolean
  ): ZIO[R with Scope, Nothing, (ZStream[Any, E, A1], ZStream[Any, E, A1])]
  
  /** Zip with another stream */
  def zip[R1 <: R, E1 >: E, B](
    that: ZStream[R1, E1, B]
  ): ZStream[R1, E1, (A, B)]
  
  /** Interleave with another stream */
  def interleave[R1 <: R, E1 >: E, A1 >: A](
    that: ZStream[R1, E1, A1]
  ): ZStream[R1, E1, A1]
}

Usage Examples:

// Rate limiting and buffering
val rateLimited = dataStream
  .buffer(1000)
  .throttleEnforce(100, 1.second)
  .runForeach(processData)

// Windowing operations
val slidingAverage = numberStream
  .sliding(5)
  .map(chunk => chunk.sum / chunk.size.toDouble)
  .runCollect

// Complex stream orchestration
val combined = for {
  (evenStream, oddStream) <- numberStream.partition(_ % 2 == 0)
  evens <- evenStream.runCollect.fork
  odds  <- oddStream.runCollect.fork
  evenResults <- evens.join
  oddResults  <- odds.join
} yield (evenResults, oddResults)