CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-cluster

Akka Cluster provides a fault-tolerant decentralized peer-to-peer based cluster membership service with no single point of failure or single point of bottleneck using gossip protocols and automatic failure detection.

Pending
Overview
Eval results
Files

extensibility.mddocs/

Extensibility

Service Provider Interfaces for extending cluster behavior including custom downing strategies and join validation. These SPIs allow customization of critical cluster behaviors to meet specific application requirements.

Capabilities

Downing Provider SPI

Interface for implementing custom downing strategies that determine when and how to remove unreachable members from the cluster.

/**
 * API for plugins that will handle downing of cluster nodes.
 * Concrete plugins must subclass and have a public one argument constructor accepting an ActorSystem.
 */
abstract class DowningProvider {
  /**
   * Time margin after which shards or singletons that belonged to a downed/removed
   * partition are created in surviving partition. This is useful if you implement 
   * downing strategies that handle network partitions.
   */
  def downRemovalMargin: FiniteDuration
  
  /**
   * If a props is returned it is created as a child of the core cluster daemon on cluster startup.
   * It should then handle downing using the regular Cluster APIs.
   * The actor will run on the same dispatcher as the cluster actor if dispatcher not configured.
   * 
   * May throw an exception which will then immediately lead to Cluster stopping,
   * as the downing provider is vital to a working cluster.
   */
  def downingActorProps: Option[Props]
}

Usage Examples:

// Custom downing provider implementation
class QuorumBasedDowningProvider(system: ActorSystem) extends DowningProvider {
  val settings = new ClusterSettings(system.settings.config, system.name)
  
  // Allow time for persistence to catch up after downing
  override def downRemovalMargin: FiniteDuration = 30.seconds
  
  // Create downing actor to implement the strategy
  override def downingActorProps: Option[Props] = 
    Some(Props(new QuorumBasedDowningActor(settings)))
}

class QuorumBasedDowningActor(settings: ClusterSettings) extends Actor with ActorLogging {
  val cluster = Cluster(context.system)
  var unreachableMembers = Set.empty[Member]
  
  override def preStart(): Unit = {
    cluster.subscribe(self, 
      classOf[UnreachableMember], 
      classOf[ReachableMember],
      classOf[MemberRemoved])
  }
  
  def receive = {
    case UnreachableMember(member) =>
      unreachableMembers += member
      evaluateDowning()
      
    case ReachableMember(member) =>
      unreachableMembers -= member
      
    case MemberRemoved(member, _) =>
      unreachableMembers -= member
  }
  
  def evaluateDowning(): Unit = {
    val currentMembers = cluster.state.members.size
    val reachableMembers = currentMembers - unreachableMembers.size
    val quorumSize = (currentMembers / 2) + 1
    
    if (reachableMembers >= quorumSize) {
      // We have quorum, safe to down unreachable members
      unreachableMembers.foreach { member =>
        log.info("Downing unreachable member: {}", member.address)
        cluster.down(member.address)
      }
    } else {
      log.warning("Cannot down members - would lose quorum ({} reachable, need {})", 
        reachableMembers, quorumSize)
    }
  }
  
  override def postStop(): Unit = {
    cluster.unsubscribe(self)
  }
}

// Configuration
// akka.cluster.downing-provider-class = "com.myapp.QuorumBasedDowningProvider"

Default Downing Providers

Built-in downing provider implementations.

/**
 * Default downing provider used when no provider is configured and 
 * 'auto-down-unreachable-after' is not enabled.
 */
final class NoDowning(system: ActorSystem) extends DowningProvider {
  override def downRemovalMargin: FiniteDuration = Cluster(system).settings.DownRemovalMargin
  override val downingActorProps: Option[Props] = None
}

/**
 * Downing provider used when auto-down is enabled.
 * Automatically downs unreachable members after configured timeout.
 */
class AutoDowning(system: ActorSystem) extends DowningProvider {
  override def downRemovalMargin: FiniteDuration
  override def downingActorProps: Option[Props]
}

