Akka Cluster provides a fault-tolerant decentralized peer-to-peer based cluster membership service with no single point of failure or single point of bottleneck using gossip protocols and automatic failure detection.
—
Cluster configuration management, settings, and JMX integration for monitoring and management. This covers how to configure cluster behavior, monitor cluster health, and manage clusters programmatically and through JMX.
Core cluster configuration loaded from application.conf with comprehensive settings for all cluster aspects.
/**
* Cluster configuration settings loaded from config.
* Provides access to all cluster-related configuration values.
*/
class ClusterSettings(val config: Config, val systemName: String) {
/** Configured seed nodes for initial cluster joining */
val SeedNodes: immutable.IndexedSeq[Address]
/** Roles assigned to this cluster node */
val Roles: Set[String]
/** Data center this node belongs to */
val SelfDataCenter: DataCenter
/** Heartbeat interval between cluster nodes */
val HeartbeatInterval: FiniteDuration
/** Expected heartbeat response time */
val HeartbeatExpectedResponseAfter: FiniteDuration
/** Number of members that monitor each member for failure detection */
val MonitoredByNrOfMembers: Int
/** Enable info level cluster logging */
val LogInfo: Boolean
/** Enable JMX monitoring and management */
val JmxEnabled: Boolean
/** Auto-down unreachable members after this duration (if configured) */
val AutoDownUnreachableAfter: FiniteDuration
/** Minimum number of members before leader actions */
val MinNrOfMembers: Int
/** Minimum number of members per role before leader actions */
val MinNrOfMembersOfRole: Map[String, Int]
/** Gossip interval for cluster state dissemination */
val GossipInterval: FiniteDuration
/** Failure detector implementation class */
val FailureDetectorImplementationClass: String
/** Failure detector configuration */
val FailureDetectorConfig: Config
/** Downing provider class name */
val DowningProviderClassName: String
/** Dispatcher to use for cluster actors */
val UseDispatcher: String
}Configuration Example (application.conf):
akka {
actor {
provider = "cluster"
}
cluster {
# Seed nodes for joining the cluster
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"
]
# Node roles
roles = ["frontend", "backend"]
# Data center (for multi-DC clusters)
multi-data-center.self-data-center = "dc1"
# Minimum cluster size before leader actions
min-nr-of-members = 3
min-nr-of-members-of-role {
backend = 2
frontend = 1
}
# Failure detection
failure-detector {
implementation-class = "akka.remote.PhiAccrualFailureDetector"
heartbeat-interval = 1s
threshold = 8.0
max-sample-size = 1000
min-std-deviation = 100ms
acceptable-heartbeat-pause = 3s
monitored-by-nr-of-members = 5
}
# Auto-down unreachable (use with caution in production)
auto-down-unreachable-after = off
# Downing provider
downing-provider-class = "akka.cluster.NoDowning"
# Gossip settings
gossip-interval = 1s
gossip-time-to-live = 2s
# JMX monitoring
jmx.enabled = on
# Logging
log-info = on
}
}Usage Examples:
val cluster = Cluster(system)
val settings = cluster.settings
println(s"Seed nodes: ${settings.SeedNodes}")
println(s"My roles: ${settings.Roles}")
println(s"Data center: ${settings.SelfDataCenter}")
println(s"Heartbeat interval: ${settings.HeartbeatInterval}")
println(s"Min cluster size: ${settings.MinNrOfMembers}")
println(s"JMX enabled: ${settings.JmxEnabled}")
// Check role-specific minimums
settings.MinNrOfMembersOfRole.foreach { case (role, minCount) =>
println(s"Minimum $role nodes: $minCount")
}JMX interface for cluster node management and monitoring providing operational control.
/**
* JMX management interface for cluster operations and monitoring.
* Accessible via JMX clients like JConsole, VisualVM.
*/
trait ClusterNodeMBean {
/** Get current member status as string */
def getMemberStatus: String
/** Get comma-separated member addresses */
def getMembers: String
/** Get comma-separated unreachable member addresses */
def getUnreachable: String
/** Get comprehensive cluster status as JSON string */
def getClusterStatus: String
/** Get current cluster leader address */
def getLeader: String
/** Check if cluster has only one member */
def isSingleton: Boolean
/** Check if this node is available (Up status) */
def isAvailable: Boolean
/** Join cluster at specified address */
def join(address: String): Unit
/** Leave cluster for specified address */
def leave(address: String): Unit
/** Mark specified address as down */
def down(address: String): Unit
}JMX Usage Examples:
// JMX is automatically enabled if akka.cluster.jmx.enabled = on
// Access via JConsole at: akka:type=Cluster
// Programmatic JMX access
import javax.management.{MBeanServer, ObjectName}
import java.lang.management.ManagementFactory
val server: MBeanServer = ManagementFactory.getPlatformMBeanServer
val objectName = new ObjectName("akka:type=Cluster")
// Get cluster information
val memberStatus = server.getAttribute(objectName, "MemberStatus").asInstanceOf[String]
val members = server.getAttribute(objectName, "Members").asInstanceOf[String]
val leader = server.getAttribute(objectName, "Leader").asInstanceOf[String]
val isAvailable = server.getAttribute(objectName, "Available").asInstanceOf[Boolean]
println(s"Status: $memberStatus")
println(s"Members: $members")
println(s"Leader: $leader")
println(s"Available: $isAvailable")
// Perform cluster operations
server.invoke(objectName, "join", Array("akka.tcp://System@host:2551"), Array("java.lang.String"))
server.invoke(objectName, "leave", Array("akka.tcp://System@host:2552"), Array("java.lang.String"))Constants and type definitions used throughout cluster configuration.
object ClusterSettings {
/** Default data center name */
val DefaultDataCenter: String = "default"
/** Prefix for data center role names */
val DcRolePrefix: String = "dc-"
/** Type alias for data center identifiers */
type DataCenter = String
/** Multi-data center specific settings */
object MultiDataCenter {
/** Cross data center failure detector settings */
val CrossDcFailureDetectorSettings: FailureDetectorSettings
/** Cross data center connections configuration */
val CrossDcConnections: Int
}
/** Debug logging settings */
object Debug {
/** Enable verbose logging of cluster events */
val LogInfo: Boolean
/** Enable debug logging of gossip */
val LogGossip: Boolean
}
}Usage Examples:
import akka.cluster.ClusterSettings._
// Check if using default data center
if (cluster.selfDataCenter == DefaultDataCenter) {
println("Using default data center")
}
// Work with data center roles
val dcRole = s"$DcRolePrefix${cluster.selfDataCenter}"
if (cluster.selfRoles.contains(dcRole)) {
println(s"Node has data center role: $dcRole")
}
// Multi-DC settings
val settings = cluster.settings
println(s"Cross-DC connections: ${settings.MultiDataCenter.CrossDcConnections}")Read-only view of cluster state for monitoring and inspection without modification capabilities.
/**
* Read-only view of the cluster state.
* Thread-safe access to current cluster information.
*/
class ClusterReadView(cluster: Cluster) {
/** Current cluster state snapshot */
def state: CurrentClusterState
/** This member's information */
def self: Member
/** Current cluster members */
def members: immutable.SortedSet[Member]
/** Unreachable members */
def unreachable: Set[Member]
/** Current leader address */
def leader: Option[Address]
/** Check if cluster has converged (all members seen latest gossip) */
def isConverged: Boolean
/** Check if this node is the leader */
def isLeader: Boolean
}Usage Examples:
val readView = cluster.readView
// Monitor cluster health
def printClusterHealth(): Unit = {
val state = readView.state
println(s"=== Cluster Health ===")
println(s"Total members: ${state.members.size}")
println(s"Unreachable: ${state.unreachable.size}")
println(s"Leader: ${state.leader.getOrElse("None")}")
println(s"Converged: ${readView.isConverged}")
println(s"Am I leader: ${readView.isLeader}")
// Members by status
val membersByStatus = state.members.groupBy(_.status)
membersByStatus.foreach { case (status, members) =>
println(s"$status: ${members.size}")
}
}
// Scheduled health check
import scala.concurrent.duration._
system.scheduler.scheduleWithFixedDelay(30.seconds, 30.seconds) { () =>
printClusterHealth()
}Configuration compatibility checking when nodes join the cluster.
/**
* Configuration validation result
*/
sealed trait ConfigValidation
case object Valid extends ConfigValidation
case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation
/**
* Service provider interface for join configuration validation
*/
abstract class JoinConfigCompatChecker {
/** Configuration keys that must be validated */
def requiredKeys: immutable.Seq[String]
/** Check configuration compatibility */
def check(toCheck: Config, actualConfig: Config): ConfigValidation
}
object JoinConfigCompatChecker {
/** Check if required keys exist */
def exists(requiredKeys: immutable.Seq[String], toCheck: Config): ConfigValidation
/** Check if configurations match exactly */
def fullMatch(requiredKeys: immutable.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation
}Usage Examples:
// Custom configuration validator
class MyConfigChecker(system: ActorSystem, settings: ClusterSettings) extends JoinConfigCompatChecker {
val requiredKeys = List(
"akka.actor.provider",
"akka.cluster.roles",
"my-app.version"
)
def check(toCheck: Config, actualConfig: Config): ConfigValidation = {
// Check that joining node has compatible configuration
val existsCheck = JoinConfigCompatChecker.exists(requiredKeys, toCheck)
existsCheck match {
case Valid =>
// Additional custom validation
val joiningVersion = toCheck.getString("my-app.version")
val myVersion = actualConfig.getString("my-app.version")
if (joiningVersion == myVersion) Valid
else Invalid(List(s"Version mismatch: $joiningVersion != $myVersion"))
case invalid => invalid
}
}
}
// Configuration in application.conf
// akka.cluster.configuration-compatibility-check.checker-class = "com.myapp.MyConfigChecker"Configurable failure detection for monitoring cluster member health.
// Failure detector configuration options
// akka.cluster.failure-detector.implementation-class
// akka.cluster.failure-detector.heartbeat-interval
// akka.cluster.failure-detector.threshold
// akka.cluster.failure-detector.max-sample-size
// akka.cluster.failure-detector.min-std-deviation
// akka.cluster.failure-detector.acceptable-heartbeat-pause
// akka.cluster.failure-detector.monitored-by-nr-of-membersFailure Detection Examples:
# Phi Accrual Failure Detector (default)
akka.cluster.failure-detector {
implementation-class = "akka.remote.PhiAccrualFailureDetector"
heartbeat-interval = 1s
threshold = 8.0 # Higher = more tolerant to network issues
max-sample-size = 1000
min-std-deviation = 100ms
acceptable-heartbeat-pause = 3s # Allow GC pauses
monitored-by-nr-of-members = 5 # Members monitoring each member
}
# Deadline Failure Detector (simpler, less adaptive)
akka.cluster.failure-detector {
implementation-class = "akka.remote.DeadlineFailureDetector"
heartbeat-interval = 1s
acceptable-heartbeat-pause = 10s # Fixed timeout
}// Configuration types
class ClusterSettings(config: Config, systemName: String)
type DataCenter = String
// JMX management interface
trait ClusterNodeMBean
// Configuration validation
sealed trait ConfigValidation
case object Valid extends ConfigValidation
case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation
// Read-only cluster view
class ClusterReadView(cluster: Cluster)
// Settings constants
object ClusterSettings {
val DefaultDataCenter: String
val DcRolePrefix: String
type DataCenter = String
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-cluster