or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application.mdconcurrency.mdcore-effects.mddependency-injection.mderror-handling.mdindex.mdmetrics.mdresource-management.mdservices.mdstm.mdstreams.mdtesting.md

streams.mddocs/

0

# ZIO Streams

1

2

ZIO Streams provides composable, resource-safe streaming data processing with comprehensive backpressure handling and error recovery. Streams are pull-based, interruptible, and integrate seamlessly with the ZIO effect system.

3

4

## Capabilities

5

6

### ZStream - Core Streaming Type

7

8

The foundational streaming type representing potentially infinite sequences of values with environment, error, and element types.

9

10

```scala { .api }

11

/**

12

* A ZStream represents a lazy, potentially infinite sequence of values of type A

13

* that can fail with E and requires environment R

14

*/

15

sealed trait ZStream[-R, +E, +A] {

16

/** Transform each element with a function */

17

def map[B](f: A => B): ZStream[R, E, B]

18

19

/** Transform each element with an effect */

20

def mapZIO[R1 <: R, E1 >: E, B](f: A => ZIO[R1, E1, B]): ZStream[R1, E1, B]

21

22

/** Filter elements based on a predicate */

23

def filter(f: A => Boolean): ZStream[R, E, A]

24

25

/** Take the first n elements */

26

def take(n: => Long): ZStream[R, E, A]

27

28

/** Skip the first n elements */

29

def drop(n: => Long): ZStream[R, E, A]

30

31

/** Concatenate with another stream */

32

def ++[R1 <: R, E1 >: E, A1 >: A](that: => ZStream[R1, E1, A1]): ZStream[R1, E1, A1]

33

34

/** Transform errors */

35

def mapError[E2](f: E => E2): ZStream[R, E2, A]

36

37

/** Recover from errors */

38

def catchAll[R1 <: R, E2, A1 >: A](f: E => ZStream[R1, E2, A1]): ZStream[R1, E2, A1]

39

40

/** Merge with another stream */

41

def merge[R1 <: R, E1 >: E, A1 >: A](that: ZStream[R1, E1, A1]): ZStream[R1, E1, A1]

42

43

/** Broadcast elements to multiple streams */

44

def broadcast(n: Int): ZIO[R with Scope, Nothing, List[ZStream[Any, E, A]]]

45

46

/** Group elements into chunks */

47

def grouped(n: => Int): ZStream[R, E, Chunk[A]]

48

49

/** Run the stream with a sink */

50

def run[R1 <: R, E1 >: E, Z](sink: => ZSink[R1, E1, A, Any, Z]): ZIO[R1, E1, Z]

51

52

/** Convert to a ZIO effect collecting all elements */

53

def runCollect: ZIO[R, E, Chunk[A]]

54

55

/** Execute foreach on each element */

56

def runForeach[R1 <: R, E1 >: E](f: A => ZIO[R1, E1, Any]): ZIO[R1, E1, Unit]

57

}

58

59

// Type aliases for common patterns

60

type Stream[+E, +A] = ZStream[Any, E, A]

61

type UStream[+A] = ZStream[Any, Nothing, A]

62

```

63

64

**Usage Examples:**

65

66

```scala

67

import zio._

68

import zio.stream._

69

70

// Create streams from various sources

71

val numbers = ZStream.range(1, 100)

72

val fromIterable = ZStream.fromIterable(List(1, 2, 3, 4, 5))

73

val fromEffect = ZStream.fromZIO(ZIO.succeed(42))

74

75

// Transform and process streams

76

val processed = ZStream.range(1, 1000)

77

.filter(_ % 2 == 0)

78

.map(_ * 2)

79

.take(10)

80

.runCollect

81

82

// Merge multiple streams

83

val merged = ZStream.range(1, 10)

84

.merge(ZStream.range(100, 110))

85

.runCollect

86

```

87

88

### Stream Construction

89

90

Create streams from various data sources including iterables, effects, and external resources.

91

92

```scala { .api }

93

/**

94

* Stream construction methods

95

*/

96

object ZStream {

97

/** Create a stream from an iterable */

98

def fromIterable[A](as: => Iterable[A]): UStream[A]

99

100

/** Create a stream from a single ZIO effect */

101

def fromZIO[R, E, A](zio: => ZIO[R, E, A]): ZStream[R, E, A]

102

103

/** Create a stream from a range of integers */

104

def range(min: Int, max: Int): UStream[Int]

105

106

/** Create a stream that repeats a value forever */

107

def repeat[A](a: => A): UStream[A]

108

109

/** Create a stream that repeats an effect forever */

110

def repeatZIO[R, E, A](zio: => ZIO[R, E, A]): ZStream[R, E, A]

111

112

/** Create a stream with a single value */

113

def succeed[A](a: => A): UStream[A]

114

115

/** Create a failing stream */

116

def fail[E](error: => E): Stream[E, Nothing]

117

118

/** Create an empty stream */

119

val empty: UStream[Nothing]

120

121

/** Create a stream from a Java InputStream */

122

def fromInputStream(is: InputStream, chunkSize: => Int): Stream[IOException, Byte]

123

124

/** Create a stream from file lines */

125

def fromPath(path: Path): Stream[IOException, String]

126

127

/** Create a stream that emits at regular intervals */

128

def tick(interval: => Duration): ZStream[Any, Nothing, Unit]

129

130

/** Create a stream from an async callback */

131

def async[R, E, A](

132

register: (ZIO[R, Option[E], Chunk[A]] => Unit) => Unit

133

): ZStream[R, E, A]

134

}

135

```

