Supervision strategies, fault handling, and the "let it crash" philosophy for building resilient systems. This covers supervision trees, error handling directives, and backoff supervision patterns.
Core supervision strategies for handling child actor failures.
/**
* Abstract base class for supervision strategies
*/
abstract class SupervisorStrategy {
/** Function that decides how to handle exceptions */
def decider: Decider
/** Called when a child actor terminates */
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit
/** Whether to log exceptions */
def loggingEnabled: Boolean
/** Restart statistics for rate limiting */
def withinTimeRange: Duration
def maxNrOfRetries: Int
}
/**
* Supervision directives for exception handling
*/
sealed trait Directive
case object Resume extends Directive // Continue processing, ignore failure
case object Restart extends Directive // Restart the actor
case object Stop extends Directive // Stop the actor permanently
case object Escalate extends Directive // Escalate to parent supervisor
/**
* Decider function type for mapping exceptions to directives
*/
type Decider = PartialFunction[Throwable, Directive]
object SupervisorStrategy {
/** Default supervision strategy (restart on any exception) */
val defaultStrategy: SupervisorStrategy = OneForOneStrategy()(defaultDecider)
/** Default decider that restarts on any exception */
val defaultDecider: Decider = {
case _: ActorInitializationException => Stop
case _: ActorKilledException => Stop
case _: DeathPactException => Stop
case _: Exception => Restart
}
/** Stop-all-on-any-failure strategy */
val stoppingStrategy: SupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 0, loggingEnabled = false) {
case _ => Stop
}
}Usage Examples:
import akka.actor.{Actor, OneForOneStrategy, SupervisorStrategy}
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
class CustomSupervisor extends Actor {
// Custom supervision strategy
override val supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 3,
withinTimeRange = 1.minute,
loggingEnabled = true
) {
case _: ArithmeticException => Resume // Continue on math errors
case _: IllegalArgumentException => Restart // Restart on invalid args
case _: RuntimeException => Stop // Stop on runtime errors
case _ => Escalate // Escalate unknown exceptions
}
def receive = {
case "create-child" =>
val child = context.actorOf(Props[WorkerActor](), "worker")
sender() ! child
case msg =>
context.children.foreach(_ ! msg)
}
}Apply supervision directive only to the failing child actor.
/**
* Supervision strategy affecting only the failing child
*/
case class OneForOneStrategy(
maxNrOfRetries: Int = -1, // Max restarts (-1 = unlimited)
withinTimeRange: Duration = Duration.Inf, // Time window for restart counting
loggingEnabled: Boolean = true // Whether to log exceptions
)(decider: Decider) extends SupervisorStrategy
object OneForOneStrategy {
/** Create with default parameters */
def apply(decider: Decider): OneForOneStrategy = OneForOneStrategy()(decider)
}Usage Examples:
import akka.actor.{Actor, OneForOneStrategy}
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
class OneForOneSupervisor extends Actor {
override val supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 5, // Allow 5 restarts
withinTimeRange = 1.minute // Within 1 minute window
) {
case _: IllegalStateException => Restart
case _: IllegalArgumentException => Resume
case _: Exception => Escalate
}
override def preStart(): Unit = {
// Create multiple children
context.actorOf(Props[WorkerActor](), "worker1")
context.actorOf(Props[WorkerActor](), "worker2")
context.actorOf(Props[WorkerActor](), "worker3")
}
def receive = {
case ("send", childName: String, msg: Any) =>
context.child(childName).foreach(_ ! msg)
case msg =>
context.children.foreach(_ ! msg)
}
}
class WorkerActor extends Actor {
var processedCount = 0
override def preStart(): Unit = {
println(s"${self.path.name} starting")
}
override def postRestart(reason: Throwable): Unit = {
println(s"${self.path.name} restarted due to: ${reason.getMessage}")
super.postRestart(reason)
}
def receive = {
case "work" =>
processedCount += 1
println(s"${self.path.name} processed $processedCount items")
case "fail-state" =>
throw new IllegalStateException("Invalid state")
case "fail-arg" =>
throw new IllegalArgumentException("Invalid argument")
case count =>
sender() ! processedCount
}
}Apply supervision directive to all child actors when one fails.
/**
* Supervision strategy affecting all children when one fails
*/
case class AllForOneStrategy(
maxNrOfRetries: Int = -1, // Max restarts for any child
withinTimeRange: Duration = Duration.Inf, // Time window for restart counting
loggingEnabled: Boolean = true // Whether to log exceptions
)(decider: Decider) extends SupervisorStrategy
object AllForOneStrategy {
/** Create with default parameters */
def apply(decider: Decider): AllForOneStrategy = AllForOneStrategy()(decider)
}Usage Examples:
import akka.actor.{Actor, AllForOneStrategy}
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
class AllForOneSupervisor extends Actor {
// If any child fails, restart all children
override val supervisorStrategy = AllForOneStrategy(
maxNrOfRetries = 3,
withinTimeRange = 30.seconds
) {
case _: IllegalStateException => Restart // Restart all children
case _: RuntimeException => Stop // Stop all children
case _ => Escalate
}
override def preStart(): Unit = {
// Create coordinated workers that depend on each other
context.actorOf(Props[DatabaseActor](), "database")
context.actorOf(Props[CacheActor](), "cache")
context.actorOf(Props[ProcessorActor](), "processor")
}
def receive = {
case "status" =>
val children = context.children.map(_.path.name).mkString(", ")
sender() ! s"Active children: $children"
case msg =>
context.children.foreach(_ ! msg)
}
}
// Example actors that might need coordinated restart
class DatabaseActor extends Actor {
def receive = {
case "corrupt" => throw new IllegalStateException("Database corrupted")
case msg => println(s"Database: $msg")
}
}
class CacheActor extends Actor {
def receive = {
case msg => println(s"Cache: $msg")
}
}
class ProcessorActor extends Actor {
def receive = {
case msg => println(s"Processor: $msg")
}
}Monitor actor termination and handle failures through death watch.
/**
* Death watch functionality in ActorContext
*/
trait ActorContext {
/** Watch an actor for termination */
def watch(subject: ActorRef): ActorRef
/** Stop watching an actor */
def unwatch(subject: ActorRef): ActorRef
}
/**
* Termination notification message
*/
final case class Terminated(actor: ActorRef)(
val existenceConfirmed: Boolean, // Whether actor ever existed
val addressTerminated: Boolean // Whether entire address terminated
) extends AutoReceivedMessage with PossiblyHarmful
/**
* Death pact exception when watched actor terminates
*/
final case class DeathPactException(dead: ActorRef) extends AkkaException(s"Monitored actor [$dead] terminated")Usage Examples:
import akka.actor.{Actor, ActorRef, Terminated, Props}
class MonitoringActor extends Actor {
var watchedActors = Set.empty[ActorRef]
def receive = {
case ("watch", actor: ActorRef) =>
context.watch(actor)
watchedActors += actor
println(s"Now watching ${actor.path.name}")
case ("unwatch", actor: ActorRef) =>
context.unwatch(actor)
watchedActors -= actor
println(s"Stopped watching ${actor.path.name}")
case Terminated(actor) =>
watchedActors -= actor
println(s"Actor ${actor.path.name} terminated")
// Decide what to do about termination
if (watchedActors.isEmpty) {
println("All watched actors terminated, stopping self")
context.stop(self)
}
case "create-worker" =>
val worker = context.actorOf(Props[WorkerActor]())
context.watch(worker)
watchedActors += worker
sender() ! worker
case "status" =>
sender() ! s"Watching ${watchedActors.size} actors"
}
}
// Death pact - terminate when dependency terminates
class DependentActor(dependency: ActorRef) extends Actor {
context.watch(dependency)
def receive = {
case Terminated(`dependency`) =>
throw DeathPactException(dependency) // Will cause this actor to stop
case msg =>
dependency ! msg // Forward to dependency
}
}Advanced supervision with exponential backoff for resilient restart patterns.
/**
* Backoff supervisor for handling repeated failures with exponential backoff
*/
object BackoffSupervisor {
/** Create backoff supervisor props */
def props(childProps: Props, childName: String, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, randomFactor: Double): Props
}
/**
* Backoff options for configuring backoff behavior
*/
class BackoffOptions private (
val childProps: Props,
val childName: String,
val minBackoff: FiniteDuration,
val maxBackoff: FiniteDuration,
val randomFactor: Double
) {
/** Configure supervision strategy */
def withSupervisorStrategy(supervisorStrategy: SupervisorStrategy): BackoffOptions
/** Configure auto-reset of backoff */
def withDefaultStoppingStrategy: BackoffOptions
/** Configure manual reset */
def withManualReset: BackoffOptions
}
object BackoffOptions {
/** Create backoff options on failure */
def onFailure(childProps: Props, childName: String, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, randomFactor: Double): BackoffOptions
/** Create backoff options on stop */
def onStop(childProps: Props, childName: String, minBackoff: FiniteDuration,
maxBackoff: FiniteDuration, randomFactor: Double): BackoffOptions
}Usage Examples:
import akka.actor.{Actor, BackoffSupervisor, BackoffOptions, SupervisorStrategy}
import akka.pattern.BackoffSupervisor
import scala.concurrent.duration._
class BackoffParent extends Actor {
// Create backoff supervisor for unreliable child
val backoffSupervisor = context.actorOf(
BackoffSupervisor.props(
BackoffOptions.onFailure(
childProps = Props[UnreliableActor](),
childName = "unreliable-child",
minBackoff = 1.second,
maxBackoff = 30.seconds,
randomFactor = 0.2 // Add jitter
).withSupervisorStrategy(
OneForOneStrategy() {
case _: RuntimeException => SupervisorStrategy.Restart
case _ => SupervisorStrategy.Escalate
}
)
),
"backoff-supervisor"
)
def receive = {
case msg => backoffSupervisor ! msg
}
}
class UnreliableActor extends Actor {
var attempts = 0
override def preStart(): Unit = {
attempts += 1
println(s"UnreliableActor starting (attempt $attempts)")
}
def receive = {
case "work" =>
if (scala.util.Random.nextDouble() < 0.7) {
throw new RuntimeException("Random failure!")
} else {
println("Work completed successfully")
sender() ! "done"
}
case msg =>
println(s"Processing: $msg")
}
}
// Backoff supervisor usage
val parent = system.actorOf(Props[BackoffParent](), "backoff-parent")
parent ! "work" // May fail and restart with backoffError escalation patterns and exception handling in supervision trees.
/**
* Exception types for different error scenarios
*/
class ActorInitializationException(message: String, cause: Throwable) extends AkkaException(message, cause)
final case class ActorKilledException(message: String) extends AkkaException(message)
final case class PreRestartException(restartedActor: ActorRef, cause: Throwable, originalMessage: Option[Any])
extends AkkaException(s"Supervised actor ${restartedActor} failed during restart")
final case class PostRestartException(restartedActor: ActorRef, cause: Throwable, originalMessage: Option[Any])
extends AkkaException(s"Supervised actor ${restartedActor} failed after restart")
final case class InvalidActorNameException(message: String) extends AkkaException(message)
/**
* Kill message that causes ActorKilledException
*/
case object Kill extends AutoReceivedMessage with PossiblyHarmfulUsage Examples:
import akka.actor.{Actor, Kill, ActorKilledException}
import akka.actor.SupervisorStrategy._
class EscalatingActor extends Actor {
override val supervisorStrategy = OneForOneStrategy() {
case _: ArithmeticException => Resume
case _: IllegalArgumentException => Restart
case _: ActorKilledException => Stop
case ex: RuntimeException =>
println(s"Escalating exception: ${ex.getMessage}")
Escalate // Let parent handle it
}
override def preStart(): Unit = {
context.actorOf(Props[FlakyWorker](), "worker")
}
def receive = {
case "kill-worker" =>
context.child("worker").foreach(_ ! Kill)
case msg =>
context.child("worker").foreach(_ ! msg)
}
}
class FlakyWorker extends Actor {
def receive = {
case "divide" =>
val result = 10 / 0 // ArithmeticException - will be resumed
sender() ! result
case "invalid" =>
throw new IllegalArgumentException("Invalid input") // Will restart
case "critical" =>
throw new RuntimeException("Critical error") // Will escalate
case Kill =>
throw ActorKilledException("Killed by supervisor")
case msg =>
println(s"Worker processing: $msg")
}
}
// Top-level supervisor handling escalated exceptions
class TopSupervisor extends Actor {
override val supervisorStrategy = OneForOneStrategy() {
case ex: RuntimeException =>
println(s"Top supervisor handling: ${ex.getMessage}")
Restart // Restart the escalating actor
}
override def preStart(): Unit = {
context.actorOf(Props[EscalatingActor](), "escalating")
}
def receive = {
case msg => context.child("escalating").foreach(_ ! msg)
}
}