or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdplatform-extensions.mdzsink.mdzstream.mdztransducer.md

ztransducer.mddocs/

0

# Stream Transformation

1

2

Powerful transducers for transforming stream elements with effects, stateful processing, and composition capabilities. ZTransducer provides efficient stream-to-stream transformations.

3

4

## Capabilities

5

6

### Basic Transformations

7

8

Fundamental transducers for element transformation.

9

10

```scala { .api }

11

object ZTransducer {

12

/** Identity transducer (pass-through) */

13

def identity[A]: Transducer[Nothing, A, A]

14

15

/** Transform elements with function */

16

def map[A, B](f: A => B): Transducer[Nothing, A, B]

17

18

/** Effectful element transformation */

19

def mapM[R, E, A, B](f: A => ZIO[R, E, B]): ZTransducer[R, E, A, B]

20

21

/** Transform chunks */

22

def mapChunks[A, B](f: Chunk[A] => Chunk[B]): Transducer[Nothing, A, B]

23

24

/** Effectful chunk transformation */

25

def mapChunksM[R, E, A, B](f: Chunk[A] => ZIO[R, E, Chunk[B]]): ZTransducer[R, E, A, B]

26

27

/** Filter elements with predicate */

28

def filter[A](predicate: A => Boolean): Transducer[Nothing, A, A]

29

30

/** Effectful filtering */

31

def filterM[R, E, A](predicate: A => ZIO[R, E, Boolean]): ZTransducer[R, E, A, A]

32

33

/** Create from chunk transformation function */

34

def apply[I, O](f: Chunk[I] => Chunk[O]): Transducer[Nothing, I, O]

35

}

36

```

37

38

### Collection Operations

39

40

Transducers for collecting and grouping elements.

41

42

```scala { .api }

43

object ZTransducer {

44

/** Collect first n elements */

45

def collectAllN[I](n: Int): Transducer[Nothing, I, Chunk[I]]

46

47

/** Collect n elements into map by key */

48

def collectAllToMapN[K, A](n: Long)(key: A => K): Transducer[Nothing, A, Map[K, A]]

49

50

/** Collect n elements into set */

51

def collectAllToSetN[A](n: Long): Transducer[Nothing, A, Set[A]]

52

53

/** Collect all elements while predicate holds */

54

def collectAllWhile[A](predicate: A => Boolean): Transducer[Nothing, A, List[A]]

55

56

/** Collect all elements matching partial function */

57

def collect[A, B](pf: PartialFunction[A, B]): Transducer[Nothing, A, B]

58

59

/** Effectful collection with partial function */

60

def collectM[R, E, A, B](pf: PartialFunction[A, ZIO[R, E, B]]): ZTransducer[R, E, A, B]

61

}

62

```

63

64

### Folding Operations

65

66

Stateful transducers that fold elements.

67

68

```scala { .api }

69

object ZTransducer {

70

/** Fold elements into accumulator */

71

def fold[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]

72

73

/** Left fold */

74

def foldLeft[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]

75

76

/** Effectful fold */

77

def foldM[R, E, A, S](z: S)(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]

78

79

/** Fold until condition */

80

def foldUntil[A, S](z: S, max: Int)(f: (S, A) => S): Transducer[Nothing, A, S]

81

82

/** Weighted fold with cost function */

83

def foldWeighted[A, S](z: S)(costFn: A => Long, max: Long)(f: (S, A) => S): Transducer[Nothing, A, S]

84

85

/** Effectful weighted fold */

86

def foldWeightedM[R, E, A, S](z: S)(costFn: A => ZIO[R, E, Long], max: Long)(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]

87

88

/** Fold with decomposition */

89

def foldWeightedDecompose[R, E, A, S](z: S)(costFn: A => ZIO[R, E, Long], max: Long, decompose: A => ZIO[R, E, Chunk[A]])(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]

90

91

/** Scanning (emit intermediate results) */

92

def scan[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]

93

94

/** Effectful scanning */

95

def scanM[R, E, A, S](z: S)(f: (S, A) => ZIO[R, E, S]): ZTransducer[R, E, A, S]

96

}

97

```

98

99

### Partitioning Transducers

100

101

Transducers that partition or slice streams.

102

103

```scala { .api }

104

object ZTransducer {

105

/** Drop first n elements */

106

def drop[A](n: Long): Transducer[Nothing, A, A]

107

108

/** Drop while predicate holds */

109

def dropWhile[A](predicate: A => Boolean): Transducer[Nothing, A, A]

110

111

/** Effectful drop while */

112

def dropWhileM[R, E, A](predicate: A => ZIO[R, E, Boolean]): ZTransducer[R, E, A, A]

113

114

/** Split strings on delimiter */

115

def splitOn(delimiter: String): Transducer[Nothing, String, String]

116

}

117

```

118

119

### Grouping Operations

120

121

Transducers for grouping adjacent or related elements.

122

123

