or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdplatform-extensions.mdzsink.mdzstream.mdztransducer.md
tile.json

zstream.mddocs/

Core Streaming Operations

Comprehensive streaming operations for creating, transforming, combining, and executing streams. ZStream provides the foundational streaming abstraction in ZIO Streams.

Capabilities

Stream Creation

Factory methods for creating streams from various sources.

object ZStream {
  /** Create stream from varargs values */
  def apply[A](as: A*): UStream[A]
  
  /** Create single-value stream */
  def succeed[A](a: => A): UStream[A]
  
  /** Create failed stream */
  def fail[E](error: => E): Stream[E, Nothing]
  
  /** Empty stream */
  def empty: UStream[Nothing]
  
  /** Never-ending stream */
  def never: UStream[Nothing]
  
  /** Stream from chunk */
  def fromChunk[O](c: => Chunk[O]): UStream[O]
  
  /** Stream from iterable */
  def fromIterable[O](as: => Iterable[O]): UStream[O]
  
  /** Stream from iterator */
  def fromIterator[A](iterator: => Iterator[A]): UStream[A]
  
  /** Stream from Java iterator */
  def fromJavaIterator[A](iterator: => java.util.Iterator[A]): UStream[A]
  
  /** Integer range stream */
  def range(min: Int, max: Int, chunkSize: Int = DefaultChunkSize): UStream[Int]
}

Effect-Based Creation

Create streams from ZIO effects and managed resources.

object ZStream {
  /** Single effect as stream */
  def fromEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]
  
  /** Repeat effect as stream */
  def repeatEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]
  
  /** Unwrap effect containing stream */
  def unwrap[R, E, A](fa: ZIO[R, E, ZStream[R, E, A]]): ZStream[R, E, A]
  
  /** Stream from managed resource */
  def managed[R, E, A](managed: ZManaged[R, E, A]): ZStream[R, E, A]
  
  /** Stream from schedule */
  def fromSchedule[R, A](schedule: Schedule[R, Any, A]): ZStream[R with Clock, Never, A]
  
  /** Periodic ticks */
  def tick(interval: Duration): ZStream[Clock, Never, Unit]
}

Generators and Iteration

Functional generators for creating streams.

object ZStream {
  /** Iterate function over seed value */
  def iterate[A](a: A)(f: A => A): UStream[A]
  
  /** Unfold state function */
  def unfold[S, A](s: S)(f: S => Option[(A, S)]): UStream[A]
  
  /** Effectful unfold */
  def unfoldM[R, E, S, A](s: S)(f: S => ZIO[R, E, Option[(A, S)]]): ZStream[R, E, A]
  
  /** Paginate values */
  def paginate[A](a: A)(f: A => (A, Option[A])): UStream[A]
  
  /** Paginate with effects */
  def paginateM[R, E, A](a: A)(f: A => ZIO[R, E, (A, Option[A])]): ZStream[R, E, A]
}

Core Transformations

Essential stream transformation operations.

trait ZStreamOps[-R, +E, +O] {
  /** Transform each element */
  def map[B](f: O => B): ZStream[R, E, B]
  
  /** Effectful element transformation */
  def mapM[R1 <: R, E1 >: E, B](f: O => ZIO[R1, E1, B]): ZStream[R1, E1, B]
  
  /** Transform chunks */
  def mapChunks[O2](f: Chunk[O] => Chunk[O2]): ZStream[R, E, O2]
  
  /** Monadic bind */
  def flatMap[R1 <: R, E1 >: E, O2](f: O => ZStream[R1, E1, O2]): ZStream[R1, E1, O2]
  
  /** Collect with partial function */
  def collect[B](pf: PartialFunction[O, B]): ZStream[R, E, B]
  
  /** Effectful collect */
  def collectM[R1 <: R, E1 >: E, B](pf: PartialFunction[O, ZIO[R1, E1, B]]): ZStream[R1, E1, B]
  
  /** Filter elements */
  def filter(predicate: O => Boolean): ZStream[R, E, O]
  
  /** Effectful filter */
  def filterM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]
}

Stateful Operations

Operations that maintain state across stream elements.

trait ZStreamOps[-R, +E, +O] {
  /** Stateful scanning */
  def scan[S](s: S)(f: (S, O) => S): ZStream[R, E, S]
  
  /** Effectful stateful scanning */
  def scanM[R1 <: R, E1 >: E, S](s: S)(f: (S, O) => ZIO[R1, E1, S]): ZStream[R1, E1, S]
  
