or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-stream-types.mdcustom-stages.mderror-handling.mdindex.mdintegration.mdmaterialization.mdstream-combining.mdstream-control.mdstream-sinks.mdstream-sources.mdstream-transformations.md
tile.json

integration.mddocs/

Integration

Integration with file systems, TCP/TLS networking, actors, and external reactive streams publishers/subscribers. This provides connectivity between Akka Stream and external systems.

Capabilities

File I/O Integration

Stream-based file reading and writing operations.

/**
 * File I/O utilities for streaming file operations
 */
object FileIO {
  /**
   * Create a source that reads from a file
   * @param f Path to the file to read
   * @param chunkSize Size of chunks to read at a time
   * @return Source of ByteString chunks with IOResult materialized value
   */
  def fromPath(f: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
  
  /**
   * Create a sink that writes to a file
   * @param f Path to the file to write
   * @param options File open options (default: write, truncate, create)
   * @return Sink that materializes to Future[IOResult]
   */
  def toPath(f: Path, options: Set[OpenOption] = Set(WRITE, TRUNCATE_EXISTING, CREATE)): Sink[ByteString, Future[IOResult]]
}

/**
 * Result of file I/O operations
 * @param count Number of bytes processed
 * @param status Success or failure status
 */
final case class IOResult(count: Long, status: Try[Done]) {
  def wasSuccessful: Boolean = status.isSuccess
}

Usage Examples:

import akka.stream.scaladsl.FileIO
import akka.util.ByteString
import java.nio.file.Paths

// Read file as stream
val filePath = Paths.get("input.txt")
val fileSource: Source[ByteString, Future[IOResult]] = FileIO.fromPath(filePath)

fileSource
  .map(_.utf8String)
  .runWith(Sink.foreach(println))

// Write stream to file
val outputPath = Paths.get("output.txt")
Source(List("Hello", "World", "!"))
  .map(s => ByteString(s + "\n"))
  .runWith(FileIO.toPath(outputPath))
  .map { result =>
    println(s"Wrote ${result.count} bytes")
  }

// Copy file with transformation
FileIO.fromPath(Paths.get("input.txt"))
  .map(_.utf8String.toUpperCase)
  .map(ByteString(_))
  .runWith(FileIO.toPath(Paths.get("output.txt")))

TCP Networking

TCP client and server streaming capabilities.

/**
 * TCP streaming utilities
 */
object Tcp {
  /**
   * Create an outgoing TCP connection
   * @param remoteAddress Address to connect to
   * @param localAddress Optional local address to bind to
   * @param options TCP socket options
   * @param halfClose Enable half-close for the connection
   * @param connectTimeout Connection timeout duration
   * @param idleTimeout Idle timeout for the connection
   * @return Flow representing the TCP connection
   */
  def outgoingConnection(
    remoteAddress: InetSocketAddress,
    localAddress: Option[InetSocketAddress] = None,
    options: immutable.Traversable[SocketOption] = Nil,
    halfClose: Boolean = true,
    connectTimeout: Duration = Duration.Inf,
    idleTimeout: Duration = Duration.Inf
  ): Flow[ByteString, ByteString, Future[OutgoingConnection]]
  
  /**
   * Bind to a TCP port to accept incoming connections
   * @param interface Interface to bind to
   * @param port Port to bind to
   * @param backlog TCP backlog size
   * @param options TCP socket options
   * @param halfClose Enable half-close for connections
   * @param idleTimeout Idle timeout for connections
   * @return Source of incoming connections
   */
  def bind(
    interface: String,
    port: Int,
    backlog: Int = 100,
    options: immutable.Traversable[SocketOption] = Nil,
    halfClose: Boolean = true,
    idleTimeout: Duration = Duration.Inf
  ): Source[IncomingConnection, Future[ServerBinding]]
}

/**
 * Represents an outgoing TCP connection
 */
final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)

/**
 * Represents an incoming TCP connection
 */
final case class IncomingConnection(
  remoteAddress: InetSocketAddress,
  localAddress: InetSocketAddress,
  flow: Flow[ByteString, ByteString, NotUsed]
) {
  /**
   * Handle this connection with the given flow
   */
  def handleWith[Mat](handler: Flow[ByteString, ByteString, Mat]): Mat
}

/**
 * Represents a bound TCP server
 */
trait ServerBinding {
  def localAddress: InetSocketAddress
  def unbind(): Future[Done]
}

Usage Examples:

import akka.stream.scaladsl.{Tcp, Flow}
import akka.util.ByteString
import java.net.InetSocketAddress

// TCP client
val connection = Tcp.outgoingConnection(new InetSocketAddress("example.com", 80))

Source.single(ByteString("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"))
  .via(connection)
  .runWith(Sink.foreach(response => println(response.utf8String)))

// TCP server  
val serverBinding = Tcp.bind("localhost", 8080)
  .runForeach { connection =>
    println(s"New connection from: ${connection.remoteAddress}")
    
    connection.handleWith(Flow[ByteString]
      .map(_.utf8String)
      .map(_.toUpperCase)
      .map(ByteString(_))
    )
  }

