CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.

Pending
Overview
Eval results
Files

stream-operations.mddocs/

Stream Operations and Transformations

Comprehensive set of stream processing operations including mapping, filtering, grouping, timing, and error handling available on all stream components through the FlowOps trait.

FlowOps Trait

All stream operations are available through the FlowOps trait, which is mixed into Source, Flow, and SubFlow classes.

trait FlowOps[+Out, +Mat] {
  type Repr[+O] <: FlowOps[O, Mat]
  
  // Core transformation methods
  def map[T](f: Out => T): Repr[T]
  def filter(p: Out => Boolean): Repr[Out]
  def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]
  def mapConcat[T](f: Out => immutable.Iterable[T]): Repr[T]
}

Element Transformation

Basic Transformations

Map and Filter:

trait FlowOps[+Out, +Mat] {
  def map[T](f: Out => T): Repr[T]
  def mapAsync[T](parallelism: Int)(f: Out => Future[T]): Repr[T]  
  def mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T]): Repr[T]
  def mapConcat[T](f: Out => immutable.Iterable[T]): Repr[T]
  def filter(p: Out => Boolean): Repr[Out]
  def filterNot(p: Out => Boolean): Repr[Out]
}

Collect with Partial Functions:

trait FlowOps[+Out, +Mat] {
  def collect[T](pf: PartialFunction[Out, T]): Repr[T]
  def collectType[T](implicit m: ClassTag[T]): Repr[T]
}

Usage Examples

import akka.stream.scaladsl.{Source, Flow}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

val source = Source(1 to 10)

// Basic map
val doubled = source.map(_ * 2)

// Async map with parallelism
val asyncMapped = source.mapAsync(4) { n =>
  Future {
    Thread.sleep(100)
    n * n
  }
}

// Flat map with mapConcat
val exploded = source.mapConcat(n => List.fill(n)(n))

// Filter 
val evenOnly = source.filter(_ % 2 == 0)

// Collect with partial function
val strings = Source(List(1, "hello", 2, "world", 3))
val onlyStrings = strings.collect {
  case s: String => s.toUpperCase
}

Element Selection and Limiting

Take and Drop Operations

trait FlowOps[+Out, +Mat] {
  def take(n: Long): Repr[Out]
  def takeWhile(p: Out => Boolean): Repr[Out]
  def takeWithin(d: FiniteDuration): Repr[Out]
  def drop(n: Long): Repr[Out]
  def dropWhile(p: Out => Boolean): Repr[Out]  
  def dropWithin(d: FiniteDuration): Repr[Out]
}

Sampling and Limiting

trait FlowOps[+Out, +Mat] {
  def limit(n: Long): Repr[Out]
  def limitWeighted(n: Long)(costFn: Out => Long): Repr[Out]
  def throttle(elements: Int, per: FiniteDuration): Repr[Out]
  def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Repr[Out]
}

Usage Examples

import scala.concurrent.duration._

val source = Source(1 to 100)

// Take first 10 elements
val first10 = source.take(10)

// Take while condition is true
val whileLessThan50 = source.takeWhile(_ < 50)

// Take within time window
val within5Seconds = source.takeWithin(5.seconds)

// Rate limiting - max 10 elements per second
val throttled = source.throttle(10, 1.second)

// Burst throttling
val burstThrottled = source.throttle(
  elements = 10, 
  per = 1.second,
  maximumBurst = 5,
  mode = ThrottleMode.Shaping
)

Grouping and Batching

Grouping Operations

trait FlowOps[+Out, +Mat] {
  def grouped(n: Int): Repr[immutable.Seq[Out]]
  def groupedWithin(n: Int, d: FiniteDuration): Repr[immutable.Seq[Out]]
  def groupedWeighted(minWeight: Long)(costFn: Out => Long): Repr[List[Out]]
  def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Repr, Closed]
}

Batching Operations

