or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-dev-zio--zio-streams_2.13

Functional, type-safe, composable streaming library built on ZIO's effect system for Scala

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/dev.zio/zio-streams_2.13@1.0.x

To install, run

npx @tessl/cli install tessl/maven-dev-zio--zio-streams_2.13@1.0.0

0

# ZIO Streams

1

2

ZIO Streams is a functional streaming library that provides a powerful abstraction for handling potentially infinite sequences of data in a type-safe, composable, and resource-safe manner. Built on top of ZIO's effect system, it offers comprehensive operators for transformation, filtering, composition, and integrates seamlessly with ZIO's concurrent and asynchronous programming model.

3

4

## Package Information

5

6

- **Package Name**: zio-streams_2.13

7

- **Package Type**: Maven

8

- **Language**: Scala

9

- **Group ID**: dev.zio

10

- **Artifact ID**: zio-streams_2.13

11

- **Version**: 1.0.18

12

- **Installation**: `libraryDependencies += "dev.zio" %% "zio-streams" % "1.0.18"`

13

14

## Core Imports

15

16

```scala

17

import zio.stream._

18

import zio._

19

```

20

21

For specific components:

22

23

```scala

24

import zio.stream.{ZStream, ZSink, ZTransducer}

25

import zio.stream.{Stream, UStream, Sink, Transducer} // Type aliases

26

```

27

28

## Basic Usage

29

30

```scala

31

import zio._

32

import zio.stream._

33

34

// Create a simple stream

35

val numbers: UStream[Int] = ZStream.range(1, 10)

36

37

// Transform the stream

38

val doubled: UStream[Int] = numbers.map(_ * 2)

39

40

// Consume the stream with a sink

41

val sumSink: Sink[Nothing, Int, Nothing, Int] = ZSink.sum[Int]

42

43

// Run the stream

44

val program: UIO[Int] = doubled.run(sumSink)

45

```

46

47

## Architecture

48

49

ZIO Streams is built around several key components:

50

51

- **ZStream[R, E, O]**: Core streaming abstraction representing programs that emit 0+ values of type `O`, may fail with errors of type `E`, and require environment of type `R`

52

- **ZSink[R, E, I, L, Z]**: Consumes stream elements of type `I` and produces a result of type `Z`, with leftover elements of type `L`

53

- **ZTransducer[R, E, I, O]**: Transforms stream elements from type `I` to type `O` with effects

54

- **Resource Management**: Built on ZManaged for automatic resource cleanup and exception safety

55

- **Chunk-based Processing**: Uses ZIO's Chunk for efficient batch processing and memory management

56

- **Pull-based Model**: Inherent backpressure and lazy evaluation for memory efficiency

57

58

## Capabilities

59

60

### Core Streaming Operations

61

62

Fundamental streaming operations including creation, transformation, combination, and execution of streams. The heart of ZIO Streams functionality.

63

64

```scala { .api }

65

abstract class ZStream[-R, +E, +O](

66

val process: ZManaged[R, Nothing, ZIO[R, Option[E], Chunk[O]]]

67

)

68

69

// Core factory methods

70

object ZStream {

71

def apply[A](as: A*): UStream[A]

72

def succeed[A](a: => A): UStream[A]

73

def fail[E](error: => E): Stream[E, Nothing]

74

def fromIterable[O](as: => Iterable[O]): UStream[O]

75

def fromEffect[R, E, A](fa: ZIO[R, E, A]): ZStream[R, E, A]

76

}

77

78

// Core transformation methods

79

trait ZStreamOps[R, E, O] {

80

def map[B](f: O => B): ZStream[R, E, B]

81

def flatMap[R1 <: R, E1 >: E, O2](f: O => ZStream[R1, E1, O2]): ZStream[R1, E1, O2]

82

def filter(predicate: O => Boolean): ZStream[R, E, O]

83

def take(n: Long): ZStream[R, E, O]

84

def run[R1 <: R, E1 >: E, B](sink: ZSink[R1, E1, O, Any, B]): ZIO[R1, E1, B]

85

}

86

```

