Core transformation operators for manipulating, filtering, grouping, and routing stream elements with strong type safety and back-pressure support. These operators form the core of stream processing pipelines.
Essential element-wise transformations that preserve stream structure while modifying individual elements.
/**
* Transform each element using the provided function
* @param f Function to transform each element
* @return Stream with transformed elements
*/
def map[T2](f: Out => T2): Source[T2, Mat]
/**
* Filter elements based on a predicate
* @param p Predicate function returning boolean
* @return Stream containing only elements that satisfy the predicate
*/
def filter(p: Out => Boolean): Source[Out, Mat]
/**
* Transform elements using a partial function, dropping unmatched elements
* @param pf Partial function for transformation
* @return Stream with collected and transformed elements
*/
def collect[T2](pf: PartialFunction[Out, T2]): Source[T2, Mat]
/**
* Transform each element into zero or more elements
* @param f Function that returns an iterable of elements
* @return Stream with flattened results
*/
def mapConcat[T2](f: Out => IterableOnce[T2]): Source[T2, Mat]Usage Examples:
import akka.stream.scaladsl.Source
// Basic transformations
Source(1 to 10)
.map(_ * 2) // Double each number
.filter(_ > 10) // Keep only numbers > 10
.collect { case x if x < 20 => s"Number: $x" } // Transform to strings
.runWith(Sink.foreach(println))
// Map concat for flattening
Source(List("hello", "world"))
.mapConcat(_.toCharArray.toList) // Flatten to individual characters
.runWith(Sink.seq)Transformations that involve asynchronous operations while maintaining stream ordering and back-pressure.
/**
* Transform elements asynchronously while preserving order
* @param parallelism Maximum number of concurrent transformations
* @param f Async transformation function
* @return Stream with async-transformed elements in original order
*/
def mapAsync[T2](parallelism: Int)(f: Out => Future[T2]): Source[T2, Mat]
/**
* Transform elements asynchronously without preserving order
* @param parallelism Maximum number of concurrent transformations
* @param f Async transformation function
* @return Stream with async-transformed elements in completion order
*/
def mapAsyncUnordered[T2](parallelism: Int)(f: Out => Future[T2]): Source[T2, Mat]Usage Examples:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
// Async transformations
Source(1 to 10)
.mapAsync(4) { n =>
Future {
Thread.sleep(100) // Simulate async work
n * 2
}
}
.runWith(Sink.seq)
// Unordered for better throughput when order doesn't matter
Source(List("url1", "url2", "url3"))
.mapAsyncUnordered(3)(url => fetchFromUrl(url))
.runWith(Sink.seq)Stateful transformations that maintain running state across elements.
/**
* Emit running state by applying function to previous state and current element
* @param zero Initial state
* @param f Function to compute next state
* @return Stream emitting each intermediate state
*/
def scan[T2](zero: T2)(f: (T2, Out) => T2): Source[T2, Mat]
/**
* Same as scan but async
* @param zero Initial state
* @param f Async function to compute next state
* @return Stream emitting each intermediate state
*/
def scanAsync[T2](zero: T2)(f: (T2, Out) => Future[T2]): Source[T2, Mat]
/**
* Fold all elements into a single result
* @param zero Initial accumulator value
* @param f Function to fold each element into accumulator
* @return Stream emitting single folded result
*/
def fold[T2](zero: T2)(f: (T2, Out) => T2): Source[T2, Mat]
/**
* Fold all elements into a single result asynchronously
* @param zero Initial accumulator value
* @param f Async function to fold each element
* @return Stream emitting single folded result
*/
def foldAsync[T2](zero: T2)(f: (T2, Out) => Future[T2]): Source[T2, Mat]
/**
* Reduce elements using a binary operator (requires at least one element)
* @param f Binary operator for reduction
* @return Stream emitting single reduced result
*/
def reduce(f: (Out, Out) => Out): Source[Out, Mat]Usage Examples:
// Running sum with scan
Source(1 to 10)
.scan(0)(_ + _) // Emits: 0, 1, 3, 6, 10, 15, 21, 28, 36, 45, 55
.runWith(Sink.seq)
// Final sum with fold
Source(1 to 10)
.fold(0)(_ + _) // Emits only: 55
.runWith(Sink.head)
// String concatenation with reduce
Source(List("Hello", " ", "World", "!"))
.reduce(_ + _) // Emits: "Hello World!"
.runWith(Sink.head)Operations that control which elements pass through based on position or conditions.
/**
* Take the first n elements
* @param n Number of elements to take
* @return Stream with first n elements
*/
def take(n: Long): Source[Out, Mat]
/**
* Take elements while predicate is true
* @param p Predicate function
* @return Stream with elements until predicate fails
*/
def takeWhile(p: Out => Boolean): Source[Out, Mat]
/**
* Drop the first n elements
* @param n Number of elements to drop
* @return Stream without first n elements
*/
def drop(n: Long): Source[Out, Mat]
/**
* Drop elements while predicate is true
* @param p Predicate function
* @return Stream starting from first element where predicate fails
*/
def dropWhile(p: Out => Boolean): Source[Out, Mat]
/**
* Take elements within a time window
* @param d Duration of the window
* @return Stream with elements emitted within the time window
*/
def takeWithin(d: FiniteDuration): Source[Out, Mat]
/**
* Drop elements within a time window
* @param d Duration to drop elements
* @return Stream starting after the time window
*/
def dropWithin(d: FiniteDuration): Source[Out, Mat]Usage Examples:
import scala.concurrent.duration._
// Position-based slicing
Source(1 to 100)
.drop(10) // Skip first 10
.take(20) // Take next 20
.runWith(Sink.seq)
// Condition-based slicing
Source(1 to 20)
.takeWhile(_ < 15) // Take while less than 15
.runWith(Sink.seq)
// Time-based slicing
Source.tick(100.millis, 100.millis, "tick")
.takeWithin(2.seconds) // Take for 2 seconds
.runWith(Sink.seq)Operations that organize stream elements into groups or batches.
/**
* Group elements into fixed-size batches
* @param n Size of each group
* @return Stream of sequences containing n elements each
*/
def grouped(n: Int): Source[immutable.Seq[Out], Mat]
/**
* Group elements within a time window
* @param n Maximum number of elements per group
* @param d Maximum time to wait for a group
* @return Stream of sequences with elements grouped by time or count
*/
def groupedWithin(n: Int, d: FiniteDuration): Source[immutable.Seq[Out], Mat]
/**
* Group elements by key into substreams
* @param maxSubstreams Maximum number of concurrent substreams
* @param f Function to extract grouping key
* @return SubFlow representing grouped substreams
*/
def groupBy[K](maxSubstreams: Int, f: Out => K): SubFlow[Out, Mat, Source[Out, Mat]#Repr, RunnableGraph[Mat]]
/**
* Batch elements using a seed and aggregation function
* @param max Maximum batch size
* @param seed Function to create initial batch from first element
* @param aggregate Function to add element to existing batch
* @return Stream of batched results
*/
def batch[S](max: Long, seed: Out => S)(aggregate: (S, Out) => S): Source[S, Mat]
/**
* Batch elements for a duration
* @param max Maximum batch size
* @param d Maximum batch duration
* @param seed Function to create batch from first element
* @param aggregate Function to add element to batch
* @return Stream of batched results
*/
def batchWeighted[S](max: Long, costFn: Out => Long, seed: Out => S)(aggregate: (S, Out) => S): Source[S, Mat]Usage Examples:
import scala.concurrent.duration._
// Fixed-size grouping
Source(1 to 10)
.grouped(3) // Groups: [1,2,3], [4,5,6], [7,8,9], [10]
.runWith(Sink.foreach(println))
// Time-based grouping
Source.tick(100.millis, 50.millis, "tick")
.groupedWithin(5, 200.millis) // Group up to 5 elements or every 200ms
.runWith(Sink.foreach(group => println(s"Group: $group")))
// Key-based grouping into substreams
Source(1 to 20)
.groupBy(3, _ % 3) // Group by remainder when divided by 3
.mergeSubstreams // Merge substreams back together
.runWith(Sink.seq)Advanced transformations that maintain custom state across elements.
/**
* Transform elements using a stateful function
* @param create Function to create initial state
* @param f Function that takes state and element, returns (newState, output)
* @param onComplete Function called with final state when stream completes
* @return Stream with stateful transformations
*/
def statefulMapConcat[S, T2](create: () => S)(f: (S, Out) => (S, immutable.Iterable[T2])): Source[T2, Mat]
/**
* Transform elements with access to materialized value
* @param f Function that receives materialized value and returns transformation
* @return Stream with transformation applied
*/
def mapMaterializedValue[Mat2](f: Mat => Mat2): Source[Out, Mat2]Usage Examples:
// Stateful deduplication
Source(List(1, 1, 2, 2, 3, 1, 2))
.statefulMapConcat(() => Set.empty[Int]) { (seen, elem) =>
if (seen.contains(elem)) {
(seen, List.empty) // Skip duplicates
} else {
(seen + elem, List(elem)) // Emit new elements
}
}
.runWith(Sink.seq) // Result: [1, 2, 3]// SubFlow for grouped operations
trait SubFlow[+Out, +Mat, +F[+_], +C] {
def mergeSubstreams: Source[Out, Mat]
def concatSubstreams: Source[Out, Mat]
def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): C
def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): SubFlow[T, Mat, F, C]
}