Akka Cluster provides a fault-tolerant decentralized peer-to-peer based cluster membership service with no single point of failure or single point of bottleneck using gossip protocols and automatic failure detection.
—
Comprehensive event system for monitoring cluster state changes, member lifecycle events, and reachability updates. The Akka Cluster event system provides fine-grained notifications about all aspects of cluster membership and health.
Snapshot of the current cluster state containing all members, their status, and leadership information.
/**
* Current snapshot state of the cluster. Sent to new subscriber.
* @param leader leader of the data center of this node
*/
class CurrentClusterState(
val members: immutable.SortedSet[Member],
val unreachable: Set[Member],
val seenBy: Set[Address],
val leader: Option[Address],
val roleLeaderMap: Map[String, Option[Address]],
val unreachableDataCenters: Set[DataCenter]
) {
/** Get current leader for specific role */
def roleLeader(role: String): Option[Address]
/** All node roles in the cluster */
def allRoles: Set[String]
/** All data centers in the cluster */
def allDataCenters: Set[String]
/** Java API: get current member list */
def getMembers: java.lang.Iterable[Member]
/** Java API: get current unreachable set */
def getUnreachable: java.util.Set[Member]
/** Java API: get current leader address, or null if none */
def getLeader: Address
/** Java API: All data centers in the cluster */
def getAllDataCenters: java.util.Set[String]
/** Java API: All unreachable data centers in the cluster */
def getUnreachableDataCenters: java.util.Set[String]
}Usage Example:
val state = cluster.state
println(s"Total members: ${state.members.size}")
println(s"Unreachable members: ${state.unreachable.size}")
println(s"Current leader: ${state.leader}")
println(s"All roles: ${state.allRoles}")
// Check for specific member
val targetAddress = Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2551)
val targetMember = state.members.find(_.address == targetAddress)
targetMember.foreach(m => println(s"Target member status: ${m.status}"))Core event interfaces that all cluster events implement.
/**
* Marker interface for cluster domain events.
* Not intended for user extension.
*/
trait ClusterDomainEvent extends DeadLetterSuppression
/**
* Marker interface for membership events.
* Published when the state change is first seen on a node.
*/
sealed trait MemberEvent extends ClusterDomainEvent {
def member: Member
}
/**
* Marker interface to facilitate subscription of both UnreachableMember and ReachableMember.
*/
sealed trait ReachabilityEvent extends ClusterDomainEvent {
def member: Member
}
/**
* Marker interface for data center reachability events.
*/
sealed trait DataCenterReachabilityEvent extends ClusterDomainEventEvents fired when members join, leave, or change status in the cluster.
/** Member status changed to Joining */
case class MemberJoined(member: Member) extends MemberEvent
/**
* Member status changed to WeaklyUp.
* A joining member can be moved to WeaklyUp if convergence
* cannot be reached, i.e. there are unreachable nodes.
* It will be moved to Up when convergence is reached.
*/
case class MemberWeaklyUp(member: Member) extends MemberEvent
/** Member status changed to Up */
case class MemberUp(member: Member) extends MemberEvent
/** Member status changed to Leaving */
case class MemberLeft(member: Member) extends MemberEvent
/**
* Member status changed to MemberStatus.Exiting and will be removed
* when all members have seen the Exiting status.
*/
case class MemberExited(member: Member) extends MemberEvent
/**
* Member status changed to MemberStatus.Down and will be removed
* when all members have seen the Down status.
*/
case class MemberDowned(member: Member) extends MemberEvent
/**
* Member completely removed from the cluster.
* When previousStatus is MemberStatus.Down the node was removed
* after being detected as unreachable and downed.
* When previousStatus is MemberStatus.Exiting the node was removed
* after graceful leaving and exiting.
*/
case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEventUsage Example:
import akka.cluster.ClusterEvent._
class ClusterListener extends Actor with ActorLogging {
def receive = {
case MemberUp(member) =>
log.info("Member is Up: {}", member.address)
case MemberJoined(member) =>
log.info("Member joined: {}", member.address)
case MemberLeft(member) =>
log.info("Member left: {}", member.address)
case MemberExited(member) =>
log.info("Member exited: {}", member.address)
case MemberRemoved(member, previousStatus) =>
log.info("Member removed: {} (was: {})", member.address, previousStatus)
case _: MemberEvent => // ignore
}
}Events related to cluster leadership changes at both cluster and role levels.
/**
* Leader of the cluster data center of this node changed.
* Published when the state change is first seen on a node.
*/
case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent {
/** Java API: get address of current leader, or null if none */
def getLeader: Address = leader.orNull
}
/**
* First member (leader) of the members within a role set changed.
* Published when the state change is first seen on a node.
*/
case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent {
/** Java API: get address of current leader, or null if none */
def getLeader: Address = leader.orNull
}Usage Example:
case LeaderChanged(newLeader) =>
newLeader match {
case Some(address) => log.info("New leader: {}", address)
case None => log.info("No leader currently")
}
case RoleLeaderChanged(role, newLeader) =>
log.info("Role '{}' leader changed to: {}", role, newLeader.getOrElse("none"))Events indicating when cluster members become reachable or unreachable from the perspective of the failure detector.
/** A member is considered as unreachable by the failure detector */
case class UnreachableMember(member: Member) extends ReachabilityEvent
/**
* A member is considered as reachable by the failure detector
* after having been unreachable.
*/
case class ReachableMember(member: Member) extends ReachabilityEventUsage Example:
case UnreachableMember(member) =>
log.warning("Member became unreachable: {}", member.address)
// Maybe take action like redistributing work
case ReachableMember(member) =>
log.info("Member became reachable again: {}", member.address)
// Member is back, can route work to it againEvents for multi-data center clusters indicating when entire data centers become unreachable.
/** A data center is considered as unreachable when any members from the data center are unreachable */
case class UnreachableDataCenter(dataCenter: DataCenter) extends DataCenterReachabilityEvent
/** A data center is considered reachable when all members from the data center are reachable */
case class ReachableDataCenter(dataCenter: DataCenter) extends DataCenterReachabilityEventUsage Example:
case UnreachableDataCenter(dc) =>
log.warning("Data center became unreachable: {}", dc)
// Adjust routing to avoid unreachable DC
case ReachableDataCenter(dc) =>
log.info("Data center became reachable: {}", dc)
// Can route to this DC againEvents indicating cluster shutdown.
/**
* This event is published when the cluster node is shutting down,
* before the final MemberRemoved events are published.
*/
case object ClusterShuttingDown extends ClusterDomainEventUsage Example:
case ClusterShuttingDown =>
log.info("Cluster is shutting down")
// Perform cleanup before final shutdownMethods for subscribing to cluster events with different initial state modes.
/**
* When using this subscription mode a snapshot of
* CurrentClusterState will be sent to the subscriber as the first message.
*/
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
/**
* When using this subscription mode the events corresponding
* to the current state will be sent to the subscriber to mimic what you would
* have seen if you were listening to the events when they occurred in the past.
*/
case object InitialStateAsEvents extends SubscriptionInitialStateModeUsage Examples:
// Subscribe with snapshot - get current state immediately
cluster.subscribe(listener, InitialStateAsSnapshot, classOf[MemberEvent])
// Subscribe with event replay - replay all past events to current state
cluster.subscribe(listener, InitialStateAsEvents, classOf[MemberEvent])
// Subscribe to multiple event types
cluster.subscribe(listener,
classOf[MemberEvent],
classOf[ReachabilityEvent],
classOf[LeaderChanged])
// Subscribe to all cluster events
cluster.subscribe(listener, classOf[ClusterDomainEvent])import akka.cluster.ClusterEvent._
class ComprehensiveClusterListener extends Actor with ActorLogging {
override def preStart(): Unit = {
val cluster = Cluster(context.system)
cluster.subscribe(self, InitialStateAsSnapshot,
classOf[MemberEvent],
classOf[ReachabilityEvent],
classOf[LeaderChanged],
classOf[RoleLeaderChanged],
classOf[DataCenterReachabilityEvent])
}
def receive = {
// Initial state
case state: CurrentClusterState =>
log.info("Current cluster state: {} members", state.members.size)
state.members.foreach(m => log.info("Member: {} - {}", m.address, m.status))
// Member lifecycle
case MemberUp(member) =>
log.info("Member UP: {}", member.address)
case MemberJoined(member) =>
log.info("Member JOINED: {}", member.address)
case MemberLeft(member) =>
log.info("Member LEFT: {}", member.address)
case MemberExited(member) =>
log.info("Member EXITED: {}", member.address)
case MemberRemoved(member, previousStatus) =>
log.info("Member REMOVED: {} (was {})", member.address, previousStatus)
// Reachability
case UnreachableMember(member) =>
log.warning("Member UNREACHABLE: {}", member.address)
case ReachableMember(member) =>
log.info("Member REACHABLE: {}", member.address)
// Leadership
case LeaderChanged(leader) =>
log.info("Leader changed: {}", leader.getOrElse("none"))
case RoleLeaderChanged(role, leader) =>
log.info("Role '{}' leader changed: {}", role, leader.getOrElse("none"))
// Data center events
case UnreachableDataCenter(dc) =>
log.warning("Data center UNREACHABLE: {}", dc)
case ReachableDataCenter(dc) =>
log.info("Data center REACHABLE: {}", dc)
// Shutdown
case ClusterShuttingDown =>
log.info("Cluster shutting down")
}
override def postStop(): Unit = {
val cluster = Cluster(context.system)
cluster.unsubscribe(self)
}
}// Subscription initial state modes
sealed abstract class SubscriptionInitialStateMode
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
case object InitialStateAsEvents extends SubscriptionInitialStateMode
// Data center type
type DataCenter = StringInstall with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-cluster