or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdplatform-extensions.mdzsink.mdzstream.mdztransducer.md

zstream.mddocs/

0

# Core Streaming Operations

1

2

Comprehensive streaming operations for creating, transforming, combining, and executing streams. ZStream provides the foundational streaming abstraction in ZIO Streams.

3

4

## Capabilities

5

6

### Stream Creation

7

8

Factory methods for creating streams from various sources.

9

10

```scala { .api }

11

object ZStream {

12

/** Create stream from varargs values */

13

def apply[A](as: A*): UStream[A]

14

15

/** Create single-value stream */

16

def succeed[A](a: => A): UStream[A]

17

18

/** Create failed stream */

19

def fail[E](error: => E): Stream[E, Nothing]

20

21

/** Empty stream */

22

def empty: UStream[Nothing]

23

24

/** Never-ending stream */

25

def never: UStream[Nothing]

26

27

/** Stream from chunk */

28

def fromChunk[O](c: => Chunk[O]): UStream[O]

29

30

/** Stream from iterable */

31

def fromIterable[O](as: => Iterable[O]): UStream[O]

32

33

/** Stream from iterator */

34

def fromIterator[A](iterator: => Iterator[A]): UStream[A]

35

36

/** Stream from Java iterator */

37

def fromJavaIterator[A](iterator: => java.util.Iterator[A]): UStream[A]

38

39

/** Integer range stream */

40

def range(min: Int, max: Int, chunkSize: Int = DefaultChunkSize): UStream[Int]

41

}

42

```

43

44

### Effect-Based Creation

45

46

Create streams from ZIO effects and managed resources.

47

48

```scala { .api }

49

object ZStream {

50

/** Single effect as stream */

51

def fromEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]

52

53

/** Repeat effect as stream */

54

def repeatEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]

55

56

/** Unwrap effect containing stream */

57

def unwrap[R, E, A](fa: ZIO[R, E, ZStream[R, E, A]]): ZStream[R, E, A]

58

59

/** Stream from managed resource */

60

def managed[R, E, A](managed: ZManaged[R, E, A]): ZStream[R, E, A]

61

62

/** Stream from schedule */

63

def fromSchedule[R, A](schedule: Schedule[R, Any, A]): ZStream[R with Clock, Never, A]

64

65

/** Periodic ticks */

66

def tick(interval: Duration): ZStream[Clock, Never, Unit]

67

}

68

```

69

70

### Generators and Iteration

71

72

Functional generators for creating streams.

73

74

```scala { .api }

75

object ZStream {

76

/** Iterate function over seed value */

77

def iterate[A](a: A)(f: A => A): UStream[A]

78

79

/** Unfold state function */

80

def unfold[S, A](s: S)(f: S => Option[(A, S)]): UStream[A]

81

82

/** Effectful unfold */

83

def unfoldM[R, E, S, A](s: S)(f: S => ZIO[R, E, Option[(A, S)]]): ZStream[R, E, A]

84

85

/** Paginate values */

86

def paginate[A](a: A)(f: A => (A, Option[A])): UStream[A]

87

88

/** Paginate with effects */

89

def paginateM[R, E, A](a: A)(f: A => ZIO[R, E, (A, Option[A])]): ZStream[R, E, A]

90

}

91

```

92

93

### Core Transformations

94

95

Essential stream transformation operations.

96

97

