0
# Integration
1
2
Integration with file systems, TCP/TLS networking, actors, and external reactive streams publishers/subscribers. This provides connectivity between Akka Stream and external systems.
3
4
## Capabilities
5
6
### File I/O Integration
7
8
Stream-based file reading and writing operations.
9
10
```scala { .api }
11
/**
12
* File I/O utilities for streaming file operations
13
*/
14
object FileIO {
15
/**
16
* Create a source that reads from a file
17
* @param f Path to the file to read
18
* @param chunkSize Size of chunks to read at a time
19
* @return Source of ByteString chunks with IOResult materialized value
20
*/
21
def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
22
23
/**
24
* Create a sink that writes to a file
25
* @param f Path to the file to write
26
* @param options File open options (default: write, truncate, create)
27
* @return Sink that materializes to Future[IOResult]
28
*/
29
def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]]
30
}
31
32
/**
33
* Result of file I/O operations
34
* @param count Number of bytes processed
35
* @param status Success or failure status
36
*/
37
final case class IOResult(count: Long, status: Try[Done]) {
38
def wasSuccessful: Boolean = status.isSuccess
39
}
40
```
41
42
**Usage Examples:**
43
44
```scala
45
import akka.stream.scaladsl.FileIO
46
import akka.util.ByteString
47
import java.nio.file.Paths
48
49
// Read file as stream
50
val filePath = Paths.get("input.txt")
51
val fileSource: Source[ByteString, Future[IOResult]] = FileIO.fromPath(filePath)
52
53
fileSource
54
.map(_.utf8String)
55
.runWith(Sink.foreach(println))
56
57
// Write stream to file
58
val outputPath = Paths.get("output.txt")
59
Source(List("Hello", "World", "!"))
60
.map(s => ByteString(s + "\n"))
61
.runWith(FileIO.toPath(outputPath))
62
.map { result =>
63
println(s"Wrote ${result.count} bytes")
64
}
65
66
// Copy file with transformation
67
FileIO.fromPath(Paths.get("input.txt"))
68
.map(_.utf8String.toUpperCase)
69
.map(ByteString(_))
70
.runWith(FileIO.toPath(Paths.get("output.txt")))
71
```
72
73
### TCP Networking
74
75
TCP client and server streaming capabilities.
76
77
```scala { .api }
78
/**
79
* TCP streaming utilities
80
*/
81
object Tcp {
82
/**
83
* Create an outgoing TCP connection
84
* @param remoteAddress Address to connect to
85
* @param localAddress Optional local address to bind to
86
* @param options TCP socket options
87
* @param halfClose Enable half-close for the connection
88
* @param connectTimeout Connection timeout duration
89
* @param idleTimeout Idle timeout for the connection
90
* @return Flow representing the TCP connection
91
*/
92
def outgoingConnection(
93
remoteAddress: InetSocketAddress,
94
localAddress: Option[InetSocketAddress] = None,
95
options: immutable.Traversable[SocketOption] = Nil,
96
halfClose: Boolean = true,
97
connectTimeout: Duration = Duration.Inf,
98
idleTimeout: Duration = Duration.Inf
99
): Flow[ByteString, ByteString, Future[OutgoingConnection]]
100
101
/**
102
* Bind to a TCP port to accept incoming connections
103
* @param interface Interface to bind to
104
* @param port Port to bind to
105
* @param backlog TCP backlog size
106
* @param options TCP socket options
107
* @param halfClose Enable half-close for connections
108
* @param idleTimeout Idle timeout for connections
109
* @return Source of incoming connections
110
*/
111
def bind(
112
interface: String,
113
port: Int,
114
backlog: Int = 100,
115
options: immutable.Traversable[SocketOption] = Nil,
116
halfClose: Boolean = true,
117
idleTimeout: Duration = Duration.Inf
118
): Source[IncomingConnection, Future[ServerBinding]]
119
}
120
121
/**
122
* Represents an outgoing TCP connection
123
*/
124
final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)
125
126
/**
127
* Represents an incoming TCP connection
128
*/
129
final case class IncomingConnection(
130
remoteAddress: InetSocketAddress,
131
localAddress: InetSocketAddress,
132
flow: Flow[ByteString, ByteString, NotUsed]
133
) {
134
/**
135
* Handle this connection with the given flow
136
*/
137
def handleWith[Mat](handler: Flow[ByteString, ByteString, Mat]): Mat
138
}
139
140
/**
141
* Represents a bound TCP server
142
*/
143
trait ServerBinding {
144
def localAddress: InetSocketAddress
145
def unbind(): Future[Done]
146
}
147
```
148
149
**Usage Examples:**
150
151
```scala
152
import akka.stream.scaladsl.{Tcp, Flow}
153
import akka.util.ByteString
154
import java.net.InetSocketAddress
155
156
// TCP client
157
val connection = Tcp.outgoingConnection(new InetSocketAddress("example.com", 80))
158
159
Source.single(ByteString("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"))
160
.via(connection)
161
.runWith(Sink.foreach(response => println(response.utf8String)))
162
163
// TCP server
164
val serverBinding = Tcp.bind("localhost", 8080)
165
.runForeach { connection =>
166
println(s"New connection from: ${connection.remoteAddress}")
167
168
connection.handleWith(Flow[ByteString]
169
.map(_.utf8String)
170
.map(_.toUpperCase)
171
.map(ByteString(_))
172
)
173
}
174
175
// Shutdown server
176
serverBinding.flatMap(_.unbind())
177
```
178
179
### TLS/SSL Support
180
181
TLS encryption and decryption for secure communications.
182
183
```scala { .api }
184
/**
185
* TLS/SSL utilities for secure communications
186
*/
187
object TLS {
188
/**
189
* Create a TLS flow for client-side TLS
190
* @param sslContext SSL context for TLS
191
* @param firstSession Optional function to configure first session
192
* @param role TLS role (client or server)
193
* @param closing Closing behavior
194
* @return BidiFlow for TLS encryption/decryption
195
*/
196
def create(
197
sslContext: SSLContext,
198
firstSession: Option[NegotiateNewSession] = None,
199
role: TLSRole = Client,
200
closing: TLSClosing = IgnoreComplete
201
): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]
202
}
203
204
/**
205
* TLS protocol messages
206
*/
207
sealed trait SslTlsInbound
208
sealed trait SslTlsOutbound
209
210
final case class SessionBytes(session: SSLSession, bytes: ByteString) extends SslTlsInbound
211
final case class SendBytes(bytes: ByteString) extends SslTlsOutbound
212
213
/**
214
* TLS roles
215
*/
216
sealed trait TLSRole
217
case object Client extends TLSRole
218
case object Server extends TLSRole
219
220
/**
221
* TLS closing behaviors
222
*/
223
sealed trait TLSClosing
224
case object EagerClose extends TLSClosing
225
case object IgnoreCancel extends TLSClosing
226
case object IgnoreComplete extends TLSClosing
227
```
228
229
**Usage Examples:**
230
231
```scala
232
import akka.stream.scaladsl.TLS
233
import javax.net.ssl.SSLContext
234
235
// TLS client
236
val sslContext = SSLContext.getInstance("TLS")
237
sslContext.init(null, null, null)
238
239
val tlsFlow = TLS.create(sslContext, role = TLSRole.Client)
240
241
// Secure TCP connection
242
val secureConnection = Tcp.outgoingConnection(new InetSocketAddress("secure.example.com", 443))
243
.join(tlsFlow)
244
245
Source.single(SendBytes(ByteString("GET / HTTP/1.1\r\nHost: secure.example.com\r\n\r\n")))
246
.via(secureConnection)
247
.runWith(Sink.foreach {
248
case SessionBytes(session, bytes) =>
249
println(s"Received: ${bytes.utf8String}")
250
})
251
```
252
253
### Actor Integration
254
255
Integration with Akka actors for sending and receiving messages.
256
257
```scala { .api }
258
/**
259
* Create a source that receives messages from an actor
260
* @param bufferSize Size of the buffer for incoming messages
261
* @param overflowStrategy Strategy when buffer overflows
262
* @return Source materialized as ActorRef for sending messages
263
*/
264
def actorRef[T](
265
bufferSize: Int,
266
overflowStrategy: OverflowStrategy
267
): Source[T, ActorRef]
268
269
/**
270
* Create a source with backpressure-aware actor integration
271
* @param ackMessage Message sent to acknowledge element processing
272
* @param completionMatcher Partial function to detect completion messages
273
* @param failureMatcher Partial function to detect failure messages
274
* @return Source materialized as ActorRef with backpressure support
275
*/
276
def actorRefWithBackpressure[T](
277
ackMessage: Any,
278
completionMatcher: PartialFunction[Any, CompletionStrategy] = PartialFunction.empty,
279
failureMatcher: PartialFunction[Any, Throwable] = PartialFunction.empty
280
): Source[T, ActorRef]
281
282
/**
283
* Create a sink that sends messages to an actor
284
* @param ref Target actor reference
285
* @param onCompleteMessage Message sent when stream completes
286
* @return Sink that sends elements as messages
287
*/
288
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed]
289
290
/**
291
* Create a sink with backpressure support for actors
292
* @param ref Target actor reference
293
* @param messageAdapter Function to wrap elements in messages
294
* @param initMessage Optional initialization message
295
* @param ackMessage Message that actor sends to acknowledge receipt
296
* @param onCompleteMessage Message sent when stream completes
297
* @param onFailureMessage Function to create failure message
298
* @return Sink with backpressure control
299
*/
300
def actorRefWithBackpressure[T](
301
ref: ActorRef,
302
messageAdapter: T => Any,
303
initMessage: Option[Any] = None,
304
ackMessage: Any,
305
onCompleteMessage: Any,
306
onFailureMessage: Throwable => Any = Status.Failure(_)
307
): Sink[T, NotUsed]
308
```
309
310
**Usage Examples:**
311
312
```scala
313
import akka.actor.{Actor, ActorRef, Props}
314
315
// Actor that processes stream elements
316
class ProcessingActor extends Actor {
317
def receive = {
318
case element: String =>
319
println(s"Processing: $element")
320
sender() ! "ack" // Acknowledge processing
321
case "complete" =>
322
println("Stream completed")
323
context.stop(self)
324
}
325
}
326
327
// Actor source
328
val (actorRef, source) = Source.actorRefWithBackpressure[String](
329
ackMessage = "ack",
330
completionMatcher = {
331
case "complete" => CompletionStrategy.immediately
332
}
333
).preMaterialize()
334
335
// Send messages to the source
336
actorRef ! "Hello"
337
actorRef ! "World"
338
actorRef ! "complete"
339
340
// Actor sink
341
val processingActor = system.actorOf(Props[ProcessingActor])
342
Source(List("msg1", "msg2", "msg3"))
343
.runWith(Sink.actorRefWithBackpressure(
344
ref = processingActor,
345
messageAdapter = identity,
346
ackMessage = "ack",
347
onCompleteMessage = "complete"
348
))
349
```
350
351
### Reactive Streams Integration
352
353
Integration with standard Reactive Streams publishers and subscribers.
354
355
```scala { .api }
356
/**
357
* Create a source from a Reactive Streams Publisher
358
* @param publisher Publisher to wrap as a source
359
* @return Source that subscribes to the publisher
360
*/
361
def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed]
362
363
/**
364
* Create a sink from a Reactive Streams Subscriber
365
* @param subscriber Subscriber to wrap as a sink
366
* @return Sink that publishes to the subscriber
367
*/
368
def fromSubscriber[T](subscriber: Subscriber[T]): Sink[T, NotUsed]
369
370
/**
371
* Convert this source to a Reactive Streams Publisher
372
* @param fanout Whether to support multiple subscribers
373
* @return Source materialized as Publisher
374
*/
375
def toPublisher(fanout: Boolean): Source[T, Publisher[T]]
376
377
/**
378
* Convert this sink to a Reactive Streams Subscriber
379
* @return Sink materialized as Subscriber
380
*/
381
def toSubscriber[T]: Sink[T, Subscriber[T]]
382
```
383
384
**Usage Examples:**
385
386
```scala
387
import org.reactivestreams.{Publisher, Subscriber}
388
389
// From publisher
390
val publisher: Publisher[Int] = createSomePublisher()
391
val source = Source.fromPublisher(publisher)
392
393
// To publisher
394
val (publisher2, source2) = Source(1 to 10)
395
.toPublisher(fanout = false)
396
.preMaterialize()
397
398
// From subscriber
399
val subscriber: Subscriber[String] = createSomeSubscriber()
400
val sink = Sink.fromSubscriber(subscriber)
401
402
// To subscriber
403
val (subscriber2, sink2) = Sink.seq[Int]
404
.toSubscriber
405
.preMaterialize()
406
```
407
408
### Stream Converters
409
410
Utilities for converting between different stream types and Java I/O.
411
412
```scala { .api }
413
/**
414
* Conversion utilities for integrating with Java I/O and other stream types
415
*/
416
object StreamConverters {
417
/**
418
* Create a source from an InputStream
419
* @param createInputStream Function that creates the InputStream
420
* @param chunkSize Size of chunks to read
421
* @return Source of ByteString chunks
422
*/
423
def fromInputStream(createInputStream: () => InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
424
425
/**
426
* Create a source from an OutputStream
427
* @param createOutputStream Function that creates the OutputStream
428
* @return Source materialized as OutputStream for writing
429
*/
430
def fromOutputStream(createOutputStream: () => OutputStream): Source[ByteString, OutputStream]
431
432
/**
433
* Convert this source to an InputStream
434
* @param readTimeout Timeout for read operations
435
* @return Source materialized as InputStream
436
*/
437
def asInputStream(readTimeout: FiniteDuration = 5.seconds): Source[ByteString, InputStream]
438
439
/**
440
* Convert this sink to an OutputStream
441
* @param writeTimeout Timeout for write operations
442
* @return Sink materialized as OutputStream
443
*/
444
def asOutputStream(writeTimeout: FiniteDuration = 5.seconds): Sink[ByteString, OutputStream]
445
446
/**
447
* Convert source to Java 8 Stream
448
* @return Source materialized as Java Stream
449
*/
450
def asJavaStream[T]: Source[T, java.util.stream.Stream[T]]
451
}
452
```
453
454
**Usage Examples:**
455
456
```scala
457
import akka.stream.scaladsl.StreamConverters
458
import java.io.{FileInputStream, FileOutputStream}
459
460
// From InputStream
461
val inputSource = StreamConverters.fromInputStream(() => new FileInputStream("input.txt"))
462
inputSource.runWith(Sink.foreach(chunk => println(chunk.utf8String)))
463
464
// To OutputStream
465
Source(List("Hello", "World"))
466
.map(s => ByteString(s + "\n"))
467
.runWith(StreamConverters.asOutputStream())
468
.map { outputStream =>
469
// Use the OutputStream
470
new PrintWriter(outputStream).println("Additional data")
471
}
472
473
// Java Stream integration
474
val javaStream: java.util.stream.Stream[Int] = Source(1 to 100)
475
.runWith(StreamConverters.asJavaStream())
476
```
477
478
## Types
479
480
```scala { .api }
481
// I/O result
482
final case class IOResult(count: Long, status: Try[Done]) {
483
def wasSuccessful: Boolean = status.isSuccess
484
}
485
486
// TCP connection types
487
final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)
488
final case class IncomingConnection(
489
remoteAddress: InetSocketAddress,
490
localAddress: InetSocketAddress,
491
flow: Flow[ByteString, ByteString, NotUsed]
492
)
493
494
trait ServerBinding {
495
def localAddress: InetSocketAddress
496
def unbind(): Future[Done]
497
}
498
499
// TLS types
500
sealed trait SslTlsInbound
501
sealed trait SslTlsOutbound
502
final case class SessionBytes(session: SSLSession, bytes: ByteString) extends SslTlsInbound
503
final case class SendBytes(bytes: ByteString) extends SslTlsOutbound
504
505
sealed trait TLSRole
506
case object Client extends TLSRole
507
case object Server extends TLSRole
508
509
// Actor integration
510
sealed abstract class CompletionStrategy
511
case object ImmediateCompletionStrategy extends CompletionStrategy
512
case object DrainAndCompletionStrategy extends CompletionStrategy
513
```