0
# Platform Extensions
1
2
Platform-specific functionality for ZIO Streams including file I/O, networking, compression (JVM), and async integration for different runtime environments.
3
4
## JVM Platform Extensions
5
6
### File I/O Operations
7
8
Stream operations for reading and writing files on the JVM platform.
9
10
```scala { .api }
11
// ZStream JVM extensions for file operations
12
object ZStream {
13
/** Read file as byte stream */
14
def fromFile(file: => File, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
15
16
/** Read classpath resource as byte stream */
17
def fromResource(name: => String, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
18
19
/** Read from InputStream */
20
def fromInputStream(is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
21
22
/** Read from InputStream created by effect */
23
def fromInputStreamEffect[R](is: ZIO[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
24
25
/** Read from managed InputStream */
26
def fromInputStreamManaged[R](is: ZManaged[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
27
28
/** Read from Reader as String stream */
29
def fromReader[R](reader: => Reader, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, String]
30
31
/** Write to OutputStreamWriter */
32
def fromOutputStreamWriter[R](writer: => OutputStreamWriter, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, String]
33
}
34
35
// ZSink JVM extensions for file operations
36
object ZSink {
37
/** Write bytes to file */
38
def fromFile(file: => File): Sink[IOException, Byte, Nothing, Unit]
39
40
/** Write bytes to OutputStream */
41
def fromOutputStream(os: => OutputStream): Sink[IOException, Byte, Nothing, Unit]
42
43
/** Write to managed OutputStream */
44
def fromOutputStreamManaged(os: ZManaged[Any, IOException, OutputStream]): Sink[IOException, Byte, Nothing, Unit]
45
46
/** Create message digest sink */
47
def digest(createDigest: => MessageDigest): Sink[IOException, Byte, Nothing, Array[Byte]]
48
}
49
```
50
51
### Network Operations
52
53
Networking capabilities for socket-based communication.
54
55
```scala { .api }
56
object ZStream {
57
/** Create server socket accepting connections */
58
def fromSocketServer(port: Int, host: String = "localhost"): ZManaged[Blocking, IOException, ZStream[Blocking, IOException, Connection]]
59
}
60
61
/** Represents a socket connection with read/write streams */
62
final class Connection(socket: Socket) {
63
/** Read bytes from connection */
64
def read(chunkSize: Int = DefaultChunkSize): ZStream[Blocking, IOException, Byte]
65
66
/** Write bytes to connection */
67
def write: ZSink[Blocking, IOException, Byte, Nothing, Unit]
68
69
/** Close the connection */
70
def close: ZIO[Any, IOException, Unit]
71
72
/** Get remote socket address */
73
def remoteAddress: ZIO[Any, IOException, SocketAddress]
74
75
/** Get local socket address */
76
def localAddress: ZIO[Any, IOException, SocketAddress]
77
}
78
```
79
80
### Compression Operations
81
82
Data compression and decompression transducers.
83
84
```scala { .api }
85
object ZTransducer {
86
/** Deflate compression transducer */
87
def deflate(bufferSize: Int = 64 * 1024, noWrap: Boolean = false): Transducer[Nothing, Byte, Byte]
88
89
/** Inflate decompression transducer */
90
def inflate(bufferSize: Int = 64 * 1024, noWrap: Boolean = false): Transducer[Nothing, Byte, Byte]
91
92
/** Gzip compression transducer */
93
def gzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]
94
95
/** Gunzip decompression transducer */
96
def gunzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]
97
}
98
```
99
100
### Compression Configuration
101
102
Configuration classes for compression operations.
103
104
```scala { .api }
105
/** Exception thrown during compression/decompression */
106
class CompressionException(message: String, cause: Throwable = null) extends IOException(message, cause)
107
108
/** Compression parameters for configuring compression behavior */
109
final case class CompressionParameters(
110
level: CompressionLevel = CompressionLevel.Default,
111
strategy: CompressionStrategy = CompressionStrategy.Default,
112
flushMode: FlushMode = FlushMode.NoFlush
113
)
114
115
/** Compression levels */
116
sealed abstract class CompressionLevel(val javaValue: Int)
117
object CompressionLevel {
118
case object NoCompression extends CompressionLevel(0)
119
case object BestSpeed extends CompressionLevel(1)
120
case object BestCompression extends CompressionLevel(9)
121
case object Default extends CompressionLevel(-1)
122
123
/** Custom compression level 0-9 */
124
final case class Level(level: Int) extends CompressionLevel(level) {
125
require(level >= 0 && level <= 9, "Compression level must be between 0 and 9")
126
}
127
}
128
129
/** Compression strategies */
130
sealed abstract class CompressionStrategy(val javaValue: Int)
131
object CompressionStrategy {
132
case object Default extends CompressionStrategy(0)
133
case object Filtered extends CompressionStrategy(1)
134
case object HuffmanOnly extends CompressionStrategy(2)
135
}
136
137
/** Flush modes for compression */
138
sealed abstract class FlushMode(val javaValue: Int)
139
object FlushMode {
140
case object NoFlush extends FlushMode(0)
141
case object SyncFlush extends FlushMode(2)
142
case object FullFlush extends FlushMode(3)
143
}
144
```
145
146
### Iterator Integration
147
148
Integration with Java and Scala iterators for blocking I/O.
149
150
```scala { .api }
151
object ZStream {
152
/** Create stream from blocking Scala iterator */
153
def fromBlockingIterator[A](iterator: => Iterator[A]): ZStream[Blocking, Throwable, A]
154
155
/** Create stream from blocking Java iterator */
156
def fromBlockingJavaIterator[A](iterator: => java.util.Iterator[A]): ZStream[Blocking, Throwable, A]
157
158
/** Create stream from Java Stream */
159
def fromJavaStream[A](stream: => java.util.stream.Stream[A]): ZStream[Blocking, Throwable, A]
160
161
/** Create stream from Java Stream created by effect */
162
def fromJavaStreamEffect[R, A](stream: ZIO[R, Throwable, java.util.stream.Stream[A]]): ZStream[R with Blocking, Throwable, A]
163
}
164
```
165
166
### Async Callback Integration
167
168
Async callback integration for the JVM platform.
169
170
```scala { .api }
171
object ZStream {
172
/** Create stream from async callback */
173
def effectAsync[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Any): ZStream[R, E, A]
174
175
/** Create stream from async callback with interrupt */
176
def effectAsyncInterrupt[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Either[Canceler[R], ZStream[R, E, A]]): ZStream[R, E, A]
177
178
/** Create stream from async callback with managed resource */
179
def effectAsyncManaged[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => ZManaged[R, E, Any]): ZStream[R, E, A]
180
}
181
182
/** Canceler for async operations */
183
type Canceler[R] = ZIO[R, Nothing, Unit]
184
```
185
186
## JavaScript Platform Extensions
187
188
### Async Integration
189
190
JavaScript-specific async integration using Futures.
191
192
```scala { .api }
193
object ZStream {
194
/** Create stream from async callback with Future */
195
def effectAsync[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Future[Boolean]): ZStream[R, E, A]
196
197
/** Create stream from async callback with interrupt and Future */
198
def effectAsyncInterrupt[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Either[Future[Boolean], ZStream[R, E, A]]): ZStream[R, E, A]
199
200
/** Create stream from async callback with managed resource and Future */
201
def effectAsyncManaged[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => ZManaged[R, E, Future[Boolean]]): ZStream[R, E, A]
202
}
203
```
204
205
### Limited I/O Operations
206
207
Basic I/O operations available on JavaScript platform.
208
209
```scala { .api }
210
object ZStream {
211
/** Read from InputStream (where available) */
212
def fromInputStream[R](is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
213
214
/** Read from InputStream created by effect */
215
def fromInputStreamEffect[R](is: ZIO[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
216
217
/** Read from managed InputStream */
218
def fromInputStreamManaged[R](is: ZManaged[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
219
}
220
```
221
222
## Scala Native Platform Extensions
223
224
### Async Integration
225
226
Native-specific async integration (similar to JavaScript).
227
228
```scala { .api }
229
object ZStream {
230
/** Create stream from async callback with Future */
231
def effectAsync[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Future[Boolean]): ZStream[R, E, A]
232
233
/** Create stream from async callback with interrupt and Future */
234
def effectAsyncInterrupt[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Either[Future[Boolean], ZStream[R, E, A]]): ZStream[R, E, A]
235
236
/** Create stream from async callback with managed resource and Future */
237
def effectAsyncManaged[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => ZManaged[R, E, Future[Boolean]]): ZStream[R, E, A]
238
}
239
```
240
241
### Limited I/O Operations
242
243
Basic I/O operations available on Scala Native platform.
244
245
```scala { .api }
246
object ZStream {
247
/** Read from InputStream (where available) */
248
def fromInputStream[R](is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
249
250
/** Read from InputStream created by effect */
251
def fromInputStreamEffect[R](is: ZIO[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
252
253
/** Read from managed InputStream */
254
def fromInputStreamManaged[R](is: ZManaged[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
255
}
256
```
257
258
## Platform Differences
259
260
### Feature Availability Matrix
261
262
| Feature | JVM | JavaScript | Scala Native |
263
|---------|-----|------------|--------------|
264
| File I/O | ✅ Full | ❌ Not Available | ❌ Not Available |
265
| Network I/O | ✅ Full | ❌ Not Available | ❌ Not Available |
266
| Compression | ✅ Full | ❌ Not Available | ❌ Not Available |
267
| Async Callbacks | ✅ Unit Return | ✅ Future[Boolean] | ✅ Future[Boolean] |
268
| Basic InputStream | ✅ Full | ⚠️ Limited | ⚠️ Limited |
269
| Iterator Support | ✅ Full | ❌ Not Available | ❌ Not Available |
270
271
### Platform-Specific Imports
272
273
```scala { .api }
274
// JVM-specific imports
275
import zio.stream.platform._ // All JVM extensions
276
import java.io._ // File I/O classes
277
import java.net._ // Networking classes
278
import java.security.MessageDigest // Cryptographic digests
279
280
// JavaScript/Native-specific imports
281
import scala.concurrent.Future // Future for async callbacks
282
import scala.scalajs.js // (JS only) JavaScript interop
283
```
284
285
**Usage Examples:**
286
287
```scala
288
import zio._
289
import zio.stream._
290
import java.io._
291
292
// JVM: File operations
293
val readFile: ZStream[Any, IOException, Byte] =
294
ZStream.fromFile(new File("data.txt"))
295
296
val writeFile: ZIO[Any, IOException, Unit] =
297
ZStream.fromIterable("Hello World".getBytes)
298
.run(ZSink.fromFile(new File("output.txt")))
299
300
// JVM: Compression
301
val compressed: ZStream[Any, Nothing, Byte] =
302
ZStream.fromIterable("Hello World".getBytes)
303
.transduce(ZTransducer.gzip())
304
305
val decompressed: ZStream[Any, Nothing, Byte] =
306
compressed.transduce(ZTransducer.gunzip())
307
308
// JVM: Network server
309
val server: ZManaged[Blocking, IOException, ZStream[Blocking, IOException, Connection]] =
310
ZStream.fromSocketServer(8080, "localhost")
311
312
// All platforms: Async callbacks
313
val asyncStream: ZStream[Any, Nothing, Int] = ZStream.effectAsync { emit =>
314
// JVM: returns Unit
315
// JS/Native: returns Future[Boolean]
316
scheduleCallback(() => emit(ZIO.succeed(42)))
317
}
318
319
// Platform-specific error handling
320
val platformSafeRead = readFile.catchAll {
321
case _: FileNotFoundException => ZStream.empty
322
case ex: IOException => ZStream.fail(s"IO Error: ${ex.getMessage}")
323
}
324
```