Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5@2.5.0Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space. This library provides a domain-specific language for expressing complex data transformation pipelines with automatic backpressure management, enabling developers to build robust, asynchronous stream processing applications.
build.sbt: libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.23"import akka.stream.scaladsl.{Source, Flow, Sink}
import akka.stream.{ActorMaterializer, Materializer}
import akka.actor.ActorSystem
import akka.{Done, NotUsed}Java API:
import akka.stream.javadsl.*;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.actor.ActorSystem;import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink}
implicit val system = ActorSystem("MySystem")
implicit val materializer: ActorMaterializer = ActorMaterializer()
// Create a simple stream: numbers 1 to 10, multiply by 2, print results
val source: Source[Int, NotUsed] = Source(1 to 10)
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)
val result: Future[Done] = source
.map(_ * 2)
.runWith(sink)
// Result will print: 2, 4, 6, 8, 10, 12, 14, 16, 18, 20Akka Streams is built around several key abstractions:
All stream processing is built using these composable, type-safe building blocks that automatically handle backpressure according to the Reactive Streams specification.
The fundamental building blocks for creating reactive streams with type-safe composition and automatic backpressure handling.
// Source - produces elements
final class Source[+Out, +Mat](traversalBuilder: LinearTraversalBuilder, shape: SourceShape[Out])
// Flow - transforms elements
final class Flow[-In, +Out, +Mat](traversalBuilder: LinearTraversalBuilder, shape: FlowShape[In, Out])
// Sink - consumes elements
final class Sink[-In, +Mat](traversalBuilder: LinearTraversalBuilder, shape: SinkShape[In])Advanced graph construction using GraphDSL for complex stream topologies including fan-in, fan-out, and custom shapes.
object GraphDSL {
def create[S <: Shape, Mat](buildBlock: GraphDSL.Builder[Mat] => S): Graph[S, Mat]
class Builder[+M] {
def add[S <: Shape](graph: Graph[S, _]): S
// Connection operators: ~>, <~, via, to, from
}
}Graph Building and Composition
Comprehensive set of stream processing operations including mapping, filtering, grouping, timing, and error handling.
trait FlowOps[+Out, +Mat] {
def map[T](f: Out => T): Repr[T]
def filter(p: Out => Boolean): Repr[Out]
def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]
def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Repr, Closed]
def throttle(elements: Int, per: FiniteDuration): Repr[Out]
}Stream Operations and Transformations
Stream materialization with ActorMaterializer, lifecycle management, and execution control.
abstract class Materializer {
def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
def withNamePrefix(name: String): Materializer
}
object ActorMaterializer {
def apply()(implicit context: ActorRefFactory): ActorMaterializer
}Stream junction operators for merging, broadcasting, zipping, and partitioning multiple streams.
class Merge[T](inputPorts: Int, eagerComplete: Boolean)
class Broadcast[T](outputPorts: Int, eagerCancel: Boolean)
class Zip[A, B] extends ZipWith2[A, B, (A, B)]
class Partition[T](outputPorts: Int, partitioner: T => Int)File I/O, TCP networking, and integration with Java streams and other I/O systems.
object FileIO {
def fromPath(path: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
def toPath(path: Path): Sink[ByteString, Future[IOResult]]
}
object Tcp {
def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]
}Comprehensive error handling with supervision strategies, recovery operations, and stream resilience patterns.
object Supervision {
sealed trait Directive
case object Stop extends Directive
case object Resume extends Directive
case object Restart extends Directive
type Decider = Function[Throwable, Directive]
}Error Handling and Supervision
Stream lifecycle management with KillSwitch, StreamRefs for distribution, and queue integration.
trait KillSwitch {
def shutdown(): Unit
def abort(ex: Throwable): Unit
}
object KillSwitches {
def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch]
def shared(name: String): SharedKillSwitch
}// Fundamental types
type NotUsed = akka.NotUsed.type
type Done = akka.Done.type
// Shape hierarchy
abstract class Shape {
def inlets: immutable.Seq[Inlet[_]]
def outlets: immutable.Seq[Outlet[_]]
}
case class SourceShape[+T](out: Outlet[T]) extends Shape
case class FlowShape[-I, +O](in: Inlet[I], out: Outlet[O]) extends Shape
case class SinkShape[-T](in: Inlet[T]) extends Shape
// Ports
final class Inlet[T](s: String)
final class Outlet[T](s: String)
// Materialization
trait Graph[+S <: Shape, +M] {
def shape: S
def withAttributes(attr: Attributes): Graph[S, M]
}
// Results
case class IOResult(count: Long, status: Try[Done]) {
def wasSuccessful: Boolean
}
// Queue integration
sealed abstract class QueueOfferResult
object QueueOfferResult {
case object Enqueued extends QueueOfferResult
case object Dropped extends QueueOfferResult
case class Failure(cause: Throwable) extends QueueOfferResult
case object QueueClosed extends QueueOfferResult
}
// Strategies
sealed abstract class OverflowStrategy
object OverflowStrategy {
def dropHead: OverflowStrategy
def dropTail: OverflowStrategy
def backpressure: OverflowStrategy
def fail: OverflowStrategy
}
// Attributes
final case class Attributes(attributeList: List[Attributes.Attribute]) {
def and(other: Attributes): Attributes
def get[T <: Attributes.Attribute: ClassTag]: Option[T]
}