or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

control-flow.mdcore-components.mderror-handling.mdgraph-building.mdindex.mdio-integration.mdjunction-operations.mdmaterialization.mdstream-operations.md
tile.json

tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/com.typesafe.akka/akka-stream_2.13.0-M5@2.5.x

To install, run

npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5@2.5.0

index.mddocs/

Akka Streams

Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space. This library provides a domain-specific language for expressing complex data transformation pipelines with automatic backpressure management, enabling developers to build robust, asynchronous stream processing applications.

Package Information

  • Package Name: com.typesafe.akka:akka-stream_2.13.0-M5
  • Package Type: maven
  • Language: Scala (with Java API)
  • Version: 2.5.23
  • Installation: Add to build.sbt: libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.23"

Core Imports

import akka.stream.scaladsl.{Source, Flow, Sink}
import akka.stream.{ActorMaterializer, Materializer}
import akka.actor.ActorSystem
import akka.{Done, NotUsed}

Java API:

import akka.stream.javadsl.*;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.actor.ActorSystem;

Basic Usage

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink}

implicit val system = ActorSystem("MySystem")
implicit val materializer: ActorMaterializer = ActorMaterializer()

// Create a simple stream: numbers 1 to 10, multiply by 2, print results
val source: Source[Int, NotUsed] = Source(1 to 10)
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val result: Future[Done] = source
  .map(_ * 2)
  .runWith(sink)

// Result will print: 2, 4, 6, 8, 10, 12, 14, 16, 18, 20

Architecture

Akka Streams is built around several key abstractions:

  • Graph: The blueprint of a stream processing topology
  • Shape: Defines the inlets and outlets of a graph component
  • Materializer: Responsible for turning graph blueprints into running streams
  • Source: Stream component with one output (data producer)
  • Flow: Stream component with one input and one output (data transformer)
  • Sink: Stream component with one input (data consumer)

All stream processing is built using these composable, type-safe building blocks that automatically handle backpressure according to the Reactive Streams specification.

Capabilities

Core Stream Components

The fundamental building blocks for creating reactive streams with type-safe composition and automatic backpressure handling.

// Source - produces elements
final class Source[+Out, +Mat](traversalBuilder: LinearTraversalBuilder, shape: SourceShape[Out])

// Flow - transforms elements  
final class Flow[-In, +Out, +Mat](traversalBuilder: LinearTraversalBuilder, shape: FlowShape[In, Out])

// Sink - consumes elements
final class Sink[-In, +Mat](traversalBuilder: LinearTraversalBuilder, shape: SinkShape[In])

Core Stream Components

Graph Building and Composition

Advanced graph construction using GraphDSL for complex stream topologies including fan-in, fan-out, and custom shapes.

object GraphDSL {
  def create[S <: Shape, Mat](buildBlock: GraphDSL.Builder[Mat] => S): Graph[S, Mat]
  
  class Builder[+M] {
    def add[S <: Shape](graph: Graph[S, _]): S
    // Connection operators: ~>, <~, via, to, from
  }
}

Graph Building and Composition

Stream Operations and Transformations

Comprehensive set of stream processing operations including mapping, filtering, grouping, timing, and error handling.

trait FlowOps[+Out, +Mat] {
  def map[T](f: Out => T): Repr[T]
  def filter(p: Out => Boolean): Repr[Out]  
  def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]
  def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Repr, Closed]
  def throttle(elements: Int, per: FiniteDuration): Repr[Out]
}

Stream Operations and Transformations

Materialization and Execution

Stream materialization with ActorMaterializer, lifecycle management, and execution control.

abstract class Materializer {
  def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
  def withNamePrefix(name: String): Materializer
}

object ActorMaterializer {
  def apply()(implicit context: ActorRefFactory): ActorMaterializer
}

Materialization and Execution

Junction Operations

Stream junction operators for merging, broadcasting, zipping, and partitioning multiple streams.

class Merge[T](inputPorts: Int, eagerComplete: Boolean)
class Broadcast[T](outputPorts: Int, eagerCancel: Boolean) 
class Zip[A, B] extends ZipWith2[A, B, (A, B)]
class Partition[T](outputPorts: Int, partitioner: T => Int)

Junction Operations

I/O Integration

File I/O, TCP networking, and integration with Java streams and other I/O systems.

object FileIO {
  def fromPath(path: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
  def toPath(path: Path): Sink[ByteString, Future[IOResult]]
}

object Tcp {
  def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]
}

I/O Integration

Error Handling and Supervision

Comprehensive error handling with supervision strategies, recovery operations, and stream resilience patterns.

object Supervision {
  sealed trait Directive
  case object Stop extends Directive
  case object Resume extends Directive
  case object Restart extends Directive
  
  type Decider = Function[Throwable, Directive]
}

Error Handling and Supervision

Control Flow and Lifecycle

Stream lifecycle management with KillSwitch, StreamRefs for distribution, and queue integration.

trait KillSwitch {
  def shutdown(): Unit
  def abort(ex: Throwable): Unit
}

object KillSwitches {
  def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch]
  def shared(name: String): SharedKillSwitch
}

Control Flow and Lifecycle

Types

// Fundamental types
type NotUsed = akka.NotUsed.type
type Done = akka.Done.type

// Shape hierarchy
abstract class Shape {
  def inlets: immutable.Seq[Inlet[_]]
  def outlets: immutable.Seq[Outlet[_]]
}

case class SourceShape[+T](out: Outlet[T]) extends Shape
case class FlowShape[-I, +O](in: Inlet[I], out: Outlet[O]) extends Shape  
case class SinkShape[-T](in: Inlet[T]) extends Shape

// Ports
final class Inlet[T](s: String)
final class Outlet[T](s: String)

// Materialization
trait Graph[+S <: Shape, +M] {
  def shape: S
  def withAttributes(attr: Attributes): Graph[S, M]
}

// Results
case class IOResult(count: Long, status: Try[Done]) {
  def wasSuccessful: Boolean
}

// Queue integration
sealed abstract class QueueOfferResult
object QueueOfferResult {
  case object Enqueued extends QueueOfferResult
  case object Dropped extends QueueOfferResult  
  case class Failure(cause: Throwable) extends QueueOfferResult
  case object QueueClosed extends QueueOfferResult
}

// Strategies
sealed abstract class OverflowStrategy
object OverflowStrategy {
  def dropHead: OverflowStrategy
  def dropTail: OverflowStrategy
  def backpressure: OverflowStrategy
  def fail: OverflowStrategy
}

// Attributes
final case class Attributes(attributeList: List[Attributes.Attribute]) {
  def and(other: Attributes): Attributes
  def get[T <: Attributes.Attribute: ClassTag]: Option[T]
}