Integration with file systems, TCP/TLS networking, actors, and external reactive streams publishers/subscribers. This provides connectivity between Akka Stream and external systems.
Stream-based file reading and writing operations.
/**
* File I/O utilities for streaming file operations
*/
object FileIO {
/**
* Create a source that reads from a file
* @param f Path to the file to read
* @param chunkSize Size of chunks to read at a time
* @return Source of ByteString chunks with IOResult materialized value
*/
def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
/**
* Create a sink that writes to a file
* @param f Path to the file to write
* @param options File open options (default: write, truncate, create)
* @return Sink that materializes to Future[IOResult]
*/
def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]]
}
/**
* Result of file I/O operations
* @param count Number of bytes processed
* @param status Success or failure status
*/
final case class IOResult(count: Long, status: Try[Done]) {
def wasSuccessful: Boolean = status.isSuccess
}Usage Examples:
import akka.stream.scaladsl.FileIO
import akka.util.ByteString
import java.nio.file.Paths
// Read file as stream
val filePath = Paths.get("input.txt")
val fileSource: Source[ByteString, Future[IOResult]] = FileIO.fromPath(filePath)
fileSource
.map(_.utf8String)
.runWith(Sink.foreach(println))
// Write stream to file
val outputPath = Paths.get("output.txt")
Source(List("Hello", "World", "!"))
.map(s => ByteString(s + "\n"))
.runWith(FileIO.toPath(outputPath))
.map { result =>
println(s"Wrote ${result.count} bytes")
}
// Copy file with transformation
FileIO.fromPath(Paths.get("input.txt"))
.map(_.utf8String.toUpperCase)
.map(ByteString(_))
.runWith(FileIO.toPath(Paths.get("output.txt")))TCP client and server streaming capabilities.
/**
* TCP streaming utilities
*/
object Tcp {
/**
* Create an outgoing TCP connection
* @param remoteAddress Address to connect to
* @param localAddress Optional local address to bind to
* @param options TCP socket options
* @param halfClose Enable half-close for the connection
* @param connectTimeout Connection timeout duration
* @param idleTimeout Idle timeout for the connection
* @return Flow representing the TCP connection
*/
def outgoingConnection(
remoteAddress: InetSocketAddress,
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil,
halfClose: Boolean = true,
connectTimeout: Duration = Duration.Inf,
idleTimeout: Duration = Duration.Inf
): Flow[ByteString, ByteString, Future[OutgoingConnection]]
/**
* Bind to a TCP port to accept incoming connections
* @param interface Interface to bind to
* @param port Port to bind to
* @param backlog TCP backlog size
* @param options TCP socket options
* @param halfClose Enable half-close for connections
* @param idleTimeout Idle timeout for connections
* @return Source of incoming connections
*/
def bind(
interface: String,
port: Int,
backlog: Int = 100,
options: immutable.Traversable[SocketOption] = Nil,
halfClose: Boolean = true,
idleTimeout: Duration = Duration.Inf
): Source[IncomingConnection, Future[ServerBinding]]
}
/**
* Represents an outgoing TCP connection
*/
final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)
/**
* Represents an incoming TCP connection
*/
final case class IncomingConnection(
remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
flow: Flow[ByteString, ByteString, NotUsed]
) {
/**
* Handle this connection with the given flow
*/
def handleWith[Mat](handler: Flow[ByteString, ByteString, Mat]): Mat
}
/**
* Represents a bound TCP server
*/
trait ServerBinding {
def localAddress: InetSocketAddress
def unbind(): Future[Done]
}Usage Examples:
import akka.stream.scaladsl.{Tcp, Flow}
import akka.util.ByteString
import java.net.InetSocketAddress
// TCP client
val connection = Tcp.outgoingConnection(new InetSocketAddress("example.com", 80))
Source.single(ByteString("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"))
.via(connection)
.runWith(Sink.foreach(response => println(response.utf8String)))
// TCP server
val serverBinding = Tcp.bind("localhost", 8080)
.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
connection.handleWith(Flow[ByteString]
.map(_.utf8String)
.map(_.toUpperCase)
.map(ByteString(_))
)
}
// Shutdown server
serverBinding.flatMap(_.unbind())TLS encryption and decryption for secure communications.
/**
* TLS/SSL utilities for secure communications
*/
object TLS {
/**
* Create a TLS flow for client-side TLS
* @param sslContext SSL context for TLS
* @param firstSession Optional function to configure first session
* @param role TLS role (client or server)
* @param closing Closing behavior
* @return BidiFlow for TLS encryption/decryption
*/
def create(
sslContext: SSLContext,
firstSession: Option[NegotiateNewSession] = None,
role: TLSRole = Client,
closing: TLSClosing = IgnoreComplete
): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]
}
/**
* TLS protocol messages
*/
sealed trait SslTlsInbound
sealed trait SslTlsOutbound
final case class SessionBytes(session: SSLSession, bytes: ByteString) extends SslTlsInbound
final case class SendBytes(bytes: ByteString) extends SslTlsOutbound
/**
* TLS roles
*/
sealed trait TLSRole
case object Client extends TLSRole
case object Server extends TLSRole
/**
* TLS closing behaviors
*/
sealed trait TLSClosing
case object EagerClose extends TLSClosing
case object IgnoreCancel extends TLSClosing
case object IgnoreComplete extends TLSClosingUsage Examples:
import akka.stream.scaladsl.TLS
import javax.net.ssl.SSLContext
// TLS client
val sslContext = SSLContext.getInstance("TLS")
sslContext.init(null, null, null)
val tlsFlow = TLS.create(sslContext, role = TLSRole.Client)
// Secure TCP connection
val secureConnection = Tcp.outgoingConnection(new InetSocketAddress("secure.example.com", 443))
.join(tlsFlow)
Source.single(SendBytes(ByteString("GET / HTTP/1.1\r\nHost: secure.example.com\r\n\r\n")))
.via(secureConnection)
.runWith(Sink.foreach {
case SessionBytes(session, bytes) =>
println(s"Received: ${bytes.utf8String}")
})Integration with Akka actors for sending and receiving messages.
/**
* Create a source that receives messages from an actor
* @param bufferSize Size of the buffer for incoming messages
* @param overflowStrategy Strategy when buffer overflows
* @return Source materialized as ActorRef for sending messages
*/
def actorRef[T](
bufferSize: Int,
overflowStrategy: OverflowStrategy
): Source[T, ActorRef]
/**
* Create a source with backpressure-aware actor integration
* @param ackMessage Message sent to acknowledge element processing
* @param completionMatcher Partial function to detect completion messages
* @param failureMatcher Partial function to detect failure messages
* @return Source materialized as ActorRef with backpressure support
*/
def actorRefWithBackpressure[T](
ackMessage: Any,
completionMatcher: PartialFunction[Any, CompletionStrategy] = PartialFunction.empty,
failureMatcher: PartialFunction[Any, Throwable] = PartialFunction.empty
): Source[T, ActorRef]
/**
* Create a sink that sends messages to an actor
* @param ref Target actor reference
* @param onCompleteMessage Message sent when stream completes
* @return Sink that sends elements as messages
*/
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed]
/**
* Create a sink with backpressure support for actors
* @param ref Target actor reference
* @param messageAdapter Function to wrap elements in messages
* @param initMessage Optional initialization message
* @param ackMessage Message that actor sends to acknowledge receipt
* @param onCompleteMessage Message sent when stream completes
* @param onFailureMessage Function to create failure message
* @return Sink with backpressure control
*/
def actorRefWithBackpressure[T](
ref: ActorRef,
messageAdapter: T => Any,
initMessage: Option[Any] = None,
ackMessage: Any,
onCompleteMessage: Any,
onFailureMessage: Throwable => Any = Status.Failure(_)
): Sink[T, NotUsed]Usage Examples:
import akka.actor.{Actor, ActorRef, Props}
// Actor that processes stream elements
class ProcessingActor extends Actor {
def receive = {
case element: String =>
println(s"Processing: $element")
sender() ! "ack" // Acknowledge processing
case "complete" =>
println("Stream completed")
context.stop(self)
}
}
// Actor source
val (actorRef, source) = Source.actorRefWithBackpressure[String](
ackMessage = "ack",
completionMatcher = {
case "complete" => CompletionStrategy.immediately
}
).preMaterialize()
// Send messages to the source
actorRef ! "Hello"
actorRef ! "World"
actorRef ! "complete"
// Actor sink
val processingActor = system.actorOf(Props[ProcessingActor])
Source(List("msg1", "msg2", "msg3"))
.runWith(Sink.actorRefWithBackpressure(
ref = processingActor,
messageAdapter = identity,
ackMessage = "ack",
onCompleteMessage = "complete"
))Integration with standard Reactive Streams publishers and subscribers.
/**
* Create a source from a Reactive Streams Publisher
* @param publisher Publisher to wrap as a source
* @return Source that subscribes to the publisher
*/
def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed]
/**
* Create a sink from a Reactive Streams Subscriber
* @param subscriber Subscriber to wrap as a sink
* @return Sink that publishes to the subscriber
*/
def fromSubscriber[T](subscriber: Subscriber[T]): Sink[T, NotUsed]
/**
* Convert this source to a Reactive Streams Publisher
* @param fanout Whether to support multiple subscribers
* @return Source materialized as Publisher
*/
def toPublisher(fanout: Boolean): Source[T, Publisher[T]]
/**
* Convert this sink to a Reactive Streams Subscriber
* @return Sink materialized as Subscriber
*/
def toSubscriber[T]: Sink[T, Subscriber[T]]Usage Examples:
import org.reactivestreams.{Publisher, Subscriber}
// From publisher
val publisher: Publisher[Int] = createSomePublisher()
val source = Source.fromPublisher(publisher)
// To publisher
val (publisher2, source2) = Source(1 to 10)
.toPublisher(fanout = false)
.preMaterialize()
// From subscriber
val subscriber: Subscriber[String] = createSomeSubscriber()
val sink = Sink.fromSubscriber(subscriber)
// To subscriber
val (subscriber2, sink2) = Sink.seq[Int]
.toSubscriber
.preMaterialize()Utilities for converting between different stream types and Java I/O.
/**
* Conversion utilities for integrating with Java I/O and other stream types
*/
object StreamConverters {
/**
* Create a source from an InputStream
* @param createInputStream Function that creates the InputStream
* @param chunkSize Size of chunks to read
* @return Source of ByteString chunks
*/
def fromInputStream(createInputStream: () => InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
/**
* Create a source from an OutputStream
* @param createOutputStream Function that creates the OutputStream
* @return Source materialized as OutputStream for writing
*/
def fromOutputStream(createOutputStream: () => OutputStream): Source[ByteString, OutputStream]
/**
* Convert this source to an InputStream
* @param readTimeout Timeout for read operations
* @return Source materialized as InputStream
*/
def asInputStream(readTimeout: FiniteDuration = 5.seconds): Source[ByteString, InputStream]
/**
* Convert this sink to an OutputStream
* @param writeTimeout Timeout for write operations
* @return Sink materialized as OutputStream
*/
def asOutputStream(writeTimeout: FiniteDuration = 5.seconds): Sink[ByteString, OutputStream]
/**
* Convert source to Java 8 Stream
* @return Source materialized as Java Stream
*/
def asJavaStream[T]: Source[T, java.util.stream.Stream[T]]
}Usage Examples:
import akka.stream.scaladsl.StreamConverters
import java.io.{FileInputStream, FileOutputStream}
// From InputStream
val inputSource = StreamConverters.fromInputStream(() => new FileInputStream("input.txt"))
inputSource.runWith(Sink.foreach(chunk => println(chunk.utf8String)))
// To OutputStream
Source(List("Hello", "World"))
.map(s => ByteString(s + "\n"))
.runWith(StreamConverters.asOutputStream())
.map { outputStream =>
// Use the OutputStream
new PrintWriter(outputStream).println("Additional data")
}
// Java Stream integration
val javaStream: java.util.stream.Stream[Int] = Source(1 to 100)
.runWith(StreamConverters.asJavaStream())// I/O result
final case class IOResult(count: Long, status: Try[Done]) {
def wasSuccessful: Boolean = status.isSuccess
}
// TCP connection types
final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)
final case class IncomingConnection(
remoteAddress: InetSocketAddress,
localAddress: InetSocketAddress,
flow: Flow[ByteString, ByteString, NotUsed]
)
trait ServerBinding {
def localAddress: InetSocketAddress
def unbind(): Future[Done]
}
// TLS types
sealed trait SslTlsInbound
sealed trait SslTlsOutbound
final case class SessionBytes(session: SSLSession, bytes: ByteString) extends SslTlsInbound
final case class SendBytes(bytes: ByteString) extends SslTlsOutbound
sealed trait TLSRole
case object Client extends TLSRole
case object Server extends TLSRole
// Actor integration
sealed abstract class CompletionStrategy
case object ImmediateCompletionStrategy extends CompletionStrategy
case object DrainAndCompletionStrategy extends CompletionStrategy