Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.
—
File I/O, TCP networking, and integration with Java streams and other I/O systems.
object FileIO {
def fromPath(path: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
def fromFile(file: File, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
def toPath(path: Path, options: Set[OpenOption] = Set(CREATE, WRITE, TRUNCATE_EXISTING)): Sink[ByteString, Future[IOResult]]
def toFile(file: File, append: Boolean = false): Sink[ByteString, Future[IOResult]]
}Usage Examples:
import akka.stream.scaladsl.FileIO
import java.nio.file.Paths
import akka.util.ByteString
// Read from file
val source: Source[ByteString, Future[IOResult]] =
FileIO.fromPath(Paths.get("input.txt"))
// Write to file
val sink: Sink[ByteString, Future[IOResult]] =
FileIO.toPath(Paths.get("output.txt"))
// Copy file
val copyResult: Future[IOResult] = source.runWith(sink)
// Process text lines
val textProcessing = FileIO.fromPath(Paths.get("data.txt"))
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
.map(_.utf8String)
.map(_.toUpperCase)
.map(line => ByteString(line + "\n"))
.runWith(FileIO.toPath(Paths.get("processed.txt")))case class IOResult(count: Long, status: Try[Done]) {
def wasSuccessful: Boolean = status.isSuccess
}object Tcp {
def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]
def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]]
def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]]
}Connection Types:
final case class IncomingConnection(
localAddress: InetSocketAddress,
remoteAddress: InetSocketAddress,
flow: Flow[ByteString, ByteString, NotUsed]
)
final case class OutgoingConnection(
localAddress: InetSocketAddress,
remoteAddress: InetSocketAddress
)
trait ServerBinding {
def localAddress: InetSocketAddress
def unbind(): Future[Done]
}Usage Examples:
import akka.stream.scaladsl.Tcp
import java.net.InetSocketAddress
// TCP Client
val connection = Tcp().outgoingConnection("example.com", 80)
val request = ByteString("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
val responseFlow = Source.single(request)
.via(connection)
.runFold(ByteString.empty)(_ ++ _)
// TCP Server
val binding = Tcp().bind("localhost", 8080)
val serverFlow = Flow[ByteString].map { request =>
ByteString("HTTP/1.1 200 OK\r\n\r\nHello World!")
}
binding.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
connection.handleWith(serverFlow)
}object StreamConverters {
def fromInputStream(in: () => InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
def fromOutputStream(out: () => OutputStream, autoFlush: Boolean = true): Sink[ByteString, Future[IOResult]]
def asInputStream(readTimeout: FiniteDuration = 5.minutes): Sink[ByteString, InputStream]
def asOutputStream(writeTimeout: FiniteDuration = 5.minutes): Source[ByteString, OutputStream]
}Usage Examples:
import akka.stream.scaladsl.StreamConverters
import java.io.{FileInputStream, FileOutputStream}
// From InputStream
val inputStreamSource = StreamConverters.fromInputStream(
() => new FileInputStream("input.txt")
)
// To OutputStream
val outputStreamSink = StreamConverters.fromOutputStream(
() => new FileOutputStream("output.txt")
)
// Bridge to blocking I/O
val inputStream: InputStream = Source(List("hello", "world"))
.map(s => ByteString(s + "\n"))
.runWith(StreamConverters.asInputStream())object Framing {
def delimiter(
delimiter: ByteString,
maximumFrameLength: Int,
allowTruncation: Boolean = false
): Flow[ByteString, ByteString, NotUsed]
def lengthField(
lengthFieldLength: Int,
lengthFieldOffset: Int = 0,
maximumFrameLength: Int,
byteOrder: ByteOrder = ByteOrder.LITTLE_ENDIAN
): Flow[ByteString, ByteString, NotUsed]
}Usage Examples:
import akka.stream.scaladsl.Framing
import akka.util.ByteString
// Line-based framing
val lineFraming = Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 1024
)
// Process text file line by line
FileIO.fromPath(Paths.get("data.txt"))
.via(lineFraming)
.map(_.utf8String.trim)
.filter(_.nonEmpty)
.runWith(Sink.foreach(println))
// Length-prefixed framing (4-byte length header)
val lengthFraming = Framing.lengthField(
lengthFieldLength = 4,
maximumFrameLength = 1024 * 1024
)object JsonFraming {
def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed]
}Usage Example:
import akka.stream.scaladsl.JsonFraming
// Parse JSON objects from stream
val jsonSource = Source.single(ByteString("""{"a":1}{"b":2}{"c":3}"""))
jsonSource
.via(JsonFraming.objectScanner(1024))
.map(_.utf8String)
.runWith(Sink.foreach(println))
// Output: {"a":1}, {"b":2}, {"c":3}object Compression {
def gzip: Flow[ByteString, ByteString, NotUsed]
def gunzip(maxBytesPerChunk: Int = 65536): Flow[ByteString, ByteString, NotUsed]
def deflate: Flow[ByteString, ByteString, NotUsed]
def inflate(maxBytesPerChunk: Int = 65536): Flow[ByteString, ByteString, NotUsed]
}Usage Examples:
import akka.stream.scaladsl.Compression
// Compress file
FileIO.fromPath(Paths.get("large-file.txt"))
.via(Compression.gzip)
.runWith(FileIO.toPath(Paths.get("compressed.gz")))
// Decompress file
FileIO.fromPath(Paths.get("data.gz"))
.via(Compression.gunzip())
.runWith(FileIO.toPath(Paths.get("decompressed.txt")))
// HTTP-style compression
val httpResponse = Source.single(ByteString("Hello World!"))
.via(Compression.gzip)
.map { compressed =>
s"Content-Encoding: gzip\r\nContent-Length: ${compressed.length}\r\n\r\n"
}object TLS {
def create(): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]
def create(sslContext: SSLContext): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]
}
sealed trait SslTlsInbound
case class SessionBytes(bytes: ByteString) extends SslTlsInbound
case class SessionTruncated extends SslTlsInbound
sealed trait SslTlsOutbound
case class SendBytes(bytes: ByteString) extends SslTlsOutbound
case object SessionClose extends SslTlsOutboundUsage Example:
import akka.stream.scaladsl.TLS
import javax.net.ssl.SSLContext
// HTTPS client with TLS
val sslContext = SSLContext.getDefault
val tlsFlow = TLS.create(sslContext)
val httpsRequest = Source.single(SendBytes(
ByteString("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
))
val connection = Tcp().outgoingConnection("example.com", 443)
httpsRequest
.via(tlsFlow.reversed)
.via(connection)
.via(tlsFlow)
.runWith(Sink.foreach {
case SessionBytes(bytes) => println(bytes.utf8String)
case SessionTruncated => println("Session truncated")
})import org.reactivestreams.{Publisher, Subscriber}
// From Reactive Streams Publisher
val publisherSource: Source[Int, NotUsed] =
Source.fromPublisher(somePublisher)
// To Reactive Streams Subscriber
val subscriberSink: Sink[Int, NotUsed] =
Sink.fromSubscriber(someSubscriber)
// As Publisher (for other Reactive Streams implementations)
val asPublisher: Sink[Int, Publisher[Int]] =
Sink.asPublisher(fanout = false)import akka.actor.ActorRef
// Send to Actor
val actorSink: Sink[String, NotUsed] =
Sink.actorRef(actorRef, onCompleteMessage = "Done")
// From Actor (with backpressure)
val actorSource: Source[String, ActorRef] =
Source.actorRef(bufferSize = 100, OverflowStrategy.dropHead)This covers the main I/O integration capabilities, providing bridges between streams and external systems while maintaining backpressure semantics.
Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5