Mechanisms for controlling stream lifecycle, implementing backpressure, rate limiting, and external stream termination. This includes flow control, buffering strategies, and dynamic stream management capabilities.
Operations for controlling element flow and managing backpressure.
/**
* Add a buffer with overflow strategy
* @param size Buffer size
* @param overflowStrategy Strategy when buffer is full
* @return Stream with buffering applied
*/
def buffer(size: Int, overflowStrategy: OverflowStrategy): Source[Out, Mat]
/**
* Async buffer that decouples upstream and downstream processing
* @param size Buffer size
* @param overflowStrategy Strategy when buffer is full
* @return Stream with async buffering
*/
def async(bufferSize: Int = 16, overflowStrategy: OverflowStrategy = OverflowStrategy.backpressure): Source[Out, Mat]
/**
* Conflate elements when downstream is slower than upstream
* @param seed Function to create initial aggregate from first element
* @param aggregate Function to combine aggregate with new element
* @return Stream that conflates elements under backpressure
*/
def conflateWithSeed[S](seed: Out => S)(aggregate: (S, Out) => S): Source[S, Mat]
/**
* Expand elements when upstream is slower than downstream
* @param extrapolate Function to generate additional elements
* @return Stream that expands elements when needed
*/
def expand[U >: Out](extrapolate: Out => Iterator[U]): Source[U, Mat]
sealed abstract class OverflowStrategy
object OverflowStrategy {
case object DropHead extends OverflowStrategy // Drop oldest elements
case object DropTail extends OverflowStrategy // Drop newest elements
case object DropBuffer extends OverflowStrategy // Drop entire buffer
case object DropNew extends OverflowStrategy // Drop incoming elements
case object Backpressure extends OverflowStrategy // Apply backpressure
case object Fail extends OverflowStrategy // Fail the stream
}Usage Examples:
import akka.stream.OverflowStrategy
// Buffer with backpressure
Source(1 to 100)
.buffer(10, OverflowStrategy.backpressure)
.map(expensiveOperation)
.runWith(Sink.seq)
// Drop elements when buffer full
Source.tick(10.millis, 10.millis, "tick")
.buffer(5, OverflowStrategy.dropHead)
.runWith(Sink.foreach(println))
// Conflate under backpressure
Source(1 to 1000)
.conflateWithSeed(identity)(_ + _) // Sum elements when backpressured
.runWith(Sink.foreach(println))
// Expand when upstream slow
Source(List(1, 2, 3))
.expand(n => Iterator.continually(n)) // Repeat each element
.take(10)
.runWith(Sink.seq)Operations for controlling the rate of element emission.
/**
* Throttle the stream to a specific rate
* @param elements Number of elements per time period
* @param per Time period duration
* @param maximumBurst Maximum burst size
* @param mode Throttling mode (shaping or enforcing)
* @return Stream with rate limiting applied
*/
def throttle(elements: Int, per: FiniteDuration, maximumBurst: Int, mode: ThrottleMode): Source[Out, Mat]
/**
* Delay each element by a fixed duration
* @param delay Duration to delay each element
* @param strategy Strategy for handling delayed elements
* @return Stream with delayed elements
*/
def delay(delay: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.FixedDelay): Source[Out, Mat]
/**
* Delay elements using a custom strategy
* @param delayStrategySupplier Function that provides delay strategy
* @return Stream with custom delay applied
*/
def delayWith(delayStrategySupplier: () => DelayStrategy[Out], overFlowStrategy: DelayOverflowStrategy = DelayOverflowStrategy.FixedDelay): Source[Out, Mat]
sealed abstract class ThrottleMode
object ThrottleMode {
case object Shaping extends ThrottleMode // Smooth out bursts
case object Enforcing extends ThrottleMode // Fail on rate violations
}
sealed abstract class DelayOverflowStrategy
object DelayOverflowStrategy {
case object EmitEarly extends DelayOverflowStrategy // Emit early under backpressure
case object DropHead extends DelayOverflowStrategy // Drop oldest delayed elements
case object DropTail extends DelayOverflowStrategy // Drop newest delayed elements
case object DropBuffer extends DelayOverflowStrategy // Drop all delayed elements
case object DropNew extends DelayOverflowStrategy // Drop new elements
case object Backpressure extends DelayOverflowStrategy // Apply backpressure
case object Fail extends DelayOverflowStrategy // Fail the stream
case object FixedDelay extends DelayOverflowStrategy // Fixed delay regardless
}Usage Examples:
import scala.concurrent.duration._
// Rate limiting
Source(1 to 100)
.throttle(10, 1.second, 5, ThrottleMode.shaping) // 10 elements/second, burst of 5
.runWith(Sink.foreach(println))
// Fixed delay
Source(List("a", "b", "c"))
.delay(500.millis) // Delay each element by 500ms
.runWith(Sink.foreach(println))
// Custom delay strategy
val increasingDelay = DelayStrategy.linearIncreasingDelay(100.millis, 50.millis)
Source(1 to 10)
.delayWith(() => increasingDelay)
.runWith(Sink.foreach(println))External controls for terminating streams.
/**
* Control interface for terminating streams
*/
trait KillSwitch {
/**
* Gracefully shutdown the stream
*/
def shutdown(): Unit
/**
* Abort the stream with an error
* @param ex Exception to fail the stream with
*/
def abort(ex: Throwable): Unit
}
/**
* Shared control for terminating multiple streams
*/
abstract class SharedKillSwitch extends KillSwitch {
/**
* Create a flow that can be killed by this switch
* @return Flow that respects this kill switch
*/
def flow[T]: Flow[T, T, NotUsed]
}
/**
* Factory for creating kill switches
*/
object KillSwitches {
/**
* Create a single-use kill switch
* @return Flow that materializes to a KillSwitch
*/
def single[T]: Flow[T, T, UniqueKillSwitch]
/**
* Create a shared kill switch for multiple streams
* @param name Name for the kill switch (for debugging)
* @return Shared kill switch that can control multiple streams
*/
def shared(name: String): SharedKillSwitch
/**
* Create a bidirectional kill switch
* @return BidiFlow that materializes to a KillSwitch
*/
def singleBidi[T1, T2]: BidiFlow[T1, T1, T2, T2, UniqueKillSwitch]
}
/**
* Kill switch for a single stream
*/
trait UniqueKillSwitch extends KillSwitchUsage Examples:
// Single kill switch
val (killSwitch, done) = Source(1 to 1000)
.viaMat(KillSwitches.single)(Keep.right)
.toMat(Sink.foreach(println))(Keep.both)
.run()
// Kill after 5 seconds
system.scheduler.scheduleOnce(5.seconds) {
killSwitch.shutdown()
}
// Shared kill switch for multiple streams
val sharedKillSwitch = KillSwitches.shared("my-streams")
val stream1 = Source.repeat("stream1")
.via(sharedKillSwitch.flow)
.runWith(Sink.foreach(println))
val stream2 = Source.repeat("stream2")
.via(sharedKillSwitch.flow)
.runWith(Sink.foreach(println))
// Kill both streams
sharedKillSwitch.shutdown()
// Abort with error
sharedKillSwitch.abort(new RuntimeException("Emergency stop"))Operations for managing stream startup, completion, and cleanup.
/**
* Execute initialization logic when stream starts
* @param callback Function to execute on stream start
* @return Stream that executes callback on start
*/
def mapMaterializedValue[Mat2](f: Mat => Mat2): Source[Out, Mat2]
/**
* Execute cleanup logic when stream completes
* @param onComplete Function to execute on completion
* @param onFailure Function to execute on failure
* @return Stream that executes cleanup callbacks
*/
def watchTermination[Mat2](f: (Mat, Future[Done]) => Mat2): Source[Out, Mat2]
/**
* Add a finalizer that runs when stream terminates
* @param finalizer Function to execute on termination (success or failure)
* @return Stream with finalizer attached
*/
def finalizeWith[U >: Out](finalizer: () => Future[Done]): Source[U, Mat]
/**
* Keep the stream alive even when there are no downstream subscribers
* @return Stream that doesn't cancel when downstream cancels
*/
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: () => U): Source[U, Mat]Usage Examples:
// Lifecycle management
Source(1 to 10)
.watchTermination() { (mat, done) =>
println("Stream started")
done.onComplete { result =>
println(s"Stream finished: $result")
// Cleanup resources
}
mat
}
.runWith(Sink.seq)
// Stream with finalizer
val resourceStream = Source.fromIterator(() => expensiveResourceIterator())
.finalizeWith(() => Future {
cleanupExpensiveResource()
Done
})
.runWith(Sink.seq)
// Keep alive stream
Source.maybe[String] // Never emits unless completed externally
.keepAlive(30.seconds, () => "heartbeat")
.runWith(Sink.foreach(println))Operations for decoupling stream segments and managing boundaries.
/**
* Create an async boundary that decouples upstream and downstream processing
* @param bufferSize Size of the async buffer
* @return Stream with async boundary
*/
def async(bufferSize: Int = 16): Source[Out, Mat]
/**
* Detach upstream from downstream, allowing independent lifecycle
* @return Stream that can run independently of upstream cancellation
*/
def detach: Source[Out, Mat]
/**
* Add an explicit async boundary with custom attributes
* @param attrs Attributes to apply to the async boundary
* @return Stream with custom async boundary
*/
def asyncBoundary(attrs: Attributes = Attributes.none): Source[Out, Mat]Usage Examples:
// Async boundaries for parallelism
Source(1 to 100)
.map(slowComputation1)
.async() // Process this stage independently
.map(slowComputation2)
.async() // And this stage independently
.runWith(Sink.seq)
// Detach for independent processing
val detachedStream = Source.repeat("data")
.take(100)
.detach // Continue processing even if downstream cancels
.map(processData)
.runWith(Sink.ignore)Advanced operations for dynamic stream behavior modification.
/**
* Switch to a new source dynamically based on materialized value
* @param f Function that takes materialized value and returns new source
* @return Stream that can switch sources dynamically
*/
def flatMapPrefix[Out2, Mat2](n: Int)(f: immutable.Seq[Out] => Flow[Out, Out2, Mat2]): Source[Out2, Mat]
/**
* Dynamically change processing based on stream state
* @param decision Function that determines when to switch behavior
* @param left Processing for left choice
* @param right Processing for right choice
* @return Stream with dynamic behavior switching
*/
def divertTo[Out2](to: Graph[SinkShape[Out], _], when: Out => Boolean): Source[Out, Mat]Usage Examples:
// Dynamic processing based on initial elements
Source(1 to 100)
.flatMapPrefix(5) { initialElements =>
if (initialElements.forall(_ > 0)) {
Flow[Int].map(_ * 2) // Positive processing
} else {
Flow[Int].map(_.abs) // Negative processing
}
}
.runWith(Sink.seq)
// Divert elements based on condition
Source(1 to 10)
.divertTo(Sink.foreach(n => println(s"Even: $n")), _ % 2 == 0)
.runWith(Sink.foreach(n => println(s"Odd: $n")))// Flow control strategies
sealed abstract class OverflowStrategy
sealed abstract class ThrottleMode
sealed abstract class DelayOverflowStrategy
// Kill switch interfaces
trait KillSwitch {
def shutdown(): Unit
def abort(ex: Throwable): Unit
}
trait UniqueKillSwitch extends KillSwitch
abstract class SharedKillSwitch extends KillSwitch {
def flow[T]: Flow[T, T, NotUsed]
}
// Delay strategies
trait DelayStrategy[T] {
def nextDelay(elem: T): FiniteDuration
}
object DelayStrategy {
def fixedDelay[T](delay: FiniteDuration): DelayStrategy[T]
def linearIncreasingDelay[T](initialDelay: FiniteDuration, increment: FiniteDuration): DelayStrategy[T]
}