or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.mdplatform-extensions.mdzsink.mdzstream.mdztransducer.md
tile.json

platform-extensions.mddocs/

Platform Extensions

Platform-specific functionality for ZIO Streams including file I/O, networking, compression (JVM), and async integration for different runtime environments.

JVM Platform Extensions

File I/O Operations

Stream operations for reading and writing files on the JVM platform.

// ZStream JVM extensions for file operations
object ZStream {
  /** Read file as byte stream */
  def fromFile(file: => File, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
  
  /** Read classpath resource as byte stream */
  def fromResource(name: => String, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
  
  /** Read from InputStream */
  def fromInputStream(is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]
  
  /** Read from InputStream created by effect */
  def fromInputStreamEffect[R](is: ZIO[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
  
  /** Read from managed InputStream */  
  def fromInputStreamManaged[R](is: ZManaged[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
  
  /** Read from Reader as String stream */
  def fromReader[R](reader: => Reader, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, String]
  
  /** Write to OutputStreamWriter */
  def fromOutputStreamWriter[R](writer: => OutputStreamWriter, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, String]
}

// ZSink JVM extensions for file operations  
object ZSink {
  /** Write bytes to file */
  def fromFile(file: => File): Sink[IOException, Byte, Nothing, Unit]
  
  /** Write bytes to OutputStream */
  def fromOutputStream(os: => OutputStream): Sink[IOException, Byte, Nothing, Unit]
  
  /** Write to managed OutputStream */
  def fromOutputStreamManaged(os: ZManaged[Any, IOException, OutputStream]): Sink[IOException, Byte, Nothing, Unit]
  
  /** Create message digest sink */
  def digest(createDigest: => MessageDigest): Sink[IOException, Byte, Nothing, Array[Byte]]
}

Network Operations

Networking capabilities for socket-based communication.

object ZStream {
  /** Create server socket accepting connections */
  def fromSocketServer(port: Int, host: String = "localhost"): ZManaged[Blocking, IOException, ZStream[Blocking, IOException, Connection]]
}

/** Represents a socket connection with read/write streams */
final class Connection(socket: Socket) {
  /** Read bytes from connection */
  def read(chunkSize: Int = DefaultChunkSize): ZStream[Blocking, IOException, Byte]
  
  /** Write bytes to connection */  
  def write: ZSink[Blocking, IOException, Byte, Nothing, Unit]
  
  /** Close the connection */
  def close: ZIO[Any, IOException, Unit]
  
  /** Get remote socket address */
  def remoteAddress: ZIO[Any, IOException, SocketAddress]
  
  /** Get local socket address */
  def localAddress: ZIO[Any, IOException, SocketAddress]
}

Compression Operations

Data compression and decompression transducers.

object ZTransducer {
  /** Deflate compression transducer */
  def deflate(bufferSize: Int = 64 * 1024, noWrap: Boolean = false): Transducer[Nothing, Byte, Byte]
  
  /** Inflate decompression transducer */
  def inflate(bufferSize: Int = 64 * 1024, noWrap: Boolean = false): Transducer[Nothing, Byte, Byte]
  
  /** Gzip compression transducer */
  def gzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]
  
  /** Gunzip decompression transducer */
  def gunzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]
}

Compression Configuration

Configuration classes for compression operations.

/** Exception thrown during compression/decompression */
class CompressionException(message: String, cause: Throwable = null) extends IOException(message, cause)

/** Compression parameters for configuring compression behavior */
final case class CompressionParameters(
  level: CompressionLevel = CompressionLevel.Default,
  strategy: CompressionStrategy = CompressionStrategy.Default,
  flushMode: FlushMode = FlushMode.NoFlush
)

/** Compression levels */
sealed abstract class CompressionLevel(val javaValue: Int)
object CompressionLevel {
  case object NoCompression extends CompressionLevel(0)
  case object BestSpeed extends CompressionLevel(1) 
  case object BestCompression extends CompressionLevel(9)
  case object Default extends CompressionLevel(-1)
  
  /** Custom compression level 0-9 */
  final case class Level(level: Int) extends CompressionLevel(level) {
    require(level >= 0 && level <= 9, "Compression level must be between 0 and 9")
  }
}

/** Compression strategies */
sealed abstract class CompressionStrategy(val javaValue: Int)
object CompressionStrategy {
  case object Default extends CompressionStrategy(0)
  case object Filtered extends CompressionStrategy(1)
  case object HuffmanOnly extends CompressionStrategy(2)
}

/** Flush modes for compression */
sealed abstract class FlushMode(val javaValue: Int)
object FlushMode {
  case object NoFlush extends FlushMode(0)
  case object SyncFlush extends FlushMode(2)
  case object FullFlush extends FlushMode(3)
}

Iterator Integration

Integration with Java and Scala iterators for blocking I/O.

object ZStream {
  /** Create stream from blocking Scala iterator */
  def fromBlockingIterator[A](iterator: => Iterator[A]): ZStream[Blocking, Throwable, A]
  
  /** Create stream from blocking Java iterator */
  def fromBlockingJavaIterator[A](iterator: => java.util.Iterator[A]): ZStream[Blocking, Throwable, A]
  
  /** Create stream from Java Stream */
  def fromJavaStream[A](stream: => java.util.stream.Stream[A]): ZStream[Blocking, Throwable, A]
  
  /** Create stream from Java Stream created by effect */
  def fromJavaStreamEffect[R, A](stream: ZIO[R, Throwable, java.util.stream.Stream[A]]): ZStream[R with Blocking, Throwable, A]
}

Async Callback Integration

Async callback integration for the JVM platform.

object ZStream {
  /** Create stream from async callback */
  def effectAsync[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Any): ZStream[R, E, A]
  
  /** Create stream from async callback with interrupt */
  def effectAsyncInterrupt[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Either[Canceler[R], ZStream[R, E, A]]): ZStream[R, E, A]
  
  /** Create stream from async callback with managed resource */
  def effectAsyncManaged[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => ZManaged[R, E, Any]): ZStream[R, E, A]
}

/** Canceler for async operations */
type Canceler[R] = ZIO[R, Nothing, Unit]

JavaScript Platform Extensions

Async Integration

JavaScript-specific async integration using Futures.

object ZStream {
  /** Create stream from async callback with Future */
  def effectAsync[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Future[Boolean]): ZStream[R, E, A]
  
  /** Create stream from async callback with interrupt and Future */
  def effectAsyncInterrupt[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Either[Future[Boolean], ZStream[R, E, A]]): ZStream[R, E, A]
  
  /** Create stream from async callback with managed resource and Future */
  def effectAsyncManaged[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => ZManaged[R, E, Future[Boolean]]): ZStream[R, E, A]
}

Limited I/O Operations

Basic I/O operations available on JavaScript platform.

object ZStream {
  /** Read from InputStream (where available) */
  def fromInputStream[R](is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
  
  /** Read from InputStream created by effect */
  def fromInputStreamEffect[R](is: ZIO[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
  
  /** Read from managed InputStream */
  def fromInputStreamManaged[R](is: ZManaged[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
}

Scala Native Platform Extensions

Async Integration

Native-specific async integration (similar to JavaScript).

object ZStream {
  /** Create stream from async callback with Future */
  def effectAsync[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Future[Boolean]): ZStream[R, E, A]
  
  /** Create stream from async callback with interrupt and Future */  
  def effectAsyncInterrupt[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Either[Future[Boolean], ZStream[R, E, A]]): ZStream[R, E, A]
  
  /** Create stream from async callback with managed resource and Future */
  def effectAsyncManaged[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => ZManaged[R, E, Future[Boolean]]): ZStream[R, E, A]
}

Limited I/O Operations

Basic I/O operations available on Scala Native platform.

object ZStream {
  /** Read from InputStream (where available) */
  def fromInputStream[R](is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
  
  /** Read from InputStream created by effect */
  def fromInputStreamEffect[R](is: ZIO[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
  
  /** Read from managed InputStream */
  def fromInputStreamManaged[R](is: ZManaged[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]
}

Platform Differences

Feature Availability Matrix

FeatureJVMJavaScriptScala Native
File I/O✅ Full❌ Not Available❌ Not Available
Network I/O✅ Full❌ Not Available❌ Not Available
Compression✅ Full❌ Not Available❌ Not Available
Async Callbacks✅ Unit Return✅ Future[Boolean]✅ Future[Boolean]
Basic InputStream✅ Full⚠️ Limited⚠️ Limited
Iterator Support✅ Full❌ Not Available❌ Not Available

Platform-Specific Imports

// JVM-specific imports
import zio.stream.platform._           // All JVM extensions
import java.io._                       // File I/O classes
import java.net._                      // Networking classes  
import java.security.MessageDigest     // Cryptographic digests

// JavaScript/Native-specific imports  
import scala.concurrent.Future         // Future for async callbacks
import scala.scalajs.js               // (JS only) JavaScript interop

Usage Examples:

import zio._
import zio.stream._
import java.io._

// JVM: File operations
val readFile: ZStream[Any, IOException, Byte] = 
  ZStream.fromFile(new File("data.txt"))

val writeFile: ZIO[Any, IOException, Unit] = 
  ZStream.fromIterable("Hello World".getBytes)
    .run(ZSink.fromFile(new File("output.txt")))

// JVM: Compression  
val compressed: ZStream[Any, Nothing, Byte] = 
  ZStream.fromIterable("Hello World".getBytes)
    .transduce(ZTransducer.gzip())

val decompressed: ZStream[Any, Nothing, Byte] = 
  compressed.transduce(ZTransducer.gunzip())

// JVM: Network server
val server: ZManaged[Blocking, IOException, ZStream[Blocking, IOException, Connection]] = 
  ZStream.fromSocketServer(8080, "localhost")

// All platforms: Async callbacks
val asyncStream: ZStream[Any, Nothing, Int] = ZStream.effectAsync { emit =>
  // JVM: returns Unit  
  // JS/Native: returns Future[Boolean]  
  scheduleCallback(() => emit(ZIO.succeed(42)))
}

// Platform-specific error handling
val platformSafeRead = readFile.catchAll {
  case _: FileNotFoundException => ZStream.empty
  case ex: IOException => ZStream.fail(s"IO Error: ${ex.getMessage}")
}