Functional, type-safe, composable streaming library built on ZIO's effect system for Scala
npx @tessl/cli install tessl/maven-dev-zio--zio-streams_2.13@1.0.00
# ZIO Streams
1
2
ZIO Streams is a functional streaming library that provides a powerful abstraction for handling potentially infinite sequences of data in a type-safe, composable, and resource-safe manner. Built on top of ZIO's effect system, it offers comprehensive operators for transformation, filtering, composition, and integrates seamlessly with ZIO's concurrent and asynchronous programming model.
3
4
## Package Information
5
6
- **Package Name**: zio-streams_2.13
7
- **Package Type**: Maven
8
- **Language**: Scala
9
- **Group ID**: dev.zio
10
- **Artifact ID**: zio-streams_2.13
11
- **Version**: 1.0.18
12
- **Installation**: `libraryDependencies += "dev.zio" %% "zio-streams" % "1.0.18"`
13
14
## Core Imports
15
16
```scala
17
import zio.stream._
18
import zio._
19
```
20
21
For specific components:
22
23
```scala
24
import zio.stream.{ZStream, ZSink, ZTransducer}
25
import zio.stream.{Stream, UStream, Sink, Transducer} // Type aliases
26
```
27
28
## Basic Usage
29
30
```scala
31
import zio._
32
import zio.stream._
33
34
// Create a simple stream
35
val numbers: UStream[Int] = ZStream.range(1, 10)
36
37
// Transform the stream
38
val doubled: UStream[Int] = numbers.map(_ * 2)
39
40
// Consume the stream with a sink
41
val sumSink: Sink[Nothing, Int, Nothing, Int] = ZSink.sum[Int]
42
43
// Run the stream
44
val program: UIO[Int] = doubled.run(sumSink)
45
```
46
47
## Architecture
48
49
ZIO Streams is built around several key components:
50
51
- **ZStream[R, E, O]**: Core streaming abstraction representing programs that emit 0+ values of type `O`, may fail with errors of type `E`, and require environment of type `R`
52
- **ZSink[R, E, I, L, Z]**: Consumes stream elements of type `I` and produces a result of type `Z`, with leftover elements of type `L`
53
- **ZTransducer[R, E, I, O]**: Transforms stream elements from type `I` to type `O` with effects
54
- **Resource Management**: Built on ZManaged for automatic resource cleanup and exception safety
55
- **Chunk-based Processing**: Uses ZIO's Chunk for efficient batch processing and memory management
56
- **Pull-based Model**: Inherent backpressure and lazy evaluation for memory efficiency
57
58
## Capabilities
59
60
### Core Streaming Operations
61
62
Fundamental streaming operations including creation, transformation, combination, and execution of streams. The heart of ZIO Streams functionality.
63
64
```scala { .api }
65
abstract class ZStream[-R, +E, +O](
66
val process: ZManaged[R, Nothing, ZIO[R, Option[E], Chunk[O]]]
67
)
68
69
// Core factory methods
70
object ZStream {
71
def apply[A](as: A*): UStream[A]
72
def succeed[A](a: => A): UStream[A]
73
def fail[E](error: => E): Stream[E, Nothing]
74
def fromIterable[O](as: => Iterable[O]): UStream[O]
75
def fromEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]
76
}
77
78
// Core transformation methods
79
trait ZStreamOps[R, E, O] {
80
def map[B](f: O => B): ZStream[R, E, B]
81
def flatMap[R1 <: R, E1 >: E, O2](f: O => ZStream[R1, E1, O2]): ZStream[R1, E1, O2]
82
def filter(predicate: O => Boolean): ZStream[R, E, O]
83
def take(n: Long): ZStream[R, E, O]
84
def run[R1 <: R, E1 >: E, B](sink: ZSink[R1, E1, O, Any, B]): ZIO[R1, E1, B]
85
}
86
```
87
88
[Core Streaming Operations](./zstream.md)
89
90
### Stream Consumption
91
92
Powerful sink operations for consuming streams and producing results. Includes collectors, folders, and effectful processors.
93
94
```scala { .api }
95
abstract class ZSink[-R, +E, -I, +L, +Z](
96
val push: ZManaged[R, Nothing, ZSink.Push[R, E, I, L, Z]]
97
)
98
99
// Core sink factory methods
100
object ZSink {
101
def collectAll[A]: Sink[Nothing, A, Nothing, Chunk[A]]
102
def fold[A, S](z: S)(contFn: S => Boolean)(f: (S, A) => S): Sink[Nothing, A, A, S]
103
def foreach[R, E, A](f: A => ZIO[R, E, Any]): ZSink[R, E, A, Nothing, Unit]
104
def head[A]: Sink[Nothing, A, A, Option[A]]
105
def sum[A](implicit A: Numeric[A]): Sink[Nothing, A, Nothing, A]
106
}
107
```
108
109
[Stream Consumption](./zsink.md)
110
111
### Stream Transformation
112
113
Transducers for transforming stream elements with effects, composition, and stateful processing capabilities.
114
115
```scala { .api }
116
abstract class ZTransducer[-R, +E, -I, +O](
117
val push: ZManaged[R, Nothing, Option[Chunk[I]] => ZIO[R, E, Chunk[O]]]
118
)
119
120
// Core transducer factory methods
121
object ZTransducer {
122
def identity[A]: Transducer[Nothing, A, A]
123
def filter[A](predicate: A => Boolean): Transducer[Nothing, A, A]
124
def map[A, B](f: A => B): Transducer[Nothing, A, B]
125
def fold[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]
126
}
127
```
128
129
[Stream Transformation](./ztransducer.md)
130
131
### Platform Extensions
132
133
Platform-specific functionality including file I/O, networking, compression (JVM), and async integration for different runtime environments.
134
135
```scala { .api }
136
// JVM-specific extensions
137
object ZStream {
138
def fromFile(file: => File, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
139
def fromInputStream(is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
140
def fromSocketServer(port: Int, host: String = "localhost"): ZManaged[Blocking, IOException, ZStream[Blocking, IOException, Connection]]
141
}
142
143
object ZTransducer {
144
def gzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]
145
def gunzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]
146
}
147
```
148
149
[Platform Extensions](./platform-extensions.md)
150
151
## Type Aliases
152
153
```scala { .api }
154
// Convenience type aliases
155
type Stream[+E, +A] = ZStream[Any, E, A] // Environment-less streams
156
type UStream[+A] = ZStream[Any, Nothing, A] // Infallible streams
157
type Sink[+E, A, +L, +B] = ZSink[Any, E, A, L, B] // Environment-less sinks
158
type Transducer[+E, -A, +B] = ZTransducer[Any, E, A, B] // Environment-less transducers
159
```
160
161
## Supporting Types
162
163
```scala { .api }
164
// Stream element container
165
case class Take[+E, +A](exit: Exit[Option[E], Chunk[A]]) {
166
def fold[Z](end: => Z, error: E => Z, value: Chunk[A] => Z): Z
167
def map[B](f: A => B): Take[E, B]
168
def isDone: Boolean
169
def isFailure: Boolean
170
def isSuccess: Boolean
171
}
172
173
// Reactive reference with change stream
174
final class SubscriptionRef[A](
175
val ref: RefM[A],
176
val changes: Stream[Nothing, A]
177
)
178
179
object SubscriptionRef {
180
def make[A](a: A): UIO[SubscriptionRef[A]]
181
}
182
```
183
184
## API Coverage
185
186
This Knowledge Tile documents the core public API of ZIO Streams 1.0.18. The documentation focuses on the most commonly used methods and capabilities. For additional methods and advanced functionality, consult the source code or use IDE auto-completion to explore the full API surface.