or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdplatform-extensions.mdzsink.mdzstream.mdztransducer.md

platform-extensions.mddocs/

0

# Platform Extensions

1

2

Platform-specific functionality for ZIO Streams including file I/O, networking, compression (JVM), and async integration for different runtime environments.

3

4

## JVM Platform Extensions

5

6

### File I/O Operations

7

8

Stream operations for reading and writing files on the JVM platform.

9

10

```scala { .api }

11

// ZStream JVM extensions for file operations

12

object ZStream {

13

/** Read file as byte stream */

14

def fromFile(file: => File, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]

15

16

/** Read classpath resource as byte stream */

17

def fromResource(name: => String, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]

18

19

/** Read from InputStream */

20

def fromInputStream(is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[Any, IOException, Byte]

21

22

/** Read from InputStream created by effect */

23

def fromInputStreamEffect[R](is: ZIO[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]

24

25

/** Read from managed InputStream */

26

def fromInputStreamManaged[R](is: ZManaged[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]

27

28

/** Read from Reader as String stream */

29

def fromReader[R](reader: => Reader, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, String]

30

31

/** Write to OutputStreamWriter */

32

def fromOutputStreamWriter[R](writer: => OutputStreamWriter, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, String]

33

}

34

35

// ZSink JVM extensions for file operations

36

object ZSink {

37

/** Write bytes to file */

38

def fromFile(file: => File): Sink[IOException, Byte, Nothing, Unit]

39

40

/** Write bytes to OutputStream */

41

def fromOutputStream(os: => OutputStream): Sink[IOException, Byte, Nothing, Unit]

42

43

/** Write to managed OutputStream */

44

def fromOutputStreamManaged(os: ZManaged[Any, IOException, OutputStream]): Sink[IOException, Byte, Nothing, Unit]

45

46

/** Create message digest sink */

47

def digest(createDigest: => MessageDigest): Sink[IOException, Byte, Nothing, Array[Byte]]

48

}

49

```

50

51

### Network Operations

52

53

Networking capabilities for socket-based communication.

54

55

```scala { .api }

56

object ZStream {

57

/** Create server socket accepting connections */

58

def fromSocketServer(port: Int, host: String = "localhost"): ZManaged[Blocking, IOException, ZStream[Blocking, IOException, Connection]]

59

}

60

61

/** Represents a socket connection with read/write streams */

62

final class Connection(socket: Socket) {

63

/** Read bytes from connection */

64

def read(chunkSize: Int = DefaultChunkSize): ZStream[Blocking, IOException, Byte]

65

66

/** Write bytes to connection */

67

def write: ZSink[Blocking, IOException, Byte, Nothing, Unit]

68

69

/** Close the connection */

70

def close: ZIO[Any, IOException, Unit]

71

72

/** Get remote socket address */

73

def remoteAddress: ZIO[Any, IOException, SocketAddress]

74

75

/** Get local socket address */

76

def localAddress: ZIO[Any, IOException, SocketAddress]

77

}

78

```

79

80

### Compression Operations

81

82

Data compression and decompression transducers.

83

84

```scala { .api }

85

object ZTransducer {

86

/** Deflate compression transducer */

87

def deflate(bufferSize: Int = 64 * 1024, noWrap: Boolean = false): Transducer[Nothing, Byte, Byte]

88

89

/** Inflate decompression transducer */

90

def inflate(bufferSize: Int = 64 * 1024, noWrap: Boolean = false): Transducer[Nothing, Byte, Byte]

91

92

/** Gzip compression transducer */

93

def gzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]

94

95

/** Gunzip decompression transducer */

96

def gunzip(bufferSize: Int = 64 * 1024): Transducer[Nothing, Byte, Byte]

97

}

98

```

99

100

### Compression Configuration

101

102

Configuration classes for compression operations.

103

104

