or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

core-stream-types.mdcustom-stages.mderror-handling.mdindex.mdintegration.mdmaterialization.mdstream-combining.mdstream-control.mdstream-sinks.mdstream-sources.mdstream-transformations.md
tile.json

stream-transformations.mddocs/

Stream Transformations

Core transformation operators for manipulating, filtering, grouping, and routing stream elements with strong type safety and back-pressure support. These operators form the core of stream processing pipelines.

Capabilities

Basic Transformations

Essential element-wise transformations that preserve stream structure while modifying individual elements.

/**
 * Transform each element using the provided function
 * @param f Function to transform each element
 * @return Stream with transformed elements
 */
def map[T2](f: Out => T2): Source[T2, Mat]

/**
 * Filter elements based on a predicate
 * @param p Predicate function returning boolean
 * @return Stream containing only elements that satisfy the predicate
 */
def filter(p: Out => Boolean): Source[Out, Mat]

/**
 * Transform elements using a partial function, dropping unmatched elements
 * @param pf Partial function for transformation
 * @return Stream with collected and transformed elements
 */
def collect[T2](pf: PartialFunction[Out, T2]): Source[T2, Mat]

/**
 * Transform each element into zero or more elements
 * @param f Function that returns an iterable of elements
 * @return Stream with flattened results
 */
def mapConcat[T2](f: Out => IterableOnce[T2]): Source[T2, Mat]

Usage Examples:

import akka.stream.scaladsl.Source

// Basic transformations
Source(1 to 10)
  .map(_ * 2)                    // Double each number
  .filter(_ > 10)                // Keep only numbers > 10
  .collect { case x if x < 20 => s"Number: $x" }  // Transform to strings
  .runWith(Sink.foreach(println))

// Map concat for flattening
Source(List("hello", "world"))
  .mapConcat(_.toCharArray.toList)  // Flatten to individual characters
  .runWith(Sink.seq)

Asynchronous Transformations

Transformations that involve asynchronous operations while maintaining stream ordering and back-pressure.

/**
 * Transform elements asynchronously while preserving order
 * @param parallelism Maximum number of concurrent transformations
 * @param f Async transformation function
 * @return Stream with async-transformed elements in original order
 */
def mapAsync[T2](parallelism: Int)(f: Out => Future[T2]): Source[T2, Mat]

/**
 * Transform elements asynchronously without preserving order
 * @param parallelism Maximum number of concurrent transformations  
 * @param f Async transformation function
 * @return Stream with async-transformed elements in completion order
 */
def mapAsyncUnordered[T2](parallelism: Int)(f: Out => Future[T2]): Source[T2, Mat]

Usage Examples:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// Async transformations
Source(1 to 10)
  .mapAsync(4) { n =>
    Future {
      Thread.sleep(100) // Simulate async work
      n * 2
    }
  }
  .runWith(Sink.seq)

// Unordered for better throughput when order doesn't matter
Source(List("url1", "url2", "url3"))
  .mapAsyncUnordered(3)(url => fetchFromUrl(url))
  .runWith(Sink.seq)

Scanning and Folding

Stateful transformations that maintain running state across elements.

/**
 * Emit running state by applying function to previous state and current element
 * @param zero Initial state
 * @param f Function to compute next state
 * @return Stream emitting each intermediate state
 */
def scan[T2](zero: T2)(f: (T2, Out) => T2): Source[T2, Mat]

/**
 * Same as scan but async
 * @param zero Initial state
 * @param f Async function to compute next state
 * @return Stream emitting each intermediate state
 */
def scanAsync[T2](zero: T2)(f: (T2, Out) => Future[T2]): Source[T2, Mat]

/**
 * Fold all elements into a single result
 * @param zero Initial accumulator value
 * @param f Function to fold each element into accumulator
 * @return Stream emitting single folded result
 */
def fold[T2](zero: T2)(f: (T2, Out) => T2): Source[T2, Mat]

/**
 * Fold all elements into a single result asynchronously
 * @param zero Initial accumulator value
 * @param f Async function to fold each element
 * @return Stream emitting single folded result
 */
def foldAsync[T2](zero: T2)(f: (T2, Out) => Future[T2]): Source[T2, Mat]

/**
 * Reduce elements using a binary operator (requires at least one element)
 * @param f Binary operator for reduction
 * @return Stream emitting single reduced result
 */
def reduce(f: (Out, Out) => Out): Source[Out, Mat]

Usage Examples:

// Running sum with scan
Source(1 to 10)
  .scan(0)(_ + _)  // Emits: 0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55
  .runWith(Sink.seq)

// Final sum with fold
Source(1 to 10)
  .fold(0)(_ + _)  // Emits only: 55
  .runWith(Sink.head)

// String concatenation with reduce
Source(List("Hello", " ", "World", "!"))
  .reduce(_ + _)   // Emits: "Hello World!"
  .runWith(Sink.head)

Stream Slicing and Limiting

Operations that control which elements pass through based on position or conditions.

/**
 * Take the first n elements
 * @param n Number of elements to take
 * @return Stream with first n elements
 */
def take(n: Long): Source[Out, Mat]