  /** Scan with early termination */
  def scanReduce[O1 >: O](f: (O1, O1) => O1): ZStream[R, E, O1]
  
  /** Effectful scan with early termination */
  def scanReduceM[R1 <: R, E1 >: E, O1 >: O](f: (O1, O1) => ZIO[R1, E1, O1]): ZStream[R1, E1, O1]
  
  /** Accumulate elements */
  def mapAccum[S, B](s: S)(f: (S, O) => (S, B)): ZStream[R, E, B]
  
  /** Effectful accumulation */
  def mapAccumM[R1 <: R, E1 >: E, S, B](s: S)(f: (S, O) => ZIO[R1, E1, (S, B)]): ZStream[R1, E1, B]
}

Stream Combination

Combine multiple streams in various ways.

trait ZStreamOps[-R, +E, +O] {
  /** Concatenate streams sequentially */
  def ++[R1 <: R, E1 >: E, O1 >: O](that: => ZStream[R1, E1, O1]): ZStream[R1, E1, O1]
  
  /** Zip with another stream */
  def zip[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, (O, O2)]
  
  /** Zip with function */
  def zipWith[R1 <: R, E1 >: E, O2, C](that: ZStream[R1, E1, O2])(f: (O, O2) => C): ZStream[R1, E1, C]
  
  /** Zip keeping left */
  def zipLeft[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, O]
  
  /** Zip keeping right */
  def zipRight[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, O2]
  
  /** Zip with index */
  def zipWithIndex: ZStream[R, E, (O, Long)]
  
  /** Merge streams concurrently */
  def merge[R1 <: R, E1 >: E, O1 >: O](that: ZStream[R1, E1, O1]): ZStream[R1, E1, O1]
  
  /** Interleave elements */
  def interleave[R1 <: R, E1 >: E, O1 >: O](that: ZStream[R1, E1, O1]): ZStream[R1, E1, O1]
  
  /** Cross product */
  def cross[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, (O, O2)]
}

Partitioning and Slicing

Operations for taking subsets of stream elements.

trait ZStreamOps[-R, +E, +O] {
  /** Take first n elements */
  def take(n: Long): ZStream[R, E, O]
  
  /** Take while predicate holds */
  def takeWhile(predicate: O => Boolean): ZStream[R, E, O]
  
  /** Effectful take while */
  def takeWhileM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]
  
  /** Drop first n elements */
  def drop(n: Long): ZStream[R, E, O]
  
  /** Drop while predicate holds */
  def dropWhile(predicate: O => Boolean): ZStream[R, E, O]
  
  /** Effectful drop while */
  def dropWhileM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]
  
  /** Group into chunks */
  def grouped(chunkSize: Int): ZStream[R, E, Chunk[O]]
  
  /** Group by key */
  def groupByKey[K](f: O => K): ZStream[R, E, (K, Chunk[O])]
  
  /** Group adjacent by key */
  def groupAdjacentBy[K](f: O => K): ZStream[R, E, (K, NonEmptyChunk[O])]
}

Error Handling

Comprehensive error handling capabilities.

trait ZStreamOps[-R, +E, +O] {
  /** Handle all errors */
  def catchAll[R1 <: R, E2, O1 >: O](h: E => ZStream[R1, E2, O1]): ZStream[R1, E2, O1]
  
  /** Handle some errors */
  def catchSome[R1 <: R, E1 >: E, O1 >: O](pf: PartialFunction[E, ZStream[R1, E1, O1]]): ZStream[R1, E1, O1]
  
  /** Fallback stream */
  def orElse[R1 <: R, E2, O1 >: O](that: => ZStream[R1, E2, O1]): ZStream[R1, E2, O1]
  
  /** Transform errors */
  def mapError[E2](f: E => E2): ZStream[R, E2, O]
  
  /** Effectful error transformation */
  def mapErrorM[R1 <: R, E2](f: E => ZIO[R1, Nothing, E2]): ZStream[R1, E2, O]
  
  /** Retry with schedule */
  def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZStream[R1, E, O]
  
  /** Ignore errors */
  def ignore: ZStream[R, Nothing, O]
  
  /** Convert to option on error */
  def option: ZStream[R, Nothing, Option[O]]
  
  /** Use default value on error */
  def orElse[O1 >: O](default: O1): ZStream[R, Nothing, O1]
}

Timing and Scheduling

Time-based stream operations.

trait ZStreamOps[-R, +E, +O] {
  /** Rate limiting with backpressure */
  def throttleEnforce(units: Long, duration: Duration): ZStream[R with Clock, E, O]
  
