or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-stream-types.mdcustom-stages.mderror-handling.mdindex.mdintegration.mdmaterialization.mdstream-combining.mdstream-control.mdstream-sinks.mdstream-sources.mdstream-transformations.md
tile.json

tessl/maven-com-typesafe-akka--akka-stream_2-12

An implementation of Reactive Streams and a DSL for stream processing built on top of Akka actors

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/com.typesafe.akka/akka-stream_2.12@2.8.x

To install, run

npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-stream_2-12@2.8.0

index.mddocs/

Akka Stream

Akka Stream is a powerful reactive streaming library built on top of the Akka actor framework that implements the Reactive Streams specification. It provides a high-level DSL for building resilient, distributed, and concurrent stream processing applications with strong back-pressure support and composable stream processing operators.

Package Information

  • Package Name: com.typesafe.akka:akka-stream_2.12
  • Package Type: maven
  • Language: Scala
  • Installation:
    • sbt: libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.8.8"
    • Maven: <dependency><groupId>com.typesafe.akka</groupId><artifactId>akka-stream_2.12</artifactId><version>2.8.8</version></dependency>

Core Imports

Scala DSL

import akka.stream.scaladsl.{Source, Flow, Sink, RunnableGraph}
import akka.stream.Materializer
import akka.NotUsed

Java DSL

import akka.stream.javadsl.Source;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.Materializer;

Basic Usage

import akka.actor.ActorSystem
import akka.stream.scaladsl.{Source, Sink}
import akka.stream.Materializer

implicit val system: ActorSystem = ActorSystem("stream-system")
implicit val materializer: Materializer = Materializer(system)

// Create a source of numbers 1 to 10
val source = Source(1 to 10)

// Transform and process the stream
val result = source
  .map(_ * 2)
  .filter(_ > 10)
  .runWith(Sink.seq)

// result is a Future[Seq[Int]] containing [12, 14, 16, 18, 20]

Architecture

Akka Stream is built around several key architectural concepts:

  • Reactive Streams: Full compliance with the Reactive Streams specification for asynchronous stream processing with back-pressure
  • Graph DSL: High-level declarative API for describing stream processing topologies as graphs with sources, flows, and sinks
  • Materialization: Two-phase execution where stream blueprints are first defined, then materialized into running streams
  • Back-pressure: Automatic flow control preventing overwhelming of downstream components
  • Actor Integration: Built on Akka actors for distribution, fault tolerance, and scalability
  • Type Safety: Strong typing throughout the stream processing pipeline with Scala's type system

Capabilities

Core Stream Types

The fundamental building blocks for creating and composing stream processing pipelines. These types define the structure and flow of data through reactive streams.

// Source: Stream with one output, no inputs
trait Source[+Out, +Mat] extends Graph[SourceShape[Out], Mat]

// Flow: Stream processing step with one input and one output  
trait Flow[-In, +Out, +Mat] extends Graph[FlowShape[In, Out], Mat]

// Sink: Stream endpoint that consumes elements
trait Sink[-In, +Mat] extends Graph[SinkShape[In], Mat]

// RunnableGraph: Complete stream ready for execution
trait RunnableGraph[+Mat] extends Graph[ClosedShape, Mat]

Core Stream Types

Stream Sources

Factory methods and utilities for creating stream sources from various data sources including collections, futures, actors, and external systems.

object Source {
  def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed]
  def fromIterator[T](f: () => Iterator[T]): Source[T, NotUsed]
  def future[T](futureElement: Future[T]): Source[T, NotUsed]
  def single[T](element: T): Source[T, NotUsed]
  def empty[T]: Source[T, NotUsed]
  def repeat[T](element: T): Source[T, NotUsed]
}

Stream Sources

Stream Transformations

Core transformation operators for manipulating, filtering, grouping, and routing stream elements with strong type safety and back-pressure support.

// Basic transformations
def map[T2](f: Out => T2): Source[T2, Mat]
def filter(p: Out => Boolean): Source[Out, Mat] 
def collect[T2](pf: PartialFunction[Out, T2]): Source[T2, Mat]

// Async transformations
def mapAsync[T2](parallelism: Int)(f: Out => Future[T2]): Source[T2, Mat]
def mapAsyncUnordered[T2](parallelism: Int)(f: Out => Future[T2]): Source[T2, Mat]