```scala { .api }

98

trait ZStreamOps[-R, +E, +O] {

99

/** Transform each element */

100

def map[B](f: O => B): ZStream[R, E, B]

101

102

/** Effectful element transformation */

103

def mapM[R1 <: R, E1 >: E, B](f: O => ZIO[R1, E1, B]): ZStream[R1, E1, B]

104

105

/** Transform chunks */

106

def mapChunks[O2](f: Chunk[O] => Chunk[O2]): ZStream[R, E, O2]

107

108

/** Monadic bind */

109

def flatMap[R1 <: R, E1 >: E, O2](f: O => ZStream[R1, E1, O2]): ZStream[R1, E1, O2]

110

111

/** Collect with partial function */

112

def collect[B](pf: PartialFunction[O, B]): ZStream[R, E, B]

113

114

/** Effectful collect */

115

def collectM[R1 <: R, E1 >: E, B](pf: PartialFunction[O, ZIO[R1, E1, B]]): ZStream[R1, E1, B]

116

117

/** Filter elements */

118

def filter(predicate: O => Boolean): ZStream[R, E, O]

119

120

/** Effectful filter */

121

def filterM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]

122

}

123

```

124

125

### Stateful Operations

126

127

Operations that maintain state across stream elements.

128

129

```scala { .api }

130

trait ZStreamOps[-R, +E, +O] {

131

/** Stateful scanning */

132

def scan[S](s: S)(f: (S, O) => S): ZStream[R, E, S]

133

134

/** Effectful stateful scanning */

135

def scanM[R1 <: R, E1 >: E, S](s: S)(f: (S, O) => ZIO[R1, E1, S]): ZStream[R1, E1, S]

136

137

/** Scan with early termination */

138

def scanReduce[O1 >: O](f: (O1, O1) => O1): ZStream[R, E, O1]

139

140

/** Effectful scan with early termination */

141

def scanReduceM[R1 <: R, E1 >: E, O1 >: O](f: (O1, O1) => ZIO[R1, E1, O1]): ZStream[R1, E1, O1]

142

143

/** Accumulate elements */

144

def mapAccum[S, B](s: S)(f: (S, O) => (S, B)): ZStream[R, E, B]

145

146

/** Effectful accumulation */

147

def mapAccumM[R1 <: R, E1 >: E, S, B](s: S)(f: (S, O) => ZIO[R1, E1, (S, B)]): ZStream[R1, E1, B]

148

}

149

```

150

151

### Stream Combination

152

153

Combine multiple streams in various ways.

154

155

```scala { .api }

156

trait ZStreamOps[-R, +E, +O] {

157

/** Concatenate streams sequentially */

158

def ++[R1 <: R, E1 >: E, O1 >: O](that: => ZStream[R1, E1, O1]): ZStream[R1, E1, O1]

159

160

/** Zip with another stream */

161

def zip[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, (O, O2)]

162

163

/** Zip with function */

164

def zipWith[R1 <: R, E1 >: E, O2, C](that: ZStream[R1, E1, O2])(f: (O, O2) => C): ZStream[R1, E1, C]

165

166

/** Zip keeping left */

167

def zipLeft[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, O]

168

169

/** Zip keeping right */

170

def zipRight[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, O2]

171

172

/** Zip with index */

173

def zipWithIndex: ZStream[R, E, (O, Long)]

174

175

/** Merge streams concurrently */

176

def merge[R1 <: R, E1 >: E, O1 >: O](that: ZStream[R1, E1, O1]): ZStream[R1, E1, O1]

177

178

/** Interleave elements */

179

def interleave[R1 <: R, E1 >: E, O1 >: O](that: ZStream[R1, E1, O1]): ZStream[R1, E1, O1]

180

181

/** Cross product */

182

def cross[R1 <: R, E1 >: E, O2](that: ZStream[R1, E1, O2]): ZStream[R1, E1, (O, O2)]

183

}

184

```

185

186

### Partitioning and Slicing

187

188

Operations for taking subsets of stream elements.

189

190