  /** Traffic shaping */
  def throttleShape(units: Long, duration: Duration): ZStream[R with Clock, E, O]
  
  /** Debounce elements */
  def debounce(duration: Duration): ZStream[R with Clock, E, O]
  
  /** Timeout stream */
  def timeout(duration: Duration): ZStream[R with Clock, E, O]
  
  /** Delay elements */
  def delay(duration: Duration): ZStream[R with Clock, E, O]
  
  /** Schedule elements */
  def schedule[R1 <: R](schedule: Schedule[R1, O, Any]): ZStream[R1 with Clock, E, O]
  
  /** Repeat on schedule */
  def repeat[R1 <: R](schedule: Schedule[R1, O, Any]): ZStream[R1 with Clock, E, O]
  
  /** Time execution */
  def timed: ZStream[R with Clock, E, (Duration, O)]
}

Buffering

Stream buffering strategies for performance optimization.

trait ZStreamOps[-R, +E, +O] {
  /** Buffer with backpressure */
  def buffer(capacity: Int): ZStream[R, E, O]
  
  /** Dropping buffer (drop oldest when full) */
  def bufferDropping(capacity: Int): ZStream[R, E, O]
  
  /** Sliding window buffer */
  def bufferSliding(capacity: Int): ZStream[R, E, O]
  
  /** Unbounded buffer */
  def bufferUnbounded: ZStream[R, E, O]
  
  /** Buffer chunks */
  def bufferChunks(capacity: Int): ZStream[R, E, O]
}

Stream Execution

Terminal operations for consuming streams.

trait ZStreamOps[-R, +E, +O] {
  /** Run stream with sink */
  def run[R1 <: R, E1 >: E, B](sink: ZSink[R1, E1, O, Any, B]): ZIO[R1, E1, B]
  
  /** Collect all elements */
  def runCollect: ZIO[R, E, List[O]]
  
  /** Get first element */
  def runHead: ZIO[R, E, Option[O]]
  
  /** Get last element */
  def runLast: ZIO[R, E, Option[O]]
  
  /** Run and discard results */
  def runDrain: ZIO[R, E, Unit]
  
  /** Count elements */
  def runCount: ZIO[R, E, Long]
  
  /** Sum numeric elements */
  def runSum[O1 >: O](implicit ev: Numeric[O1]): ZIO[R, E, O1]
  
  /** Apply effect to each element */
  def foreach[R1 <: R, E1 >: E](f: O => ZIO[R1, E1, Any]): ZIO[R1, E1, Unit]
  
  /** Apply effect while predicate holds */
  def foreachWhile[R1 <: R, E1 >: E](f: O => ZIO[R1, E1, Boolean]): ZIO[R1, E1, Unit]
}

Resource Management

Safe resource handling and cleanup.

trait ZStreamOps[-R, +E, +O] {
  /** Ensure finalizer runs */
  def ensuring[R1 <: R](finalizer: ZIO[R1, Nothing, Any]): ZStream[R1, E, O]
  
  /** Bracket resource acquisition/release */
  def bracket[R1 <: R, A](acquire: ZIO[R1, E, A])(release: A => ZIO[R1, Nothing, Any]): ZStream[R1, E, A]
  
  /** Provide environment layer */
  def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZStream[R0, E, O]
  
  /** Provide environment */
  def provide[R1](env: R1)(implicit ev: R1 <:< R): ZStream[Any, E, O]
  
  /** Access environment */
  def access[R1 <: R, B](f: R1 => B): ZStream[R1, E, B]
  
  /** Timed execution with duration */
  def timed: ZStream[R with Clock, E, (Duration, O)]
  
  /** Summarized execution */
  def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZStream[R1, E1, (C, O)]
}

Usage Examples:

import zio._
import zio.stream._
import zio.duration._

// Create and transform streams
val numbers = ZStream.range(1, 100)
val evens = numbers.filter(_ % 2 == 0)
val doubled = evens.map(_ * 2)

// Combine streams
val stream1 = ZStream(1, 2, 3)
val stream2 = ZStream(4, 5, 6)
val combined = stream1 ++ stream2

// Error handling
val safeStream = ZStream.fail("error").catchAll(_ => ZStream.succeed(0))

// Timing operations
val throttled = numbers.throttleShape(10, 1.second)
val debounced = numbers.debounce(100.millis)

// Resource management
val managed = ZStream.managed(ZManaged.make(openFile)(closeFile))