CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

Akka Streams is a reactive streams implementation for processing and transferring sequences of elements using bounded buffer space with automatic backpressure management.

Pending
Overview
Eval results
Files

error-handling.mddocs/

Error Handling and Supervision

Comprehensive error handling with supervision strategies, recovery operations, and stream resilience patterns.

Supervision Strategies

Supervision Directives

object Supervision {
  sealed trait Directive
  case object Stop extends Directive      // Stop the processing stage
  case object Resume extends Directive    // Skip the failing element and continue
  case object Restart extends Directive   // Restart the processing stage
  
  type Decider = Function[Throwable, Directive]
}

Predefined Supervision Deciders

object Supervision {
  val stoppingDecider: Decider = _ => Stop
  val resumingDecider: Decider = _ => Resume  
  val restartingDecider: Decider = _ => Restart
}

Usage Examples:

import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}

// Custom supervision strategy
val customDecider: Supervision.Decider = {
  case _: NumberFormatException => Supervision.Resume
  case _: IllegalArgumentException => Supervision.Restart
  case _ => Supervision.Stop
}

// Configure materializer with supervision
val settings = ActorMaterializerSettings(system)
  .withSupervisionStrategy(customDecider)

implicit val materializer = ActorMaterializer(settings)

// Apply supervision to specific stream
val source = Source(List("1", "2", "bad", "4", "5"))
  .map(_.toInt) // Will fail on "bad"
  .withAttributes(ActorAttributes.supervisionStrategy(customDecider))
  .runWith(Sink.foreach(println))
// Output: 1, 2, 4, 5 (skips "bad")

Recovery Operations

Stream-Level Recovery

trait FlowOps[+Out, +Mat] {
  def recover[T >: Out](pf: PartialFunction[Throwable, T]): Repr[T]
  def recoverWithRetries[T >: Out](
    attempts: Int, 
    pf: PartialFunction[Throwable, Graph[SourceShape[T], NotUsed]]
  ): Repr[T]
  def mapError(f: Throwable => Throwable): Repr[Out]
}

Usage Examples:

// Basic recovery with fallback value
val recoveredStream = Source(List("1", "2", "abc", "4"))
  .map(_.toInt)
  .recover {
    case _: NumberFormatException => -1
  }
  .runWith(Sink.seq)
// Result: List(1, 2, -1, 4)

// Recovery with retries using alternative source
val withRetries = Source(List("1", "2", "abc", "4"))
  .map(_.toInt)
  .recoverWithRetries(3, {
    case _: NumberFormatException => Source.single(0)
  })

// Transform errors
val transformedErrors = source
  .mapError {
    case nfe: NumberFormatException => 
      new IllegalArgumentException(s"Invalid number: ${nfe.getMessage}")
  }

Restart Patterns

RestartSource

object RestartSource {
  def withBackoff[T](
    minBackoff: FiniteDuration,
    maxBackoff: FiniteDuration, 
    randomFactor: Double
  )(sourceFactory: () => Source[T, _]): Source[T, NotUsed]
  
  def onFailuresWithBackoff[T](
    minBackoff: FiniteDuration,
    maxBackoff: FiniteDuration,
    randomFactor: Double
  )(sourceFactory: () => Source[T, _]): Source[T, NotUsed]
}

Usage Example:

import akka.stream.scaladsl.RestartSource
import scala.concurrent.duration._

// Restart source with exponential backoff
val resilientSource = RestartSource.withBackoff(
  minBackoff = 1.second,
  maxBackoff = 30.seconds,
  randomFactor = 0.2
) { () =>
  Source.tick(1.second, 1.second, "tick")
    .map { _ =>
      if (scala.util.Random.nextDouble() < 0.1) throw new RuntimeException("Random failure")
      else "success"
    }
}

resilientSource.runWith(Sink.foreach(println))

RestartFlow and RestartSink

object RestartFlow {
  def withBackoff[In, Out](
    minBackoff: FiniteDuration,
    maxBackoff: FiniteDuration,
    randomFactor: Double
  )(flowFactory: () => Flow[In, Out, _]): Flow[In, Out, NotUsed]
}

object RestartSink {
  def withBackoff[T](
    minBackoff: FiniteDuration,
    maxBackoff: FiniteDuration,
    randomFactor: Double  
  )(sinkFactory: () => Sink[T, _]): Sink[T, NotUsed]
}

Overflow Strategies

OverflowStrategy Types

sealed abstract class OverflowStrategy extends DelayOverflowStrategy