```scala { .api }

191

trait ZStreamOps[-R, +E, +O] {

192

/** Take first n elements */

193

def take(n: Long): ZStream[R, E, O]

194

195

/** Take while predicate holds */

196

def takeWhile(predicate: O => Boolean): ZStream[R, E, O]

197

198

/** Effectful take while */

199

def takeWhileM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]

200

201

/** Drop first n elements */

202

def drop(n: Long): ZStream[R, E, O]

203

204

/** Drop while predicate holds */

205

def dropWhile(predicate: O => Boolean): ZStream[R, E, O]

206

207

/** Effectful drop while */

208

def dropWhileM[R1 <: R, E1 >: E](predicate: O => ZIO[R1, E1, Boolean]): ZStream[R1, E1, O]

209

210

/** Group into chunks */

211

def grouped(chunkSize: Int): ZStream[R, E, Chunk[O]]

212

213

/** Group by key */

214

def groupByKey[K](f: O => K): ZStream[R, E, (K, Chunk[O])]

215

216

/** Group adjacent by key */

217

def groupAdjacentBy[K](f: O => K): ZStream[R, E, (K, NonEmptyChunk[O])]

218

}

219

```

220

221

### Error Handling

222

223

Comprehensive error handling capabilities.

224

225

```scala { .api }

226

trait ZStreamOps[-R, +E, +O] {

227

/** Handle all errors */

228

def catchAll[R1 <: R, E2, O1 >: O](h: E => ZStream[R1, E2, O1]): ZStream[R1, E2, O1]

229

230

/** Handle some errors */

231

def catchSome[R1 <: R, E1 >: E, O1 >: O](pf: PartialFunction[E, ZStream[R1, E1, O1]]): ZStream[R1, E1, O1]

232

233

/** Fallback stream */

234

def orElse[R1 <: R, E2, O1 >: O](that: => ZStream[R1, E2, O1]): ZStream[R1, E2, O1]

235

236

/** Transform errors */

237

def mapError[E2](f: E => E2): ZStream[R, E2, O]

238

239

/** Effectful error transformation */

240

def mapErrorM[R1 <: R, E2](f: E => ZIO[R1, Nothing, E2]): ZStream[R1, E2, O]

241

242

/** Retry with schedule */

243

def retry[R1 <: R](schedule: Schedule[R1, E, Any]): ZStream[R1, E, O]

244

245

/** Ignore errors */

246

def ignore: ZStream[R, Nothing, O]

247

248

/** Convert to option on error */

249

def option: ZStream[R, Nothing, Option[O]]

250

251

/** Use default value on error */

252

def orElse[O1 >: O](default: O1): ZStream[R, Nothing, O1]

253

}

254

```

255

256

### Timing and Scheduling

257

258

Time-based stream operations.

259

260

```scala { .api }

261

trait ZStreamOps[-R, +E, +O] {

262

/** Rate limiting with backpressure */

263

def throttleEnforce(units: Long, duration: Duration): ZStream[R with Clock, E, O]

264

265

/** Traffic shaping */

266

def throttleShape(units: Long, duration: Duration): ZStream[R with Clock, E, O]

267

268

/** Debounce elements */

269

def debounce(duration: Duration): ZStream[R with Clock, E, O]

270

271

/** Timeout stream */

272

def timeout(duration: Duration): ZStream[R with Clock, E, O]

273

274

/** Delay elements */

275

def delay(duration: Duration): ZStream[R with Clock, E, O]

276

277

/** Schedule elements */

278

def schedule[R1 <: R](schedule: Schedule[R1, O, Any]): ZStream[R1 with Clock, E, O]

279

280

/** Repeat on schedule */

281

def repeat[R1 <: R](schedule: Schedule[R1, O, Any]): ZStream[R1 with Clock, E, O]

282

283

/** Time execution */

284

def timed: ZStream[R with Clock, E, (Duration, O)]

285

}

286

```

287

288

### Buffering

289

290

Stream buffering strategies for performance optimization.

291

292

