0
# Stream Consumption
1
2
Comprehensive sink operations for consuming streams and producing results. ZSink provides powerful abstractions for collecting, folding, and processing stream elements.
3
4
## Capabilities
5
6
### Basic Collectors
7
8
Essential sinks for collecting stream elements.
9
10
```scala { .api }
11
object ZSink {
12
/** Collect all elements into a chunk */
13
def collectAll[A]: Sink[Nothing, A, Nothing, Chunk[A]]
14
15
/** Collect all elements into a map by key */
16
def collectAllToMap[A, K](key: A => K)(f: (A, A) => A): Sink[Nothing, A, Nothing, Map[K, A]]
17
18
/** Collect all elements into a set */
19
def collectAllToSet[A]: Sink[Nothing, A, Nothing, Set[A]]
20
21
/** Collect first n elements */
22
def take[A](n: Int): Sink[Nothing, A, A, Chunk[A]]
23
24
/** Get first element */
25
def head[A]: Sink[Nothing, A, A, Option[A]]
26
27
/** Get last element */
28
def last[A]: Sink[Nothing, A, Nothing, Option[A]]
29
}
30
```
31
32
### Folding Operations
33
34
Sinks that fold stream elements into a single result.
35
36
```scala { .api }
37
object ZSink {
38
/** Fold with continuation function */
39
def fold[A, S](z: S)(contFn: S => Boolean)(f: (S, A) => S): Sink[Nothing, A, A, S]
40
41
/** Left fold without continuation */
42
def foldLeft[A, S](z: S)(f: (S, A) => S): Sink[Nothing, A, Nothing, S]
43
44
/** Effectful fold */
45
def foldM[R, E, A, S](z: S)(contFn: S => Boolean)(f: (S, A) => ZIO[R, E, S]): ZSink[R, E, A, A, S]
46
47
/** Effectful left fold */
48
def foldLeftM[R, E, A, S](z: S)(f: (S, A) => ZIO[R, E, S]): ZSink[R, E, A, Nothing, S]
49
50
/** Fold chunks */
51
def foldChunks[A, S](z: S)(contFn: S => Boolean)(f: (S, Chunk[A]) => S): Sink[Nothing, A, A, S]
52
53
/** Effectful chunk folding */
54
def foldChunksM[R, E, A, S](z: S)(contFn: S => Boolean)(f: (S, Chunk[A]) => ZIO[R, E, S]): ZSink[R, E, A, A, S]
55
56
/** Reduce elements (requires non-empty stream) */
57
def foldUntil[A, S](z: S, max: Int)(f: (S, A) => S): Sink[Nothing, A, A, S]
58
59
/** Weighted fold with cost function */
60
def foldWeighted[A, S](z: S)(costFn: A => Long, max: Long)(f: (S, A) => S): Sink[Nothing, A, A, S]
61
}
62
```
63
64
### Numeric Operations
65
66
Specialized sinks for numeric computations.
67
68
```scala { .api }
69
object ZSink {
70
/** Sum all numeric elements */
71
def sum[A](implicit A: Numeric[A]): Sink[Nothing, A, Nothing, A]
72
}
73
```
74
75
### Effect-Based Sinks
76
77
Sinks that perform effects on stream elements.
78
79
```scala { .api }
80
object ZSink {
81
/** Create sink from effect */
82
def fromEffect[R, E, Z](b: => ZIO[R, E, Z]): ZSink[R, E, Any, Nothing, Z]
83
84
/** Apply effect to each element */
85
def foreach[R, E, A](f: A => ZIO[R, E, Any]): ZSink[R, E, A, Nothing, Unit]
86
87
/** Apply effect while predicate holds */
88
def foreachWhile[R, E, A](f: A => ZIO[R, E, Boolean]): ZSink[R, E, A, A, Unit]
89
90
/** Apply effectful function to chunks */
91
def foreachChunk[R, E, A](f: Chunk[A] => ZIO[R, E, Any]): ZSink[R, E, A, Nothing, Unit]
92
93
/** Apply effectful function while chunk predicate holds */
94
def foreachChunkWhile[R, E, A](f: Chunk[A] => ZIO[R, E, Boolean]): ZSink[R, E, A, A, Unit]
95
}
96
```
97
98
### Resource-Based Sinks
99
100
Sinks that work with managed resources.
101
102
```scala { .api }
103
object ZSink {
104
/** Create sink from managed resource */
105
def managed[R, E, A, Z](managed: ZManaged[R, E, ZSink[R, E, A, Any, Z]]): ZSink[R, E, A, Any, Z]
106
107
/** Sink from Hub */
108
def fromHub[A](hub: Hub[A]): Sink[Nothing, A, Nothing, Unit]
109
110
/** Sink from Queue */
111
def fromQueue[A](queue: Queue[A]): Sink[Nothing, A, Nothing, Unit]
112
113
/** Bracketed sink creation */
114
def bracket[R, E, A, Z](acquire: ZIO[R, E, A])(release: A => ZIO[R, Nothing, Any])(use: A => ZSink[R, E, Any, Any, Z]): ZSink[R, E, Any, Any, Z]
115
}
116
```
117
118
### Sink Transformations
119
120
Transform sink behavior and results.
121
122
```scala { .api }
123
trait ZSinkOps[-R, +E, -I, +L, +Z] {
124
/** Transform result */
125
def map[Z2](f: Z => Z2): ZSink[R, E, I, L, Z2]
126
127
/** Effectful result transformation */
128
def mapM[R1 <: R, E1 >: E, Z2](f: Z => ZIO[R1, E1, Z2]): ZSink[R1, E1, I, L, Z2]
129
130
/** Transform errors */
131
def mapError[E2](f: E => E2): ZSink[R, E2, I, L, Z]
132
133
/** Transform input */
134
def contramap[I2](f: I2 => I): ZSink[R, E, I2, L, Z]
135
136
/** Effectful input transformation */
137
def contramapM[R1 <: R, E1 >: E, I2](f: I2 => ZIO[R1, E1, I]): ZSink[R1, E1, I2, L, Z]
138
139
/** Transform chunks */
140
def contramapChunks[I2](f: Chunk[I2] => Chunk[I]): ZSink[R, E, I2, L, Z]
141
142
/** Dimap both input and output */
143
def dimap[I2, Z2](f: I2 => I)(g: Z => Z2): ZSink[R, E, I2, L, Z2]
144
}
145
```
146
147
### Sink Combination
148
149
Combine multiple sinks in various ways.
150
151
```scala { .api }
152
trait ZSinkOps[-R, +E, -I, +L, +Z] {
153
/** Zip sinks sequentially */
154
def zip[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, (Z, Z2)]
155
156
/** Zip sinks with function */
157
def zipWith[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2, Z3](that: ZSink[R1, E1, I1, L1, Z2])(f: (Z, Z2) => Z3): ZSink[R1, E1, I1, L1, Z3]
158
159
/** Zip sinks in parallel */
160
def zipPar[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, (Z, Z2)]
161
162
/** Zip keeping left result */
163
def zipLeft[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, Z]
164
165
/** Zip keeping right result */
166
def zipRight[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, Z2]
167
168
/** Race sinks (first to complete wins) */
169
def race[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z1 >: Z](that: ZSink[R1, E1, I1, L1, Z1]): ZSink[R1, E1, I1, L1, Z1]
170
171
/** Fallback sink */
172
def orElse[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z1 >: Z](that: ZSink[R1, E1, I1, L1, Z1]): ZSink[R1, E1, I1, L1, Z1]
173
}
174
```
175
176
### Monadic Operations
177
178
Monadic composition for sink chaining.
179
180
```scala { .api }
181
trait ZSinkOps[-R, +E, -I, +L, +Z] {
182
/** Monadic bind */
183
def flatMap[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](f: Z => ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, Z2]
184
185
/** Chain with function */
186
def andThen[R1 <: R, E1 >: E, Z2](f: Z => ZSink[R1, E1, L, Any, Z2]): ZSink[R1, E1, I, Any, Z2]
187
188
/** Provide sink result as input to function */
189
def foldM[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](z: Z2)(contFn: Z2 => Boolean)(f: (Z2, Z) => ZIO[R1, E1, Z2]): ZSink[R1, E1, I1, L1, Z2]
190
}
191
```
192
193
### Error Handling
194
195
Error handling operations for sinks.
196
197
```scala { .api }
198
trait ZSinkOps[-R, +E, -I, +L, +Z] {
199
/** Handle all errors */
200
def catchAll[R1 <: R, E2, I1 <: I, L1 >: L, Z1 >: Z](h: E => ZSink[R1, E2, I1, L1, Z1]): ZSink[R1, E2, I1, L1, Z1]
201
202
/** Retry sink on failure */
203
def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZSink[R1, E, I, L, Z]
204
205
/** Convert to option on error */
206
def option: ZSink[R, Nothing, I, L, Option[Z]]
207
208
/** Use either for error handling */
209
def either: ZSink[R, Nothing, I, L, Either[E, Z]]
210
}
211
```
212
213
### Resource Management
214
215
Resource management for sinks.
216
217
```scala { .api }
218
trait ZSinkOps[-R, +E, -I, +L, +Z] {
219
/** Provide environment */
220
def provide[R1](env: R1)(implicit ev: R1 <:< R): ZSink[Any, E, I, L, Z]
221
222
/** Provide layer */
223
def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZSink[R0, E, I, L, Z]
224
225
/** Time sink execution */
226
def timed: ZSink[R with Clock, E, I, L, (Duration, Z)]
227
228
/** Summarize sink execution */
229
def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZSink[R1, E1, I, L, (C, Z)]
230
231
/** Convert to transducer */
232
def toTransducer: ZTransducer[R, E, I, L]
233
}
234
```
235
236
### Type Definitions
237
238
Core types used by sinks.
239
240
```scala { .api }
241
object ZSink {
242
/** Push interface for sink implementation */
243
type Push[R, E, I, L, Z] = Option[Chunk[I]] => ZIO[R, (Either[E, Z], Chunk[L]), Chunk[L]]
244
245
/** Sink that ignores input and produces unit */
246
val drain: Sink[Nothing, Any, Nothing, Unit] = ZSink.foreach(_ => ZIO.unit)
247
248
/** Identity sink that passes through all input */
249
def identity[A]: Sink[Nothing, A, Nothing, Chunk[A]] = collectAllToChunk[A]
250
251
/** Never-completing sink */
252
def never: Sink[Nothing, Any, Nothing, Nothing] = ZSink.fromEffect(ZIO.never)
253
254
/** Immediately succeeding sink */
255
def succeed[Z](z: => Z): Sink[Nothing, Any, Nothing, Z] = fromEffect(ZIO.succeed(z))
256
257
/** Immediately failing sink */
258
def fail[E](e: => E): Sink[E, Any, Nothing, Nothing] = fromEffect(ZIO.fail(e))
259
}
260
```
261
262
**Usage Examples:**
263
264
```scala
265
import zio._
266
import zio.stream._
267
268
// Basic collectors
269
val numbers = ZStream.range(1, 10)
270
val allNumbers: UIO[List[Int]] = numbers.run(ZSink.collectAll)
271
val sum: UIO[Int] = numbers.run(ZSink.sum)
272
273
// Folding operations
274
val evenSum: UIO[Int] = numbers.run(
275
ZSink.foldLeft(0)((acc, n) => if (n % 2 == 0) acc + n else acc)
276
)
277
278
// Take operations
279
val firstThree: UIO[List[Int]] = numbers.run(ZSink.take(3))
280
val firstEven: UIO[Option[Int]] = numbers.filter(_ % 2 == 0).run(ZSink.head)
281
282
// Effectful processing
283
val logged: UIO[Unit] = numbers.run(
284
ZSink.foreach(n => Console.printLine(s"Processing: $n"))
285
)
286
287
// Sink combination
288
val combined: UIO[(Int, Long)] = numbers.run(
289
ZSink.sum.zip(ZSink.count)
290
)
291
292
// Error handling
293
val safeSum: UIO[Either[String, Int]] =
294
numbers.run(ZSink.sum.either)
295
```