Akka Cluster provides a fault-tolerant decentralized peer-to-peer based cluster membership service with no single point of failure or single point of bottleneck using gossip protocols and automatic failure detection.
—
Service Provider Interfaces for extending cluster behavior including custom downing strategies and join validation. These SPIs allow customization of critical cluster behaviors to meet specific application requirements.
Interface for implementing custom downing strategies that determine when and how to remove unreachable members from the cluster.
/**
* API for plugins that will handle downing of cluster nodes.
* Concrete plugins must subclass and have a public one argument constructor accepting an ActorSystem.
*/
abstract class DowningProvider {
/**
* Time margin after which shards or singletons that belonged to a downed/removed
* partition are created in surviving partition. This is useful if you implement
* downing strategies that handle network partitions.
*/
def downRemovalMargin: FiniteDuration
/**
* If a props is returned it is created as a child of the core cluster daemon on cluster startup.
* It should then handle downing using the regular Cluster APIs.
* The actor will run on the same dispatcher as the cluster actor if dispatcher not configured.
*
* May throw an exception which will then immediately lead to Cluster stopping,
* as the downing provider is vital to a working cluster.
*/
def downingActorProps: Option[Props]
}Usage Examples:
// Custom downing provider implementation
class QuorumBasedDowningProvider(system: ActorSystem) extends DowningProvider {
val settings = new ClusterSettings(system.settings.config, system.name)
// Allow time for persistence to catch up after downing
override def downRemovalMargin: FiniteDuration = 30.seconds
// Create downing actor to implement the strategy
override def downingActorProps: Option[Props] =
Some(Props(new QuorumBasedDowningActor(settings)))
}
class QuorumBasedDowningActor(settings: ClusterSettings) extends Actor with ActorLogging {
val cluster = Cluster(context.system)
var unreachableMembers = Set.empty[Member]
override def preStart(): Unit = {
cluster.subscribe(self,
classOf[UnreachableMember],
classOf[ReachableMember],
classOf[MemberRemoved])
}
def receive = {
case UnreachableMember(member) =>
unreachableMembers += member
evaluateDowning()
case ReachableMember(member) =>
unreachableMembers -= member
case MemberRemoved(member, _) =>
unreachableMembers -= member
}
def evaluateDowning(): Unit = {
val currentMembers = cluster.state.members.size
val reachableMembers = currentMembers - unreachableMembers.size
val quorumSize = (currentMembers / 2) + 1
if (reachableMembers >= quorumSize) {
// We have quorum, safe to down unreachable members
unreachableMembers.foreach { member =>
log.info("Downing unreachable member: {}", member.address)
cluster.down(member.address)
}
} else {
log.warning("Cannot down members - would lose quorum ({} reachable, need {})",
reachableMembers, quorumSize)
}
}
override def postStop(): Unit = {
cluster.unsubscribe(self)
}
}
// Configuration
// akka.cluster.downing-provider-class = "com.myapp.QuorumBasedDowningProvider"Built-in downing provider implementations.
/**
* Default downing provider used when no provider is configured and
* 'auto-down-unreachable-after' is not enabled.
*/
final class NoDowning(system: ActorSystem) extends DowningProvider {
override def downRemovalMargin: FiniteDuration = Cluster(system).settings.DownRemovalMargin
override val downingActorProps: Option[Props] = None
}
/**
* Downing provider used when auto-down is enabled.
* Automatically downs unreachable members after configured timeout.
*/
class AutoDowning(system: ActorSystem) extends DowningProvider {
override def downRemovalMargin: FiniteDuration
override def downingActorProps: Option[Props]
}Auto-Down Configuration:
# Enable automatic downing (use with caution in production)
akka.cluster.auto-down-unreachable-after = 10s
# Or use custom downing provider
akka.cluster {
downing-provider-class = "com.myapp.CustomDowningProvider"
down-removal-margin = 30s
}Examples of more sophisticated downing strategies.
// Split-brain resolver based on oldest member
class OldestMemberDowningProvider(system: ActorSystem) extends DowningProvider {
override def downRemovalMargin: FiniteDuration = 20.seconds
override def downingActorProps: Option[Props] =
Some(Props(new OldestMemberDowningActor()))
}
class OldestMemberDowningActor extends Actor with ActorLogging {
val cluster = Cluster(context.system)
override def preStart(): Unit = {
cluster.subscribe(self, classOf[UnreachableMember])
}
def receive = {
case UnreachableMember(member) =>
val reachableMembers = cluster.state.members -- cluster.state.unreachable
val oldestReachable = reachableMembers.minBy(_.upNumber)
if (cluster.selfMember == oldestReachable) {
// I'm the oldest reachable member, I decide who to down
log.info("As oldest member, downing unreachable: {}", member.address)
cluster.down(member.address)
} else {
log.info("Not oldest member, waiting for {} to make downing decision",
oldestReachable.address)
}
}
override def postStop(): Unit = {
cluster.unsubscribe(self)
}
}
// Role-based downing - only down members not in critical roles
class RoleBasedDowningProvider(system: ActorSystem) extends DowningProvider {
val criticalRoles = Set("database", "master")
override def downRemovalMargin: FiniteDuration = 15.seconds
override def downingActorProps: Option[Props] =
Some(Props(new RoleBasedDowningActor(criticalRoles)))
}
class RoleBasedDowningActor(criticalRoles: Set[String]) extends Actor with ActorLogging {
val cluster = Cluster(context.system)
override def preStart(): Unit = {
cluster.subscribe(self, classOf[UnreachableMember])
}
def receive = {
case UnreachableMember(member) =>
val hasCriticalRole = member.roles.intersect(criticalRoles).nonEmpty
if (hasCriticalRole) {
log.warning("NOT downing member {} with critical roles: {}",
member.address, member.roles.intersect(criticalRoles))
} else {
log.info("Downing non-critical member: {} (roles: {})",
member.address, member.roles)
cluster.down(member.address)
}
}
override def postStop(): Unit = {
cluster.unsubscribe(self)
}
}SPI for validating configuration compatibility when nodes attempt to join the cluster.
/**
* Service provider interface for validating configuration compatibility when nodes join.
* Implementations must have a constructor accepting ActorSystem and ClusterSettings.
*/
abstract class JoinConfigCompatChecker {
/** Configuration keys that must be present and validated */
def requiredKeys: immutable.Seq[String]
/**
* Check if the joining node's configuration is compatible.
* @param toCheck Configuration from joining node
* @param actualConfig This node's configuration
* @return Valid if compatible, Invalid with error messages if not
*/
def check(toCheck: Config, actualConfig: Config): ConfigValidation
}
/**
* Configuration validation result
*/
sealed trait ConfigValidation
case object Valid extends ConfigValidation
case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation
object JoinConfigCompatChecker {
/** Factory method to load configured checker */
def load(system: ActorSystem, settings: ClusterSettings): JoinConfigCompatChecker
/** Utility to check key existence */
def exists(requiredKeys: immutable.Seq[String], toCheck: Config): ConfigValidation
/** Utility to check exact value match */
def fullMatch(requiredKeys: immutable.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation
}Usage Examples:
// Application version compatibility checker
class AppVersionCompatChecker(system: ActorSystem, settings: ClusterSettings)
extends JoinConfigCompatChecker {
val requiredKeys = List(
"akka.actor.provider",
"akka.cluster.roles",
"app.version",
"app.api-version"
)
def check(toCheck: Config, actualConfig: Config): ConfigValidation = {
// First check required keys exist
JoinConfigCompatChecker.exists(requiredKeys, toCheck) match {
case Valid =>
// Custom validation logic
validateVersions(toCheck, actualConfig)
case invalid => invalid
}
}
private def validateVersions(joining: Config, actual: Config): ConfigValidation = {
val joiningAppVersion = joining.getString("app.version")
val joiningApiVersion = joining.getString("app.api-version")
val actualAppVersion = actual.getString("app.version")
val actualApiVersion = actual.getString("app.api-version")
val errors = scala.collection.mutable.ListBuffer[String]()
// Allow same major version
if (!isCompatibleVersion(joiningAppVersion, actualAppVersion)) {
errors += s"Incompatible app version: $joiningAppVersion vs $actualAppVersion"
}
// API version must match exactly
if (joiningApiVersion != actualApiVersion) {
errors += s"API version mismatch: $joiningApiVersion vs $actualApiVersion"
}
// Check cluster roles compatibility
val joiningRoles = joining.getStringList("akka.cluster.roles").asScala.toSet
val actualRoles = actual.getStringList("akka.cluster.roles").asScala.toSet
val allowedRoleCombinations = Set(
Set("frontend", "api"),
Set("backend", "worker"),
Set("database")
)
if (!allowedRoleCombinations.exists(allowed => joiningRoles.subsetOf(allowed))) {
errors += s"Invalid role combination: ${joiningRoles.mkString(", ")}"
}
if (errors.isEmpty) Valid else Invalid(errors.toList)
}
private def isCompatibleVersion(v1: String, v2: String): Boolean = {
// Simple major.minor.patch compatibility
val Array(major1, _, _) = v1.split("\\.")
val Array(major2, _, _) = v2.split("\\.")
major1 == major2
}
}
// Configuration
// akka.cluster.configuration-compatibility-check.checker-class = "com.myapp.AppVersionCompatChecker"// Environment-aware compatibility checker
class EnvironmentCompatChecker(system: ActorSystem, settings: ClusterSettings)
extends JoinConfigCompatChecker {
val requiredKeys = List(
"app.environment",
"app.datacenter",
"app.instance-type"
)
def check(toCheck: Config, actualConfig: Config): ConfigValidation = {
JoinConfigCompatChecker.exists(requiredKeys, toCheck) match {
case Valid => validateEnvironment(toCheck, actualConfig)
case invalid => invalid
}
}
private def validateEnvironment(joining: Config, actual: Config): ConfigValidation = {
val joiningEnv = joining.getString("app.environment")
val actualEnv = actual.getString("app.environment")
// Only allow same environment
if (joiningEnv != actualEnv) {
return Invalid(List(s"Environment mismatch: $joiningEnv cannot join $actualEnv cluster"))
}
val joiningDc = joining.getString("app.datacenter")
val actualDc = actual.getString("app.datacenter")
val joiningInstanceType = joining.getString("app.instance-type")
// Validate data center compatibility
val compatibleDcs = Map(
"us-east-1" -> Set("us-east-1", "us-west-2"),
"eu-west-1" -> Set("eu-west-1", "eu-central-1")
)
compatibleDcs.get(actualDc) match {
case Some(allowed) if !allowed.contains(joiningDc) =>
Invalid(List(s"Data center $joiningDc not compatible with $actualDc"))
case _ =>
// Additional instance type validation
validateInstanceType(joiningInstanceType, joiningDc)
}
}
private def validateInstanceType(instanceType: String, dc: String): ConfigValidation = {
val allowedTypes = Map(
"us-east-1" -> Set("m5.large", "m5.xlarge", "c5.large"),
"eu-west-1" -> Set("m5.large", "c5.large")
)
allowedTypes.get(dc) match {
case Some(allowed) if !allowed.contains(instanceType) =>
Invalid(List(s"Instance type $instanceType not allowed in $dc"))
case _ => Valid
}
}
}
// Development vs Production checker
class DevProdCompatChecker(system: ActorSystem, settings: ClusterSettings)
extends JoinConfigCompatChecker {
val requiredKeys = List("app.mode", "app.debug-enabled")
def check(toCheck: Config, actualConfig: Config): ConfigValidation = {
val joiningMode = toCheck.getString("app.mode")
val actualMode = actualConfig.getString("app.mode")
// Strict separation of dev and prod
if (joiningMode != actualMode) {
Invalid(List(s"Cannot mix $joiningMode and $actualMode nodes in same cluster"))
} else {
// In development, allow debug mismatch with warning
// In production, require exact match
if (actualMode == "production") {
JoinConfigCompatChecker.fullMatch(requiredKeys, toCheck, actualConfig)
} else {
Valid // Dev mode is more permissive
}
}
}
}Pattern for creating configurable downing providers.
// Factory for creating different downing strategies
object DowningProviderFactory {
def create(strategy: String, system: ActorSystem): DowningProvider = strategy match {
case "none" => new NoDowning(system)
case "auto" => new AutoDowning(system)
case "quorum" => new QuorumBasedDowningProvider(system)
case "oldest" => new OldestMemberDowningProvider(system)
case "role-based" => new RoleBasedDowningProvider(system)
case custom =>
// Load custom provider by class name
system.asInstanceOf[ExtendedActorSystem].dynamicAccess
.createInstanceFor[DowningProvider](custom, List(classOf[ActorSystem] -> system))
.get
}
}
// Configuration-driven provider
class ConfigurableDowningProvider(system: ActorSystem) extends DowningProvider {
val config = system.settings.config.getConfig("app.cluster.downing")
val strategy = config.getString("strategy")
val delegate = DowningProviderFactory.create(strategy, system)
override def downRemovalMargin: FiniteDuration = delegate.downRemovalMargin
override def downingActorProps: Option[Props] = delegate.downingActorProps
}
// Configuration
/*
app.cluster.downing {
strategy = "quorum" # or "auto", "oldest", "role-based", etc.
}
akka.cluster.downing-provider-class = "com.myapp.ConfigurableDowningProvider"
*/// Downing provider SPI
abstract class DowningProvider {
def downRemovalMargin: FiniteDuration
def downingActorProps: Option[Props]
}
// Built-in providers
final class NoDowning(system: ActorSystem) extends DowningProvider
class AutoDowning(system: ActorSystem) extends DowningProvider
// Configuration validation SPI
abstract class JoinConfigCompatChecker {
def requiredKeys: immutable.Seq[String]
def check(toCheck: Config, actualConfig: Config): ConfigValidation
}
// Validation result types
sealed trait ConfigValidation
case object Valid extends ConfigValidation
case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation
// Factory methods
object JoinConfigCompatChecker {
def load(system: ActorSystem, settings: ClusterSettings): JoinConfigCompatChecker
def exists(requiredKeys: immutable.Seq[String], toCheck: Config): ConfigValidation
def fullMatch(requiredKeys: immutable.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-cluster