Fault-tolerant decentralized peer-to-peer cluster membership service with no single point of failure for Akka distributed systems
Akka Cluster configuration is managed through the ClusterSettings class and Typesafe Config. This covers all aspects of cluster behavior including membership, failure detection, gossip protocols, and multi-data center operations.
class ClusterSettings(config: Config, systemName: String) {
// Basic cluster settings
def SeedNodes: immutable.IndexedSeq[Address]
def Roles: Set[String]
def SelfDataCenter: DataCenter
def MinNrOfMembers: Int
def MinNrOfMembersOfRole: Map[String, Int]
def AppVersion: Version
// Logging and monitoring
def LogInfo: Boolean
def LogInfoVerbose: Boolean
def JmxEnabled: Boolean
def JmxMultiMbeansInSameEnabled: Boolean
def PublishStatsInterval: Duration
// Failure detection settings
def FailureDetectorImplementationClass: String
def FailureDetectorConfig: Config
def HeartbeatInterval: Duration
def HeartbeatExpectedResponseAfter: Duration
def MonitoredByNrOfMembers: Int
// Downing provider
def DowningProviderClassName: String
// Timing and lifecycle settings
def SeedNodeTimeout: Duration
def RetryUnsuccessfulJoinAfter: Duration
def WeaklyUpAfter: Duration
def AllowWeaklyUpMembers: Boolean
def DownRemovalMargin: Duration
def QuarantineRemovedNodeAfter: Duration
def PruneGossipTombstonesAfter: Duration
// Gossip protocol settings
def GossipInterval: FiniteDuration
def GossipTimeToLive: FiniteDuration
def GossipDifferentViewProbability: Double
def ReduceGossipDifferentViewProbability: Int
def LeaderActionsInterval: FiniteDuration
def UnreachableNodesReaperInterval: FiniteDuration
// Scheduler settings
def PeriodicTasksInitialDelay: Duration
def SchedulerTickDuration: Duration
def SchedulerTicksPerWheel: Int
// Configuration management
def UseDispatcher: String
def RunCoordinatedShutdownWhenDown: Boolean
def ByPassConfigCompatCheck: Boolean
def ConfigCompatCheckers: List[String]
def SensitiveConfigPaths: List[String]
// Debug settings
def Debug: ClusterSettings.Debug
}
object ClusterSettings {
type DataCenter = String
val DefaultDataCenter: DataCenter = "default"
object Debug {
def VerboseHeartbeatLogging: Boolean
def VerboseGossipLogging: Boolean
}
}val cluster = Cluster(system)
val settings = cluster.settings
println(s"Seed nodes: ${settings.SeedNodes}")
println(s"Roles: ${settings.Roles}")
println(s"Min members: ${settings.MinNrOfMembers}")
println(s"Data center: ${settings.SelfDataCenter}")
println(s"Gossip interval: ${settings.GossipInterval}")akka {
actor {
provider = cluster
}
cluster {
# Seed nodes for cluster discovery
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2551",
"akka://ClusterSystem@127.0.0.1:2552"
]
# Minimum number of members before leader actions
min-nr-of-members = 3
# Node roles for role-based operations
roles = ["backend", "compute"]
# Application version for compatibility checking
app-version = "1.0.0"
}
remote.artery {
canonical.hostname = "127.0.0.1"
canonical.port = 2551
}
}akka.cluster {
role {
# Minimum 2 backend nodes before any backend node becomes Up
backend.min-nr-of-members = 2
# Minimum 1 frontend node
frontend.min-nr-of-members = 1
}
}akka.cluster {
# Failure detector implementation
failure-detector {
implementation-class = "akka.remote.PhiAccrualFailureDetector"
# How often keep-alive heartbeat messages should be sent to each connection
heartbeat-interval = 1s
# Defines the failure detector threshold
threshold = 8.0
# Number of potentially lost/delayed heartbeats that will be
# accepted before considering it to be an anomaly
max-sample-size = 1000
# Minimum standard deviation to use for the normal distribution in AccrualFailureDetector
min-std-deviation = 100ms
# Number of member nodes that each member will send heartbeat messages to
monitored-by-nr-of-members = 5
}
}class CustomFailureDetector extends FailureDetector {
def isAvailable: Boolean = // custom logic
def isMonitoring: Boolean = // custom logic
def heartbeat(): Unit = // custom logic
}akka.cluster.failure-detector {
implementation-class = "com.example.CustomFailureDetector"
# Custom failure detector specific configuration
custom-setting = "value"
}akka.cluster {
# How often cluster nodes gossip membership information
gossip-interval = 1s
# Time to live for gossip messages
gossip-time-to-live = 2s
# How often leader performs maintenance tasks
leader-actions-interval = 1s
# How often unreachable nodes reaper runs
unreachable-nodes-reaper-interval = 1s
# Gossip to random node with newer gossip
gossip-different-view-probability = 0.8
# Reduced gossip frequency during convergence
reduce-gossip-different-view-probability = 400
}akka.cluster {
# Disable particular gossiping to speed up large cluster convergence
gossip-different-view-probability = 0.4
# For large clusters (>100 nodes)
gossip-interval = 2s
leader-actions-interval = 2s
# For high-latency networks
gossip-time-to-live = 5s
}object MultiDataCenterSettings {
def CrossDcConnections: Int
def CrossDcGossipProbability: Double
def CrossDcFailureDetectorSettings: FailureDetectorSettings
}akka.cluster {
multi-data-center {
# Data center this node belongs to
self-data-center = "dc-east"
# Number of connections to other data centers
cross-data-center-connections = 5
# Probability of gossiping to other data centers
cross-data-center-gossip-probability = 0.2
# Failure detector for cross data center connections
failure-detector {
heartbeat-interval = 3s
acceptable-heartbeat-pause = 10s
threshold = 12.0
}
}
}akka.cluster {
# Automatically assigned based on self-data-center
roles = ["dc-east", "backend"]
multi-data-center {
self-data-center = "east"
}
}akka.cluster {
downing-provider-class = "akka.cluster.NoDowning"
}akka.cluster {
downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
split-brain-resolver {
# Strategy to use: keep-majority, lease-majority, static-quorum, keep-oldest, down-all
active-strategy = "keep-majority"
# Time margin after downing after which members will be removed
stable-after = 20s
# Down all members if cluster size is less than this
down-all-when-unstable = "on"
}
}class CustomDowningProvider(system: ActorSystem) extends DowningProvider {
override def downRemovalMargin: FiniteDuration = 10.seconds
override def downingActorProps: Option[Props] =
Some(Props(classOf[CustomDowningActor]))
}akka.cluster.downing-provider-class = "com.example.CustomDowningProvider"akka.cluster {
# Enable cluster info logging
log-info = on
# Enable verbose cluster info logging
log-info-verbose = off
# Log cluster state changes at DEBUG level
debug {
verbose-gossip-logging = off
verbose-heartbeat-logging = off
}
}akka.cluster {
# Enable JMX monitoring
jmx.enabled = on
# Cluster MBean name
jmx.multi-mbeans-in-same-jvm = on
}akka.cluster {
# Publish cluster metrics for monitoring
metrics {
enabled = on
native-library-extract-folder = ${user.dir}/target/native
# How often metrics are sampled
sample-interval = 3s
# How often metrics are gossiped
gossip-interval = 3s
# Metrics moving average window
moving-average-half-life = 12s
}
}akka.cluster {
# Use dedicated scheduler for cluster operations
use-dispatcher = "akka.cluster.cluster-dispatcher"
# Cluster scheduler tick duration
scheduler {
tick-duration = 33ms
ticks-per-wheel = 512
}
}
akka.cluster.cluster-dispatcher {
type = "Dispatcher"
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 4
}
}akka.cluster {
# Allow weakly up members in partitioned clusters
allow-weakly-up-members = on
# Quarantine unreachable members after this duration
unreachable-nodes-reaper-interval = 1s
# Auto-down has been removed in Akka 2.6+
# Use split-brain-resolver instead
}akka.management {
cluster.bootstrap {
# Bootstrap method (for cloud environments)
contact-point-discovery {
service-name = "my-service"
discovery-method = "kubernetes-api"
}
}
}trait JoinConfigCompatChecker {
def check(toCheck: Config): ConfigValidation
}
class JoinConfigCompatCheckCluster extends JoinConfigCompatCheckerclass CustomConfigChecker extends JoinConfigCompatChecker {
def check(toCheck: Config): ConfigValidation = {
// Custom validation logic
if (toCheck.getString("app.database-version") == expectedVersion) {
Valid
} else {
Invalid("Database version mismatch")
}
}
}akka.cluster.configuration-compatibility-check {
checkers = ["com.example.CustomConfigChecker"]
}akka.cluster {
seed-nodes = ["akka://dev@127.0.0.1:2551"]
min-nr-of-members = 1
log-info = on
log-info-verbose = on
}akka.cluster {
seed-nodes = [
"akka://prod@node1.example.com:2551",
"akka://prod@node2.example.com:2551",
"akka://prod@node3.example.com:2551"
]
min-nr-of-members = 3
log-info = on
log-info-verbose = off
failure-detector.threshold = 12.0
gossip-interval = 1s
split-brain-resolver {
active-strategy = "keep-majority"
stable-after = 30s
}
}akka {
cluster {
seed-nodes = [] # Use Cluster Bootstrap instead
}
management {
cluster.bootstrap {
contact-point-discovery {
service-name = ${?SERVICE_NAME}
discovery-method = "kubernetes-api"
}
}
}
discovery {
kubernetes-api {
pod-label-selector = "app=%s"
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-cluster-2-12