An implementation of Reactive Streams and a DSL for stream processing built on top of Akka actors
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-stream_2-12@2.8.0Akka Stream is a powerful reactive streaming library built on top of the Akka actor framework that implements the Reactive Streams specification. It provides a high-level DSL for building resilient, distributed, and concurrent stream processing applications with strong back-pressure support and composable stream processing operators.
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.8.8"<dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-stream_2.12</artifactId><version>2.8.8</version></dependency>import akka.stream.scaladsl.{Source, Flow, Sink, RunnableGraph}
import akka.stream.Materializer
import akka.NotUsedimport akka.stream.javadsl.Source;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.Materializer;import akka.actor.ActorSystem
import akka.stream.scaladsl.{Source, Sink}
import akka.stream.Materializer
implicit val system: ActorSystem = ActorSystem("stream-system")
implicit val materializer: Materializer = Materializer(system)
// Create a source of numbers 1 to 10
val source = Source(1 to 10)
// Transform and process the stream
val result = source
.map(_ * 2)
.filter(_ > 10)
.runWith(Sink.seq)
// result is a Future[Seq[Int]] containing [12, 14, 16, 18, 20]Akka Stream is built around several key architectural concepts:
The fundamental building blocks for creating and composing stream processing pipelines. These types define the structure and flow of data through reactive streams.
// Source: Stream with one output, no inputs
trait Source[+Out, +Mat] extends Graph[SourceShape[Out], Mat]
// Flow: Stream processing step with one input and one output
trait Flow[-In, +Out, +Mat] extends Graph[FlowShape[In, Out], Mat]
// Sink: Stream endpoint that consumes elements
trait Sink[-In, +Mat] extends Graph[SinkShape[In], Mat]
// RunnableGraph: Complete stream ready for execution
trait RunnableGraph[+Mat] extends Graph[ClosedShape, Mat]Factory methods and utilities for creating stream sources from various data sources including collections, futures, actors, and external systems.
object Source {
def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed]
def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed]
def future[T](futureElement: Future[T]): Source[T, NotUsed]
def single[T](element: T): Source[T, NotUsed]
def empty[T]: Source[T, NotUsed]
def repeat[T](element: T): Source[T, NotUsed]
}Core transformation operators for manipulating, filtering, grouping, and routing stream elements with strong type safety and back-pressure support.
// Basic transformations
def map[T2](f: Out => T2): Source[T2, Mat]
def filter(p: Out => Boolean): Source[Out, Mat]
def collect[T2](pf: PartialFunction[Out, T2]): Source[T2, Mat]
// Async transformations
def mapAsync[T2](parallelism: Int)(f: Out => Future[T2]): Source[T2, Mat]
def mapAsyncUnordered[T2](parallelism: Int)(f: Out => Future[T2]): Source[T2, Mat]
// Grouping and batching
def grouped(n: Int): Source[immutable.Seq[Out], Mat]
def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Source[Out, Mat]#Repr, RunnableGraph[Mat]]Operations for merging, zipping, concatenating, and broadcasting streams to create complex data flow topologies.
// Combining sources
def merge[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]
def concat[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]
def zip[U](other: Graph[SourceShape[U], _]): Source[(Out, U), Mat]
// Broadcasting and balancing
def broadcast(outputCount: Int): Graph[UniformFanOutShape[Out, Out], NotUsed]
def balance[T](outputCount: Int): Graph[UniformFanOutShape[T, T], NotUsed]Endpoints for consuming stream elements including collection sinks, side-effect sinks, and integration with external systems.
object Sink {
def seq[T]: Sink[T, Future[immutable.Seq[T]]]
def head[T]: Sink[T, Future[T]]
def foreach[T](f: T => Unit): Sink[T, Future[Done]]
def fold[U, T](zero: U)(f: (U, T) => U): Sink[T, Future[U]]
def ignore: Sink[Any, Future[Done]]
}System for converting stream blueprints into running streams, managing resources, and controlling materialized values.
trait Materializer {
def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable
}
// Materialization control
def run(): Mat
def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2]): Mat2
def mapMaterializedValue[Mat2](f: Mat => Mat2): Source[Out, Mat2]Strategies for handling failures, implementing supervision, and recovering from errors in stream processing pipelines.
object Supervision {
type Decider = Throwable => Directive
sealed abstract class Directive
case object Resume extends Directive
case object Restart extends Directive
case object Stop extends Directive
}
// Error handling operators
def recover[U >: Out](pf: PartialFunction[Throwable, U]): Source[U, Mat]
def recoverWithRetries[U >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[U], _]]): Source[U, Mat]Mechanisms for controlling stream lifecycle, implementing backpressure, rate limiting, and external stream termination.
// Flow control
def buffer(size: Int, overflowStrategy: OverflowStrategy): Source[Out, Mat]
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Source[Out, Mat]
// Kill switches for external termination
trait KillSwitch {
def shutdown(): Unit
def abort(ex: Throwable): Unit
}API for creating custom stream processing operators using GraphStage for advanced use cases requiring fine-grained control over stream behavior.
abstract class GraphStage[S <: Shape] extends Graph[S, NotUsed] {
def createLogic(inheritedAttributes: Attributes): GraphStageLogic
}
abstract class GraphStageLogic(val shape: Shape) {
def setHandler(in: Inlet[_], handler: InHandler): Unit
def setHandler(out: Outlet[_], handler: OutHandler): Unit
def push[T](out: Outlet[T], elem: T): Unit
def pull[T](in: Inlet[T]): Unit
}Integration with file systems, TCP/TLS networking, actors, and external reactive streams publishers/subscribers.
// File I/O
object FileIO {
def fromPath(f: Path): Source[ByteString, Future[IOResult]]
def toPath(f: Path): Sink[ByteString, Future[IOResult]]
}
// TCP networking
object Tcp {
def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]
def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]]
}// Essential type for operations without materialized value
type NotUsed = akka.NotUsed
// Completion marker
sealed abstract class Done
case object Done extends Done
// Stream shapes
trait SourceShape[+T] extends Shape
trait FlowShape[-I, +O] extends Shape
trait SinkShape[-T] extends Shape
trait ClosedShape extends Shape
// Overflow strategies for buffering
sealed abstract class OverflowStrategy
object OverflowStrategy {
case object DropHead extends OverflowStrategy
case object DropTail extends OverflowStrategy
case object DropBuffer extends OverflowStrategy
case object DropNew extends OverflowStrategy
case object Backpressure extends OverflowStrategy
case object Fail extends OverflowStrategy
}