trait FlowOps[+Out, +Mat] {
  def batch[S](max: Long, seed: Out => S)(aggregate: (S, Out) => S): Repr[S]
  def batchWeighted[S](max: Long, costFn: Out => Long, seed: Out => S)(aggregate: (S, Out) => S): Repr[S]
}

Usage Examples

import scala.concurrent.duration._

val source = Source(1 to 100)

// Group into batches of 10
val groups = source.grouped(10)

// Group by time or count (whichever comes first)
val timeGroups = source.groupedWithin(10, 1.second)

// Group by key into substreams
val byParity = source.groupBy(2, _ % 2)
  .map(_ * 2)
  .mergeSubstreams

// Batch with custom aggregation
val batched = source.batch(
  max = 10,
  seed = n => List(n),
  aggregate = (acc, n) => n :: acc
)

Timing Operations

Delays and Timeouts

trait FlowOps[+Out, +Mat] {
  def delay(d: FiniteDuration): Repr[Out]
  def delayWith(delayStrategySupplier: () => DelayStrategy[Out]): Repr[Out]
  def initialDelay(d: FiniteDuration): Repr[Out]
  def idleTimeout(timeout: FiniteDuration): Repr[Out]
  def completionTimeout(timeout: FiniteDuration): Repr[Out]
  def backpressureTimeout(timeout: FiniteDuration): Repr[Out]
}

Keep Alive

trait FlowOps[+Out, +Mat] {
  def keepAlive(maxIdle: FiniteDuration, injectedElem: () => Out): Repr[Out]
}

Usage Examples

import scala.concurrent.duration._

val source = Source(1 to 10)

// Delay each element by 1 second
val delayed = source.delay(1.second)

// Initial delay before first element
val withInitialDelay = source.initialDelay(5.seconds)

// Timeout if no elements for 30 seconds
val withIdleTimeout = source.idleTimeout(30.seconds)

// Keep alive by injecting elements
val keepAlive = source.keepAlive(10.seconds, () => 0)

Accumulation Operations

Scanning and Folding

trait FlowOps[+Out, +Mat] {
  def scan[T](zero: T)(f: (T, Out) => T): Repr[T]
  def scanAsync[T](zero: T)(f: (T, Out) => Future[T]): Repr[T]
  def fold[T](zero: T)(f: (T, Out) => T): Repr[T]
  def foldAsync[T](zero: T)(f: (T, Out) => Future[T]): Repr[T]
  def reduce(f: (Out, Out) => Out): Repr[Out]
}

Usage Examples

val source = Source(1 to 10)

// Running sum (scan emits intermediate results)
val runningSums = source.scan(0)(_ + _)
// Emits: 0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55

// Final sum only (fold emits only final result)
val totalSum = source.fold(0)(_ + _)
// Emits: 55

// Reduce without initial value
val product = source.reduce(_ * _)
// Emits: 3628800

// Async accumulation
val asyncSum = source.scanAsync(0) { (acc, n) =>
  Future.successful(acc + n)
}

Buffer Management

Buffering Operations

trait FlowOps[+Out, +Mat] {
  def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out]
  def conflate[S](seed: Out => S)(aggregate: (S, Out) => S): Repr[S]
  def conflateWithSeed[S](seed: Out => S)(aggregate: (S, Out) => S): Repr[S]
  def expand[U](extrapolate: Out => Iterator[U]): Repr[U]
}

Usage Examples

import akka.stream.OverflowStrategy

val source = Source(1 to 100)

// Buffer with overflow strategy
val buffered = source.buffer(10, OverflowStrategy.dropHead)

// Conflate (combine) when downstream is slow
val conflated = source.conflate(identity)(_ + _)

// Expand elements when downstream is fast
val expanded = source.expand(n => Iterator.fill(3)(n))

Error Handling and Recovery

Recovery Operations

trait FlowOps[+Out, +Mat] {
  def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T]
  def recoverWithRetries[T >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]): Repr[T]
  def mapError(f: Throwable => Throwable): Repr[Out]
}

Usage Examples