// Grouping and batching
def grouped(n: Int): Source[immutable.Seq[Out], Mat]
def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Source[Out, Mat]#Repr, RunnableGraph[Mat]]

Stream Transformations

Stream Combining

Operations for merging, zipping, concatenating, and broadcasting streams to create complex data flow topologies.

// Combining sources
def merge[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]
def concat[U >: Out](other: Graph[SourceShape[U], _]): Source[U, Mat]
def zip[U](other: Graph[SourceShape[U], _]): Source[(Out, U), Mat]

// Broadcasting and balancing
def broadcast(outputCount: Int): Graph[UniformFanOutShape[Out, Out], NotUsed]
def balance[T](outputCount: Int): Graph[UniformFanOutShape[T, T], NotUsed]

Stream Combining

Stream Sinks

Endpoints for consuming stream elements including collection sinks, side-effect sinks, and integration with external systems.

object Sink {
  def seq[T]: Sink[T, Future[immutable.Seq[T]]]
  def head[T]: Sink[T, Future[T]]
  def foreach[T](f: T => Unit): Sink[T, Future[Done]]
  def fold[U, T](zero: U)(f: (U, T) => U): Sink[T, Future[U]]
  def ignore: Sink[Any, Future[Done]]
}

Stream Sinks

Materialization and Execution

System for converting stream blueprints into running streams, managing resources, and controlling materialized values.

trait Materializer {
  def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
  def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable
}

// Materialization control
def run(): Mat
def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2]): Mat2
def mapMaterializedValue[Mat2](f: Mat => Mat2): Source[Out, Mat2]

Materialization and Execution

Error Handling and Supervision

Strategies for handling failures, implementing supervision, and recovering from errors in stream processing pipelines.

object Supervision {
  type Decider = Throwable => Directive
  
  sealed abstract class Directive
  case object Resume extends Directive
  case object Restart extends Directive  
  case object Stop extends Directive
}

// Error handling operators
def recover[U >: Out](pf: PartialFunction[Throwable, U]): Source[U, Mat]
def recoverWithRetries[U >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[U], _]]): Source[U, Mat]

Error Handling

Stream Control and Lifecycle

Mechanisms for controlling stream lifecycle, implementing backpressure, rate limiting, and external stream termination.

// Flow control
def buffer(size: Int, overflowStrategy: OverflowStrategy): Source[Out, Mat]
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Source[Out, Mat]

// Kill switches for external termination
trait KillSwitch {
  def shutdown(): Unit
  def abort(ex: Throwable): Unit
}

Stream Control

Custom Stages

API for creating custom stream processing operators using GraphStage for advanced use cases requiring fine-grained control over stream behavior.

abstract class GraphStage[S <: Shape] extends Graph[S, NotUsed] {
  def createLogic(inheritedAttributes: Attributes): GraphStageLogic
}

abstract class GraphStageLogic(val shape: Shape) {
  def setHandler(in: Inlet[_], handler: InHandler): Unit
  def setHandler(out: Outlet[_], handler: OutHandler): Unit
  def push[T](out: Outlet[T], elem: T): Unit
  def pull[T](in: Inlet[T]): Unit
}

Custom Stages

Integration

Integration with file systems, TCP/TLS networking, actors, and external reactive streams publishers/subscribers.

// File I/O
object FileIO {
  def fromPath(f: Path): Source[ByteString, Future[IOResult]]
  def toPath(f: Path): Sink[ByteString, Future[IOResult]]
}

// TCP networking
object Tcp {
  def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]
  def bind(interface: String, port: Int): Source[IncomingConnection, Future[ServerBinding]]
}

Integration

Types

Core Types

// Essential type for operations without materialized value
type NotUsed = akka.NotUsed

// Completion marker
sealed abstract class Done
case object Done extends Done

// Stream shapes
trait SourceShape[+T] extends Shape
trait FlowShape[-I, +O] extends Shape  
trait SinkShape[-T] extends Shape
trait ClosedShape extends Shape

// Overflow strategies for buffering
sealed abstract class OverflowStrategy
object OverflowStrategy {
  case object DropHead extends OverflowStrategy
  case object DropTail extends OverflowStrategy
  case object DropBuffer extends OverflowStrategy
  case object DropNew extends OverflowStrategy
  case object Backpressure extends OverflowStrategy
  case object Fail extends OverflowStrategy
}