or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

control-flow.mdcore-components.mderror-handling.mdgraph-building.mdindex.mdio-integration.mdjunction-operations.mdmaterialization.mdstream-operations.md

io-integration.mddocs/

0

# I/O Integration

1

2

File I/O, TCP networking, and integration with Java streams and other I/O systems.

3

4

## File I/O

5

6

### FileIO Operations

7

8

```scala { .api }

9

object FileIO {

10

def fromPath(path: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]

11

def fromFile(file: File, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]

12

def toPath(path: Path, options: Set[OpenOption] = Set(CREATE, WRITE, TRUNCATE_EXISTING)): Sink[ByteString, Future[IOResult]]

13

def toFile(file: File, append: Boolean = false): Sink[ByteString, Future[IOResult]]

14

}

15

```

16

17

**Usage Examples:**

18

```scala

19

import akka.stream.scaladsl.FileIO

20

import java.nio.file.Paths

21

import akka.util.ByteString

22

23

// Read from file

24

val source: Source[ByteString, Future[IOResult]] =

25

FileIO.fromPath(Paths.get("input.txt"))

26

27

// Write to file

28

val sink: Sink[ByteString, Future[IOResult]] =

29

FileIO.toPath(Paths.get("output.txt"))

30

31

// Copy file

32

val copyResult: Future[IOResult] = source.runWith(sink)

33

34

// Process text lines

35

val textProcessing = FileIO.fromPath(Paths.get("data.txt"))

36

.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))

37

.map(_.utf8String)

38

.map(_.toUpperCase)

39

.map(line => ByteString(line + "\n"))

40

.runWith(FileIO.toPath(Paths.get("processed.txt")))

41

```

42

43

### IOResult

44

45

```scala { .api }

46

case class IOResult(count: Long, status: Try[Done]) {

47

def wasSuccessful: Boolean = status.isSuccess

48

}

49

```

50

51

## TCP Networking

52

53

### TCP Operations

54

55

```scala { .api }

56

object Tcp {

57

def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]

58

def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]]

59

def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]]

60

}

61

```

62

63

**Connection Types:**

64

```scala { .api }

65

final case class IncomingConnection(

66

localAddress: InetSocketAddress,

67

remoteAddress: InetSocketAddress,

68

flow: Flow[ByteString, ByteString, NotUsed]

69

)

70

71

final case class OutgoingConnection(

72

localAddress: InetSocketAddress,

73

remoteAddress: InetSocketAddress

74

)

75

76

trait ServerBinding {

77

def localAddress: InetSocketAddress

78

def unbind(): Future[Done]

79

}

80

```

81

82

**Usage Examples:**

83

```scala

84

import akka.stream.scaladsl.Tcp

85

import java.net.InetSocketAddress

86

87

// TCP Client

88

val connection = Tcp().outgoingConnection("example.com", 80)

89

val request = ByteString("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")

90

91

val responseFlow = Source.single(request)

92

.via(connection)

93

.runFold(ByteString.empty)(_ ++ _)

94

95

// TCP Server

96

val binding = Tcp().bind("localhost", 8080)

97

val serverFlow = Flow[ByteString].map { request =>

98

ByteString("HTTP/1.1 200 OK\r\n\r\nHello World!")

99

}

100

101

binding.runForeach { connection =>

102

println(s"New connection from: ${connection.remoteAddress}")

103

connection.handleWith(serverFlow)

104

}

105

```

106

107

## Stream Converters

108

109

### Java Stream Integration

110

111

```scala { .api }

112

object StreamConverters {

113

def fromInputStream(in: () => InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]

114

def fromOutputStream(out: () => OutputStream, autoFlush: Boolean = true): Sink[ByteString, Future[IOResult]]

115

def asInputStream(readTimeout: FiniteDuration = 5.minutes): Sink[ByteString, InputStream]

116

def asOutputStream(writeTimeout: FiniteDuration = 5.minutes): Source[ByteString, OutputStream]

117

}

118

```

119

120

**Usage Examples:**

121

```scala

122

import akka.stream.scaladsl.StreamConverters

123

import java.io.{FileInputStream, FileOutputStream}

124

125

// From InputStream

126

val inputStreamSource = StreamConverters.fromInputStream(

127

() => new FileInputStream("input.txt")

128

)

129

130

// To OutputStream

131

val outputStreamSink = StreamConverters.fromOutputStream(

132

() => new FileOutputStream("output.txt")

133

)

134

135

// Bridge to blocking I/O

136

val inputStream: InputStream = Source(List("hello", "world"))

137

.map(s => ByteString(s + "\n"))

138

.runWith(StreamConverters.asInputStream())

139

```

140

141

## Framing

142

143

### Delimiter-based Framing

144

145

```scala { .api }

146

object Framing {

147

def delimiter(

148

delimiter: ByteString,

149

maximumFrameLength: Int,

150

allowTruncation: Boolean = false

151

): Flow[ByteString, ByteString, NotUsed]

152

153

def lengthField(

154

lengthFieldLength: Int,

155

lengthFieldOffset: Int = 0,

156

maximumFrameLength: Int,

157

byteOrder: ByteOrder = ByteOrder.LITTLE_ENDIAN

158

): Flow[ByteString, ByteString, NotUsed]

159

}

160

```

