or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdplatform-extensions.mdzsink.mdzstream.mdztransducer.md

zsink.mddocs/

0

# Stream Consumption

1

2

Comprehensive sink operations for consuming streams and producing results. ZSink provides powerful abstractions for collecting, folding, and processing stream elements.

3

4

## Capabilities

5

6

### Basic Collectors

7

8

Essential sinks for collecting stream elements.

9

10

```scala { .api }

11

object ZSink {

12

/** Collect all elements into a chunk */

13

def collectAll[A]: Sink[Nothing, A, Nothing, Chunk[A]]

14

15

/** Collect all elements into a map by key */

16

def collectAllToMap[A, K](key: A => K)(f: (A, A) => A): Sink[Nothing, A, Nothing, Map[K, A]]

17

18

/** Collect all elements into a set */

19

def collectAllToSet[A]: Sink[Nothing, A, Nothing, Set[A]]

20

21

/** Collect first n elements */

22

def take[A](n: Int): Sink[Nothing, A, A, Chunk[A]]

23

24

/** Get first element */

25

def head[A]: Sink[Nothing, A, A, Option[A]]

26

27

/** Get last element */

28

def last[A]: Sink[Nothing, A, Nothing, Option[A]]

29

}

30

```

31

32

### Folding Operations

33

34

Sinks that fold stream elements into a single result.

35

36

```scala { .api }

37

object ZSink {

38

/** Fold with continuation function */

39

def fold[A, S](z: S)(contFn: S => Boolean)(f: (S, A) => S): Sink[Nothing, A, A, S]

40

41

/** Left fold without continuation */

42

def foldLeft[A, S](z: S)(f: (S, A) => S): Sink[Nothing, A, Nothing, S]

43

44

/** Effectful fold */

45

def foldM[R, E, A, S](z: S)(contFn: S => Boolean)(f: (S, A) => ZIO[R, E, S]): ZSink[R, E, A, A, S]

46

47

/** Effectful left fold */

48

def foldLeftM[R, E, A, S](z: S)(f: (S, A) => ZIO[R, E, S]): ZSink[R, E, A, Nothing, S]

49

50

/** Fold chunks */

51

def foldChunks[A, S](z: S)(contFn: S => Boolean)(f: (S, Chunk[A]) => S): Sink[Nothing, A, A, S]

52

53

/** Effectful chunk folding */

54

def foldChunksM[R, E, A, S](z: S)(contFn: S => Boolean)(f: (S, Chunk[A]) => ZIO[R, E, S]): ZSink[R, E, A, A, S]

55

56

/** Reduce elements (requires non-empty stream) */

57

def foldUntil[A, S](z: S, max: Int)(f: (S, A) => S): Sink[Nothing, A, A, S]

58

59

/** Weighted fold with cost function */

60

def foldWeighted[A, S](z: S)(costFn: A => Long, max: Long)(f: (S, A) => S): Sink[Nothing, A, A, S]

61

}

62

```

63

64

### Numeric Operations

65

66

Specialized sinks for numeric computations.

67

68

```scala { .api }

69

object ZSink {

70

/** Sum all numeric elements */

71

def sum[A](implicit A: Numeric[A]): Sink[Nothing, A, Nothing, A]

72

}

73

```

74

75

### Effect-Based Sinks

76

77

Sinks that perform effects on stream elements.

78

79

```scala { .api }

80

object ZSink {

81

/** Create sink from effect */

82

def fromEffect[R, E, Z](b: => ZIO[R, E, Z]): ZSink[R, E, Any, Nothing, Z]

83

84

/** Apply effect to each element */

85

def foreach[R, E, A](f: A => ZIO[R, E, Any]): ZSink[R, E, A, Nothing, Unit]

86

87

/** Apply effect while predicate holds */

88

def foreachWhile[R, E, A](f: A => ZIO[R, E, Boolean]): ZSink[R, E, A, A, Unit]

89

90

/** Apply effectful function to chunks */

91

def foreachChunk[R, E, A](f: Chunk[A] => ZIO[R, E, Any]): ZSink[R, E, A, Nothing, Unit]

92

93

/** Apply effectful function while chunk predicate holds */

94

def foreachChunkWhile[R, E, A](f: Chunk[A] => ZIO[R, E, Boolean]): ZSink[R, E, A, A, Unit]

95

}

96

```