Auto-Down Configuration:

# Enable automatic downing (use with caution in production)
akka.cluster.auto-down-unreachable-after = 10s

# Or use custom downing provider
akka.cluster {
  downing-provider-class = "com.myapp.CustomDowningProvider"
  down-removal-margin = 30s
}

Advanced Downing Strategies

Examples of more sophisticated downing strategies.

// Split-brain resolver based on oldest member
class OldestMemberDowningProvider(system: ActorSystem) extends DowningProvider {
  override def downRemovalMargin: FiniteDuration = 20.seconds
  
  override def downingActorProps: Option[Props] = 
    Some(Props(new OldestMemberDowningActor()))
}

class OldestMemberDowningActor extends Actor with ActorLogging {
  val cluster = Cluster(context.system)
  
  override def preStart(): Unit = {
    cluster.subscribe(self, classOf[UnreachableMember])
  }
  
  def receive = {
    case UnreachableMember(member) =>
      val reachableMembers = cluster.state.members -- cluster.state.unreachable
      val oldestReachable = reachableMembers.minBy(_.upNumber)
      
      if (cluster.selfMember == oldestReachable) {
        // I'm the oldest reachable member, I decide who to down
        log.info("As oldest member, downing unreachable: {}", member.address)
        cluster.down(member.address)
      } else {
        log.info("Not oldest member, waiting for {} to make downing decision", 
          oldestReachable.address)
      }
  }
  
  override def postStop(): Unit = {
    cluster.unsubscribe(self)
  }
}

// Role-based downing - only down members not in critical roles
class RoleBasedDowningProvider(system: ActorSystem) extends DowningProvider {
  val criticalRoles = Set("database", "master")
  
  override def downRemovalMargin: FiniteDuration = 15.seconds
  
  override def downingActorProps: Option[Props] = 
    Some(Props(new RoleBasedDowningActor(criticalRoles)))
}

class RoleBasedDowningActor(criticalRoles: Set[String]) extends Actor with ActorLogging {
  val cluster = Cluster(context.system)
  
  override def preStart(): Unit = {
    cluster.subscribe(self, classOf[UnreachableMember])
  }
  
  def receive = {
    case UnreachableMember(member) =>
      val hasCriticalRole = member.roles.intersect(criticalRoles).nonEmpty
      
      if (hasCriticalRole) {
        log.warning("NOT downing member {} with critical roles: {}", 
          member.address, member.roles.intersect(criticalRoles))
      } else {
        log.info("Downing non-critical member: {} (roles: {})", 
          member.address, member.roles)
        cluster.down(member.address)
      }
  }
  
  override def postStop(): Unit = {
    cluster.unsubscribe(self)
  }
}

Join Configuration Compatibility Checker

SPI for validating configuration compatibility when nodes attempt to join the cluster.

/**
 * Service provider interface for validating configuration compatibility when nodes join.
 * Implementations must have a constructor accepting ActorSystem and ClusterSettings.
 */
abstract class JoinConfigCompatChecker {
  /** Configuration keys that must be present and validated */
  def requiredKeys: immutable.Seq[String]
  
  /** 
   * Check if the joining node's configuration is compatible.
   * @param toCheck Configuration from joining node
   * @param actualConfig This node's configuration  
   * @return Valid if compatible, Invalid with error messages if not
   */
  def check(toCheck: Config, actualConfig: Config): ConfigValidation
}

/**
 * Configuration validation result
 */
sealed trait ConfigValidation
case object Valid extends ConfigValidation  
case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation

object JoinConfigCompatChecker {
  /** Factory method to load configured checker */
  def load(system: ActorSystem, settings: ClusterSettings): JoinConfigCompatChecker
  
  /** Utility to check key existence */
  def exists(requiredKeys: immutable.Seq[String], toCheck: Config): ConfigValidation
  
