CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.

Pending
Overview
Eval results
Files

io-integration.mddocs/

I/O Integration

File I/O, TCP networking, and integration with Java streams and other I/O systems.

File I/O

FileIO Operations

object FileIO {
  def fromPath(path: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
  def fromFile(file: File, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
  def toPath(path: Path, options: Set[OpenOption] = Set(CREATE, WRITE, TRUNCATE_EXISTING)): Sink[ByteString, Future[IOResult]]
  def toFile(file: File, append: Boolean = false): Sink[ByteString, Future[IOResult]]
}

Usage Examples:

import akka.stream.scaladsl.FileIO
import java.nio.file.Paths
import akka.util.ByteString

// Read from file
val source: Source[ByteString, Future[IOResult]] = 
  FileIO.fromPath(Paths.get("input.txt"))

// Write to file
val sink: Sink[ByteString, Future[IOResult]] = 
  FileIO.toPath(Paths.get("output.txt"))

// Copy file
val copyResult: Future[IOResult] = source.runWith(sink)

// Process text lines
val textProcessing = FileIO.fromPath(Paths.get("data.txt"))
  .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1024))
  .map(_.utf8String)
  .map(_.toUpperCase)
  .map(line => ByteString(line + "\n"))
  .runWith(FileIO.toPath(Paths.get("processed.txt")))

IOResult

case class IOResult(count: Long, status: Try[Done]) {
  def wasSuccessful: Boolean = status.isSuccess
}

TCP Networking

TCP Operations

object Tcp {
  def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]
  def outgoingConnection(host: String, port: Int): Flow[ByteString, ByteString, Future[OutgoingConnection]]
  def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]]
}

Connection Types:

final case class IncomingConnection(
  localAddress: InetSocketAddress,
  remoteAddress: InetSocketAddress,
  flow: Flow[ByteString, ByteString, NotUsed]
)

final case class OutgoingConnection(
  localAddress: InetSocketAddress,
  remoteAddress: InetSocketAddress
)

trait ServerBinding {
  def localAddress: InetSocketAddress
  def unbind(): Future[Done]
}

Usage Examples:

import akka.stream.scaladsl.Tcp
import java.net.InetSocketAddress

// TCP Client
val connection = Tcp().outgoingConnection("example.com", 80)
val request = ByteString("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")

val responseFlow = Source.single(request)
  .via(connection)
  .runFold(ByteString.empty)(_ ++ _)

// TCP Server  
val binding = Tcp().bind("localhost", 8080)
val serverFlow = Flow[ByteString].map { request =>
  ByteString("HTTP/1.1 200 OK\r\n\r\nHello World!")
}

binding.runForeach { connection =>
  println(s"New connection from: ${connection.remoteAddress}")
  connection.handleWith(serverFlow)
}

Stream Converters

Java Stream Integration

object StreamConverters {
  def fromInputStream(in: () => InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
  def fromOutputStream(out: () => OutputStream, autoFlush: Boolean = true): Sink[ByteString, Future[IOResult]]
  def asInputStream(readTimeout: FiniteDuration = 5.minutes): Sink[ByteString, InputStream]
  def asOutputStream(writeTimeout: FiniteDuration = 5.minutes): Source[ByteString, OutputStream]
}

Usage Examples:

import akka.stream.scaladsl.StreamConverters
import java.io.{FileInputStream, FileOutputStream}

// From InputStream
val inputStreamSource = StreamConverters.fromInputStream(
  () => new FileInputStream("input.txt")
)

// To OutputStream  
val outputStreamSink = StreamConverters.fromOutputStream(
  () => new FileOutputStream("output.txt")
)

// Bridge to blocking I/O
val inputStream: InputStream = Source(List("hello", "world"))
  .map(s => ByteString(s + "\n"))
  .runWith(StreamConverters.asInputStream())

Framing

Delimiter-based Framing

object Framing {
  def delimiter(
    delimiter: ByteString,
    maximumFrameLength: Int,
    allowTruncation: Boolean = false
  ): Flow[ByteString, ByteString, NotUsed]
  