```scala { .api }

124

object ZTransducer {

125

/** Group adjacent elements by key */

126

def groupAdjacentBy[A, K](f: A => K): Transducer[Nothing, A, (K, NonEmptyChunk[A])]

127

128

/** Group by key with time window */

129

def groupByKey[A, K](f: A => K): Transducer[Nothing, A, Map[K, NonEmptyChunk[A]]]

130

131

/** Group elements within time window */

132

def groupWithin[A](n: Int, duration: Duration): ZTransducer[Clock, Nothing, A, Chunk[A]]

133

134

/** Batch elements by count */

135

def batch[A](n: Long): Transducer[Nothing, A, Chunk[A]]

136

137

/** Batch elements by count or time */

138

def batchN[A](n: Long): Transducer[Nothing, A, Chunk[A]]

139

140

/** Batch elements with weighted grouping */

141

def batchWeighted[A](costFn: A => Long)(max: Long): Transducer[Nothing, A, Chunk[A]]

142

143

/** Effectful weighted batching */

144

def batchWeightedM[R, E, A](costFn: A => ZIO[R, E, Long])(max: Long): ZTransducer[R, E, A, Chunk[A]]

145

}

146

```

147

148

### Utility Transducers

149

150

Helpful utility transducers for common patterns.

151

152

```scala { .api }

153

object ZTransducer {

154

/** Get first element */

155

def head[A]: Transducer[Nothing, A, Option[A]]

156

157

/** Get last element */

158

def last[A]: Transducer[Nothing, A, Option[A]]

159

160

/** Prepend values to stream */

161

def prepend[A](values: A*): Transducer[Nothing, A, A]

162

163

/** Append values to stream */

164

def append[A](values: A*): Transducer[Nothing, A, A]

165

166

/** Intersperse separator between elements */

167

def intersperse[A](separator: A): Transducer[Nothing, A, A]

168

169

/** Add index to elements */

170

def zipWithIndex[A]: Transducer[Nothing, A, (A, Long)]

171

172

/** Deduplicate consecutive elements */

173

def deduplicateAdjacent[A]: Transducer[Nothing, A, A]

174

175

/** Deduplicate by key */

176

def deduplicateAdjacentBy[A, K](f: A => K): Transducer[Nothing, A, A]

177

178

/** Remove None values from Option stream */

179

def collectSome[A]: Transducer[Nothing, Option[A], A]

180

181

/** Flatten nested chunks */

182

def flatten[A]: Transducer[Nothing, Chunk[A], A]

183

}

184

```

185

186

### Effect-Based Transducers

187

188

Transducers that incorporate effects.

189

190

```scala { .api }

191

object ZTransducer {

192

/** Create from effect */

193

def fromEffect[R, E, A](effect: ZIO[R, E, A]): ZTransducer[R, E, Any, A]

194

195

/** Create from function */

196

def fromFunction[A, B](f: A => B): Transducer[Nothing, A, B]

197

198

/** Create from effectful function */

199

def fromFunctionM[R, E, A, B](f: A => ZIO[R, E, B]): ZTransducer[R, E, A, B]

200

201

/** Apply effect to each element (tap) */

202

def tap[R, E, A](f: A => ZIO[R, E, Any]): ZTransducer[R, E, A, A]

203

204

/** Trace elements for debugging */

205

def debug[A](prefix: String = ""): Transducer[Nothing, A, A]

206

207

/** Time each element processing */

208

def timed[A]: ZTransducer[Clock, Nothing, A, (Duration, A)]

209

}

210

```

211

212

### Transducer Transformations

213

214

Transform transducer behavior and composition.

215

216

```scala { .api }

217

trait ZTransducerOps[-R, +E, -I, +O] {

218

/** Transform output elements */

219

def map[P](f: O => P): ZTransducer[R, E, I, P]

220

221

/** Effectful output transformation */

222

def mapM[R1 <: R, E1 >: E, P](f: O => ZIO[R1, E1, P]): ZTransducer[R1, E1, I, P]

223

224

/** Transform output chunks */

225

def mapChunks[P](f: Chunk[O] => Chunk[P]): ZTransducer[R, E, I, P]

226

227

/** Transform input elements */

228

def contramap[J](f: J => I): ZTransducer[R, E, J, O]

229

230

/** Effectful input transformation */

231

def contramapM[R1 <: R, E1 >: E, J](f: J => ZIO[R1, E1, I]): ZTransducer[R1, E1, J, O]

232

233

/** Transform input chunks */

234

def contramapChunks[J](f: Chunk[J] => Chunk[I]): ZTransducer[R, E, J, O]

235

236

/** Transform errors */

237

def mapError[E2](f: E => E2): ZTransducer[R, E2, I, O]

238

239

/** Filter output elements */

240

def filter(predicate: O => Boolean): ZTransducer[R, E, I, O]

241

242

/** Effectful output filtering */

243

def filterM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZTransducer[R1, E1, I, O]

244

}

245

```

246

247

### Transducer Composition

248

249

Compose transducers together.

250

251

