Platform-specific functionality for ZIO Streams including file I/O, networking, compression (JVM), and async integration for different runtime environments.
Stream operations for reading and writing files on the JVM platform.
// ZStream JVM extensions for file operations
object ZStream {
/** Read file as byte stream */
def fromFile(file: => File, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
/** Read classpath resource as byte stream */
def fromResource(name: => String, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
/** Read from InputStream */
def fromInputStream(is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
/** Read from InputStream created by effect */
def fromInputStreamEffect[R](is: ZIO[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
/** Read from managed InputStream */
def fromInputStreamManaged[R](is: ZManaged[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
/** Read from Reader as String stream */
def fromReader[R](reader: => Reader, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, String]
/** Write to OutputStreamWriter */
def fromOutputStreamWriter[R](writer: => OutputStreamWriter, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, String]
}
// ZSink JVM extensions for file operations
object ZSink {
/** Write bytes to file */
def fromFile(file: => File): Sink[IOException, Byte, Nothing, Unit]
/** Write bytes to OutputStream */
def fromOutputStream(os: => OutputStream): Sink[IOException, Byte, Nothing, Unit]
/** Write to managed OutputStream */
def fromOutputStreamManaged(os: ZManaged[Any, IOException, OutputStream]): Sink[IOException, Byte, Nothing, Unit]
/** Create message digest sink */
def digest(createDigest: => MessageDigest): Sink[IOException, Byte, Nothing, Array[Byte]]
}Networking capabilities for socket-based communication.
object ZStream {
/** Create server socket accepting connections */
def fromSocketServer(port: Int, host: String = "localhost"): ZManaged[Blocking, IOException, ZStream[Blocking, IOException, Connection]]
}
/** Represents a socket connection with read/write streams */
final class Connection(socket: Socket) {
/** Read bytes from connection */
def read(chunkSize: Int = DefaultChunkSize): ZStream[Blocking, IOException, Byte]
/** Write bytes to connection */
def write: ZSink[Blocking, IOException, Byte, Nothing, Unit]
/** Close the connection */
def close: ZIO[Any, IOException, Unit]
/** Get remote socket address */
def remoteAddress: ZIO[Any, IOException, SocketAddress]
/** Get local socket address */
def localAddress: ZIO[Any, IOException, SocketAddress]
}Data compression and decompression transducers.
object ZTransducer {
/** Deflate compression transducer */
def deflate(bufferSize: Int = 64 * 1024, noWrap: Boolean = false): Transducer[Nothing, Byte, Byte]
/** Inflate decompression transducer */
def inflate(bufferSize: Int = 64 * 1024, noWrap: Boolean = false): Transducer[Nothing, Byte, Byte]
/** Gzip compression transducer */
def gzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]
/** Gunzip decompression transducer */
def gunzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]
}Configuration classes for compression operations.
/** Exception thrown during compression/decompression */
class CompressionException(message: String, cause: Throwable = null) extends IOException(message, cause)
/** Compression parameters for configuring compression behavior */
final case class CompressionParameters(
level: CompressionLevel = CompressionLevel.Default,
strategy: CompressionStrategy = CompressionStrategy.Default,
flushMode: FlushMode = FlushMode.NoFlush
)
/** Compression levels */
sealed abstract class CompressionLevel(val javaValue: Int)
object CompressionLevel {
case object NoCompression extends CompressionLevel(0)
case object BestSpeed extends CompressionLevel(1)
case object BestCompression extends CompressionLevel(9)
case object Default extends CompressionLevel(-1)
/** Custom compression level 0-9 */
final case class Level(level: Int) extends CompressionLevel(level) {
require(level >= 0 && level <= 9, "Compression level must be between 0 and 9")
}
}
/** Compression strategies */
sealed abstract class CompressionStrategy(val javaValue: Int)
object CompressionStrategy {
case object Default extends CompressionStrategy(0)
case object Filtered extends CompressionStrategy(1)
case object HuffmanOnly extends CompressionStrategy(2)
}
/** Flush modes for compression */
sealed abstract class FlushMode(val javaValue: Int)
object FlushMode {
case object NoFlush extends FlushMode(0)
case object SyncFlush extends FlushMode(2)
case object FullFlush extends FlushMode(3)
}Integration with Java and Scala iterators for blocking I/O.
object ZStream {
/** Create stream from blocking Scala iterator */
def fromBlockingIterator[A](iterator: => Iterator[A]): ZStream[Blocking, Throwable, A]
/** Create stream from blocking Java iterator */
def fromBlockingJavaIterator[A](iterator: => java.util.Iterator[A]): ZStream[Blocking, Throwable, A]
/** Create stream from Java Stream */
def fromJavaStream[A](stream: => java.util.stream.Stream[A]): ZStream[Blocking, Throwable, A]
/** Create stream from Java Stream created by effect */
def fromJavaStreamEffect[R, A](stream: ZIO[R, Throwable, java.util.stream.Stream[A]]): ZStream[R with Blocking, Throwable, A]
}Async callback integration for the JVM platform.
object ZStream {
/** Create stream from async callback */
def effectAsync[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Any): ZStream[R, E, A]
/** Create stream from async callback with interrupt */
def effectAsyncInterrupt[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Either[Canceler[R], ZStream[R, E, A]]): ZStream[R, E, A]
/** Create stream from async callback with managed resource */
def effectAsyncManaged[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => ZManaged[R, E, Any]): ZStream[R, E, A]
}
/** Canceler for async operations */
type Canceler[R] = ZIO[R, Nothing, Unit]JavaScript-specific async integration using Futures.
object ZStream {
/** Create stream from async callback with Future */
def effectAsync[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Future[Boolean]): ZStream[R, E, A]
/** Create stream from async callback with interrupt and Future */
def effectAsyncInterrupt[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Either[Future[Boolean], ZStream[R, E, A]]): ZStream[R, E, A]
/** Create stream from async callback with managed resource and Future */
def effectAsyncManaged[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => ZManaged[R, E, Future[Boolean]]): ZStream[R, E, A]
}Basic I/O operations available on JavaScript platform.
object ZStream {
/** Read from InputStream (where available) */
def fromInputStream[R](is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
/** Read from InputStream created by effect */
def fromInputStreamEffect[R](is: ZIO[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
/** Read from managed InputStream */
def fromInputStreamManaged[R](is: ZManaged[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
}Native-specific async integration (similar to JavaScript).
object ZStream {
/** Create stream from async callback with Future */
def effectAsync[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Future[Boolean]): ZStream[R, E, A]
/** Create stream from async callback with interrupt and Future */
def effectAsyncInterrupt[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Either[Future[Boolean], ZStream[R, E, A]]): ZStream[R, E, A]
/** Create stream from async callback with managed resource and Future */
def effectAsyncManaged[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => ZManaged[R, E, Future[Boolean]]): ZStream[R, E, A]
}Basic I/O operations available on Scala Native platform.
object ZStream {
/** Read from InputStream (where available) */
def fromInputStream[R](is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
/** Read from InputStream created by effect */
def fromInputStreamEffect[R](is: ZIO[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
/** Read from managed InputStream */
def fromInputStreamManaged[R](is: ZManaged[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
}| Feature | JVM | JavaScript | Scala Native |
|---|---|---|---|
| File I/O | ✅ Full | ❌ Not Available | ❌ Not Available |
| Network I/O | ✅ Full | ❌ Not Available | ❌ Not Available |
| Compression | ✅ Full | ❌ Not Available | ❌ Not Available |
| Async Callbacks | ✅ Unit Return | ✅ Future[Boolean] | ✅ Future[Boolean] |
| Basic InputStream | ✅ Full | ⚠️ Limited | ⚠️ Limited |
| Iterator Support | ✅ Full | ❌ Not Available | ❌ Not Available |
// JVM-specific imports
import zio.stream.platform._ // All JVM extensions
import java.io._ // File I/O classes
import java.net._ // Networking classes
import java.security.MessageDigest // Cryptographic digests
// JavaScript/Native-specific imports
import scala.concurrent.Future // Future for async callbacks
import scala.scalajs.js // (JS only) JavaScript interopUsage Examples:
import zio._
import zio.stream._
import java.io._
// JVM: File operations
val readFile: ZStream[Any, IOException, Byte] =
ZStream.fromFile(new File("data.txt"))
val writeFile: ZIO[Any, IOException, Unit] =
ZStream.fromIterable("Hello World".getBytes)
.run(ZSink.fromFile(new File("output.txt")))
// JVM: Compression
val compressed: ZStream[Any, Nothing, Byte] =
ZStream.fromIterable("Hello World".getBytes)
.transduce(ZTransducer.gzip())
val decompressed: ZStream[Any, Nothing, Byte] =
compressed.transduce(ZTransducer.gunzip())
// JVM: Network server
val server: ZManaged[Blocking, IOException, ZStream[Blocking, IOException, Connection]] =
ZStream.fromSocketServer(8080, "localhost")
// All platforms: Async callbacks
val asyncStream: ZStream[Any, Nothing, Int] = ZStream.effectAsync { emit =>
// JVM: returns Unit
// JS/Native: returns Future[Boolean]
scheduleCallback(() => emit(ZIO.succeed(42)))
}
// Platform-specific error handling
val platformSafeRead = readFile.catchAll {
case _: FileNotFoundException => ZStream.empty
case ex: IOException => ZStream.fail(s"IO Error: ${ex.getMessage}")
}