0
# Stream Transformation
1
2
Powerful transducers for transforming stream elements with effects, stateful processing, and composition capabilities. ZTransducer provides efficient stream-to-stream transformations.
3
4
## Capabilities
5
6
### Basic Transformations
7
8
Fundamental transducers for element transformation.
9
10
```scala { .api }
11
object ZTransducer {
12
/** Identity transducer (pass-through) */
13
def identity[A]: Transducer[Nothing, A, A]
14
15
/** Transform elements with function */
16
def map[A, B](f: A => B): Transducer[Nothing, A, B]
17
18
/** Effectful element transformation */
19
def mapM[R, E, A, B](f: A => ZIO[R, E, B]): ZTransducer[R, E, A, B]
20
21
/** Transform chunks */
22
def mapChunks[A, B](f: Chunk[A] => Chunk[B]): Transducer[Nothing, A, B]
23
24
/** Effectful chunk transformation */
25
def mapChunksM[R, E, A, B](f: Chunk[A] => ZIO[R, E, Chunk[B]]): ZTransducer[R, E, A, B]
26
27
/** Filter elements with predicate */
28
def filter[A](predicate: A => Boolean): Transducer[Nothing, A, A]
29
30
/** Effectful filtering */
31
def filterM[R, E, A](predicate: A => ZIO[R, E, Boolean]): ZTransducer[R, E, A, A]
32
33
/** Create from chunk transformation function */
34
def apply[I, O](f: Chunk[I] => Chunk[O]): Transducer[Nothing, I, O]
35
}
36
```
37
38
### Collection Operations
39
40
Transducers for collecting and grouping elements.
41
42
```scala { .api }
43
object ZTransducer {
44
/** Collect first n elements */
45
def collectAllN[I](n: Int): Transducer[Nothing, I, Chunk[I]]
46
47
/** Collect n elements into map by key */
48
def collectAllToMapN[K, A](n: Long)(key: A => K): Transducer[Nothing, A, Map[K, A]]
49
50
/** Collect n elements into set */
51
def collectAllToSetN[A](n: Long): Transducer[Nothing, A, Set[A]]
52
53
/** Collect all elements while predicate holds */
54
def collectAllWhile[A](predicate: A => Boolean): Transducer[Nothing, A, List[A]]
55
56
/** Collect all elements matching partial function */
57
def collect[A, B](pf: PartialFunction[A, B]): Transducer[Nothing, A, B]
58
59
/** Effectful collection with partial function */
60
def collectM[R, E, A, B](pf: PartialFunction[A, ZIO[R, E, B]]): ZTransducer[R, E, A, B]
61
}
62
```
63
64
### Folding Operations
65
66
Stateful transducers that fold elements.
67
68
```scala { .api }
69
object ZTransducer {
70
/** Fold elements into accumulator */
71
def fold[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]
72
73
/** Left fold */
74
def foldLeft[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]
75
76
/** Effectful fold */
77
def foldM[R, E, A, S](z: S)(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]
78
79
/** Fold until condition */
80
def foldUntil[A, S](z: S, max: Int)(f: (S, A) => S): Transducer[Nothing, A, S]
81
82
/** Weighted fold with cost function */
83
def foldWeighted[A, S](z: S)(costFn: A => Long, max: Long)(f: (S, A) => S): Transducer[Nothing, A, S]
84
85
/** Effectful weighted fold */
86
def foldWeightedM[R, E, A, S](z: S)(costFn: A => ZIO[R, E, Long], max: Long)(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]
87
88
/** Fold with decomposition */
89
def foldWeightedDecompose[R, E, A, S](z: S)(costFn: A => ZIO[R, E, Long], max: Long, decompose: A => ZIO[R, E, Chunk[A]])(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]
90
91
/** Scanning (emit intermediate results) */
92
def scan[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]
93
94
/** Effectful scanning */
95
def scanM[R, E, A, S](z: S)(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]
96
}
97
```
98
99
### Partitioning Transducers
100
101
Transducers that partition or slice streams.
102
103
```scala { .api }
104
object ZTransducer {
105
/** Drop first n elements */
106
def drop[A](n: Long): Transducer[Nothing, A, A]
107
108
/** Drop while predicate holds */
109
def dropWhile[A](predicate: A => Boolean): Transducer[Nothing, A, A]
110
111
/** Effectful drop while */
112
def dropWhileM[R, E, A](predicate: A => ZIO[R, E, Boolean]): ZTransducer[R, E, A, A]
113
114
/** Split strings on delimiter */
115
def splitOn(delimiter: String): Transducer[Nothing, String, String]
116
}
117
```
118
119
### Grouping Operations
120
121
Transducers for grouping adjacent or related elements.
122
123
```scala { .api }
124
object ZTransducer {
125
/** Group adjacent elements by key */
126
def groupAdjacentBy[A, K](f: A => K): Transducer[Nothing, A, (K, NonEmptyChunk[A])]
127
128
/** Group by key with time window */
129
def groupByKey[A, K](f: A => K): Transducer[Nothing, A, Map[K, NonEmptyChunk[A]]]
130
131
/** Group elements within time window */
132
def groupWithin[A](n: Int, duration: Duration): ZTransducer[Clock, Nothing, A, Chunk[A]]
133
134
/** Batch elements by count */
135
def batch[A](n: Long): Transducer[Nothing, A, Chunk[A]]
136
137
/** Batch elements by count or time */
138
def batchN[A](n: Long): Transducer[Nothing, A, Chunk[A]]
139
140
/** Batch elements with weighted grouping */
141
def batchWeighted[A](costFn: A => Long)(max: Long): Transducer[Nothing, A, Chunk[A]]
142
143
/** Effectful weighted batching */
144
def batchWeightedM[R, E, A](costFn: A => ZIO[R, E, Long])(max: Long): ZTransducer[R, E, A, Chunk[A]]
145
}
146
```
147
148
### Utility Transducers
149
150
Helpful utility transducers for common patterns.
151
152
```scala { .api }
153
object ZTransducer {
154
/** Get first element */
155
def head[A]: Transducer[Nothing, A, Option[A]]
156
157
/** Get last element */
158
def last[A]: Transducer[Nothing, A, Option[A]]
159
160
/** Prepend values to stream */
161
def prepend[A](values: A*): Transducer[Nothing, A, A]
162
163
/** Append values to stream */
164
def append[A](values: A*): Transducer[Nothing, A, A]
165
166
/** Intersperse separator between elements */
167
def intersperse[A](separator: A): Transducer[Nothing, A, A]
168
169
/** Add index to elements */
170
def zipWithIndex[A]: Transducer[Nothing, A, (A, Long)]
171
172
/** Deduplicate consecutive elements */
173
def deduplicateAdjacent[A]: Transducer[Nothing, A, A]
174
175
/** Deduplicate by key */
176
def deduplicateAdjacentBy[A, K](f: A => K): Transducer[Nothing, A, A]
177
178
/** Remove None values from Option stream */
179
def collectSome[A]: Transducer[Nothing, Option[A], A]
180
181
/** Flatten nested chunks */
182
def flatten[A]: Transducer[Nothing, Chunk[A], A]
183
}
184
```
185
186
### Effect-Based Transducers
187
188
Transducers that incorporate effects.
189
190
```scala { .api }
191
object ZTransducer {
192
/** Create from effect */
193
def fromEffect[R, E, A](effect: ZIO[R, E, A]): ZTransducer[R, E, Any, A]
194
195
/** Create from function */
196
def fromFunction[A, B](f: A => B): Transducer[Nothing, A, B]
197
198
/** Create from effectful function */
199
def fromFunctionM[R, E, A, B](f: A => ZIO[R, E, B]): ZTransducer[R, E, A, B]
200
201
/** Apply effect to each element (tap) */
202
def tap[R, E, A](f: A => ZIO[R, E, Any]): ZTransducer[R, E, A, A]
203
204
/** Trace elements for debugging */
205
def debug[A](prefix: String = ""): Transducer[Nothing, A, A]
206
207
/** Time each element processing */
208
def timed[A]: ZTransducer[Clock, Nothing, A, (Duration, A)]
209
}
210
```
211
212
### Transducer Transformations
213
214
Transform transducer behavior and composition.
215
216
```scala { .api }
217
trait ZTransducerOps[-R, +E, -I, +O] {
218
/** Transform output elements */
219
def map[P](f: O => P): ZTransducer[R, E, I, P]
220
221
/** Effectful output transformation */
222
def mapM[R1 <: R, E1 >: E, P](f: O => ZIO[R1, E1, P]): ZTransducer[R1, E1, I, P]
223
224
/** Transform output chunks */
225
def mapChunks[P](f: Chunk[O] => Chunk[P]): ZTransducer[R, E, I, P]
226
227
/** Transform input elements */
228
def contramap[J](f: J => I): ZTransducer[R, E, J, O]
229
230
/** Effectful input transformation */
231
def contramapM[R1 <: R, E1 >: E, J](f: J => ZIO[R1, E1, I]): ZTransducer[R1, E1, J, O]
232
233
/** Transform input chunks */
234
def contramapChunks[J](f: Chunk[J] => Chunk[I]): ZTransducer[R, E, J, O]
235
236
/** Transform errors */
237
def mapError[E2](f: E => E2): ZTransducer[R, E2, I, O]
238
239
/** Filter output elements */
240
def filter(predicate: O => Boolean): ZTransducer[R, E, I, O]
241
242
/** Effectful output filtering */
243
def filterM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZTransducer[R1, E1, I, O]
244
}
245
```
246
247
### Transducer Composition
248
249
Compose transducers together.
250
251
```scala { .api }
252
trait ZTransducerOps[-R, +E, -I, +O] {
253
/** Compose with another transducer (andThen) */
254
def >>>[R1 <: R, E1 >: E, P](that: ZTransducer[R1, E1, O, P]): ZTransducer[R1, E1, I, P]
255
256
/** Compose (alias for >>>) */
257
def andThen[R1 <: R, E1 >: E, P](that: ZTransducer[R1, E1, O, P]): ZTransducer[R1, E1, I, P]
258
259
/** Compose before */
260
def compose[R1 <: R, E1 >: E, C](that: ZTransducer[R1, E1, C, I]): ZTransducer[R1, E1, C, O]
261
262
/** Zip with another transducer */
263
def zip[R1 <: R, E1 >: E, I1 <: I, P](that: ZTransducer[R1, E1, I1, P]): ZTransducer[R1, E1, I1, (O, P)]
264
265
/** Zip with function */
266
def zipWith[R1 <: R, E1 >: E, I1 <: I, P, Q](that: ZTransducer[R1, E1, I1, P])(f: (O, P) => Q): ZTransducer[R1, E1, I1, Q]
267
268
/** Race two transducers */
269
def race[R1 <: R, E1 >: E, I1 <: I, O1 >: O](that: ZTransducer[R1, E1, I1, O1]): ZTransducer[R1, E1, I1, O1]
270
}
271
```
272
273
### Error Handling
274
275
Error handling for transducers.
276
277
```scala { .api }
278
trait ZTransducerOps[-R, +E, -I, +O] {
279
/** Handle all errors */
280
def catchAll[R1 <: R, E2, I1 <: I, O1 >: O](h: E => ZTransducer[R1, E2, I1, O1]): ZTransducer[R1, E2, I1, O1]
281
282
/** Fallback transducer */
283
def orElse[R1 <: R, E2, I1 <: I, O1 >: O](that: ZTransducer[R1, E2, I1, O1]): ZTransducer[R1, E2, I1, O1]
284
285
/** Convert to option on error */
286
def option: ZTransducer[R, Nothing, I, Option[O]]
287
288
/** Convert to either */
289
def either: ZTransducer[R, Nothing, I, Either[E, O]]
290
291
/** Retry on failure */
292
def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZTransducer[R1, E, I, O]
293
}
294
```
295
296
### Resource Management
297
298
Resource management for transducers.
299
300
```scala { .api }
301
trait ZTransducerOps[-R, +E, -I, +O] {
302
/** Provide environment */
303
def provide[R1](env: R1)(implicit ev: R1 <:< R): ZTransducer[Any, E, I, O]
304
305
/** Provide environment layer */
306
def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZTransducer[R0, E, I, O]
307
308
/** Time transducer execution */
309
def timed: ZTransducer[R with Clock, E, I, (Duration, O)]
310
311
/** Summarize execution */
312
def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZTransducer[R1, E1, I, (C, O)]
313
}
314
```
315
316
### Type Definitions
317
318
Core types used by transducers.
319
320
```scala { .api }
321
object ZTransducer {
322
/** Push function signature for transducer implementation */
323
type Push[R, E, I, O] = Option[Chunk[I]] => ZIO[R, E, Chunk[O]]
324
325
/** Transducer that drains input without output */
326
def drain[A]: Transducer[Nothing, A, Nothing] = ZTransducer(_ => Chunk.empty)
327
328
/** Never-completing transducer */
329
def never[I, O]: Transducer[Nothing, I, O] = ZTransducer.fromEffect(ZIO.never)
330
331
/** Immediately succeeding transducer */
332
def succeed[O](o: => O): Transducer[Nothing, Any, O] = fromEffect(ZIO.succeed(o))
333
334
/** Immediately failing transducer */
335
def fail[E](e: => E): ZTransducer[Any, E, Any, Nothing] = fromEffect(ZIO.fail(e))
336
}
337
```
338
339
**Usage Examples:**
340
341
```scala
342
import zio._
343
import zio.stream._
344
import zio.duration._
345
346
// Basic transformations
347
val numbers = ZStream.range(1, 10)
348
val doubled = numbers.transduce(ZTransducer.map(_ * 2))
349
val evens = numbers.transduce(ZTransducer.filter(_ % 2 == 0))
350
351
// Folding and scanning
352
val sum = numbers.transduce(ZTransducer.fold(0)(_ + _))
353
val runningSum = numbers.transduce(ZTransducer.scan(0)(_ + _))
354
355
// Collecting operations
356
val firstThree = numbers.transduce(ZTransducer.take(3))
357
val grouped = numbers.transduce(ZTransducer.batch(3))
358
359
// Composition
360
val composed = ZTransducer.filter[Int](_ > 5) >>> ZTransducer.map(_ * 2)
361
val result = numbers.transduce(composed)
362
363
// Effectful processing
364
val logged = ZTransducer.tap[Console, IOException, Int](n =>
365
Console.printLine(s"Processing: $n")
366
)
367
368
// Grouping operations
369
val adjacentGroups = ZStream("a", "a", "b", "b", "c")
370
.transduce(ZTransducer.groupAdjacentBy(identity))
371
372
// Timing operations
373
val timedProcessing = numbers.transduce(ZTransducer.timed)
374
```