  /** Utility to check exact value match */
  def fullMatch(requiredKeys: immutable.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation
}

Usage Examples:

// Application version compatibility checker
class AppVersionCompatChecker(system: ActorSystem, settings: ClusterSettings) 
    extends JoinConfigCompatChecker {
  
  val requiredKeys = List(
    "akka.actor.provider",
    "akka.cluster.roles", 
    "app.version",
    "app.api-version"
  )
  
  def check(toCheck: Config, actualConfig: Config): ConfigValidation = {
    // First check required keys exist
    JoinConfigCompatChecker.exists(requiredKeys, toCheck) match {
      case Valid =>
        // Custom validation logic
        validateVersions(toCheck, actualConfig)
      case invalid => invalid
    }
  }
  
  private def validateVersions(joining: Config, actual: Config): ConfigValidation = {
    val joiningAppVersion = joining.getString("app.version")
    val joiningApiVersion = joining.getString("app.api-version")
    val actualAppVersion = actual.getString("app.version")
    val actualApiVersion = actual.getString("app.api-version")
    
    val errors = scala.collection.mutable.ListBuffer[String]()
    
    // Allow same major version
    if (!isCompatibleVersion(joiningAppVersion, actualAppVersion)) {
      errors += s"Incompatible app version: $joiningAppVersion vs $actualAppVersion"
    }
    
    // API version must match exactly
    if (joiningApiVersion != actualApiVersion) {
      errors += s"API version mismatch: $joiningApiVersion vs $actualApiVersion"  
    }
    
    // Check cluster roles compatibility
    val joiningRoles = joining.getStringList("akka.cluster.roles").asScala.toSet
    val actualRoles = actual.getStringList("akka.cluster.roles").asScala.toSet
    val allowedRoleCombinations = Set(
      Set("frontend", "api"),
      Set("backend", "worker"),
      Set("database")
    )
    
    if (!allowedRoleCombinations.exists(allowed => joiningRoles.subsetOf(allowed))) {
      errors += s"Invalid role combination: ${joiningRoles.mkString(", ")}"
    }
    
    if (errors.isEmpty) Valid else Invalid(errors.toList)
  }
  
  private def isCompatibleVersion(v1: String, v2: String): Boolean = {
    // Simple major.minor.patch compatibility
    val Array(major1, _, _) = v1.split("\\.")
    val Array(major2, _, _) = v2.split("\\.")
    major1 == major2
  }
}

// Configuration
// akka.cluster.configuration-compatibility-check.checker-class = "com.myapp.AppVersionCompatChecker"

Environment-Specific Compatibility Checkers

// Environment-aware compatibility checker
class EnvironmentCompatChecker(system: ActorSystem, settings: ClusterSettings)
    extends JoinConfigCompatChecker {
  
  val requiredKeys = List(
    "app.environment",
    "app.datacenter", 
    "app.instance-type"
  )
  
  def check(toCheck: Config, actualConfig: Config): ConfigValidation = {
    JoinConfigCompatChecker.exists(requiredKeys, toCheck) match {
      case Valid => validateEnvironment(toCheck, actualConfig)
      case invalid => invalid
    }
  }
  
  private def validateEnvironment(joining: Config, actual: Config): ConfigValidation = {
    val joiningEnv = joining.getString("app.environment")
    val actualEnv = actual.getString("app.environment")
    
    // Only allow same environment
    if (joiningEnv != actualEnv) {
      return Invalid(List(s"Environment mismatch: $joiningEnv cannot join $actualEnv cluster"))
    }
    
    val joiningDc = joining.getString("app.datacenter")
    val actualDc = actual.getString("app.datacenter")
    val joiningInstanceType = joining.getString("app.instance-type")
    
    // Validate data center compatibility
    val compatibleDcs = Map(
      "us-east-1" -> Set("us-east-1", "us-west-2"),
      "eu-west-1" -> Set("eu-west-1", "eu-central-1")
    )
    
    compatibleDcs.get(actualDc) match {
      case Some(allowed) if !allowed.contains(joiningDc) =>
        Invalid(List(s"Data center $joiningDc not compatible with $actualDc"))
      case _ => 
        // Additional instance type validation
        validateInstanceType(joiningInstanceType, joiningDc)
    }
  }
  
  private def validateInstanceType(instanceType: String, dc: String): ConfigValidation = {
    val allowedTypes = Map(
      "us-east-1" -> Set("m5.large", "m5.xlarge", "c5.large"),
      "eu-west-1" -> Set("m5.large", "c5.large")
    )
    
    allowedTypes.get(dc) match {
      case Some(allowed) if !allowed.contains(instanceType) =>
        Invalid(List(s"Instance type $instanceType not allowed in $dc"))
      case _ => Valid
    }
  }
}

// Development vs Production checker
class DevProdCompatChecker(system: ActorSystem, settings: ClusterSettings)
    extends JoinConfigCompatChecker {
  
  val requiredKeys = List("app.mode", "app.debug-enabled")
  
  def check(toCheck: Config, actualConfig: Config): ConfigValidation = {
    val joiningMode = toCheck.getString("app.mode")
    val actualMode = actualConfig.getString("app.mode")
    
    // Strict separation of dev and prod
    if (joiningMode != actualMode) {
      Invalid(List(s"Cannot mix $joiningMode and $actualMode nodes in same cluster"))
    } else {
      // In development, allow debug mismatch with warning
      // In production, require exact match
      if (actualMode == "production") {
        JoinConfigCompatChecker.fullMatch(requiredKeys, toCheck, actualConfig)
      } else {
        Valid // Dev mode is more permissive
      }
    }
  }
}

Downing Provider Factory Pattern

Pattern for creating configurable downing providers.

// Factory for creating different downing strategies
object DowningProviderFactory {
  def create(strategy: String, system: ActorSystem): DowningProvider = strategy match {
    case "none" => new NoDowning(system)
    case "auto" => new AutoDowning(system)
    case "quorum" => new QuorumBasedDowningProvider(system)
    case "oldest" => new OldestMemberDowningProvider(system) 
    case "role-based" => new RoleBasedDowningProvider(system)
    case custom => 
      // Load custom provider by class name
      system.asInstanceOf[ExtendedActorSystem].dynamicAccess
        .createInstanceFor[DowningProvider](custom, List(classOf[ActorSystem] -> system))
        .get
  }
}

// Configuration-driven provider
class ConfigurableDowningProvider(system: ActorSystem) extends DowningProvider {
  val config = system.settings.config.getConfig("app.cluster.downing")
  val strategy = config.getString("strategy")
  val delegate = DowningProviderFactory.create(strategy, system)
  