  def lengthField(
    lengthFieldLength: Int,
    lengthFieldOffset: Int = 0,
    maximumFrameLength: Int,
    byteOrder: ByteOrder = ByteOrder.LITTLE_ENDIAN
  ): Flow[ByteString, ByteString, NotUsed]
}

Usage Examples:

import akka.stream.scaladsl.Framing
import akka.util.ByteString

// Line-based framing
val lineFraming = Framing.delimiter(
  ByteString("\n"), 
  maximumFrameLength = 1024
)

// Process text file line by line
FileIO.fromPath(Paths.get("data.txt"))
  .via(lineFraming)
  .map(_.utf8String.trim)
  .filter(_.nonEmpty)
  .runWith(Sink.foreach(println))

// Length-prefixed framing (4-byte length header)
val lengthFraming = Framing.lengthField(
  lengthFieldLength = 4,
  maximumFrameLength = 1024 * 1024
)

JSON Framing

object JsonFraming {
  def objectScanner(maximumObjectLength: Int): Flow[ByteString, ByteString, NotUsed]
}

Usage Example:

import akka.stream.scaladsl.JsonFraming

// Parse JSON objects from stream
val jsonSource = Source.single(ByteString("""{"a":1}{"b":2}{"c":3}"""))

jsonSource
  .via(JsonFraming.objectScanner(1024))
  .map(_.utf8String)
  .runWith(Sink.foreach(println))
// Output: {"a":1}, {"b":2}, {"c":3}

Compression

Compression Operations

object Compression {
  def gzip: Flow[ByteString, ByteString, NotUsed]
  def gunzip(maxBytesPerChunk: Int = 65536): Flow[ByteString, ByteString, NotUsed]
  def deflate: Flow[ByteString, ByteString, NotUsed]
  def inflate(maxBytesPerChunk: Int = 65536): Flow[ByteString, ByteString, NotUsed]
}

Usage Examples:

import akka.stream.scaladsl.Compression

// Compress file
FileIO.fromPath(Paths.get("large-file.txt"))
  .via(Compression.gzip)
  .runWith(FileIO.toPath(Paths.get("compressed.gz")))

// Decompress file
FileIO.fromPath(Paths.get("data.gz"))
  .via(Compression.gunzip())
  .runWith(FileIO.toPath(Paths.get("decompressed.txt")))

// HTTP-style compression
val httpResponse = Source.single(ByteString("Hello World!"))
  .via(Compression.gzip)
  .map { compressed =>
    s"Content-Encoding: gzip\r\nContent-Length: ${compressed.length}\r\n\r\n"
  }

TLS/SSL Support

TLS Operations

object TLS {
  def create(): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]
  def create(sslContext: SSLContext): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]
}

sealed trait SslTlsInbound
case class SessionBytes(bytes: ByteString) extends SslTlsInbound
case class SessionTruncated extends SslTlsInbound

sealed trait SslTlsOutbound  
case class SendBytes(bytes: ByteString) extends SslTlsOutbound
case object SessionClose extends SslTlsOutbound

Usage Example:

import akka.stream.scaladsl.TLS
import javax.net.ssl.SSLContext

// HTTPS client with TLS
val sslContext = SSLContext.getDefault
val tlsFlow = TLS.create(sslContext)

val httpsRequest = Source.single(SendBytes(
  ByteString("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n")
))

val connection = Tcp().outgoingConnection("example.com", 443)

httpsRequest
  .via(tlsFlow.reversed)
  .via(connection)
  .via(tlsFlow)
  .runWith(Sink.foreach {
    case SessionBytes(bytes) => println(bytes.utf8String)
    case SessionTruncated => println("Session truncated")
  })

Integration Patterns

Reactive Streams Integration

import org.reactivestreams.{Publisher, Subscriber}

// From Reactive Streams Publisher
val publisherSource: Source[Int, NotUsed] = 
  Source.fromPublisher(somePublisher)

// To Reactive Streams Subscriber  
val subscriberSink: Sink[Int, NotUsed] = 
  Sink.fromSubscriber(someSubscriber)

// As Publisher (for other Reactive Streams implementations)
val asPublisher: Sink[Int, Publisher[Int]] = 
  Sink.asPublisher(fanout = false)

Actor Integration

import akka.actor.ActorRef

// Send to Actor
val actorSink: Sink[String, NotUsed] = 
  Sink.actorRef(actorRef, onCompleteMessage = "Done")

// From Actor (with backpressure)
val actorSource: Source[String, ActorRef] = 
  Source.actorRef(bufferSize = 100, OverflowStrategy.dropHead)

This covers the main I/O integration capabilities, providing bridges between streams and external systems while maintaining backpressure semantics.

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

docs

control-flow.md

core-components.md

error-handling.md

graph-building.md

index.md

io-integration.md

junction-operations.md

materialization.md

stream-operations.md

tile.json