Strategies for handling failures, implementing supervision, and recovering from errors in stream processing pipelines. Akka Stream provides comprehensive error handling mechanisms to build resilient streaming applications.
Core supervision system for controlling how streams handle errors.
/**
* Supervision utilities for error handling in streams
*/
object Supervision {
/**
* Function type for making supervision decisions
* @param throwable The error that occurred
* @return Directive indicating how to handle the error
*/
type Decider = Throwable => Directive
/**
* Built-in supervision directives
*/
sealed abstract class Directive
/**
* Resume processing, skipping the failed element
*/
case object Resume extends Directive
/**
* Restart the operator, losing current state
*/
case object Restart extends Directive
/**
* Stop the stream with the error
*/
case object Stop extends Directive
/**
* Resume processing for all exceptions (never fails the stream)
*/
val resumingDecider: Decider = _ => Resume
/**
* Restart processing for all exceptions
*/
val restartingDecider: Decider = _ => Restart
/**
* Stop processing for all exceptions (default behavior)
*/
val stoppingDecider: Decider = _ => Stop
}Usage Examples:
import akka.stream.Supervision
import akka.stream.scaladsl.{Source, Sink}
// Custom supervision strategy
val customDecider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _: IllegalArgumentException => Supervision.Restart
case _ => Supervision.Stop
}
// Apply supervision to operators
Source(1 to 10)
.map(x => 10 / (x - 5)) // Will throw ArithmeticException at x=5
.withAttributes(ActorAttributes.supervisionStrategy(customDecider))
.runWith(Sink.seq)
// Global supervision strategy
val settings = ActorMaterializerSettings(system)
.withSupervisionStrategy(Supervision.resumingDecider)
val materializer = Materializer(settings, system)Operators for recovering from errors and providing fallback values.
/**
* Recover from errors using a partial function to provide fallback values
* @param pf Partial function to handle specific exceptions
* @return Stream that emits fallback values for matched exceptions
*/
def recover[U >: Out](pf: PartialFunction[Throwable, U]): Source[U, Mat]
/**
* Recover from errors by switching to an alternative stream
* @param pf Partial function that provides alternative streams for exceptions
* @return Stream that switches to alternative stream on error
*/
def recoverWith[U >: Out](pf: PartialFunction[Throwable, Graph[SourceShape[U], _]]): Source[U, Mat]
/**
* Recover from errors with retry attempts
* @param attempts Maximum number of retry attempts (-1 for unlimited)
* @param pf Partial function that provides alternative streams for retries
* @return Stream that retries on error up to the specified attempts
*/
def recoverWithRetries[U >: Out](attempts: Int, pf: PartialFunction[Throwable, Graph[SourceShape[U], _]]): Source[U, Mat]
/**
* Handle both success and failure cases
* @param pf Function that handles both Success and Failure cases
* @return Stream that processes both successful and failed outcomes
*/
def handleWith[U >: Out](pf: PartialFunction[Try[Out], U]): Source[U, Mat]Usage Examples:
import scala.util.{Try, Success, Failure}
// Simple recovery with fallback values
Source(List("1", "2", "invalid", "4"))
.map(_.toInt)
.recover {
case _: NumberFormatException => -1 // Fallback value for parse errors
}
.runWith(Sink.seq) // Result: [1, 2, -1, 4]
// Recovery with alternative stream
Source(List("http://good-url", "http://bad-url"))
.map(url => fetchData(url)) // May throw HttpException
.recoverWith {
case _: HttpException => Source.single("default-data")
}
.runWith(Sink.seq)
// Recovery with retries
Source.single("unreliable-operation")
.map(performUnreliableOperation)
.recoverWithRetries(3) {
case _: TransientException =>
Source.single("retry-operation").map(performUnreliableOperation)
}
.runWith(Sink.head)
// Handle all outcomes
Source(List("1", "invalid", "3"))
.map(s => Try(s.toInt))
.handleWith {
case Success(value) => value
case Failure(_) => 0
}
.runWith(Sink.seq)Built-in operators that automatically restart streams with backoff strategies.
/**
* Factory for restart sources with exponential backoff
*/
object RestartSource {
/**
* Create a source that restarts with exponential backoff on failure
* @param minBackoff Minimum backoff duration
* @param maxBackoff Maximum backoff duration
* @param randomFactor Random factor for backoff jitter
* @param sourceFactory Function that creates the source
* @return Source that automatically restarts on failure
*/
def withBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
sourceFactory: () => Source[T, _]
): Source[T, NotUsed]
/**
* Create a source that restarts only on specific failures
* @param restartSettings Restart configuration
* @param sourceFactory Function that creates the source
* @return Source that restarts on matching failures
*/
def onFailuresWithBackoff[T](
restartSettings: RestartSettings,
sourceFactory: () => Source[T, _]
): Source[T, NotUsed]
}
/**
* Factory for restart flows with exponential backoff
*/
object RestartFlow {
/**
* Create a flow that restarts with exponential backoff on failure
* @param minBackoff Minimum backoff duration
* @param maxBackoff Maximum backoff duration
* @param randomFactor Random factor for backoff jitter
* @param flowFactory Function that creates the flow
* @return Flow that automatically restarts on failure
*/
def withBackoff[In, Out](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
flowFactory: () => Flow[In, Out, _]
): Flow[In, Out, NotUsed]
}
/**
* Factory for restart sinks with exponential backoff
*/
object RestartSink {
/**
* Create a sink that restarts with exponential backoff on failure
* @param minBackoff Minimum backoff duration
* @param maxBackoff Maximum backoff duration
* @param randomFactor Random factor for backoff jitter
* @param sinkFactory Function that creates the sink
* @return Sink that automatically restarts on failure
*/
def withBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
sinkFactory: () => Sink[T, _]
): Sink[T, NotUsed]
}Usage Examples:
import scala.concurrent.duration._
// Restart source with backoff
val restartingSource = RestartSource.withBackoff(
minBackoff = 1.second,
maxBackoff = 30.seconds,
randomFactor = 0.2,
sourceFactory = () => Source.fromIterator(() => unreliableDataIterator())
)
// Restart flow for processing
val restartingFlow = RestartFlow.withBackoff(
minBackoff = 500.millis,
maxBackoff = 10.seconds,
randomFactor = 0.1,
flowFactory = () => Flow[String].map(processString)
)
// Restart sink for output
val restartingSink = RestartSink.withBackoff(
minBackoff = 1.second,
maxBackoff = 20.seconds,
randomFactor = 0.3,
sinkFactory = () => Sink.foreach(writeToUnreliableService)
)
// Use in pipeline
restartingSource
.via(restartingFlow)
.runWith(restartingSink)Specialized retry operators for individual elements and operations.
/**
* Factory for retry flows that retry individual elements
*/
object RetryFlow {
/**
* Create a flow that retries failed elements with backoff
* @param minBackoff Minimum backoff duration
* @param maxBackoff Maximum backoff duration
* @param randomFactor Random factor for backoff jitter
* @param maxRetries Maximum number of retries per element
* @param flow Flow to apply with retry logic
* @return Flow that retries failed elements
*/
def withBackoff[In, Out](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRetries: Int,
flow: Flow[In, Out, _]
): Flow[In, Out, NotUsed]
/**
* Create a flow that retries elements while preserving context
* @param minBackoff Minimum backoff duration
* @param maxBackoff Maximum backoff duration
* @param randomFactor Random factor for backoff jitter
* @param maxRetries Maximum number of retries per element
* @param flow Flow to apply with retry logic
* @return Flow that retries failed elements with context
*/
def withBackoffAndContext[In, CtxIn, Out, CtxOut](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRetries: Int,
flow: Flow[(In, CtxIn), (Out, CtxOut), _]
): Flow[(In, CtxIn), (Out, CtxOut), NotUsed]
}Usage Examples:
// Retry individual HTTP requests
val httpFlow = Flow[String].mapAsync(1) { url =>
Http().singleRequest(HttpRequest(uri = url))
.map(response => response.entity.toStrict(5.seconds))
}
val retryingHttpFlow = RetryFlow.withBackoff(
minBackoff = 100.millis,
maxBackoff = 2.seconds,
randomFactor = 0.2,
maxRetries = 3,
flow = httpFlow
)
Source(List("http://api1.com", "http://api2.com"))
.via(retryingHttpFlow)
.runWith(Sink.seq)Operators for monitoring and reporting errors without stopping the stream.
/**
* Log errors without affecting the stream
* @param name Name for logging
* @param extract Function to extract loggable information from elements
* @return Stream that logs errors and continues processing
*/
def log(name: String, extract: Out => Any = ConstantFun.scalaIdentityFunction)(implicit log: LoggingAdapter = null): Source[Out, Mat]
/**
* Monitor stream completion and failure
* @param onComplete Function called when stream completes successfully
* @param onFailure Function called when stream fails
* @return Stream that executes monitoring callbacks
*/
def monitor[U >: Out](onComplete: () => Unit = () => (), onFailure: Throwable => Unit = _ => ()): Source[U, Mat]
/**
* Execute side effects for both successful and failed outcomes
* @param f Function to execute for each outcome
* @return Stream that executes side effects without modifying elements
*/
def watchTermination[Mat2](f: (Mat, Future[Done]) => Mat2): Source[Out, Mat2]Usage Examples:
// Log errors and continue
Source(1 to 10)
.map(x => 10 / (x - 5)) // Throws at x=5
.log("division-errors")
.recover { case _: ArithmeticException => 0 }
.runWith(Sink.seq)
// Monitor stream termination
Source(1 to 100)
.watchTermination() { (mat, done) =>
done.onComplete {
case Success(_) => println("Stream completed successfully")
case Failure(ex) => println(s"Stream failed: $ex")
}
mat
}
.runWith(Sink.ignore)Operators for handling timeouts and deadlines in stream processing.
/**
* Fail the stream if processing takes longer than the specified duration
* @param timeout Maximum duration to wait
* @return Stream that fails on timeout
*/
def completionTimeout(timeout: FiniteDuration): Source[Out, Mat]
/**
* Fail the stream if no elements are received within the specified duration
* @param timeout Maximum duration between elements
* @return Stream that fails if idle too long
*/
def idleTimeout(timeout: FiniteDuration): Source[Out, Mat]
/**
* Fail the stream if backpressure lasts longer than specified duration
* @param timeout Maximum backpressure duration
* @return Stream that fails on prolonged backpressure
*/
def backpressureTimeout(timeout: FiniteDuration): Source[Out, Mat]
/**
* Complete the stream after the specified initial timeout, regardless of processing
* @param timeout Duration after which to complete
* @return Stream that completes after timeout
*/
def initialTimeout(timeout: FiniteDuration): Source[Out, Mat]Usage Examples:
import scala.concurrent.duration._
// Timeout for overall completion
Source.fromIterator(() => slowDataIterator)
.completionTimeout(30.seconds)
.recover {
case _: TimeoutException => "timeout-fallback"
}
.runWith(Sink.seq)
// Timeout between elements
Source.tick(1.second, 2.seconds, "tick")
.idleTimeout(5.seconds) // Fail if no tick for 5 seconds
.recover {
case _: TimeoutException => "idle-timeout"
}
.runWith(Sink.seq)
// Initial timeout
Source.repeat("element")
.initialTimeout(10.seconds) // Complete after 10 seconds regardless
.runWith(Sink.seq)// Supervision types
object Supervision {
type Decider = Throwable => Directive
sealed abstract class Directive
case object Resume extends Directive
case object Restart extends Directive
case object Stop extends Directive
}
// Restart settings
final case class RestartSettings(
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double,
maxRestarts: Int = -1,
maxRestartsWithin: FiniteDuration = Duration.Undefined,
onlyOnFailures: Set[Class[_ <: Throwable]] = Set.empty
)
// Common exception types
class StreamDetachedException(msg: String) extends RuntimeException(msg)
class NeverMaterializedException(msg: String) extends RuntimeException(msg)
class TooManySubstreamsOpenException(msg: String) extends RuntimeException(msg)