The Akka Cluster event system provides comprehensive monitoring of cluster state changes through a publisher-subscriber model. Applications can subscribe to specific event types to build cluster-aware behavior.
Subscribe to cluster events with automatic initial state delivery:
// Subscription methods on Cluster
def subscribe(subscriber: ActorRef, to: Class[_]*): Unit
def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit
def unsubscribe(subscriber: ActorRef): Unit
def unsubscribe(subscriber: ActorRef, to: Class[_]): Unit
def sendCurrentClusterState(receiver: ActorRef): Unitsealed abstract class SubscriptionInitialStateMode
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
case object InitialStateAsEvents extends SubscriptionInitialStateMode
// Java API
def initialStateAsSnapshot: SubscriptionInitialStateMode
def initialStateAsEvents: SubscriptionInitialStateModeimport akka.cluster.ClusterEvent._
// Subscribe to all member events with snapshot
cluster.subscribe(self, InitialStateAsSnapshot, classOf[MemberEvent])
// Subscribe to specific events with event replay
cluster.subscribe(self, InitialStateAsEvents,
classOf[MemberUp], classOf[MemberRemoved], classOf[UnreachableMember])
// Subscribe to leadership changes
cluster.subscribe(self, classOf[LeaderChanged], classOf[RoleLeaderChanged])
// Unsubscribe from all events
cluster.unsubscribe(self)
// Unsubscribe from specific event type
cluster.unsubscribe(self, classOf[MemberEvent])case class CurrentClusterState(
members: immutable.SortedSet[Member],
unreachable: Set[Member],
seenBy: Set[Address],
leader: Option[Address],
roleLeaderMap: Map[String, Option[Address]],
unreachableDataCenters: Set[DataCenter],
memberTombstones: Set[UniqueAddress] // Internal API
) {
// Member queries
def roleMembers(role: String): immutable.SortedSet[Member]
def unreachableMembers(role: String): Set[Member]
def allDataCenters: Set[DataCenter]
// Utility methods
def copy(members: immutable.SortedSet[Member] = members,
unreachable: Set[Member] = unreachable,
seenBy: Set[Address] = seenBy,
leader: Option[Address] = leader,
roleLeaderMap: Map[String, Option[Address]] = roleLeaderMap,
unreachableDataCenters: Set[DataCenter] = unreachableDataCenters): CurrentClusterState
def copyUnreachable(unreachable: Set[Member]): CurrentClusterState
// Java API
def getMembers: java.lang.Iterable[Member]
def getUnreachable: java.util.Set[Member]
def getRoleLeaderMap: java.util.Map[String, Address]
def getAllDataCenters: java.util.Set[DataCenter]
def getUnreachableDataCenters: java.util.Set[DataCenter]
}// Receive current state as first message after subscription
def receive = {
case state: CurrentClusterState =>
println(s"Current members: ${state.members.size}")
println(s"Leader: ${state.leader}")
println(s"Unreachable: ${state.unreachable.size}")
// Check role leadership
state.roleLeaderMap.foreach { case (role, leaderOpt) =>
println(s"Leader for role '$role': ${leaderOpt.getOrElse("None")}")
}
case other => // Handle events
}trait MemberEvent extends ClusterDomainEvent {
def member: Member
}case class MemberJoined(member: Member) extends MemberEvent
case class MemberWeaklyUp(member: Member) extends MemberEvent
case class MemberUp(member: Member) extends MemberEvent
case class MemberLeft(member: Member) extends MemberEvent
case class MemberPreparingForShutdown(member: Member) extends MemberEvent
case class MemberReadyForShutdown(member: Member) extends MemberEvent
case class MemberExited(member: Member) extends MemberEvent
case class MemberDowned(member: Member) extends MemberEvent
case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEventdef receive = {
case MemberJoined(member) =>
println(s"Member joined: ${member.address}")
case MemberUp(member) =>
println(s"Member is Up: ${member.address}")
if (member.hasRole("backend")) {
// Initialize backend-specific communication
}
case MemberLeft(member) =>
println(s"Member is leaving: ${member.address}")
// Cleanup resources for this member
case MemberRemoved(member, previousStatus) =>
println(s"Member removed: ${member.address}, was: $previousStatus")
// Final cleanup
case MemberDowned(member) =>
println(s"Member marked as down: ${member.address}")
// Handle failure scenario
}case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent
case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEventdef receive = {
case LeaderChanged(Some(leader)) =>
println(s"New cluster leader: $leader")
if (leader == cluster.selfAddress) {
println("This node is now the leader")
// Start leader-specific tasks
}
case LeaderChanged(None) =>
println("No cluster leader currently")
case RoleLeaderChanged(role, Some(leader)) =>
println(s"New leader for role '$role': $leader")
if (cluster.selfMember.hasRole(role) && leader == cluster.selfAddress) {
// This node is now leader for this role
}
case RoleLeaderChanged(role, None) =>
println(s"No leader for role '$role'")
}case class UnreachableMember(member: Member) extends ClusterDomainEvent
case class ReachableMember(member: Member) extends ClusterDomainEvent
case class UnreachableDataCenter(dataCenter: DataCenter) extends ClusterDomainEvent
case class ReachableDataCenter(dataCenter: DataCenter) extends ClusterDomainEventdef receive = {
case UnreachableMember(member) =>
println(s"Member unreachable: ${member.address}")
// Stop sending work to this member
// Potentially trigger failure handling
case ReachableMember(member) =>
println(s"Member reachable again: ${member.address}")
// Resume sending work to this member
case UnreachableDataCenter(dc) =>
println(s"Data center unreachable: $dc")
// Handle cross-DC partition
case ReachableDataCenter(dc) =>
println(s"Data center reachable again: $dc")
// Resume cross-DC operations
}case object ClusterShuttingDown extends ClusterDomainEventdef receive = {
case ClusterShuttingDown =>
println("Cluster is shutting down")
// Prepare for shutdown, save state, etc.
// This is the last cluster event that will be delivered
}import akka.actor.{Actor, ActorLogging}
import akka.cluster.{Cluster, ClusterEvent}
import akka.cluster.ClusterEvent._
class ClusterListener extends Actor with ActorLogging {
val cluster = Cluster(context.system)
override def preStart(): Unit = {
cluster.subscribe(self, InitialStateAsSnapshot,
classOf[MemberEvent], classOf[UnreachableMember])
}
override def postStop(): Unit = {
cluster.unsubscribe(self)
}
def receive = {
case state: CurrentClusterState =>
log.info("Current members: {}", state.members.mkString(", "))
case MemberUp(member) =>
log.info("Member is Up: {}", member.address)
registerMember(member)
case MemberRemoved(member, previousStatus) =>
log.info("Member is Removed: {} after {}", member.address, previousStatus)
deregisterMember(member)
case UnreachableMember(member) =>
log.info("Member detected as unreachable: {}", member)
handleUnreachableMember(member)
case _: MemberEvent => // Ignore other member events
}
def registerMember(member: Member): Unit = {
// Application-specific member registration
}
def deregisterMember(member: Member): Unit = {
// Application-specific member cleanup
}
def handleUnreachableMember(member: Member): Unit = {
// Handle member unreachability
}
}def receive = {
case MemberUp(member) if member.hasRole("worker") =>
// Only handle worker nodes coming up
addWorkerNode(member)
case MemberRemoved(member, _) if member.hasRole("coordinator") =>
// Special handling for coordinator removal
handleCoordinatorRemoval(member)
}def receive = {
case MemberUp(member) if member.dataCenter == cluster.selfDataCenter =>
// Only handle members in same data center
handleLocalMemberUp(member)
case MemberUp(member) =>
// Handle remote data center members differently
handleRemoteMemberUp(member)
}CurrentClusterState is always delivered first (with InitialStateAsSnapshot)