```scala { .api }

252

trait ZTransducerOps[-R, +E, -I, +O] {

253

/** Compose with another transducer (andThen) */

254

def >>>[R1 <: R, E1 >: E, P](that: ZTransducer[R1, E1, O, P]): ZTransducer[R1, E1, I, P]

255

256

/** Compose (alias for >>>) */

257

def andThen[R1 <: R, E1 >: E, P](that: ZTransducer[R1, E1, O, P]): ZTransducer[R1, E1, I, P]

258

259

/** Compose before */

260

def compose[R1 <: R, E1 >: E, C](that: ZTransducer[R1, E1, C, I]): ZTransducer[R1, E1, C, O]

261

262

/** Zip with another transducer */

263

def zip[R1 <: R, E1 >: E, I1 <: I, P](that: ZTransducer[R1, E1, I1, P]): ZTransducer[R1, E1, I1, (O, P)]

264

265

/** Zip with function */

266

def zipWith[R1 <: R, E1 >: E, I1 <: I, P, Q](that: ZTransducer[R1, E1, I1, P])(f: (O, P) => Q): ZTransducer[R1, E1, I1, Q]

267

268

/** Race two transducers */

269

def race[R1 <: R, E1 >: E, I1 <: I, O1 >: O](that: ZTransducer[R1, E1, I1, O1]): ZTransducer[R1, E1, I1, O1]

270

}

271

```

272

273

### Error Handling

274

275

Error handling for transducers.

276

277

```scala { .api }

278

trait ZTransducerOps[-R, +E, -I, +O] {

279

/** Handle all errors */

280

def catchAll[R1 <: R, E2, I1 <: I, O1 >: O](h: E => ZTransducer[R1, E2, I1, O1]): ZTransducer[R1, E2, I1, O1]

281

282

/** Fallback transducer */

283

def orElse[R1 <: R, E2, I1 <: I, O1 >: O](that: ZTransducer[R1, E2, I1, O1]): ZTransducer[R1, E2, I1, O1]

284

285

/** Convert to option on error */

286

def option: ZTransducer[R, Nothing, I, Option[O]]

287

288

/** Convert to either */

289

def either: ZTransducer[R, Nothing, I, Either[E, O]]

290

291

/** Retry on failure */

292

def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZTransducer[R1, E, I, O]

293

}

294

```

295

296

### Resource Management

297

298

Resource management for transducers.

299

300

```scala { .api }

301

trait ZTransducerOps[-R, +E, -I, +O] {

302

/** Provide environment */

303

def provide[R1](env: R1)(implicit ev: R1 <:< R): ZTransducer[Any, E, I, O]

304

305

/** Provide environment layer */

306

def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZTransducer[R0, E, I, O]

307

308

/** Time transducer execution */

309

def timed: ZTransducer[R with Clock, E, I, (Duration, O)]

310

311

/** Summarize execution */

312

def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZTransducer[R1, E1, I, (C, O)]

313

}

314

```

315

316

### Type Definitions

317

318

Core types used by transducers.

319

320

```scala { .api }

321

object ZTransducer {

322

/** Push function signature for transducer implementation */

323

type Push[R, E, I, O] = Option[Chunk[I]] => ZIO[R, E, Chunk[O]]

324

325

/** Transducer that drains input without output */

326

def drain[A]: Transducer[Nothing, A, Nothing] = ZTransducer(_ => Chunk.empty)

327

328

/** Never-completing transducer */

329

def never[I, O]: Transducer[Nothing, I, O] = ZTransducer.fromEffect(ZIO.never)

330

331

/** Immediately succeeding transducer */

332

def succeed[O](o: => O): Transducer[Nothing, Any, O] = fromEffect(ZIO.succeed(o))

333

334

/** Immediately failing transducer */

335

def fail[E](e: => E): ZTransducer[Any, E, Any, Nothing] = fromEffect(ZIO.fail(e))

336

}

337

```

338

339

**Usage Examples:**

340

341

```scala

342

import zio._

343

import zio.stream._

344

import zio.duration._

345

346

// Basic transformations

347

val numbers = ZStream.range(1, 10)

348

val doubled = numbers.transduce(ZTransducer.map(_ * 2))

349

val evens = numbers.transduce(ZTransducer.filter(_ % 2 == 0))

350

351

// Folding and scanning

352

val sum = numbers.transduce(ZTransducer.fold(0)(_ + _))

353

val runningSum = numbers.transduce(ZTransducer.scan(0)(_ + _))

354

355

// Collecting operations

356

val firstThree = numbers.transduce(ZTransducer.take(3))

357

val grouped = numbers.transduce(ZTransducer.batch(3))

358

359

// Composition

360

val composed = ZTransducer.filter[Int](_ > 5) >>> ZTransducer.map(_ * 2)

361

val result = numbers.transduce(composed)

362

363

// Effectful processing

364

val logged = ZTransducer.tap[Console, IOException, Int](n =>

365

Console.printLine(s"Processing: $n")

366

)

367

368

// Grouping operations

369

val adjacentGroups = ZStream("a", "a", "b", "b", "c")

370

.transduce(ZTransducer.groupAdjacentBy(identity))

371

372

// Timing operations

373

val timedProcessing = numbers.transduce(ZTransducer.timed)

374

```