Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.
—
Comprehensive error handling with supervision strategies, recovery operations, and stream resilience patterns.
object Supervision {
sealed trait Directive
case object Stop extends Directive // Stop the processing stage
case object Resume extends Directive // Skip the failing element and continue
case object Restart extends Directive // Restart the processing stage
type Decider = Function[Throwable, Directive]
}object Supervision {
val stoppingDecider: Decider = _ => Stop
val resumingDecider: Decider = _ => Resume
val restartingDecider: Decider = _ => Restart
}Usage Examples:
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
// Custom supervision strategy
val customDecider: Supervision.Decider = {
case _: NumberFormatException => Supervision.Resume
case _: IllegalArgumentException => Supervision.Restart
case _ => Supervision.Stop
}
// Configure materializer with supervision
val settings = ActorMaterializerSettings(system)
.withSupervisionStrategy(customDecider)
implicit val materializer = ActorMaterializer(settings)
// Apply supervision to specific stream
val source = Source(List("1", "2", "bad", "4", "5"))
.map(_.toInt) // Will fail on "bad"
.withAttributes(ActorAttributes.supervisionStrategy(customDecider))
.runWith(Sink.foreach(println))
// Output: 1, 2, 4, 5 (skips "bad")trait FlowOps[+Out, +Mat] {
def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T]
def recoverWithRetries[T >: Out](
attempts: Int,
pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]
): Repr[T]
def mapError(f: Throwable => Throwable): Repr[Out]
}Usage Examples:
// Basic recovery with fallback value
val recoveredStream = Source(List("1", "2", "abc", "4"))
.map(_.toInt)
.recover {
case _: NumberFormatException => -1
}
.runWith(Sink.seq)
// Result: List(1, 2, -1, 4)
// Recovery with retries using alternative source
val withRetries = Source(List("1", "2", "abc", "4"))
.map(_.toInt)
.recoverWithRetries(3, {
case _: NumberFormatException => Source.single(0)
})
// Transform errors
val transformedErrors = source
.mapError {
case nfe: NumberFormatException =>
new IllegalArgumentException(s"Invalid number: ${nfe.getMessage}")
}object RestartSource {
def withBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double
)(sourceFactory: () => Source[T, _]): Source[T, NotUsed]
def onFailuresWithBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double
)(sourceFactory: () => Source[T, _]): Source[T, NotUsed]
}Usage Example:
import akka.stream.scaladsl.RestartSource
import scala.concurrent.duration._
// Restart source with exponential backoff
val resilientSource = RestartSource.withBackoff(
minBackoff = 1.second,
maxBackoff = 30.seconds,
randomFactor = 0.2
) { () =>
Source.tick(1.second, 1.second, "tick")
.map { _ =>
if (scala.util.Random.nextDouble() < 0.1) throw new RuntimeException("Random failure")
else "success"
}
}
resilientSource.runWith(Sink.foreach(println))object RestartFlow {
def withBackoff[In, Out](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double
)(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed]
}
object RestartSink {
def withBackoff[T](
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
randomFactor: Double
)(sinkFactory: () => Sink[T, _]): Sink[T, NotUsed]
}sealed abstract class OverflowStrategy extends DelayOverflowStrategy
object OverflowStrategy {
def dropHead: OverflowStrategy // Drop oldest element
def dropTail: OverflowStrategy // Drop newest element
def dropBuffer: OverflowStrategy // Drop entire buffer
def dropNew: OverflowStrategy // Drop incoming element
def backpressure: OverflowStrategy // Apply backpressure (block)
def fail: OverflowStrategy // Fail the stream
}Usage Examples:
import akka.stream.OverflowStrategy
// Buffer with overflow handling
val bufferedStream = Source(1 to 1000)
.buffer(10, OverflowStrategy.dropHead)
.throttle(1, 100.millis) // Slow consumer
.runWith(Sink.seq)
// Queue source with overflow strategy
val (queueSource, queue) = Source.queue[Int](5, OverflowStrategy.dropTail)
.toMat(Sink.seq)(Keep.both)
.run()
// Offer elements that may be dropped
queue.offer(1)
queue.offer(2)
// ... more elements than buffer sizeimport scala.util.{Try, Success, Failure}
val safeParsing = Source(List("1", "2", "abc", "4"))
.map(s => Try(s.toInt))
.collect {
case Success(value) => value
}
.runWith(Sink.seq)
// Result: List(1, 2, 4)
// Process both successes and failures
val withFailureHandling = Source(List("1", "2", "abc", "4"))
.map(s => Try(s.toInt))
.map {
case Success(value) => s"Parsed: $value"
case Failure(ex) => s"Failed: ${ex.getMessage}"
}
.runWith(Sink.foreach(println))import scala.concurrent.Future
val asyncProcessing = Source(1 to 10)
.mapAsync(4) { n =>
Future {
if (n == 5) throw new RuntimeException("Five is bad!")
n * 2
}.recover {
case _: RuntimeException => -1
}
}
.runWith(Sink.seq)
// Result includes -1 for the failed elementtrait FlowOps[+Out, +Mat] {
def watchTermination[Mat2]()(matF: (Mat, Future[Done]) => Mat2): ReprMat[Out, Mat2]
}Usage Example:
val monitoredStream = Source(1 to 10)
.map { n =>
if (n == 5) throw new RuntimeException("Boom!")
n
}
.watchTermination() { (_, done) =>
done.onComplete {
case Success(_) => println("Stream completed successfully")
case Failure(ex) => println(s"Stream failed with: ${ex.getMessage}")
}
}
.runWith(Sink.ignore)val resilientProcessing = Source(1 to 100)
.mapAsync(4) { n =>
Future {
if (scala.util.Random.nextDouble() < 0.1) {
throw new RuntimeException(s"Random failure at $n")
}
n * 2
}
}
.recover {
case ex: RuntimeException =>
println(s"Recovered from: ${ex.getMessage}")
-1
}
.filter(_ > 0) // Remove recovered elements
.runWith(Sink.seq)import akka.pattern.CircuitBreaker
import scala.concurrent.duration._
val circuitBreaker = new CircuitBreaker(
scheduler = system.scheduler,
maxFailures = 5,
callTimeout = 10.seconds,
resetTimeout = 1.minute
)
val protectedProcessing = Source(1 to 100)
.mapAsync(4) { n =>
circuitBreaker.withCircuitBreaker {
Future {
// Potentially failing operation
if (scala.util.Random.nextDouble() < 0.2) {
throw new RuntimeException("Service unavailable")
}
s"Processed: $n"
}
}.recover {
case _: akka.pattern.CircuitBreakerOpenException => "Circuit breaker open"
case ex => s"Error: ${ex.getMessage}"
}
}
.runWith(Sink.foreach(println))val safeBatchProcessing = Source(1 to 20)
.grouped(5)
.map { batch =>
Source(batch)
.map { n =>
if (n % 7 == 0) throw new RuntimeException(s"Seven divisor: $n")
n * 2
}
.recover {
case _: RuntimeException => -1
}
.runWith(Sink.seq)
}
.mapAsync(1)(identity) // Materialize each batch
.runWith(Sink.seq)This error handling approach ensures stream resilience while providing fine-grained control over failure scenarios and recovery strategies.
Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5