// Shutdown server
serverBinding.flatMap(_.unbind())

TLS/SSL Support

TLS encryption and decryption for secure communications.

/**
 * TLS/SSL utilities for secure communications
 */
object TLS {
  /**
   * Create a TLS flow for client-side TLS
   * @param sslContext SSL context for TLS
   * @param firstSession Optional function to configure first session
   * @param role TLS role (client or server)
   * @param closing Closing behavior
   * @return BidiFlow for TLS encryption/decryption
   */
  def create(
    sslContext: SSLContext,
    firstSession: Option[NegotiateNewSession] = None,
    role: TLSRole = Client,
    closing: TLSClosing = IgnoreComplete
  ): BidiFlow[SslTlsOutbound, ByteString, ByteString, SslTlsInbound, NotUsed]
}

/**
 * TLS protocol messages
 */
sealed trait SslTlsInbound
sealed trait SslTlsOutbound

final case class SessionBytes(session: SSLSession, bytes: ByteString) extends SslTlsInbound
final case class SendBytes(bytes: ByteString) extends SslTlsOutbound

/**
 * TLS roles
 */
sealed trait TLSRole
case object Client extends TLSRole  
case object Server extends TLSRole

/**
 * TLS closing behaviors
 */
sealed trait TLSClosing
case object EagerClose extends TLSClosing
case object IgnoreCancel extends TLSClosing
case object IgnoreComplete extends TLSClosing

Usage Examples:

import akka.stream.scaladsl.TLS
import javax.net.ssl.SSLContext

// TLS client
val sslContext = SSLContext.getInstance("TLS")
sslContext.init(null, null, null)

val tlsFlow = TLS.create(sslContext, role = TLSRole.Client)

// Secure TCP connection
val secureConnection = Tcp.outgoingConnection(new InetSocketAddress("secure.example.com", 443))
  .join(tlsFlow)

Source.single(SendBytes(ByteString("GET / HTTP/1.1\r\nHost: secure.example.com\r\n\r\n")))
  .via(secureConnection)
  .runWith(Sink.foreach {
    case SessionBytes(session, bytes) =>
      println(s"Received: ${bytes.utf8String}")
  })

Actor Integration

Integration with Akka actors for sending and receiving messages.

/**
 * Create a source that receives messages from an actor
 * @param bufferSize Size of the buffer for incoming messages
 * @param overflowStrategy Strategy when buffer overflows
 * @return Source materialized as ActorRef for sending messages
 */
def actorRef[T](
  bufferSize: Int,
  overflowStrategy: OverflowStrategy
): Source[T, ActorRef]

/**
 * Create a source with backpressure-aware actor integration
 * @param ackMessage Message sent to acknowledge element processing
 * @param completionMatcher Partial function to detect completion messages
 * @param failureMatcher Partial function to detect failure messages
 * @return Source materialized as ActorRef with backpressure support
 */
def actorRefWithBackpressure[T](
  ackMessage: Any,
  completionMatcher: PartialFunction[Any, CompletionStrategy] = PartialFunction.empty,
  failureMatcher: PartialFunction[Any, Throwable] = PartialFunction.empty
): Source[T, ActorRef]

/**
 * Create a sink that sends messages to an actor
 * @param ref Target actor reference
 * @param onCompleteMessage Message sent when stream completes
 * @return Sink that sends elements as messages
 */
def actorRef[T](ref: ActorRef, onCompleteMessage: Any): Sink[T, NotUsed]

/**
 * Create a sink with backpressure support for actors
 * @param ref Target actor reference
 * @param messageAdapter Function to wrap elements in messages
 * @param initMessage Optional initialization message
 * @param ackMessage Message that actor sends to acknowledge receipt
 * @param onCompleteMessage Message sent when stream completes
 * @param onFailureMessage Function to create failure message
 * @return Sink with backpressure control
 */
def actorRefWithBackpressure[T](
  ref: ActorRef,
  messageAdapter: T => Any,
  initMessage: Option[Any] = None,
  ackMessage: Any,
  onCompleteMessage: Any,
  onFailureMessage: Throwable => Any = Status.Failure(_)
): Sink[T, NotUsed]

Usage Examples:

import akka.actor.{Actor, ActorRef, Props}

// Actor that processes stream elements
class ProcessingActor extends Actor {
  def receive = {
    case element: String =>
      println(s"Processing: $element")
      sender() ! "ack" // Acknowledge processing
    case "complete" =>
      println("Stream completed")
      context.stop(self)
  }
}

// Actor source
val (actorRef, source) = Source.actorRefWithBackpressure[String](
  ackMessage = "ack",
  completionMatcher = {
    case "complete" => CompletionStrategy.immediately
  }
).preMaterialize()

// Send messages to the source
actorRef ! "Hello"
actorRef ! "World"
actorRef ! "complete"

