0
# Core Streaming Operations
1
2
Comprehensive streaming operations for creating, transforming, combining, and executing streams. ZStream provides the foundational streaming abstraction in ZIO Streams.
3
4
## Capabilities
5
6
### Stream Creation
7
8
Factory methods for creating streams from various sources.
9
10
```scala { .api }
11
object ZStream {
12
/** Create stream from varargs values */
13
def apply[A](as: A*): UStream[A]
14
15
/** Create single-value stream */
16
def succeed[A](a: => A): UStream[A]
17
18
/** Create failed stream */
19
def fail[E](error: => E): Stream[E, Nothing]
20
21
/** Empty stream */
22
def empty: UStream[Nothing]
23
24
/** Never-ending stream */
25
def never: UStream[Nothing]
26
27
/** Stream from chunk */
28
def fromChunk[O](c: => Chunk[O]): UStream[O]
29
30
/** Stream from iterable */
31
def fromIterable[O](as: => Iterable[O]): UStream[O]
32
33
/** Stream from iterator */
34
def fromIterator[A](iterator: => Iterator[A]): UStream[A]
35
36
/** Stream from Java iterator */
37
def fromJavaIterator[A](iterator: => java.util.Iterator[A]): UStream[A]
38
39
/** Integer range stream */
40
def range(min: Int, max: Int, chunkSize: Int = DefaultChunkSize): UStream[Int]
41
}
42
```
43
44
### Effect-Based Creation
45
46
Create streams from ZIO effects and managed resources.
47
48
```scala { .api }
49
object ZStream {
50
/** Single effect as stream */
51
def fromEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]
52
53
/** Repeat effect as stream */
54
def repeatEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]
55
56
/** Unwrap effect containing stream */
57
def unwrap[R, E, A](fa: ZIO[R, E, ZStream[R, E, A]]): ZStream[R, E, A]
58
59
/** Stream from managed resource */
60
def managed[R, E, A](managed: ZManaged[R, E, A]): ZStream[R, E, A]
61
62
/** Stream from schedule */
63
def fromSchedule[R, A](schedule: Schedule[R, Any, A]): ZStream[R with Clock, Never, A]
64
65
/** Periodic ticks */
66
def tick(interval: Duration): ZStream[Clock, Never, Unit]
67
}
68
```
69
70
### Generators and Iteration
71
72
Functional generators for creating streams.
73
74
```scala { .api }
75
object ZStream {
76
/** Iterate function over seed value */
77
def iterate[A](a: A)(f: A => A): UStream[A]
78
79
/** Unfold state function */
80
def unfold[S, A](s: S)(f: S => Option[(A, S)]): UStream[A]
81
82
/** Effectful unfold */
83
def unfoldM[R, E, S, A](s: S)(f: S => ZIO[R, E, Option[(A, S)]]): ZStream[R, E, A]
84
85
/** Paginate values */
86
def paginate[A](a: A)(f: A => (A, Option[A])): UStream[A]
87
88
/** Paginate with effects */
89
def paginateM[R, E, A](a: A)(f: A => ZIO[R, E, (A, Option[A])]): ZStream[R, E, A]
90
}
91
```
92
93
### Core Transformations
94
95
Essential stream transformation operations.
96
97
```scala { .api }
98
trait ZStreamOps[-R, +E, +O] {
99
/** Transform each element */
100
def map[B](f: O => B): ZStream[R, E, B]
101
102
/** Effectful element transformation */
103
def mapM[R1 <: R, E1 >: E, B](f: O => ZIO[R1, E1, B]): ZStream[R1, E1, B]
104
105
/** Transform chunks */
106
def mapChunks[O2](f: Chunk[O] => Chunk[O2]): ZStream[R, E, O2]
107
108
/** Monadic bind */
109
def flatMap[R1 <: R, E1 >: E, O2](f: O => ZStream[R1, E1, O2]): ZStream[R1, E1, O2]
110
111
/** Collect with partial function */
112
def collect[B](pf: PartialFunction[O, B]): ZStream[R, E, B]
113
114
/** Effectful collect */
115
def collectM[R1 <: R, E1 >: E, B](pf: PartialFunction[O, ZIO[R1, E1, B]]): ZStream[R1, E1, B]
116
117
/** Filter elements */
118
def filter(predicate: O => Boolean): ZStream[R, E, O]
119
120
/** Effectful filter */
121
def filterM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]
122
}
123
```
124
125
### Stateful Operations
126
127
Operations that maintain state across stream elements.
128
129
```scala { .api }
130
trait ZStreamOps[-R, +E, +O] {
131
/** Stateful scanning */
132
def scan[S](s: S)(f: (S, O) => S): ZStream[R, E, S]
133
134
/** Effectful stateful scanning */
135
def scanM[R1 <: R, E1 >: E, S](s: S)(f: (S, O) => ZIO[R1, E1, S]): ZStream[R1, E1, S]
136
137
/** Scan with early termination */
138
def scanReduce[O1 >: O](f: (O1, O1) => O1): ZStream[R, E, O1]
139
140
/** Effectful scan with early termination */
141
def scanReduceM[R1 <: R, E1 >: E, O1 >: O](f: (O1, O1) => ZIO[R1, E1, O1]): ZStream[R1, E1, O1]
142
143
/** Accumulate elements */
144
def mapAccum[S, B](s: S)(f: (S, O) => (S, B)): ZStream[R, E, B]
145
146
/** Effectful accumulation */
147
def mapAccumM[R1 <: R, E1 >: E, S, B](s: S)(f: (S, O) => ZIO[R1, E1, (S, B)]): ZStream[R1, E1, B]
148
}
149
```
150
151
### Stream Combination
152
153
Combine multiple streams in various ways.
154
155
```scala { .api }
156
trait ZStreamOps[-R, +E, +O] {
157
/** Concatenate streams sequentially */
158
def ++[R1 <: R, E1 >: E, O1 >: O](that: => ZStream[R1, E1, O1]): ZStream[R1, E1, O1]
159
160
/** Zip with another stream */
161
def zip[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, (O, O2)]
162
163
/** Zip with function */
164
def zipWith[R1 <: R, E1 >: E, O2, C](that: ZStream[R1, E1, O2])(f: (O, O2) => C): ZStream[R1, E1, C]
165
166
/** Zip keeping left */
167
def zipLeft[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, O]
168
169
/** Zip keeping right */
170
def zipRight[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, O2]
171
172
/** Zip with index */
173
def zipWithIndex: ZStream[R, E, (O, Long)]
174
175
/** Merge streams concurrently */
176
def merge[R1 <: R, E1 >: E, O1 >: O](that: ZStream[R1, E1, O1]): ZStream[R1, E1, O1]
177
178
/** Interleave elements */
179
def interleave[R1 <: R, E1 >: E, O1 >: O](that: ZStream[R1, E1, O1]): ZStream[R1, E1, O1]
180
181
/** Cross product */
182
def cross[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, (O, O2)]
183
}
184
```
185
186
### Partitioning and Slicing
187
188
Operations for taking subsets of stream elements.
189
190
```scala { .api }
191
trait ZStreamOps[-R, +E, +O] {
192
/** Take first n elements */
193
def take(n: Long): ZStream[R, E, O]
194
195
/** Take while predicate holds */
196
def takeWhile(predicate: O => Boolean): ZStream[R, E, O]
197
198
/** Effectful take while */
199
def takeWhileM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]
200
201
/** Drop first n elements */
202
def drop(n: Long): ZStream[R, E, O]
203
204
/** Drop while predicate holds */
205
def dropWhile(predicate: O => Boolean): ZStream[R, E, O]
206
207
/** Effectful drop while */
208
def dropWhileM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]
209
210
/** Group into chunks */
211
def grouped(chunkSize: Int): ZStream[R, E, Chunk[O]]
212
213
/** Group by key */
214
def groupByKey[K](f: O => K): ZStream[R, E, (K, Chunk[O])]
215
216
/** Group adjacent by key */
217
def groupAdjacentBy[K](f: O => K): ZStream[R, E, (K, NonEmptyChunk[O])]
218
}
219
```
220
221
### Error Handling
222
223
Comprehensive error handling capabilities.
224
225
```scala { .api }
226
trait ZStreamOps[-R, +E, +O] {
227
/** Handle all errors */
228
def catchAll[R1 <: R, E2, O1 >: O](h: E => ZStream[R1, E2, O1]): ZStream[R1, E2, O1]
229
230
/** Handle some errors */
231
def catchSome[R1 <: R, E1 >: E, O1 >: O](pf: PartialFunction[E, ZStream[R1, E1, O1]]): ZStream[R1, E1, O1]
232
233
/** Fallback stream */
234
def orElse[R1 <: R, E2, O1 >: O](that: => ZStream[R1, E2, O1]): ZStream[R1, E2, O1]
235
236
/** Transform errors */
237
def mapError[E2](f: E => E2): ZStream[R, E2, O]
238
239
/** Effectful error transformation */
240
def mapErrorM[R1 <: R, E2](f: E => ZIO[R1, Nothing, E2]): ZStream[R1, E2, O]
241
242
/** Retry with schedule */
243
def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZStream[R1, E, O]
244
245
/** Ignore errors */
246
def ignore: ZStream[R, Nothing, O]
247
248
/** Convert to option on error */
249
def option: ZStream[R, Nothing, Option[O]]
250
251
/** Use default value on error */
252
def orElse[O1 >: O](default: O1): ZStream[R, Nothing, O1]
253
}
254
```
255
256
### Timing and Scheduling
257
258
Time-based stream operations.
259
260
```scala { .api }
261
trait ZStreamOps[-R, +E, +O] {
262
/** Rate limiting with backpressure */
263
def throttleEnforce(units: Long, duration: Duration): ZStream[R with Clock, E, O]
264
265
/** Traffic shaping */
266
def throttleShape(units: Long, duration: Duration): ZStream[R with Clock, E, O]
267
268
/** Debounce elements */
269
def debounce(duration: Duration): ZStream[R with Clock, E, O]
270
271
/** Timeout stream */
272
def timeout(duration: Duration): ZStream[R with Clock, E, O]
273
274
/** Delay elements */
275
def delay(duration: Duration): ZStream[R with Clock, E, O]
276
277
/** Schedule elements */
278
def schedule[R1 <: R](schedule: Schedule[R1, O, Any]): ZStream[R1 with Clock, E, O]
279
280
/** Repeat on schedule */
281
def repeat[R1 <: R](schedule: Schedule[R1, O, Any]): ZStream[R1 with Clock, E, O]
282
283
/** Time execution */
284
def timed: ZStream[R with Clock, E, (Duration, O)]
285
}
286
```
287
288
### Buffering
289
290
Stream buffering strategies for performance optimization.
291
292
```scala { .api }
293
trait ZStreamOps[-R, +E, +O] {
294
/** Buffer with backpressure */
295
def buffer(capacity: Int): ZStream[R, E, O]
296
297
/** Dropping buffer (drop oldest when full) */
298
def bufferDropping(capacity: Int): ZStream[R, E, O]
299
300
/** Sliding window buffer */
301
def bufferSliding(capacity: Int): ZStream[R, E, O]
302
303
/** Unbounded buffer */
304
def bufferUnbounded: ZStream[R, E, O]
305
306
/** Buffer chunks */
307
def bufferChunks(capacity: Int): ZStream[R, E, O]
308
}
309
```
310
311
### Stream Execution
312
313
Terminal operations for consuming streams.
314
315
```scala { .api }
316
trait ZStreamOps[-R, +E, +O] {
317
/** Run stream with sink */
318
def run[R1 <: R, E1 >: E, B](sink: ZSink[R1, E1, O, Any, B]): ZIO[R1, E1, B]
319
320
/** Collect all elements */
321
def runCollect: ZIO[R, E, List[O]]
322
323
/** Get first element */
324
def runHead: ZIO[R, E, Option[O]]
325
326
/** Get last element */
327
def runLast: ZIO[R, E, Option[O]]
328
329
/** Run and discard results */
330
def runDrain: ZIO[R, E, Unit]
331
332
/** Count elements */
333
def runCount: ZIO[R, E, Long]
334
335
/** Sum numeric elements */
336
def runSum[O1 >: O](implicit ev: Numeric[O1]): ZIO[R, E, O1]
337
338
/** Apply effect to each element */
339
def foreach[R1 <: R, E1 >: E](f: O => ZIO[R1, E1, Any]): ZIO[R1, E1, Unit]
340
341
/** Apply effect while predicate holds */
342
def foreachWhile[R1 <: R, E1 >: E](f: O => ZIO[R1, E1, Boolean]): ZIO[R1, E1, Unit]
343
}
344
```
345
346
### Resource Management
347
348
Safe resource handling and cleanup.
349
350
```scala { .api }
351
trait ZStreamOps[-R, +E, +O] {
352
/** Ensure finalizer runs */
353
def ensuring[R1 <: R](finalizer: ZIO[R1, Nothing, Any]): ZStream[R1, E, O]
354
355
/** Bracket resource acquisition/release */
356
def bracket[R1 <: R, A](acquire: ZIO[R1, E, A])(release: A => ZIO[R1, Nothing, Any]): ZStream[R1, E, A]
357
358
/** Provide environment layer */
359
def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZStream[R0, E, O]
360
361
/** Provide environment */
362
def provide[R1](env: R1)(implicit ev: R1 <:< R): ZStream[Any, E, O]
363
364
/** Access environment */
365
def access[R1 <: R, B](f: R1 => B): ZStream[R1, E, B]
366
367
/** Timed execution with duration */
368
def timed: ZStream[R with Clock, E, (Duration, O)]
369
370
/** Summarized execution */
371
def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZStream[R1, E1, (C, O)]
372
}
373
```
374
375
**Usage Examples:**
376
377
```scala
378
import zio._
379
import zio.stream._
380
import zio.duration._
381
382
// Create and transform streams
383
val numbers = ZStream.range(1, 100)
384
val evens = numbers.filter(_ % 2 == 0)
385
val doubled = evens.map(_ * 2)
386
387
// Combine streams
388
val stream1 = ZStream(1, 2, 3)
389
val stream2 = ZStream(4, 5, 6)
390
val combined = stream1 ++ stream2
391
392
// Error handling
393
val safeStream = ZStream.fail("error").catchAll(_ => ZStream.succeed(0))
394
395
// Timing operations
396
val throttled = numbers.throttleShape(10, 1.second)
397
val debounced = numbers.debounce(100.millis)
398
399
// Resource management
400
val managed = ZStream.managed(ZManaged.make(openFile)(closeFile))
401
```