  override def downRemovalMargin: FiniteDuration = delegate.downRemovalMargin
  override def downingActorProps: Option[Props] = delegate.downingActorProps
}

// Configuration
/*
app.cluster.downing {
  strategy = "quorum"  # or "auto", "oldest", "role-based", etc.
}
akka.cluster.downing-provider-class = "com.myapp.ConfigurableDowningProvider"
*/

Types

// Downing provider SPI
abstract class DowningProvider {
  def downRemovalMargin: FiniteDuration
  def downingActorProps: Option[Props]
}

// Built-in providers
final class NoDowning(system: ActorSystem) extends DowningProvider
class AutoDowning(system: ActorSystem) extends DowningProvider

// Configuration validation SPI
abstract class JoinConfigCompatChecker {
  def requiredKeys: immutable.Seq[String]
  def check(toCheck: Config, actualConfig: Config): ConfigValidation
}

// Validation result types
sealed trait ConfigValidation
case object Valid extends ConfigValidation
case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation

// Factory methods
object JoinConfigCompatChecker {
  def load(system: ActorSystem, settings: ClusterSettings): JoinConfigCompatChecker
  def exists(requiredKeys: immutable.Seq[String], toCheck: Config): ConfigValidation
  def fullMatch(requiredKeys: immutable.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-cluster

docs

cluster-management.md

cluster-routing.md

configuration-and-management.md

events-and-state.md

extensibility.md

index.md

members-and-status.md

tile.json