```scala { .api }

105

/** Exception thrown during compression/decompression */

106

class CompressionException(message: String, cause: Throwable = null) extends IOException(message, cause)

107

108

/** Compression parameters for configuring compression behavior */

109

final case class CompressionParameters(

110

level: CompressionLevel = CompressionLevel.Default,

111

strategy: CompressionStrategy = CompressionStrategy.Default,

112

flushMode: FlushMode = FlushMode.NoFlush

113

)

114

115

/** Compression levels */

116

sealed abstract class CompressionLevel(val javaValue: Int)

117

object CompressionLevel {

118

case object NoCompression extends CompressionLevel(0)

119

case object BestSpeed extends CompressionLevel(1)

120

case object BestCompression extends CompressionLevel(9)

121

case object Default extends CompressionLevel(-1)

122

123

/** Custom compression level 0-9 */

124

final case class Level(level: Int) extends CompressionLevel(level) {

125

require(level >= 0 && level <= 9, "Compression level must be between 0 and 9")

126

}

127

}

128

129

/** Compression strategies */

130

sealed abstract class CompressionStrategy(val javaValue: Int)

131

object CompressionStrategy {

132

case object Default extends CompressionStrategy(0)

133

case object Filtered extends CompressionStrategy(1)

134

case object HuffmanOnly extends CompressionStrategy(2)

135

}

136

137

/** Flush modes for compression */

138

sealed abstract class FlushMode(val javaValue: Int)

139

object FlushMode {

140

case object NoFlush extends FlushMode(0)

141

case object SyncFlush extends FlushMode(2)

142

case object FullFlush extends FlushMode(3)

143

}

144

```

145

146

### Iterator Integration

147

148

Integration with Java and Scala iterators for blocking I/O.

149

150

```scala { .api }

151

object ZStream {

152

/** Create stream from blocking Scala iterator */

153

def fromBlockingIterator[A](iterator: => Iterator[A]): ZStream[Blocking, Throwable, A]

154

155

/** Create stream from blocking Java iterator */

156

def fromBlockingJavaIterator[A](iterator: => java.util.Iterator[A]): ZStream[Blocking, Throwable, A]

157

158

/** Create stream from Java Stream */

159

def fromJavaStream[A](stream: => java.util.stream.Stream[A]): ZStream[Blocking, Throwable, A]

160

161

/** Create stream from Java Stream created by effect */

162

def fromJavaStreamEffect[R, A](stream: ZIO[R, Throwable, java.util.stream.Stream[A]]): ZStream[R with Blocking, Throwable, A]

163

}

164

```

165

166

### Async Callback Integration

167

168

Async callback integration for the JVM platform.

169

170

```scala { .api }

171

object ZStream {

172

/** Create stream from async callback */

173

def effectAsync[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Any): ZStream[R, E, A]

174

175

/** Create stream from async callback with interrupt */

176

def effectAsyncInterrupt[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Either[Canceler[R], ZStream[R, E, A]]): ZStream[R, E, A]

177

178

/** Create stream from async callback with managed resource */

179

def effectAsyncManaged[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => ZManaged[R, E, Any]): ZStream[R, E, A]

180

}

181

182

/** Canceler for async operations */

183

type Canceler[R] = ZIO[R, Nothing, Unit]

184

```

185

186

## JavaScript Platform Extensions

187

188

### Async Integration

189

190

JavaScript-specific async integration using Futures.

191

192

```scala { .api }

193

object ZStream {

194

/** Create stream from async callback with Future */

195

def effectAsync[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Future[Boolean]): ZStream[R, E, A]

196

197

/** Create stream from async callback with interrupt and Future */

198

def effectAsyncInterrupt[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Either[Future[Boolean], ZStream[R, E, A]]): ZStream[R, E, A]

199

200

/** Create stream from async callback with managed resource and Future */

201

def effectAsyncManaged[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => ZManaged[R, E, Future[Boolean]]): ZStream[R, E, A]

202

}

203

```

204

205

### Limited I/O Operations

206

207

Basic I/O operations available on JavaScript platform.

208

209

```scala { .api }

210

object ZStream {

211

/** Read from InputStream (where available) */

212

def fromInputStream[R](is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]

213

214

/** Read from InputStream created by effect */

215

def fromInputStreamEffect[R](is: ZIO[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]

216

217

/** Read from managed InputStream */

218

def fromInputStreamManaged[R](is: ZManaged[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]

219

}

220

```

221

222

