0
# ZIO Streams
1
2
ZIO Streams provides composable, resource-safe streaming data processing with comprehensive backpressure handling and error recovery. Streams are pull-based, interruptible, and integrate seamlessly with the ZIO effect system.
3
4
## Capabilities
5
6
### ZStream - Core Streaming Type
7
8
The foundational streaming type representing potentially infinite sequences of values with environment, error, and element types.
9
10
```scala { .api }
11
/**
12
* A ZStream represents a lazy, potentially infinite sequence of values of type A
13
* that can fail with E and requires environment R
14
*/
15
sealed trait ZStream[-R, +E, +A] {
16
/** Transform each element with a function */
17
def map[B](f: A => B): ZStream[R, E, B]
18
19
/** Transform each element with an effect */
20
def mapZIO[R1 <: R, E1 >: E, B](f: A => ZIO[R1, E1, B]): ZStream[R1, E1, B]
21
22
/** Filter elements based on a predicate */
23
def filter(f: A => Boolean): ZStream[R, E, A]
24
25
/** Take the first n elements */
26
def take(n: => Long): ZStream[R, E, A]
27
28
/** Skip the first n elements */
29
def drop(n: => Long): ZStream[R, E, A]
30
31
/** Concatenate with another stream */
32
def ++[R1 <: R, E1 >: E, A1 >: A](that: => ZStream[R1, E1, A1]): ZStream[R1, E1, A1]
33
34
/** Transform errors */
35
def mapError[E2](f: E => E2): ZStream[R, E2, A]
36
37
/** Recover from errors */
38
def catchAll[R1 <: R, E2, A1 >: A](f: E => ZStream[R1, E2, A1]): ZStream[R1, E2, A1]
39
40
/** Merge with another stream */
41
def merge[R1 <: R, E1 >: E, A1 >: A](that: ZStream[R1, E1, A1]): ZStream[R1, E1, A1]
42
43
/** Broadcast elements to multiple streams */
44
def broadcast(n: Int): ZIO[R with Scope, Nothing, List[ZStream[Any, E, A]]]
45
46
/** Group elements into chunks */
47
def grouped(n: => Int): ZStream[R, E, Chunk[A]]
48
49
/** Run the stream with a sink */
50
def run[R1 <: R, E1 >: E, Z](sink: => ZSink[R1, E1, A, Any, Z]): ZIO[R1, E1, Z]
51
52
/** Convert to a ZIO effect collecting all elements */
53
def runCollect: ZIO[R, E, Chunk[A]]
54
55
/** Execute foreach on each element */
56
def runForeach[R1 <: R, E1 >: E](f: A => ZIO[R1, E1, Any]): ZIO[R1, E1, Unit]
57
}
58
59
// Type aliases for common patterns
60
type Stream[+E, +A] = ZStream[Any, E, A]
61
type UStream[+A] = ZStream[Any, Nothing, A]
62
```
63
64
**Usage Examples:**
65
66
```scala
67
import zio._
68
import zio.stream._
69
70
// Create streams from various sources
71
val numbers = ZStream.range(1, 100)
72
val fromIterable = ZStream.fromIterable(List(1, 2, 3, 4, 5))
73
val fromEffect = ZStream.fromZIO(ZIO.succeed(42))
74
75
// Transform and process streams
76
val processed = ZStream.range(1, 1000)
77
.filter(_ % 2 == 0)
78
.map(_ * 2)
79
.take(10)
80
.runCollect
81
82
// Merge multiple streams
83
val merged = ZStream.range(1, 10)
84
.merge(ZStream.range(100, 110))
85
.runCollect
86
```
87
88
### Stream Construction
89
90
Create streams from various data sources including iterables, effects, and external resources.
91
92
```scala { .api }
93
/**
94
* Stream construction methods
95
*/
96
object ZStream {
97
/** Create a stream from an iterable */
98
def fromIterable[A](as: => Iterable[A]): UStream[A]
99
100
/** Create a stream from a single ZIO effect */
101
def fromZIO[R, E, A](zio: => ZIO[R, E, A]): ZStream[R, E, A]
102
103
/** Create a stream from a range of integers */
104
def range(min: Int, max: Int): UStream[Int]
105
106
/** Create a stream that repeats a value forever */
107
def repeat[A](a: => A): UStream[A]
108
109
/** Create a stream that repeats an effect forever */
110
def repeatZIO[R, E, A](zio: => ZIO[R, E, A]): ZStream[R, E, A]
111
112
/** Create a stream with a single value */
113
def succeed[A](a: => A): UStream[A]
114
115
/** Create a failing stream */
116
def fail[E](error: => E): Stream[E, Nothing]
117
118
/** Create an empty stream */
119
val empty: UStream[Nothing]
120
121
/** Create a stream from a Java InputStream */
122
def fromInputStream(is: InputStream, chunkSize: => Int): Stream[IOException, Byte]
123
124
/** Create a stream from file lines */
125
def fromPath(path: Path): Stream[IOException, String]
126
127
/** Create a stream that emits at regular intervals */
128
def tick(interval: => Duration): ZStream[Any, Nothing, Unit]
129
130
/** Create a stream from an async callback */
131
def async[R, E, A](
132
register: (ZIO[R, Option[E], Chunk[A]] => Unit) => Unit
133
): ZStream[R, E, A]
134
}
135
```
136
137
**Usage Examples:**
138
139
```scala
140
// File streaming
141
val fileContent = ZStream.fromPath(Paths.get("data.txt"))
142
.via(ZPipeline.utf8Decode)
143
.runForeach(Console.printLine(_))
144
145
// Periodic emissions
146
val heartbeat = ZStream.tick(1.second)
147
.zipWith(ZStream.range(1, Int.MaxValue))((_, n) => s"Heartbeat $n")
148
.runForeach(Console.printLine(_))
149
150
// Async stream from callback
151
val asyncStream = ZStream.async[Any, Nothing, String] { callback =>
152
// Register callback with external system
153
externalSystem.onData(data => callback(ZIO.succeed(Chunk(data))))
154
}
155
```
156
157
### ZSink - Stream Consumer
158
159
Sinks consume streams and produce results, providing backpressure and resource management.
160
161
```scala { .api }
162
/**
163
* A ZSink consumes values from a stream and produces a result
164
*/
165
sealed trait ZSink[-R, +E, -In, +L, +Z] {
166
/** Transform the result of the sink */
167
def map[Z2](f: Z => Z2): ZSink[R, E, In, L, Z2]
168
169
/** Transform the result with an effect */
170
def mapZIO[R1 <: R, E1 >: E, Z2](f: Z => ZIO[R1, E1, Z2]): ZSink[R1, E1, In, L, Z2]
171
172
/** Transform errors */
173
def mapError[E2](f: E => E2): ZSink[R, E2, In, L, Z]
174
175
/** Recover from errors */
176
def orElse[R1 <: R, E2, In1 <: In, L1 >: L, Z1 >: Z](
177
that: => ZSink[R1, E2, In1, L1, Z1]
178
): ZSink[R1, E2, In1, L1, Z1]
179
180
/** Combine with another sink in parallel */
181
def zipPar[R1 <: R, E1 >: E, In1 <: In, L1 >: L, Z2](
182
that: => ZSink[R1, E1, In1, L1, Z2]
183
): ZSink[R1, E1, In1, L1, (Z, Z2)]
184
185
/** Execute the sink on a stream */
186
def apply[R1 <: R, E1 >: E, In1 <: In](
187
stream: ZStream[R1, E1, In1]
188
): ZIO[R1, E1, Z]
189
}
190
191
/**
192
* Common sink constructors
193
*/
194
object ZSink {
195
/** Collect all elements into a Chunk */
196
def collectAll[A]: ZSink[Any, Nothing, A, Nothing, Chunk[A]]
197
198
/** Count the number of elements */
199
val count: ZSink[Any, Nothing, Any, Nothing, Long]
200
201
/** Take the first n elements */
202
def take[A](n: => Long): ZSink[Any, Nothing, A, A, Chunk[A]]
203
204
/** Fold over elements */
205
def fold[S, A](s: => S)(f: (S, A) => S): ZSink[Any, Nothing, A, Nothing, S]
206
207
/** Execute a side effect for each element */
208
def foreach[R, E, A](f: A => ZIO[R, E, Any]): ZSink[R, E, A, A, Unit]
209
210
/** Write to an OutputStream */
211
def fromOutputStream(os: OutputStream): ZSink[Any, IOException, Byte, Nothing, Unit]
212
213
/** Write to a file */
214
def fromPath(path: Path): ZSink[Any, IOException, Byte, Byte, Unit]
215
}
216
```
217
218
**Usage Examples:**
219
220
```scala
221
// Collect stream results
222
val collected = ZStream.range(1, 100)
223
.run(ZSink.collectAll)
224
225
// Count elements
226
val elementCount = ZStream.fromIterable(someList)
227
.run(ZSink.count)
228
229
// Fold/reduce stream
230
val sum = ZStream.range(1, 101)
231
.run(ZSink.fold(0)(_ + _))
232
233
// Side effects during consumption
234
val logged = ZStream.range(1, 10)
235
.run(ZSink.foreach(n => Console.printLine(s"Processing: $n")))
236
```
237
238
### ZPipeline - Stream Transformation
239
240
Pipelines provide reusable stream transformations that can be composed and applied to multiple streams.
241
242
```scala { .api }
243
/**
244
* A ZPipeline transforms one stream into another
245
*/
246
sealed trait ZPipeline[-R, +E, -In, +Out] {
247
/** Compose with another pipeline */
248
def >>>[R1 <: R, E1 >: E, Out2](
249
that: ZPipeline[R1, E1, Out, Out2]
250
): ZPipeline[R1, E1, In, Out2]
251
252
/** Apply this pipeline to a stream */
253
def apply[R1 <: R, E1 >: E, In1 <: In](
254
stream: ZStream[R1, E1, In1]
255
): ZStream[R1, E1, Out]
256
}
257
258
/**
259
* Common pipeline constructors
260
*/
261
object ZPipeline {
262
/** Identity pipeline that passes through all elements */
263
def identity[A]: ZPipeline[Any, Nothing, A, A]
264
265
/** Map each element */
266
def map[In, Out](f: In => Out): ZPipeline[Any, Nothing, In, Out]
267
268
/** Filter elements */
269
def filter[A](f: A => Boolean): ZPipeline[Any, Nothing, A, A]
270
271
/** Take the first n elements */
272
def take[A](n: => Long): ZPipeline[Any, Nothing, A, A]
273
274
/** Drop the first n elements */
275
def drop[A](n: => Long): ZPipeline[Any, Nothing, A, A]
276
277
/** Group elements into chunks */
278
def grouped[A](n: => Int): ZPipeline[Any, Nothing, A, Chunk[A]]
279
280
/** Decode UTF-8 bytes to strings */
281
val utf8Decode: ZPipeline[Any, CharacterCodingException, Byte, String]
282
283
/** Encode strings to UTF-8 bytes */
284
val utf8Encode: ZPipeline[Any, CharacterCodingException, String, Byte]
285
286
/** Split strings by lines */
287
val splitLines: ZPipeline[Any, Nothing, String, String]
288
289
/** Compress using gzip */
290
val gzip: ZPipeline[Any, Nothing, Byte, Byte]
291
292
/** Decompress gzip */
293
val gunzip: ZPipeline[Any, IOException, Byte, Byte]
294
}
295
```
296
297
**Usage Examples:**
298
299
```scala
300
// Compose pipelines
301
val textProcessor = ZPipeline.utf8Decode >>>
302
ZPipeline.splitLines >>>
303
ZPipeline.filter(_.nonEmpty)
304
305
// Process file with pipeline
306
val processedFile = ZStream.fromPath(Paths.get("input.txt"))
307
.via(textProcessor)
308
.runForeach(Console.printLine(_))
309
310
// Reusable transformation
311
val numberProcessor = ZPipeline.map[String, Int](_.toInt)
312
.filter(_ > 0)
313
.map(_ * 2)
314
315
val processedNumbers = ZStream.fromIterable(List("1", "2", "3"))
316
.via(numberProcessor)
317
.runCollect
318
```
319
320
### Advanced Stream Operations
321
322
Sophisticated stream operations for complex data processing scenarios.
323
324
```scala { .api }
325
/**
326
* Advanced stream transformations and combinations
327
*/
328
trait ZStream[-R, +E, +A] {
329
/** Buffer elements up to a maximum size */
330
def buffer(capacity: => Int): ZStream[R, E, A]
331
332
/** Throttle stream to emit at most n elements per duration */
333
def throttleEnforce(n: => Long, duration: => Duration): ZStream[R, E, A]
334
335
/** Debounce rapid emissions */
336
def debounce(d: => Duration): ZStream[R, E, A]
337
338
/** Sliding window of elements */
339
def sliding(n: => Int): ZStream[R, E, Chunk[A]]
340
341
/** Aggregate elements with a schedule */
342
def aggregateAsync[R1 <: R, E1 >: E, B, C](
343
sink: => ZSink[R1, E1, A, A, B]
344
): ZStream[R1, E1, B]
345
346
/** Partition elements into multiple streams */
347
def partition[A1 >: A](
348
p: A1 => Boolean
349
): ZIO[R with Scope, Nothing, (ZStream[Any, E, A1], ZStream[Any, E, A1])]
350
351
/** Zip with another stream */
352
def zip[R1 <: R, E1 >: E, B](
353
that: ZStream[R1, E1, B]
354
): ZStream[R1, E1, (A, B)]
355
356
/** Interleave with another stream */
357
def interleave[R1 <: R, E1 >: E, A1 >: A](
358
that: ZStream[R1, E1, A1]
359
): ZStream[R1, E1, A1]
360
}
361
```
362
363
**Usage Examples:**
364
365
```scala
366
// Rate limiting and buffering
367
val rateLimited = dataStream
368
.buffer(1000)
369
.throttleEnforce(100, 1.second)
370
.runForeach(processData)
371
372
// Windowing operations
373
val slidingAverage = numberStream
374
.sliding(5)
375
.map(chunk => chunk.sum / chunk.size.toDouble)
376
.runCollect
377
378
// Complex stream orchestration
379
val combined = for {
380
(evenStream, oddStream) <- numberStream.partition(_ % 2 == 0)
381
evens <- evenStream.runCollect.fork
382
odds <- oddStream.runCollect.fork
383
evenResults <- evens.join
384
oddResults <- odds.join
385
} yield (evenResults, oddResults)
386
```