87

88

[Core Streaming Operations](./zstream.md)

89

90

### Stream Consumption

91

92

Powerful sink operations for consuming streams and producing results. Includes collectors, folders, and effectful processors.

93

94

```scala { .api }

95

abstract class ZSink[-R, +E, -I, +L, +Z](

96

val push: ZManaged[R, Nothing, ZSink.Push[R, E, I, L, Z]]

97

)

98

99

// Core sink factory methods

100

object ZSink {

101

def collectAll[A]: Sink[Nothing, A, Nothing, Chunk[A]]

102

def fold[A, S](z: S)(contFn: S => Boolean)(f: (S, A) => S): Sink[Nothing, A, A, S]

103

def foreach[R, E, A](f: A => ZIO[R, E, Any]): ZSink[R, E, A, Nothing, Unit]

104

def head[A]: Sink[Nothing, A, A, Option[A]]

105

def sum[A](implicit A: Numeric[A]): Sink[Nothing, A, Nothing, A]

106

}

107

```

108

109

[Stream Consumption](./zsink.md)

110

111

### Stream Transformation

112

113

Transducers for transforming stream elements with effects, composition, and stateful processing capabilities.

114

115

```scala { .api }

116

abstract class ZTransducer[-R, +E, -I, +O](

117

val push: ZManaged[R, Nothing, Option[Chunk[I]] => ZIO[R, E, Chunk[O]]]

118

)

119

120

// Core transducer factory methods

121

object ZTransducer {

122

def identity[A]: Transducer[Nothing, A, A]

123

def filter[A](predicate: A => Boolean): Transducer[Nothing, A, A]

124

def map[A, B](f: A => B): Transducer[Nothing, A, B]

125

def fold[A, S](z: S)(f: (S, A) => S): Transducer[Nothing, A, S]

126

}

127

```

128

129

[Stream Transformation](./ztransducer.md)

130

131

### Platform Extensions

132

133

Platform-specific functionality including file I/O, networking, compression (JVM), and async integration for different runtime environments.

134

135

```scala { .api }

136

// JVM-specific extensions

137

object ZStream {

138

def fromFile(file: => File, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]

139

def fromInputStream(is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]

140

def fromSocketServer(port: Int, host: String = "localhost"): ZManaged[Blocking, IOException, ZStream[Blocking, IOException, Connection]]

141

}

142

143

object ZTransducer {

144

def gzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]

145

def gunzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]

146

}

147

```

148

149

[Platform Extensions](./platform-extensions.md)

150

151

## Type Aliases

152

153

```scala { .api }

154

// Convenience type aliases

155

type Stream[+E, +A] = ZStream[Any, E, A] // Environment-less streams

156

type UStream[+A] = ZStream[Any, Nothing, A] // Infallible streams

157

type Sink[+E, A, +L, +B] = ZSink[Any, E, A, L, B] // Environment-less sinks

158

type Transducer[+E, -A, +B] = ZTransducer[Any, E, A, B] // Environment-less transducers

159

```

160

161

## Supporting Types

162

163

```scala { .api }

164

// Stream element container

165

case class Take[+E, +A](exit: Exit[Option[E], Chunk[A]]) {

166

def fold[Z](end: => Z, error: E => Z, value: Chunk[A] => Z): Z

167

def map[B](f: A => B): Take[E, B]

168

def isDone: Boolean

169

def isFailure: Boolean

170

def isSuccess: Boolean

171

}

172

173

// Reactive reference with change stream

174

final class SubscriptionRef[A](

175

val ref: RefM[A],

176

val changes: Stream[Nothing, A]

177

)

178

179

object SubscriptionRef {

180

def make[A](a: A): UIO[SubscriptionRef[A]]

181

}

182

```

183

184

## API Coverage

185

186

This Knowledge Tile documents the core public API of ZIO Streams 1.0.18. The documentation focuses on the most commonly used methods and capabilities. For additional methods and advanced functionality, consult the source code or use IDE auto-completion to explore the full API surface.