val source = Source(List("1", "2", "abc", "4"))

// Recover from parsing errors
val safeParseInt = source
  .map(_.toInt) // This will fail on "abc"
  .recover {
    case _: NumberFormatException => -1
  }

// Recover with retries using alternative source
val withRetries = source
  .map(_.toInt)
  .recoverWithRetries(3, {
    case _: NumberFormatException => Source.single(0)
  })

Stream Splitting and Substreams

Splitting Operations

trait FlowOps[+Out, +Mat] {
  type Closed
  
  def splitWhen(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed]
  def splitAfter(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed]  
  def splitWhen[U](substreamCancelStrategy: SubstreamCancelStrategy)(p: Out => Boolean): SubFlow[Out, Mat, Repr, Closed]
}

SubFlow Operations

final class SubFlow[+Out, +Mat, +F[+_, +_], +C] {
  def mergeSubstreams: F[Out, Mat]
  def mergeSubstreamsWithParallelism(parallelism: Int): F[Out, Mat]
  def concatSubstreams: F[Out, Mat]
  
  // All FlowOps methods are available on SubFlow
  def map[T](f: Out => T): SubFlow[T, Mat, F, C]
  def filter(p: Out => Boolean): SubFlow[Out, Mat, F, C]
  // ... etc
}

Usage Examples

val source = Source(1 to 20)

// Split into substreams when element is divisible by 5
val substreams = source
  .splitWhen(_ % 5 == 0)
  .map(_ * 2) // Process each substream
  .mergeSubstreams // Merge back to single stream

// Split after condition and concatenate results  
val splitAfter = source
  .splitAfter(_ % 7 == 0)
  .fold(0)(_ + _) // Sum each substream
  .concatSubstreams

Utility Operations

Element Inspection and Side Effects

trait FlowOps[+Out, +Mat] {
  def log(name: String): Repr[Out]
  def log(name: String, extract: Out => Any): Repr[Out]
  def wireTap(sink: Graph[SinkShape[Out], _]): Repr[Out]
  def alsoTo(sink: Graph[SinkShape[Out], _]): Repr[Out]
}

Element Interspersing

trait FlowOps[+Out, +Mat] {
  def intersperse[T >: Out](inject: T): Repr[T]
  def intersperse[T >: Out](start: T, inject: T, end: T): Repr[T]
}

Usage Examples

val source = Source(List("a", "b", "c"))

// Add logging
val logged = source.log("my-stream")

// Intersperse with separator
val withSeparator = source.intersperse(",")
// Result: "a", ",", "b", ",", "c"

// With start and end
val withBrackets = source.intersperse("[", ",", "]")  
// Result: "[", "a", ",", "b", ",", "c", "]"

// Wire tap for side effects without affecting main stream
val withSideEffect = source.wireTap(Sink.foreach(x => println(s"Side: $x")))

Monitoring and Lifecycle

Stream Monitoring

trait FlowOps[+Out, +Mat] {
  type ClosedMat[+M]
  
  def watchTermination[Mat2]()(matF: (Mat, Future[Done]) => Mat2): ReprMat[Out, Mat2]
  def monitor[Mat2]()(matF: (Mat, FlowMonitor[Out]) => Mat2): ReprMat[Out, Mat2]
}

Usage Examples

val source = Source(1 to 10)

// Watch for completion
val withTerminationWatcher = source
  .watchTermination() { (notUsed, done) =>
    done.onComplete {
      case Success(_) => println("Stream completed successfully")
      case Failure(ex) => println(s"Stream failed: $ex")
    }
    notUsed
  }

// Monitor stream state
val withMonitoring = source
  .monitor() { (mat, monitor) =>
    monitor.state.foreach { state =>
      println(s"Stream state: $state")
    }
    mat
  }

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

docs

control-flow.md

core-components.md

error-handling.md

graph-building.md

index.md

io-integration.md

junction-operations.md

materialization.md

stream-operations.md

tile.json