```scala { .api }

293

trait ZStreamOps[-R, +E, +O] {

294

/** Buffer with backpressure */

295

def buffer(capacity: Int): ZStream[R, E, O]

296

297

/** Dropping buffer (drop oldest when full) */

298

def bufferDropping(capacity: Int): ZStream[R, E, O]

299

300

/** Sliding window buffer */

301

def bufferSliding(capacity: Int): ZStream[R, E, O]

302

303

/** Unbounded buffer */

304

def bufferUnbounded: ZStream[R, E, O]

305

306

/** Buffer chunks */

307

def bufferChunks(capacity: Int): ZStream[R, E, O]

308

}

309

```

310

311

### Stream Execution

312

313

Terminal operations for consuming streams.

314

315

```scala { .api }

316

trait ZStreamOps[-R, +E, +O] {

317

/** Run stream with sink */

318

def run[R1 <: R, E1 >: E, B](sink: ZSink[R1, E1, O, Any, B]): ZIO[R1, E1, B]

319

320

/** Collect all elements */

321

def runCollect: ZIO[R, E, List[O]]

322

323

/** Get first element */

324

def runHead: ZIO[R, E, Option[O]]

325

326

/** Get last element */

327

def runLast: ZIO[R, E, Option[O]]

328

329

/** Run and discard results */

330

def runDrain: ZIO[R, E, Unit]

331

332

/** Count elements */

333

def runCount: ZIO[R, E, Long]

334

335

/** Sum numeric elements */

336

def runSum[O1 >: O](implicit ev: Numeric[O1]): ZIO[R, E, O1]

337

338

/** Apply effect to each element */

339

def foreach[R1 <: R, E1 >: E](f: O => ZIO[R1, E1, Any]): ZIO[R1, E1, Unit]

340

341

/** Apply effect while predicate holds */

342

def foreachWhile[R1 <: R, E1 >: E](f: O => ZIO[R1, E1, Boolean]): ZIO[R1, E1, Unit]

343

}

344

```

345

346

### Resource Management

347

348

Safe resource handling and cleanup.

349

350

```scala { .api }

351

trait ZStreamOps[-R, +E, +O] {

352

/** Ensure finalizer runs */

353

def ensuring[R1 <: R](finalizer: ZIO[R1, Nothing, Any]): ZStream[R1, E, O]

354

355

/** Bracket resource acquisition/release */

356

def bracket[R1 <: R, A](acquire: ZIO[R1, E, A])(release: A => ZIO[R1, Nothing, Any]): ZStream[R1, E, A]

357

358

/** Provide environment layer */

359

def provideLayer[R0, R1](layer: ZLayer[R0, E, R1])(implicit ev: R1 <:< R): ZStream[R0, E, O]

360

361

/** Provide environment */

362

def provide[R1](env: R1)(implicit ev: R1 <:< R): ZStream[Any, E, O]

363

364

/** Access environment */

365

def access[R1 <: R, B](f: R1 => B): ZStream[R1, E, B]

366

367

/** Timed execution with duration */

368

def timed: ZStream[R with Clock, E, (Duration, O)]

369

370

/** Summarized execution */

371

def summarized[R1 <: R, E1 >: E, B, C](summary: ZIO[R1, E1, B])(f: (B, B) => C): ZStream[R1, E1, (C, O)]

372

}

373

```

374

375

**Usage Examples:**

376

377

```scala

378

import zio._

379

import zio.stream._

380

import zio.duration._

381

382

// Create and transform streams

383

val numbers = ZStream.range(1, 100)

384

val evens = numbers.filter(_ % 2 == 0)

385

val doubled = evens.map(_ * 2)

386

387

// Combine streams

388

val stream1 = ZStream(1, 2, 3)

389

val stream2 = ZStream(4, 5, 6)

390

val combined = stream1 ++ stream2

391

392

// Error handling

393

val safeStream = ZStream.fail("error").catchAll(_ => ZStream.succeed(0))

394

395

// Timing operations

396

val throttled = numbers.throttleShape(10, 1.second)

397

val debounced = numbers.debounce(100.millis)

398

399

// Resource management

400

val managed = ZStream.managed(ZManaged.make(openFile)(closeFile))

401

```