161

162

**Usage Examples:**

163

```scala

164

import akka.stream.scaladsl.Framing

165

import akka.util.ByteString

166

167

// Line-based framing

168

val lineFraming = Framing.delimiter(

169

ByteString("\n"),

170

maximumFrameLength = 1024

171

)

172

173

// Process text file line by line

174

FileIO.fromPath(Paths.get("data.txt"))

175

.via(lineFraming)

176

.map(_.utf8String.trim)

177

.filter(_.nonEmpty)

178

.runWith(Sink.foreach(println))

179

180

// Length-prefixed framing (4-byte length header)

181

val lengthFraming = Framing.lengthField(

182

lengthFieldLength = 4,

183

maximumFrameLength = 1024 * 1024

184

)

185

```

186

187

### JSON Framing

188

189

```scala { .api }

190

object JsonFraming {

191

def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed]

192

}

193

```

194

195

**Usage Example:**

196

```scala

197

import akka.stream.scaladsl.JsonFraming

198

199

// Parse JSON objects from stream

200

val jsonSource = Source.single(ByteString("""{"a":1}{"b":2}{"c":3}"""))

201

202

jsonSource

203

.via(JsonFraming.objectScanner(1024))

204

.map(_.utf8String)

205

.runWith(Sink.foreach(println))

206

// Output: {"a":1}, {"b":2}, {"c":3}

207

```

208

209

## Compression

210

211

### Compression Operations

212

213

```scala { .api }

214

object Compression {

215

def gzip: Flow[ByteString, ByteString, NotUsed]

216

def gunzip(maxBytesPerChunk: Int = 65536): Flow[ByteString, ByteString, NotUsed]

217

def deflate: Flow[ByteString, ByteString, NotUsed]

218

def inflate(maxBytesPerChunk: Int = 65536): Flow[ByteString, ByteString, NotUsed]

219

}

220

```

221

222

**Usage Examples:**

223

```scala

224

import akka.stream.scaladsl.Compression

225

226

// Compress file

227

FileIO.fromPath(Paths.get("large-file.txt"))

228

.via(Compression.gzip)

229

.runWith(FileIO.toPath(Paths.get("compressed.gz")))

230

231

// Decompress file

232

FileIO.fromPath(Paths.get("data.gz"))

233

.via(Compression.gunzip())

234

.runWith(FileIO.toPath(Paths.get("decompressed.txt")))

235

236

// HTTP-style compression

237

val httpResponse = Source.single(ByteString("Hello World!"))

238

.via(Compression.gzip)

239

.map { compressed =>

240

s"Content-Encoding: gzip\r\nContent-Length: ${compressed.length}\r\n\r\n"

241

}

242

```

243

244

## TLS/SSL Support

245

246

### TLS Operations

247

248

```scala { .api }

249

object TLS {

250

def create(): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]

251

def create(sslContext: SSLContext): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]

252

}

253

254

sealed trait SslTlsInbound

255

case class SessionBytes(bytes: ByteString) extends SslTlsInbound

256

case class SessionTruncated extends SslTlsInbound

257

258

sealed trait SslTlsOutbound

259

case class SendBytes(bytes: ByteString) extends SslTlsOutbound

260

case object SessionClose extends SslTlsOutbound

261

```

262

263

**Usage Example:**

264

```scala

265

import akka.stream.scaladsl.TLS

266

import javax.net.ssl.SSLContext

267

268

// HTTPS client with TLS

269

val sslContext = SSLContext.getDefault

270

val tlsFlow = TLS.create(sslContext)

271

272

val httpsRequest = Source.single(SendBytes(

273

ByteString("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")

274

))

275

276

val connection = Tcp().outgoingConnection("example.com", 443)

277

278

httpsRequest

279

.via(tlsFlow.reversed)

280

.via(connection)

281

.via(tlsFlow)

282

.runWith(Sink.foreach {

283

case SessionBytes(bytes) => println(bytes.utf8String)

284

case SessionTruncated => println("Session truncated")

285

})

286

```

287

288

## Integration Patterns

289

290

### Reactive Streams Integration

291

292

```scala

293

import org.reactivestreams.{Publisher, Subscriber}

294

295

// From Reactive Streams Publisher

296

val publisherSource: Source[Int, NotUsed] =

297

Source.fromPublisher(somePublisher)

298

299

// To Reactive Streams Subscriber

300

val subscriberSink: Sink[Int, NotUsed] =

301

Sink.fromSubscriber(someSubscriber)

302

303

// As Publisher (for other Reactive Streams implementations)

304

val asPublisher: Sink[Int, Publisher[Int]] =

305

Sink.asPublisher(fanout = false)

306

```

307

308

### Actor Integration

309

310

```scala

311

import akka.actor.ActorRef

312

313

// Send to Actor

314

val actorSink: Sink[String, NotUsed] =

315

Sink.actorRef(actorRef, onCompleteMessage = "Done")

316

317

// From Actor (with backpressure)

318

val actorSource: Source[String, ActorRef] =

319

Source.actorRef(bufferSize = 100, OverflowStrategy.dropHead)

320

```

321

322

This covers the main I/O integration capabilities, providing bridges between streams and external systems while maintaining backpressure semantics.