136

137

**Usage Examples:**

138

139

```scala

140

// File streaming

141

val fileContent = ZStream.fromPath(Paths.get("data.txt"))

142

.via(ZPipeline.utf8Decode)

143

.runForeach(Console.printLine(_))

144

145

// Periodic emissions

146

val heartbeat = ZStream.tick(1.second)

147

.zipWith(ZStream.range(1, Int.MaxValue))((_, n) => s"Heartbeat $n")

148

.runForeach(Console.printLine(_))

149

150

// Async stream from callback

151

val asyncStream = ZStream.async[Any, Nothing, String] { callback =>

152

// Register callback with external system

153

externalSystem.onData(data => callback(ZIO.succeed(Chunk(data))))

154

}

155

```

156

157

### ZSink - Stream Consumer

158

159

Sinks consume streams and produce results, providing backpressure and resource management.

160

161

```scala { .api }

162

/**

163

* A ZSink consumes values from a stream and produces a result

164

*/

165

sealed trait ZSink[-R, +E, -In, +L, +Z] {

166

/** Transform the result of the sink */

167

def map[Z2](f: Z => Z2): ZSink[R, E, In, L, Z2]

168

169

/** Transform the result with an effect */

170

def mapZIO[R1 <: R, E1 >: E, Z2](f: Z => ZIO[R1, E1, Z2]): ZSink[R1, E1, In, L, Z2]

171

172

/** Transform errors */

173

def mapError[E2](f: E => E2): ZSink[R, E2, In, L, Z]

174

175

/** Recover from errors */

176

def orElse[R1 <: R, E2, In1 <: In, L1 >: L, Z1 >: Z](

177

that: => ZSink[R1, E2, In1, L1, Z1]

178

): ZSink[R1, E2, In1, L1, Z1]

179

180

/** Combine with another sink in parallel */

181

def zipPar[R1 <: R, E1 >: E, In1 <: In, L1 >: L, Z2](

182

that: => ZSink[R1, E1, In1, L1, Z2]

183

): ZSink[R1, E1, In1, L1, (Z, Z2)]

184

185

/** Execute the sink on a stream */

186

def apply[R1 <: R, E1 >: E, In1 <: In](

187

stream: ZStream[R1, E1, In1]

188

): ZIO[R1, E1, Z]

189

}

190

191

/**

192

* Common sink constructors

193

*/

194

object ZSink {

195

/** Collect all elements into a Chunk */

196

def collectAll[A]: ZSink[Any, Nothing, A, Nothing, Chunk[A]]

197

198

/** Count the number of elements */

199

val count: ZSink[Any, Nothing, Any, Nothing, Long]

200

201

/** Take the first n elements */

202

def take[A](n: => Long): ZSink[Any, Nothing, A, A, Chunk[A]]

203

204

/** Fold over elements */

205

def fold[S, A](s: => S)(f: (S, A) => S): ZSink[Any, Nothing, A, Nothing, S]

206

207

/** Execute a side effect for each element */

208

def foreach[R, E, A](f: A => ZIO[R, E, Any]): ZSink[R, E, A, A, Unit]

209

210

/** Write to an OutputStream */

211

def fromOutputStream(os: OutputStream): ZSink[Any, IOException, Byte, Nothing, Unit]

212

213

/** Write to a file */

214

def fromPath(path: Path): ZSink[Any, IOException, Byte, Byte, Unit]

215

}

216

```

217

218

**Usage Examples:**

219

220

```scala

221

// Collect stream results

222

val collected = ZStream.range(1, 100)

223

.run(ZSink.collectAll)

224

225

// Count elements

226

val elementCount = ZStream.fromIterable(someList)

227

.run(ZSink.count)

228

229

// Fold/reduce stream

230

val sum = ZStream.range(1, 101)

231

.run(ZSink.fold(0)(_ + _))

232

233

// Side effects during consumption

234

val logged = ZStream.range(1, 10)

235

.run(ZSink.foreach(n => Console.printLine(s"Processing: $n")))

236

```

237

238

### ZPipeline - Stream Transformation

239

240

Pipelines provide reusable stream transformations that can be composed and applied to multiple streams.

241

242

