0
# I/O Integration
1
2
File I/O, TCP networking, and integration with Java streams and other I/O systems.
3
4
## File I/O
5
6
### FileIO Operations
7
8
```scala { .api }
9
object FileIO {
10
def fromPath(path: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
11
def fromFile(file: File, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
12
def toPath(path: Path, options: Set[OpenOption] = Set(CREATE, WRITE, TRUNCATE_EXISTING)): Sink[ByteString, Future[IOResult]]
13
def toFile(file: File, append: Boolean = false): Sink[ByteString, Future[IOResult]]
14
}
15
```
16
17
**Usage Examples:**
18
```scala
19
import akka.stream.scaladsl.FileIO
20
import java.nio.file.Paths
21
import akka.util.ByteString
22
23
// Read from file
24
val source: Source[ByteString, Future[IOResult]] =
25
FileIO.fromPath(Paths.get("input.txt"))
26
27
// Write to file
28
val sink: Sink[ByteString, Future[IOResult]] =
29
FileIO.toPath(Paths.get("output.txt"))
30
31
// Copy file
32
val copyResult: Future[IOResult] = source.runWith(sink)
33
34
// Process text lines
35
val textProcessing = FileIO.fromPath(Paths.get("data.txt"))
36
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
37
.map(_.utf8String)
38
.map(_.toUpperCase)
39
.map(line => ByteString(line + "\n"))
40
.runWith(FileIO.toPath(Paths.get("processed.txt")))
41
```
42
43
### IOResult
44
45
```scala { .api }
46
case class IOResult(count: Long, status: Try[Done]) {
47
def wasSuccessful: Boolean = status.isSuccess
48
}
49
```
50
51
## TCP Networking
52
53
### TCP Operations
54
55
```scala { .api }
56
object Tcp {
57
def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]
58
def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]]
59
def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]]
60
}
61
```
62
63
**Connection Types:**
64
```scala { .api }
65
final case class IncomingConnection(
66
localAddress: InetSocketAddress,
67
remoteAddress: InetSocketAddress,
68
flow: Flow[ByteString, ByteString, NotUsed]
69
)
70
71
final case class OutgoingConnection(
72
localAddress: InetSocketAddress,
73
remoteAddress: InetSocketAddress
74
)
75
76
trait ServerBinding {
77
def localAddress: InetSocketAddress
78
def unbind(): Future[Done]
79
}
80
```
81
82
**Usage Examples:**
83
```scala
84
import akka.stream.scaladsl.Tcp
85
import java.net.InetSocketAddress
86
87
// TCP Client
88
val connection = Tcp().outgoingConnection("example.com", 80)
89
val request = ByteString("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
90
91
val responseFlow = Source.single(request)
92
.via(connection)
93
.runFold(ByteString.empty)(_ ++ _)
94
95
// TCP Server
96
val binding = Tcp().bind("localhost", 8080)
97
val serverFlow = Flow[ByteString].map { request =>
98
ByteString("HTTP/1.1 200 OK\r\n\r\nHello World!")
99
}
100
101
binding.runForeach { connection =>
102
println(s"New connection from: ${connection.remoteAddress}")
103
connection.handleWith(serverFlow)
104
}
105
```
106
107
## Stream Converters
108
109
### Java Stream Integration
110
111
```scala { .api }
112
object StreamConverters {
113
def fromInputStream(in: () => InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
114
def fromOutputStream(out: () => OutputStream, autoFlush: Boolean = true): Sink[ByteString, Future[IOResult]]
115
def asInputStream(readTimeout: FiniteDuration = 5.minutes): Sink[ByteString, InputStream]
116
def asOutputStream(writeTimeout: FiniteDuration = 5.minutes): Source[ByteString, OutputStream]
117
}
118
```
119
120
**Usage Examples:**
121
```scala
122
import akka.stream.scaladsl.StreamConverters
123
import java.io.{FileInputStream, FileOutputStream}
124
125
// From InputStream
126
val inputStreamSource = StreamConverters.fromInputStream(
127
() => new FileInputStream("input.txt")
128
)
129
130
// To OutputStream
131
val outputStreamSink = StreamConverters.fromOutputStream(
132
() => new FileOutputStream("output.txt")
133
)
134
135
// Bridge to blocking I/O
136
val inputStream: InputStream = Source(List("hello", "world"))
137
.map(s => ByteString(s + "\n"))
138
.runWith(StreamConverters.asInputStream())
139
```
140
141
## Framing
142
143
### Delimiter-based Framing
144
145
```scala { .api }
146
object Framing {
147
def delimiter(
148
delimiter: ByteString,
149
maximumFrameLength: Int,
150
allowTruncation: Boolean = false
151
): Flow[ByteString, ByteString, NotUsed]
152
153
def lengthField(
154
lengthFieldLength: Int,
155
lengthFieldOffset: Int = 0,
156
maximumFrameLength: Int,
157
byteOrder: ByteOrder = ByteOrder.LITTLE_ENDIAN
158
): Flow[ByteString, ByteString, NotUsed]
159
}
160
```
161
162
**Usage Examples:**
163
```scala
164
import akka.stream.scaladsl.Framing
165
import akka.util.ByteString
166
167
// Line-based framing
168
val lineFraming = Framing.delimiter(
169
ByteString("\n"),
170
maximumFrameLength = 1024
171
)
172
173
// Process text file line by line
174
FileIO.fromPath(Paths.get("data.txt"))
175
.via(lineFraming)
176
.map(_.utf8String.trim)
177
.filter(_.nonEmpty)
178
.runWith(Sink.foreach(println))
179
180
// Length-prefixed framing (4-byte length header)
181
val lengthFraming = Framing.lengthField(
182
lengthFieldLength = 4,
183
maximumFrameLength = 1024 * 1024
184
)
185
```
186
187
### JSON Framing
188
189
```scala { .api }
190
object JsonFraming {
191
def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed]
192
}
193
```
194
195
**Usage Example:**
196
```scala
197
import akka.stream.scaladsl.JsonFraming
198
199
// Parse JSON objects from stream
200
val jsonSource = Source.single(ByteString("""{"a":1}{"b":2}{"c":3}"""))
201
202
jsonSource
203
.via(JsonFraming.objectScanner(1024))
204
.map(_.utf8String)
205
.runWith(Sink.foreach(println))
206
// Output: {"a":1}, {"b":2}, {"c":3}
207
```
208
209
## Compression
210
211
### Compression Operations
212
213
```scala { .api }
214
object Compression {
215
def gzip: Flow[ByteString, ByteString, NotUsed]
216
def gunzip(maxBytesPerChunk: Int = 65536): Flow[ByteString, ByteString, NotUsed]
217
def deflate: Flow[ByteString, ByteString, NotUsed]
218
def inflate(maxBytesPerChunk: Int = 65536): Flow[ByteString, ByteString, NotUsed]
219
}
220
```
221
222
**Usage Examples:**
223
```scala
224
import akka.stream.scaladsl.Compression
225
226
// Compress file
227
FileIO.fromPath(Paths.get("large-file.txt"))
228
.via(Compression.gzip)
229
.runWith(FileIO.toPath(Paths.get("compressed.gz")))
230
231
// Decompress file
232
FileIO.fromPath(Paths.get("data.gz"))
233
.via(Compression.gunzip())
234
.runWith(FileIO.toPath(Paths.get("decompressed.txt")))
235
236
// HTTP-style compression
237
val httpResponse = Source.single(ByteString("Hello World!"))
238
.via(Compression.gzip)
239
.map { compressed =>
240
s"Content-Encoding: gzip\r\nContent-Length: ${compressed.length}\r\n\r\n"
241
}
242
```
243
244
## TLS/SSL Support
245
246
### TLS Operations
247
248
```scala { .api }
249
object TLS {
250
def create(): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]
251
def create(sslContext: SSLContext): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]
252
}
253
254
sealed trait SslTlsInbound
255
case class SessionBytes(bytes: ByteString) extends SslTlsInbound
256
case class SessionTruncated extends SslTlsInbound
257
258
sealed trait SslTlsOutbound
259
case class SendBytes(bytes: ByteString) extends SslTlsOutbound
260
case object SessionClose extends SslTlsOutbound
261
```
262
263
**Usage Example:**
264
```scala
265
import akka.stream.scaladsl.TLS
266
import javax.net.ssl.SSLContext
267
268
// HTTPS client with TLS
269
val sslContext = SSLContext.getDefault
270
val tlsFlow = TLS.create(sslContext)
271
272
val httpsRequest = Source.single(SendBytes(
273
ByteString("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
274
))
275
276
val connection = Tcp().outgoingConnection("example.com", 443)
277
278
httpsRequest
279
.via(tlsFlow.reversed)
280
.via(connection)
281
.via(tlsFlow)
282
.runWith(Sink.foreach {
283
case SessionBytes(bytes) => println(bytes.utf8String)
284
case SessionTruncated => println("Session truncated")
285
})
286
```
287
288
## Integration Patterns
289
290
### Reactive Streams Integration
291
292
```scala
293
import org.reactivestreams.{Publisher, Subscriber}
294
295
// From Reactive Streams Publisher
296
val publisherSource: Source[Int, NotUsed] =
297
Source.fromPublisher(somePublisher)
298
299
// To Reactive Streams Subscriber
300
val subscriberSink: Sink[Int, NotUsed] =
301
Sink.fromSubscriber(someSubscriber)
302
303
// As Publisher (for other Reactive Streams implementations)
304
val asPublisher: Sink[Int, Publisher[Int]] =
305
Sink.asPublisher(fanout = false)
306
```
307
308
### Actor Integration
309
310
```scala
311
import akka.actor.ActorRef
312
313
// Send to Actor
314
val actorSink: Sink[String, NotUsed] =
315
Sink.actorRef(actorRef, onCompleteMessage = "Done")
316
317
// From Actor (with backpressure)
318
val actorSource: Source[String, ActorRef] =
319
Source.actorRef(bufferSize = 100, OverflowStrategy.dropHead)
320
```
321
322
This covers the main I/O integration capabilities, providing bridges between streams and external systems while maintaining backpressure semantics.