CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.

Pending
Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

SecuritybySnyk

Pending

The risk profile of this skill

Overview
Eval results
Files

index.mddocs/

Akka Streams

Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space. This library provides a domain-specific language for expressing complex data transformation pipelines with automatic backpressure management, enabling developers to build robust, asynchronous stream processing applications.

Package Information

  • Package Name: com.typesafe.akka:akka-stream_2.13.0-M5
  • Package Type: maven
  • Language: Scala (with Java API)
  • Version: 2.5.23
  • Installation: Add to build.sbt: libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.23"

Core Imports

import akka.stream.scaladsl.{Source, Flow, Sink}
import akka.stream.{ActorMaterializer, Materializer}
import akka.actor.ActorSystem
import akka.{Done, NotUsed}

Java API:

import akka.stream.javadsl.*;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.actor.ActorSystem;

Basic Usage

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink}

implicit val system = ActorSystem("MySystem")
implicit val materializer: ActorMaterializer = ActorMaterializer()

// Create a simple stream: numbers 1 to 10, multiply by 2, print results
val source: Source[Int, NotUsed] = Source(1 to 10)
val sink: Sink[Any, Future[Done]] = Sink.foreach(println)

val result: Future[Done] = source
  .map(_ * 2)
  .runWith(sink)

// Result will print: 2, 4, 6, 8, 10, 12, 14, 16, 18, 20

Architecture

Akka Streams is built around several key abstractions:

  • Graph: The blueprint of a stream processing topology
  • Shape: Defines the inlets and outlets of a graph component
  • Materializer: Responsible for turning graph blueprints into running streams
  • Source: Stream component with one output (data producer)
  • Flow: Stream component with one input and one output (data transformer)
  • Sink: Stream component with one input (data consumer)

All stream processing is built using these composable, type-safe building blocks that automatically handle backpressure according to the Reactive Streams specification.

Capabilities

Core Stream Components

The fundamental building blocks for creating reactive streams with type-safe composition and automatic backpressure handling.

// Source - produces elements
final class Source[+Out, +Mat](traversalBuilder: LinearTraversalBuilder, shape: SourceShape[Out])

// Flow - transforms elements  
final class Flow[-In, +Out, +Mat](traversalBuilder: LinearTraversalBuilder, shape: FlowShape[In, Out])

// Sink - consumes elements
final class Sink[-In, +Mat](traversalBuilder: LinearTraversalBuilder, shape: SinkShape[In])

Core Stream Components

Graph Building and Composition

Advanced graph construction using GraphDSL for complex stream topologies including fan-in, fan-out, and custom shapes.

object GraphDSL {
  def create[S <: Shape, Mat](buildBlock: GraphDSL.Builder[Mat] => S): Graph[S, Mat]
  
  class Builder[+M] {
    def add[S <: Shape](graph: Graph[S, _]): S
    // Connection operators: ~>, <~, via, to, from
  }
}

Graph Building and Composition

Stream Operations and Transformations

Comprehensive set of stream processing operations including mapping, filtering, grouping, timing, and error handling.

trait FlowOps[+Out, +Mat] {
  def map[T](f: Out => T): Repr[T]
  def filter(p: Out => Boolean): Repr[Out]  
  def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]
  def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Repr, Closed]
  def throttle(elements: Int, per: FiniteDuration): Repr[Out]
}

Stream Operations and Transformations

Materialization and Execution

Stream materialization with ActorMaterializer, lifecycle management, and execution control.

abstract class Materializer {
  def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat
  def withNamePrefix(name: String): Materializer
}

object ActorMaterializer {
  def apply()(implicit context: ActorRefFactory): ActorMaterializer
}

Materialization and Execution

Junction Operations

Stream junction operators for merging, broadcasting, zipping, and partitioning multiple streams.

class Merge[T](inputPorts: Int, eagerComplete: Boolean)
class Broadcast[T](outputPorts: Int, eagerCancel: Boolean) 
class Zip[A, B] extends ZipWith2[A, B, (A, B)]
class Partition[T](outputPorts: Int, partitioner: T => Int)

Junction Operations

I/O Integration

File I/O, TCP networking, and integration with Java streams and other I/O systems.

object FileIO {
  def fromPath(path: Path, chunkSize: Int = 8192): Source[ByteString, Future[IOResult]]
  def toPath(path: Path): Sink[ByteString, Future[IOResult]]
}

object Tcp {
  def outgoingConnection(remoteAddress: InetSocketAddress): Flow[ByteString, ByteString, Future[OutgoingConnection]]
}

I/O Integration

Error Handling and Supervision

Comprehensive error handling with supervision strategies, recovery operations, and stream resilience patterns.

object Supervision {
  sealed trait Directive
  case object Stop extends Directive
  case object Resume extends Directive
  case object Restart extends Directive
  
  type Decider = Function[Throwable, Directive]
}

Error Handling and Supervision

Control Flow and Lifecycle

Stream lifecycle management with KillSwitch, StreamRefs for distribution, and queue integration.

trait KillSwitch {
  def shutdown(): Unit
  def abort(ex: Throwable): Unit
}

object KillSwitches {
  def single[T]: Graph[FlowShape[T, T], UniqueKillSwitch]
  def shared(name: String): SharedKillSwitch
}

Control Flow and Lifecycle

Types

// Fundamental types
type NotUsed = akka.NotUsed.type
type Done = akka.Done.type

// Shape hierarchy
abstract class Shape {
  def inlets: immutable.Seq[Inlet[_]]
  def outlets: immutable.Seq[Outlet[_]]
}

case class SourceShape[+T](out: Outlet[T]) extends Shape
case class FlowShape[-I, +O](in: Inlet[I], out: Outlet[O]) extends Shape  
case class SinkShape[-T](in: Inlet[T]) extends Shape

// Ports
final class Inlet[T](s: String)
final class Outlet[T](s: String)

// Materialization
trait Graph[+S <: Shape, +M] {
  def shape: S
  def withAttributes(attr: Attributes): Graph[S, M]
}

// Results
case class IOResult(count: Long, status: Try[Done]) {
  def wasSuccessful: Boolean
}

// Queue integration
sealed abstract class QueueOfferResult
object QueueOfferResult {
  case object Enqueued extends QueueOfferResult
  case object Dropped extends QueueOfferResult  
  case class Failure(cause: Throwable) extends QueueOfferResult
  case object QueueClosed extends QueueOfferResult
}

// Strategies
sealed abstract class OverflowStrategy
object OverflowStrategy {
  def dropHead: OverflowStrategy
  def dropTail: OverflowStrategy
  def backpressure: OverflowStrategy
  def fail: OverflowStrategy
}

// Attributes
final case class Attributes(attributeList: List[Attributes.Attribute]) {
  def and(other: Attributes): Attributes
  def get[T <: Attributes.Attribute: ClassTag]: Option[T]
}

docs

control-flow.md

core-components.md

error-handling.md

graph-building.md

index.md

io-integration.md

junction-operations.md

materialization.md

stream-operations.md

tile.json