```scala { .api }

243

/**

244

* A ZPipeline transforms one stream into another

245

*/

246

sealed trait ZPipeline[-R, +E, -In, +Out] {

247

/** Compose with another pipeline */

248

def >>>[R1 <: R, E1 >: E, Out2](

249

that: ZPipeline[R1, E1, Out, Out2]

250

): ZPipeline[R1, E1, In, Out2]

251

252

/** Apply this pipeline to a stream */

253

def apply[R1 <: R, E1 >: E, In1 <: In](

254

stream: ZStream[R1, E1, In1]

255

): ZStream[R1, E1, Out]

256

}

257

258

/**

259

* Common pipeline constructors

260

*/

261

object ZPipeline {

262

/** Identity pipeline that passes through all elements */

263

def identity[A]: ZPipeline[Any, Nothing, A, A]

264

265

/** Map each element */

266

def map[In, Out](f: In => Out): ZPipeline[Any, Nothing, In, Out]

267

268

/** Filter elements */

269

def filter[A](f: A => Boolean): ZPipeline[Any, Nothing, A, A]

270

271

/** Take the first n elements */

272

def take[A](n: => Long): ZPipeline[Any, Nothing, A, A]

273

274

/** Drop the first n elements */

275

def drop[A](n: => Long): ZPipeline[Any, Nothing, A, A]

276

277

/** Group elements into chunks */

278

def grouped[A](n: => Int): ZPipeline[Any, Nothing, A, Chunk[A]]

279

280

/** Decode UTF-8 bytes to strings */

281

val utf8Decode: ZPipeline[Any, CharacterCodingException, Byte, String]

282

283

/** Encode strings to UTF-8 bytes */

284

val utf8Encode: ZPipeline[Any, CharacterCodingException, String, Byte]

285

286

/** Split strings by lines */

287

val splitLines: ZPipeline[Any, Nothing, String, String]

288

289

/** Compress using gzip */

290

val gzip: ZPipeline[Any, Nothing, Byte, Byte]

291

292

/** Decompress gzip */

293

val gunzip: ZPipeline[Any, IOException, Byte, Byte]

294

}

295

```

296

297

**Usage Examples:**

298

299

```scala

300

// Compose pipelines

301

val textProcessor = ZPipeline.utf8Decode >>>

302

ZPipeline.splitLines >>>

303

ZPipeline.filter(_.nonEmpty)

304

305

// Process file with pipeline

306

val processedFile = ZStream.fromPath(Paths.get("input.txt"))

307

.via(textProcessor)

308

.runForeach(Console.printLine(_))

309

310

// Reusable transformation

311

val numberProcessor = ZPipeline.map[String, Int](_.toInt)

312

.filter(_ > 0)

313

.map(_ * 2)

314

315

val processedNumbers = ZStream.fromIterable(List("1", "2", "3"))

316

.via(numberProcessor)

317

.runCollect

318

```

319

320

### Advanced Stream Operations

321

322

Sophisticated stream operations for complex data processing scenarios.

323

324

```scala { .api }

325

/**

326

* Advanced stream transformations and combinations

327

*/

328

trait ZStream[-R, +E, +A] {

329

/** Buffer elements up to a maximum size */

330

def buffer(capacity: => Int): ZStream[R, E, A]

331

332

/** Throttle stream to emit at most n elements per duration */

333

def throttleEnforce(n: => Long, duration: => Duration): ZStream[R, E, A]

334

335

/** Debounce rapid emissions */

336

def debounce(d: => Duration): ZStream[R, E, A]

337

338

/** Sliding window of elements */

339

def sliding(n: => Int): ZStream[R, E, Chunk[A]]

340

341

/** Aggregate elements with a schedule */

342

def aggregateAsync[R1 <: R, E1 >: E, B, C](

343

sink: => ZSink[R1, E1, A, A, B]

344

): ZStream[R1, E1, B]

345

346

/** Partition elements into multiple streams */

347

def partition[A1 >: A](

348

p: A1 => Boolean

349

): ZIO[R with Scope, Nothing, (ZStream[Any, E, A1], ZStream[Any, E, A1])]

350

351

/** Zip with another stream */

352

def zip[R1 <: R, E1 >: E, B](

353

that: ZStream[R1, E1, B]

354

): ZStream[R1, E1, (A, B)]

355

356

/** Interleave with another stream */

357

def interleave[R1 <: R, E1 >: E, A1 >: A](

358

that: ZStream[R1, E1, A1]

359

): ZStream[R1, E1, A1]

360

}

361

```

362

363

**Usage Examples:**

364

365

```scala

366

// Rate limiting and buffering

367

val rateLimited = dataStream

368

.buffer(1000)

369

.throttleEnforce(100, 1.second)

370

.runForeach(processData)

371

372

// Windowing operations

373

val slidingAverage = numberStream

374

.sliding(5)

375

.map(chunk => chunk.sum / chunk.size.toDouble)

376

.runCollect

377

378

// Complex stream orchestration

379

val combined = for {

380

(evenStream, oddStream) <- numberStream.partition(_ % 2 == 0)

381

evens <- evenStream.runCollect.fork

382

odds <- oddStream.runCollect.fork

383

evenResults <- evens.join

384

oddResults <- odds.join

385

} yield (evenResults, oddResults)

386

```