97

98

### Resource-Based Sinks

99

100

Sinks that work with managed resources.

101

102

```scala { .api }

103

object ZSink {

104

/** Create sink from managed resource */

105

def managed[R, E, A, Z](managed: ZManaged[R, E, ZSink[R, E, A, Any, Z]]): ZSink[R, E, A, Any, Z]

106

107

/** Sink from Hub */

108

def fromHub[A](hub: Hub[A]): Sink[Nothing, A, Nothing, Unit]

109

110

/** Sink from Queue */

111

def fromQueue[A](queue: Queue[A]): Sink[Nothing, A, Nothing, Unit]

112

113

/** Bracketed sink creation */

114

def bracket[R, E, A, Z](acquire: ZIO[R, E, A])(release: A => ZIO[R, Nothing, Any])(use: A => ZSink[R, E, Any, Any, Z]): ZSink[R, E, Any, Any, Z]

115

}

116

```

117

118

### Sink Transformations

119

120

Transform sink behavior and results.

121

122

```scala { .api }

123

trait ZSinkOps[-R, +E, -I, +L, +Z] {

124

/** Transform result */

125

def map[Z2](f: Z => Z2): ZSink[R, E, I, L, Z2]

126

127

/** Effectful result transformation */

128

def mapM[R1 <: R, E1 >: E, Z2](f: Z => ZIO[R1, E1, Z2]): ZSink[R1, E1, I, L, Z2]

129

130

/** Transform errors */

131

def mapError[E2](f: E => E2): ZSink[R, E2, I, L, Z]

132

133

/** Transform input */

134

def contramap[I2](f: I2 => I): ZSink[R, E, I2, L, Z]

135

136

/** Effectful input transformation */

137

def contramapM[R1 <: R, E1 >: E, I2](f: I2 => ZIO[R1, E1, I]): ZSink[R1, E1, I2, L, Z]

138

139

/** Transform chunks */

140

def contramapChunks[I2](f: Chunk[I2] => Chunk[I]): ZSink[R, E, I2, L, Z]

141

142

/** Dimap both input and output */

143

def dimap[I2, Z2](f: I2 => I)(g: Z => Z2): ZSink[R, E, I2, L, Z2]

144

}

145

```

146

147

### Sink Combination

148

149

Combine multiple sinks in various ways.

150

151

```scala { .api }

152

trait ZSinkOps[-R, +E, -I, +L, +Z] {

153

/** Zip sinks sequentially */

154

def zip[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, (Z, Z2)]

155

156

/** Zip sinks with function */

157

def zipWith[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2, Z3](that: ZSink[R1, E1, I1, L1, Z2])(f: (Z, Z2) => Z3): ZSink[R1, E1, I1, L1, Z3]

158

159

/** Zip sinks in parallel */

160

def zipPar[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, (Z, Z2)]

161

162

/** Zip keeping left result */

163

def zipLeft[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, Z]

164

165

/** Zip keeping right result */

166

def zipRight[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](that: ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, Z2]

167

168

/** Race sinks (first to complete wins) */

169

def race[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z1 >: Z](that: ZSink[R1, E1, I1, L1, Z1]): ZSink[R1, E1, I1, L1, Z1]

170

171

/** Fallback sink */

172

def orElse[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z1 >: Z](that: ZSink[R1, E1, I1, L1, Z1]): ZSink[R1, E1, I1, L1, Z1]

173

}

174

```

175

176

### Monadic Operations

177

178

Monadic composition for sink chaining.

179

180

```scala { .api }

181

trait ZSinkOps[-R, +E, -I, +L, +Z] {

182

/** Monadic bind */

183

def flatMap[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](f: Z => ZSink[R1, E1, I1, L1, Z2]): ZSink[R1, E1, I1, L1, Z2]

184

185

/** Chain with function */

186

def andThen[R1 <: R, E1 >: E, Z2](f: Z => ZSink[R1, E1, L, Any, Z2]): ZSink[R1, E1, I, Any, Z2]

187

188

/** Provide sink result as input to function */

189

def foldM[R1 <: R, E1 >: E, I1 <: I, L1 >: L, Z2](z: Z2)(contFn: Z2 => Boolean)(f: (Z2, Z) => ZIO[R1, E1, Z2]): ZSink[R1, E1, I1, L1, Z2]

190

}

191

```

