Akka's supervision system provides hierarchical fault tolerance through configurable strategies that define how parent actors handle child actor failures, enabling resilient system design.
Configurable strategies for handling child actor failures with different scoping and retry policies.
/**
* Base class for supervision strategies
*/
abstract class SupervisorStrategy {
/**
* Decision function mapping exceptions to directives
*/
def decider: Decider
/**
* Whether to log failures
*/
def loggingEnabled: Boolean
/**
* Handle child termination
* @param context - Actor context
* @param child - Terminated child
* @param children - Current children
*/
def handleChildTerminated(
context: ActorContext,
child: ActorRef,
children: Iterable[ActorRef]
): Unit
/**
* Process failure and return directive
* @param context - Actor context
* @param restart - Whether this is a restart
* @param child - Failed child
* @param cause - Failure cause
* @param stats - Failure statistics
* @param children - Current children
* @return Directive for handling failure
*/
def processFailure(
context: ActorContext,
restart: Boolean,
child: ActorRef,
cause: Throwable,
stats: ChildRestartStats,
children: Iterable[ChildRestartStats]
): Directive
}
/**
* Strategy that applies directive only to failed child
*/
class OneForOneStrategy(
maxNrOfRetries: Int = -1,
withinTimeRange: Duration = Duration.Inf,
loggingEnabled: Boolean = true
)(val decider: Decider) extends SupervisorStrategy
/**
* Strategy that applies directive to all children
*/
class AllForOneStrategy(
maxNrOfRetries: Int = -1,
withinTimeRange: Duration = Duration.Inf,
loggingEnabled: Boolean = true
)(val decider: Decider) extends SupervisorStrategy
object SupervisorStrategy {
/**
* Default supervision strategy (OneForOneStrategy with default decider)
*/
val defaultStrategy: SupervisorStrategy
/**
* Strategy that stops on any failure
*/
val stoppingStrategy: SupervisorStrategy
/**
* Strategy that restarts on any failure
*/
val restartingStrategy: SupervisorStrategy
/**
* Default decider handling common exceptions
*/
val defaultDecider: Decider
/**
* Decider that stops on any exception
*/
val stoppingDecider: Decider
/**
* Decider that restarts on any exception
*/
val restartingDecider: Decider
}Usage Examples:
import akka.actor._
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
// Custom supervision strategy
class SupervisingActor extends Actor {
override val supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 10,
withinTimeRange = 1.minute,
loggingEnabled = true
) {
case _: ArithmeticException => Resume
case _: NullPointerException => Restart
case _: IllegalArgumentException => Stop
case _: Exception => Escalate
}
def receive = {
case "createChild" =>
val child = context.actorOf(Props[RiskyActor], "risky")
sender() ! child
}
}
// AllForOne strategy example
class TeamSupervisor extends Actor {
override val supervisorStrategy = AllForOneStrategy(
maxNrOfRetries = 5,
withinTimeRange = 1.minute
) {
case _: Exception => Restart // Restart all children
}
def receive = {
case "createTeam" =>
// Create team of workers
val workers = (1 to 3).map { i =>
context.actorOf(Props[WorkerActor], s"worker-$i")
}
sender() ! workers
}
}Actions that can be taken when a child actor fails.
/**
* Base trait for supervision directives
*/
sealed trait Directive
/**
* Resume the actor, keeping its accumulated internal state
*/
case object Resume extends Directive
/**
* Restart the actor, discarding old instance and creating new one
*/
case object Restart extends Directive
/**
* Stop the actor permanently
*/
case object Stop extends Directive
/**
* Escalate the failure to parent supervisor
*/
case object Escalate extends DirectiveUsage Examples:
import akka.actor.SupervisorStrategy._
// Decider examples
val customDecider: Decider = {
case _: IllegalArgumentException => Stop
case _: IllegalStateException => Restart
case _: RuntimeException => Resume
case _: Exception => Escalate
}
// Combining with timeouts
val timeoutAwareDecider: Decider = {
case _: java.util.concurrent.TimeoutException => Restart
case _: java.net.ConnectException => Restart
case _: java.io.IOException => Resume
case _ => Stop
}Tracking and limiting restart attempts within time windows.
/**
* Statistics for child restart attempts
*/
case class ChildRestartStats(child: ActorRef, maxNrOfRetriesCount: Int, restartTimeWindowStartNanos: Long) {
/**
* Check if restart is within allowed limits
* @param maxNrOfRetries - Maximum retry count
* @param withinTimeRange - Time window for retries
* @return true if restart is allowed
*/
def requestRestartPermission(maxNrOfRetries: Int, withinTimeRange: Duration): Boolean
/**
* Record restart attempt
* @return Updated statistics
*/
def restart: ChildRestartStats
}Monitoring actor lifecycle with termination notifications.
/**
* Watch actor for termination
* @param subject - Actor to watch
* @param watcher - Actor that will receive Terminated message
*/
case class Watch(subject: ActorRef, watcher: ActorRef) extends SystemMessage
/**
* Stop watching actor
* @param subject - Actor to stop watching
* @param watcher - Watching actor
*/
case class Unwatch(subject: ActorRef, watcher: ActorRef) extends SystemMessage
/**
* Notification sent when watched actor terminates
* @param actor - Terminated actor
* @param existenceConfirmed - Whether termination was confirmed
* @param addressTerminated - Whether due to address failure
*/
final case class Terminated private[akka] (
actor: ActorRef
)(
val existenceConfirmed: Boolean,
val addressTerminated: Boolean
) extends AutoReceivedMessage with PossiblyHarmfulUsage Examples:
class WatchingActor extends Actor {
def receive = {
case "watchChild" =>
val child = context.actorOf(Props[ChildActor])
context.watch(child) // Start watching
case "unwatchChild" =>
context.children.foreach(context.unwatch) // Stop watching all
case Terminated(actorRef) =>
println(s"Child actor terminated: ${actorRef.path}")
// Handle termination - maybe restart or create replacement
val replacement = context.actorOf(Props[ChildActor])
context.watch(replacement)
}
}
// Death pact - terminate when watched actor dies
class DeathPactActor extends Actor {
val criticalChild = context.actorOf(Props[CriticalActor])
context.watch(criticalChild)
def receive = {
case Terminated(`criticalChild`) =>
// Don't handle - will throw DeathPactException and terminate
throw DeathPactException(criticalChild)
case msg =>
criticalChild ! msg
}
}Specific exceptions used in supervision and error handling.
/**
* Exception thrown when actor receives Kill message
*/
final case class ActorKilledException private[akka] (message: String)
extends AkkaException(message) with NoStackTrace
/**
* Exception during actor initialization
*/
class ActorInitializationException protected (
actor: ActorRef,
message: String,
cause: Throwable
) extends AkkaException(message, cause) {
def getActor: ActorRef = actor
}
/**
* Exception in preRestart hook
*/
final case class PreRestartException private[akka] (
actor: ActorRef,
cause: Throwable,
originalCause: Throwable,
messageOption: Option[Any]
) extends ActorInitializationException(actor, "exception in preRestart", cause)
/**
* Exception in postRestart or constructor during restart
*/
final case class PostRestartException private[akka] (
actor: ActorRef,
cause: Throwable,
originalCause: Throwable
) extends ActorInitializationException(actor, "exception post restart", cause)
/**
* Exception when monitored actor terminates unexpectedly
*/
final case class DeathPactException private[akka] (dead: ActorRef)
extends AkkaException(s"Monitored actor [$dead] terminated") with NoStackTraceUsage Examples:
class FaultHandlingActor extends Actor {
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
// Custom cleanup before restart
println(s"Actor restarting due to: $reason")
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable): Unit = {
// Custom initialization after restart
println(s"Actor restarted after: $reason")
super.postRestart(reason)
}
def receive = {
case "fail" =>
throw new RuntimeException("Intentional failure")
case "kill" =>
self ! Kill // Will throw ActorKilledException
case msg =>
// Normal processing
}
}
// Exception extraction pattern
class ExceptionHandlingActor extends Actor {
def receive = {
case msg =>
try {
riskyOperation(msg)
} catch {
case ActorInitializationException(actor, message, cause) =>
log.error(s"Initialization failed for $actor: $message", cause)
case ex: Exception =>
log.error(ex, "Unexpected error processing message")
}
}
private def riskyOperation(msg: Any): Unit = {
// Potentially failing operation
}
}/**
* Function type for mapping exceptions to directives
*/
type Decider = PartialFunction[Throwable, Directive]
/**
* Base class for system messages
*/
sealed trait SystemMessage extends Serializable
/**
* Configuration for supervision behavior
*/
sealed trait FaultHandlingStrategy {
def decider: Decider
def handleFailure(
context: ActorContext,
child: ActorRef,
cause: Throwable,
stats: ChildRestartStats,
children: Iterable[ChildRestartStats]
): Boolean
}
/**
* Marker trait for messages that suppress dead letter logging
*/
trait DeadLetterSuppression
/**
* Marker trait for potentially harmful messages
*/
trait PossiblyHarmful
/**
* Marker trait for automatically handled messages
*/
private[akka] trait AutoReceivedMessage extends Serializable