// Actor sink
val processingActor = system.actorOf(Props[ProcessingActor])
Source(List("msg1", "msg2", "msg3"))
  .runWith(Sink.actorRefWithBackpressure(
    ref = processingActor,
    messageAdapter = identity,
    ackMessage = "ack",
    onCompleteMessage = "complete"
  ))

Reactive Streams Integration

Integration with standard Reactive Streams publishers and subscribers.

/**
 * Create a source from a Reactive Streams Publisher
 * @param publisher Publisher to wrap as a source
 * @return Source that subscribes to the publisher
 */
def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed]

/**
 * Create a sink from a Reactive Streams Subscriber  
 * @param subscriber Subscriber to wrap as a sink
 * @return Sink that publishes to the subscriber
 */
def fromSubscriber[T](subscriber: Subscriber[T]): Sink[T, NotUsed]

/**
 * Convert this source to a Reactive Streams Publisher
 * @param fanout Whether to support multiple subscribers
 * @return Source materialized as Publisher
 */
def toPublisher(fanout: Boolean): Source[T, Publisher[T]]

/**
 * Convert this sink to a Reactive Streams Subscriber
 * @return Sink materialized as Subscriber
 */
def toSubscriber[T]: Sink[T, Subscriber[T]]

Usage Examples:

import org.reactivestreams.{Publisher, Subscriber}

// From publisher
val publisher: Publisher[Int] = createSomePublisher()
val source = Source.fromPublisher(publisher)

// To publisher  
val (publisher2, source2) = Source(1 to 10)
  .toPublisher(fanout = false)
  .preMaterialize()

// From subscriber
val subscriber: Subscriber[String] = createSomeSubscriber()
val sink = Sink.fromSubscriber(subscriber)

// To subscriber
val (subscriber2, sink2) = Sink.seq[Int]
  .toSubscriber
  .preMaterialize()

Stream Converters

Utilities for converting between different stream types and Java I/O.

/**
 * Conversion utilities for integrating with Java I/O and other stream types
 */
object StreamConverters {
  /**
   * Create a source from an InputStream
   * @param createInputStream Function that creates the InputStream
   * @param chunkSize Size of chunks to read
   * @return Source of ByteString chunks
   */
  def fromInputStream(createInputStream: () => InputStream, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
  
  /**
   * Create a source from an OutputStream 
   * @param createOutputStream Function that creates the OutputStream
   * @return Source materialized as OutputStream for writing
   */
  def fromOutputStream(createOutputStream: () => OutputStream): Source[ByteString, OutputStream]
  
  /**
   * Convert this source to an InputStream
   * @param readTimeout Timeout for read operations
   * @return Source materialized as InputStream
   */
  def asInputStream(readTimeout: FiniteDuration = 5.seconds): Source[ByteString, InputStream]
  
  /**
   * Convert this sink to an OutputStream
   * @param writeTimeout Timeout for write operations
   * @return Sink materialized as OutputStream
   */
  def asOutputStream(writeTimeout: FiniteDuration = 5.seconds): Sink[ByteString, OutputStream]
  
  /**
   * Convert source to Java 8 Stream
   * @return Source materialized as Java Stream
   */
  def asJavaStream[T]: Source[T, java.util.stream.Stream[T]]
}

Usage Examples:

import akka.stream.scaladsl.StreamConverters
import java.io.{FileInputStream, FileOutputStream}

// From InputStream
val inputSource = StreamConverters.fromInputStream(() => new FileInputStream("input.txt"))
inputSource.runWith(Sink.foreach(chunk => println(chunk.utf8String)))

// To OutputStream  
Source(List("Hello", "World"))
  .map(s => ByteString(s + "\n"))
  .runWith(StreamConverters.asOutputStream())
  .map { outputStream =>
    // Use the OutputStream
    new PrintWriter(outputStream).println("Additional data")
  }

// Java Stream integration
val javaStream: java.util.stream.Stream[Int] = Source(1 to 100)
  .runWith(StreamConverters.asJavaStream())

Types

// I/O result
final case class IOResult(count: Long, status: Try[Done]) {
  def wasSuccessful: Boolean = status.isSuccess
}

// TCP connection types
final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)
final case class IncomingConnection(
  remoteAddress: InetSocketAddress,
  localAddress: InetSocketAddress,
  flow: Flow[ByteString, ByteString, NotUsed]
)

trait ServerBinding {
  def localAddress: InetSocketAddress
  def unbind(): Future[Done]
}

// TLS types
sealed trait SslTlsInbound
sealed trait SslTlsOutbound
final case class SessionBytes(session: SSLSession, bytes: ByteString) extends SslTlsInbound
final case class SendBytes(bytes: ByteString) extends SslTlsOutbound

sealed trait TLSRole
case object Client extends TLSRole
case object Server extends TLSRole

// Actor integration
sealed abstract class CompletionStrategy
case object ImmediateCompletionStrategy extends CompletionStrategy
case object DrainAndCompletionStrategy extends CompletionStrategy