/**
 * Take elements while predicate is true
 * @param p Predicate function
 * @return Stream with elements until predicate fails
 */
def takeWhile(p: Out => Boolean): Source[Out, Mat]

/**
 * Drop the first n elements
 * @param n Number of elements to drop
 * @return Stream without first n elements
 */
def drop(n: Long): Source[Out, Mat]

/**
 * Drop elements while predicate is true
 * @param p Predicate function
 * @return Stream starting from first element where predicate fails
 */
def dropWhile(p: Out => Boolean): Source[Out, Mat]

/**
 * Take elements within a time window
 * @param d Duration of the window
 * @return Stream with elements emitted within the time window
 */
def takeWithin(d: FiniteDuration): Source[Out, Mat]

/**
 * Drop elements within a time window
 * @param d Duration to drop elements
 * @return Stream starting after the time window
 */
def dropWithin(d: FiniteDuration): Source[Out, Mat]

Usage Examples:

import scala.concurrent.duration._

// Position-based slicing
Source(1 to 100)
  .drop(10)        // Skip first 10
  .take(20)        // Take next 20
  .runWith(Sink.seq)

// Condition-based slicing
Source(1 to 20)
  .takeWhile(_ < 15)  // Take while less than 15
  .runWith(Sink.seq)

// Time-based slicing
Source.tick(100.millis, 100.millis, "tick")
  .takeWithin(2.seconds)  // Take for 2 seconds
  .runWith(Sink.seq)

Grouping Operations

Operations that organize stream elements into groups or batches.

/**
 * Group elements into fixed-size batches
 * @param n Size of each group
 * @return Stream of sequences containing n elements each
 */
def grouped(n: Int): Source[immutable.Seq[Out], Mat]

/**
 * Group elements within a time window
 * @param n Maximum number of elements per group
 * @param d Maximum time to wait for a group
 * @return Stream of sequences with elements grouped by time or count
 */
def groupedWithin(n: Int, d: FiniteDuration): Source[immutable.Seq[Out], Mat]

/**
 * Group elements by key into substreams
 * @param maxSubstreams Maximum number of concurrent substreams
 * @param f Function to extract grouping key
 * @return SubFlow representing grouped substreams
 */
def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Source[Out, Mat]#Repr, RunnableGraph[Mat]]

/**
 * Batch elements using a seed and aggregation function
 * @param max Maximum batch size
 * @param seed Function to create initial batch from first element
 * @param aggregate Function to add element to existing batch
 * @return Stream of batched results
 */
def batch[S](max: Long, seed: Out => S)(aggregate: (S, Out) => S): Source[S, Mat]

/**
 * Batch elements for a duration
 * @param max Maximum batch size
 * @param d Maximum batch duration
 * @param seed Function to create batch from first element
 * @param aggregate Function to add element to batch
 * @return Stream of batched results
 */
def batchWeighted[S](max: Long, costFn: Out => Long, seed: Out => S)(aggregate: (S, Out) => S): Source[S, Mat]

Usage Examples:

import scala.concurrent.duration._

// Fixed-size grouping
Source(1 to 10)
  .grouped(3)  // Groups: [1,2,3], [4,5,6], [7,8,9], [10]
  .runWith(Sink.foreach(println))

// Time-based grouping
Source.tick(100.millis, 50.millis, "tick")
  .groupedWithin(5, 200.millis)  // Group up to 5 elements or every 200ms
  .runWith(Sink.foreach(group => println(s"Group: $group")))

// Key-based grouping into substreams
Source(1 to 20)
  .groupBy(3, _ % 3)  // Group by remainder when divided by 3
  .mergeSubstreams    // Merge substreams back together
  .runWith(Sink.seq)

Stateful Transformations

Advanced transformations that maintain custom state across elements.

/**
 * Transform elements using a stateful function
 * @param create Function to create initial state
 * @param f Function that takes state and element, returns (newState, output)
 * @param onComplete Function called with final state when stream completes
 * @return Stream with stateful transformations
 */
def statefulMapConcat[S, T2](create: () => S)(f: (S, Out) => (S, immutable.Iterable[T2])): Source[T2, Mat]

/**
 * Transform elements with access to materialized value
 * @param f Function that receives materialized value and returns transformation
 * @return Stream with transformation applied
 */
def mapMaterializedValue[Mat2](f: Mat => Mat2): Source[Out, Mat2]

Usage Examples:

// Stateful deduplication
Source(List(1, 1, 2, 2, 3, 1, 2))
  .statefulMapConcat(() => Set.empty[Int]) { (seen, elem) =>
    if (seen.contains(elem)) {
      (seen, List.empty)  // Skip duplicates
    } else {
      (seen + elem, List(elem))  // Emit new elements
    }
  }
  .runWith(Sink.seq)  // Result: [1, 2, 3]

Types

// SubFlow for grouped operations
trait SubFlow[+Out, +Mat, +F[+_], +C] {
  def mergeSubstreams: Source[Out, Mat]
  def concatSubstreams: Source[Out, Mat]
  def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): C
  def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): SubFlow[T, Mat, F, C]
}