## Scala Native Platform Extensions

223

224

### Async Integration

225

226

Native-specific async integration (similar to JavaScript).

227

228

```scala { .api }

229

object ZStream {

230

/** Create stream from async callback with Future */

231

def effectAsync[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Future[Boolean]): ZStream[R, E, A]

232

233

/** Create stream from async callback with interrupt and Future */

234

def effectAsyncInterrupt[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => Either[Future[Boolean], ZStream[R, E, A]]): ZStream[R, E, A]

235

236

/** Create stream from async callback with managed resource and Future */

237

def effectAsyncManaged[R, E, A](register: (ZIO[R, Option[E], A] => Unit) => ZManaged[R, E, Future[Boolean]]): ZStream[R, E, A]

238

}

239

```

240

241

### Limited I/O Operations

242

243

Basic I/O operations available on Scala Native platform.

244

245

```scala { .api }

246

object ZStream {

247

/** Read from InputStream (where available) */

248

def fromInputStream[R](is: => InputStream, chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]

249

250

/** Read from InputStream created by effect */

251

def fromInputStreamEffect[R](is: ZIO[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]

252

253

/** Read from managed InputStream */

254

def fromInputStreamManaged[R](is: ZManaged[R, IOException, InputStream], chunkSize: Int = DefaultChunkSize): ZStream[R, IOException, Byte]

255

}

256

```

257

258

## Platform Differences

259

260

### Feature Availability Matrix

261

262

| Feature | JVM | JavaScript | Scala Native |

263

|---------|-----|------------|--------------|

264

| File I/O | ✅ Full | ❌ Not Available | ❌ Not Available |

265

| Network I/O | ✅ Full | ❌ Not Available | ❌ Not Available |

266

| Compression | ✅ Full | ❌ Not Available | ❌ Not Available |

267

| Async Callbacks | ✅ Unit Return | ✅ Future[Boolean] | ✅ Future[Boolean] |

268

| Basic InputStream | ✅ Full | ⚠️ Limited | ⚠️ Limited |

269

| Iterator Support | ✅ Full | ❌ Not Available | ❌ Not Available |

270

271

### Platform-Specific Imports

272

273

```scala { .api }

274

// JVM-specific imports

275

import zio.stream.platform._ // All JVM extensions

276

import java.io._ // File I/O classes

277

import java.net._ // Networking classes

278

import java.security.MessageDigest // Cryptographic digests

279

280

// JavaScript/Native-specific imports

281

import scala.concurrent.Future // Future for async callbacks

282

import scala.scalajs.js // (JS only) JavaScript interop

283

```

284

285

**Usage Examples:**

286

287

```scala

288

import zio._

289

import zio.stream._

290

import java.io._

291

292

// JVM: File operations

293

val readFile: ZStream[Any, IOException, Byte] =

294

ZStream.fromFile(new File("data.txt"))

295

296

val writeFile: ZIO[Any, IOException, Unit] =

297

ZStream.fromIterable("Hello World".getBytes)

298

.run(ZSink.fromFile(new File("output.txt")))

299

300

// JVM: Compression

301

val compressed: ZStream[Any, Nothing, Byte] =

302

ZStream.fromIterable("Hello World".getBytes)

303

.transduce(ZTransducer.gzip())

304

305

val decompressed: ZStream[Any, Nothing, Byte] =

306

compressed.transduce(ZTransducer.gunzip())

307

308

// JVM: Network server

309

val server: ZManaged[Blocking, IOException, ZStream[Blocking, IOException, Connection]] =

310

ZStream.fromSocketServer(8080, "localhost")

311

312

// All platforms: Async callbacks

313

val asyncStream: ZStream[Any, Nothing, Int] = ZStream.effectAsync { emit =>

314

// JVM: returns Unit

315

// JS/Native: returns Future[Boolean]

316

scheduleCallback(() => emit(ZIO.succeed(42)))

317

}

318

319

// Platform-specific error handling

320

val platformSafeRead = readFile.catchAll {

321

case _: FileNotFoundException => ZStream.empty

322

case ex: IOException => ZStream.fail(s"IO Error: ${ex.getMessage}")

323

}

324

```