192

193

### Error Handling

194

195

Error handling operations for sinks.

196

197

```scala { .api }

198

trait ZSinkOps[-R, +E, -I, +L, +Z] {

199

/** Handle all errors */

200

def catchAll[R1 <: R, E2, I1 <: I, L1 >: L, Z1 >: Z](h: E => ZSink[R1, E2, I1, L1, Z1]): ZSink[R1, E2, I1, L1, Z1]

201

202

/** Retry sink on failure */

203

def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZSink[R1, E, I, L, Z]

204

205

/** Convert to option on error */

206

def option: ZSink[R, Nothing, I, L, Option[Z]]

207

208

/** Use either for error handling */

209

def either: ZSink[R, Nothing, I, L, Either[E, Z]]

210

}

211

```

212

213

### Resource Management

214

215

Resource management for sinks.

216

217

```scala { .api }

218

trait ZSinkOps[-R, +E, -I, +L, +Z] {

219

/** Provide environment */

220

def provide[R1](env: R1)(implicit ev: R1 <:< R): ZSink[Any, E, I, L, Z]

221

222

/** Provide layer */

223

def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZSink[R0, E, I, L, Z]

224

225

/** Time sink execution */

226

def timed: ZSink[R with Clock, E, I, L, (Duration, Z)]

227

228

/** Summarize sink execution */

229

def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZSink[R1, E1, I, L, (C, Z)]

230

231

/** Convert to transducer */

232

def toTransducer: ZTransducer[R, E, I, L]

233

}

234

```

235

236

### Type Definitions

237

238

Core types used by sinks.

239

240

```scala { .api }

241

object ZSink {

242

/** Push interface for sink implementation */

243

type Push[R, E, I, L, Z] = Option[Chunk[I]] => ZIO[R, (Either[E, Z], Chunk[L]), Chunk[L]]

244

245

/** Sink that ignores input and produces unit */

246

val drain: Sink[Nothing, Any, Nothing, Unit] = ZSink.foreach(_ => ZIO.unit)

247

248

/** Identity sink that passes through all input */

249

def identity[A]: Sink[Nothing, A, Nothing, Chunk[A]] = collectAllToChunk[A]

250

251

/** Never-completing sink */

252

def never: Sink[Nothing, Any, Nothing, Nothing] = ZSink.fromEffect(ZIO.never)

253

254

/** Immediately succeeding sink */

255

def succeed[Z](z: => Z): Sink[Nothing, Any, Nothing, Z] = fromEffect(ZIO.succeed(z))

256

257

/** Immediately failing sink */

258

def fail[E](e: => E): Sink[E, Any, Nothing, Nothing] = fromEffect(ZIO.fail(e))

259

}

260

```

261

262

**Usage Examples:**

263

264

```scala

265

import zio._

266

import zio.stream._

267

268

// Basic collectors

269

val numbers = ZStream.range(1, 10)

270

val allNumbers: UIO[List[Int]] = numbers.run(ZSink.collectAll)

271

val sum: UIO[Int] = numbers.run(ZSink.sum)

272

273

// Folding operations

274

val evenSum: UIO[Int] = numbers.run(

275

ZSink.foldLeft(0)((acc, n) => if (n % 2 == 0) acc + n else acc)

276

)

277

278

// Take operations

279

val firstThree: UIO[List[Int]] = numbers.run(ZSink.take(3))

280

val firstEven: UIO[Option[Int]] = numbers.filter(_ % 2 == 0).run(ZSink.head)

281

282

// Effectful processing

283

val logged: UIO[Unit] = numbers.run(

284

ZSink.foreach(n => Console.printLine(s"Processing: $n"))

285

)

286

287

// Sink combination

288

val combined: UIO[(Int, Long)] = numbers.run(

289

ZSink.sum.zip(ZSink.count)

290

)

291

292

// Error handling

293

val safeSum: UIO[Either[String, Int]] =

294

numbers.run(ZSink.sum.either)

295

```