object OverflowStrategy {
  def dropHead: OverflowStrategy      // Drop oldest element
  def dropTail: OverflowStrategy      // Drop newest element
  def dropBuffer: OverflowStrategy    // Drop entire buffer
  def dropNew: OverflowStrategy       // Drop incoming element
  def backpressure: OverflowStrategy  // Apply backpressure (block)
  def fail: OverflowStrategy          // Fail the stream
}

Usage Examples:

import akka.stream.OverflowStrategy

// Buffer with overflow handling
val bufferedStream = Source(1 to 1000)
  .buffer(10, OverflowStrategy.dropHead)
  .throttle(1, 100.millis) // Slow consumer
  .runWith(Sink.seq)

// Queue source with overflow strategy
val (queueSource, queue) = Source.queue[Int](5, OverflowStrategy.dropTail)
  .toMat(Sink.seq)(Keep.both)
  .run()

// Offer elements that may be dropped
queue.offer(1)
queue.offer(2)
// ... more elements than buffer size

Exception Handling Patterns

Try-based Processing

import scala.util.{Try, Success, Failure}

val safeParsing = Source(List("1", "2", "abc", "4"))
  .map(s => Try(s.toInt))
  .collect {
    case Success(value) => value
  }
  .runWith(Sink.seq)
// Result: List(1, 2, 4)

// Process both successes and failures
val withFailureHandling = Source(List("1", "2", "abc", "4"))
  .map(s => Try(s.toInt))
  .map {
    case Success(value) => s"Parsed: $value"
    case Failure(ex) => s"Failed: ${ex.getMessage}"
  }
  .runWith(Sink.foreach(println))

Future-based Error Handling

import scala.concurrent.Future

val asyncProcessing = Source(1 to 10)
  .mapAsync(4) { n =>
    Future {
      if (n == 5) throw new RuntimeException("Five is bad!")
      n * 2
    }.recover {
      case _: RuntimeException => -1
    }
  }
  .runWith(Sink.seq)
// Result includes -1 for the failed element

Stream Failure Monitoring

watchTermination

trait FlowOps[+Out, +Mat] {
  def watchTermination[Mat2]()(matF: (Mat, Future[Done]) => Mat2): ReprMat[Out, Mat2]
}

Usage Example:

val monitoredStream = Source(1 to 10)
  .map { n =>
    if (n == 5) throw new RuntimeException("Boom!")
    n
  }
  .watchTermination() { (_, done) =>
    done.onComplete {
      case Success(_) => println("Stream completed successfully")
      case Failure(ex) => println(s"Stream failed with: ${ex.getMessage}")
    }
  }
  .runWith(Sink.ignore)

Stream Monitoring with Custom Logic

val resilientProcessing = Source(1 to 100)
  .mapAsync(4) { n =>
    Future {
      if (scala.util.Random.nextDouble() < 0.1) {
        throw new RuntimeException(s"Random failure at $n")
      }
      n * 2
    }
  }
  .recover {
    case ex: RuntimeException => 
      println(s"Recovered from: ${ex.getMessage}")
      -1
  }
  .filter(_ > 0) // Remove recovered elements
  .runWith(Sink.seq)

Circuit Breaker Pattern

import akka.pattern.CircuitBreaker
import scala.concurrent.duration._

val circuitBreaker = new CircuitBreaker(
  scheduler = system.scheduler,
  maxFailures = 5,
  callTimeout = 10.seconds,
  resetTimeout = 1.minute
)

val protectedProcessing = Source(1 to 100)
  .mapAsync(4) { n =>
    circuitBreaker.withCircuitBreaker {
      Future {
        // Potentially failing operation
        if (scala.util.Random.nextDouble() < 0.2) {
          throw new RuntimeException("Service unavailable")
        }
        s"Processed: $n"
      }
    }.recover {
      case _: akka.pattern.CircuitBreakerOpenException => "Circuit breaker open"
      case ex => s"Error: ${ex.getMessage}"
    }
  }
  .runWith(Sink.foreach(println))

Error Propagation Control

Isolating Errors in Substreams

val safeBatchProcessing = Source(1 to 20)
  .grouped(5)
  .map { batch =>
    Source(batch)
      .map { n =>
        if (n % 7 == 0) throw new RuntimeException(s"Seven divisor: $n")
        n * 2
      }
      .recover {
        case _: RuntimeException => -1
      }
      .runWith(Sink.seq)
  }
  .mapAsync(1)(identity) // Materialize each batch
  .runWith(Sink.seq)

This error handling approach ensures stream resilience while providing fine-grained control over failure scenarios and recovery strategies.

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-stream-2-13-0-m5

docs

control-flow.md

core-components.md

error-handling.md

graph-building.md

index.md

io-integration.md

junction-operations.md

materialization.md

stream-operations.md

tile.json