or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cluster-management.mdcluster-routing.mdconfiguration.mdevent-system.mdindex.mdmember-management.mdsplit-brain-resolution.md
tile.json

event-system.mddocs/

Event System

The Akka Cluster event system provides comprehensive monitoring of cluster state changes through a publisher-subscriber model. Applications can subscribe to specific event types to build cluster-aware behavior.

Event Subscription

Basic Subscription

Subscribe to cluster events with automatic initial state delivery:

// Subscription methods on Cluster
def subscribe(subscriber: ActorRef, to: Class[_]*): Unit
def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit
def unsubscribe(subscriber: ActorRef): Unit  
def unsubscribe(subscriber: ActorRef, to: Class[_]): Unit
def sendCurrentClusterState(receiver: ActorRef): Unit

Subscription Modes

sealed abstract class SubscriptionInitialStateMode
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
case object InitialStateAsEvents extends SubscriptionInitialStateMode

// Java API
def initialStateAsSnapshot: SubscriptionInitialStateMode
def initialStateAsEvents: SubscriptionInitialStateMode

Usage Examples

import akka.cluster.ClusterEvent._

// Subscribe to all member events with snapshot
cluster.subscribe(self, InitialStateAsSnapshot, classOf[MemberEvent])

// Subscribe to specific events with event replay
cluster.subscribe(self, InitialStateAsEvents, 
                 classOf[MemberUp], classOf[MemberRemoved], classOf[UnreachableMember])

// Subscribe to leadership changes
cluster.subscribe(self, classOf[LeaderChanged], classOf[RoleLeaderChanged])

// Unsubscribe from all events
cluster.unsubscribe(self)

// Unsubscribe from specific event type  
cluster.unsubscribe(self, classOf[MemberEvent])

Current Cluster State

State Snapshot

case class CurrentClusterState(
  members: immutable.SortedSet[Member],
  unreachable: Set[Member], 
  seenBy: Set[Address],
  leader: Option[Address],
  roleLeaderMap: Map[String, Option[Address]],
  unreachableDataCenters: Set[DataCenter],
  memberTombstones: Set[UniqueAddress] // Internal API
) {
  // Member queries
  def roleMembers(role: String): immutable.SortedSet[Member]
  def unreachableMembers(role: String): Set[Member]
  def allDataCenters: Set[DataCenter]
  
  // Utility methods  
  def copy(members: immutable.SortedSet[Member] = members,
          unreachable: Set[Member] = unreachable,
          seenBy: Set[Address] = seenBy,
          leader: Option[Address] = leader,
          roleLeaderMap: Map[String, Option[Address]] = roleLeaderMap,
          unreachableDataCenters: Set[DataCenter] = unreachableDataCenters): CurrentClusterState
  
  def copyUnreachable(unreachable: Set[Member]): CurrentClusterState
  
  // Java API
  def getMembers: java.lang.Iterable[Member]
  def getUnreachable: java.util.Set[Member]
  def getRoleLeaderMap: java.util.Map[String, Address]
  def getAllDataCenters: java.util.Set[DataCenter]
  def getUnreachableDataCenters: java.util.Set[DataCenter]
}

State Access

// Receive current state as first message after subscription
def receive = {
  case state: CurrentClusterState =>
    println(s"Current members: ${state.members.size}")
    println(s"Leader: ${state.leader}")
    println(s"Unreachable: ${state.unreachable.size}")
    
    // Check role leadership
    state.roleLeaderMap.foreach { case (role, leaderOpt) =>
      println(s"Leader for role '$role': ${leaderOpt.getOrElse("None")}")
    }
    
  case other => // Handle events
}

Member Events

Base Member Event

trait MemberEvent extends ClusterDomainEvent {
  def member: Member
}

Member Lifecycle Events

case class MemberJoined(member: Member) extends MemberEvent
case class MemberWeaklyUp(member: Member) extends MemberEvent  
case class MemberUp(member: Member) extends MemberEvent
case class MemberLeft(member: Member) extends MemberEvent
case class MemberPreparingForShutdown(member: Member) extends MemberEvent
case class MemberReadyForShutdown(member: Member) extends MemberEvent
case class MemberExited(member: Member) extends MemberEvent
case class MemberDowned(member: Member) extends MemberEvent
case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent

Member Event Handling

def receive = {
  case MemberJoined(member) =>
    println(s"Member joined: ${member.address}")
    
  case MemberUp(member) =>
    println(s"Member is Up: ${member.address}")
    if (member.hasRole("backend")) {
      // Initialize backend-specific communication
    }
    
  case MemberLeft(member) =>
    println(s"Member is leaving: ${member.address}")
    // Cleanup resources for this member
    
  case MemberRemoved(member, previousStatus) =>
    println(s"Member removed: ${member.address}, was: $previousStatus")
    // Final cleanup
    
  case MemberDowned(member) =>
    println(s"Member marked as down: ${member.address}")
    // Handle failure scenario
}

Leadership Events

Leadership Change Events

case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent
case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent

Leadership Event Handling

def receive = {
  case LeaderChanged(Some(leader)) =>
    println(s"New cluster leader: $leader")
    if (leader == cluster.selfAddress) {
      println("This node is now the leader")
      // Start leader-specific tasks
    }
    
  case LeaderChanged(None) =>
    println("No cluster leader currently")
    
  case RoleLeaderChanged(role, Some(leader)) =>
    println(s"New leader for role '$role': $leader")
    if (cluster.selfMember.hasRole(role) && leader == cluster.selfAddress) {
      // This node is now leader for this role
    }
    
  case RoleLeaderChanged(role, None) =>
    println(s"No leader for role '$role'")
}

Reachability Events

Reachability Event Types

case class UnreachableMember(member: Member) extends ClusterDomainEvent
case class ReachableMember(member: Member) extends ClusterDomainEvent
case class UnreachableDataCenter(dataCenter: DataCenter) extends ClusterDomainEvent  
case class ReachableDataCenter(dataCenter: DataCenter) extends ClusterDomainEvent

Reachability Event Handling

def receive = {
  case UnreachableMember(member) =>
    println(s"Member unreachable: ${member.address}")
    // Stop sending work to this member
    // Potentially trigger failure handling
    
  case ReachableMember(member) =>
    println(s"Member reachable again: ${member.address}")
    // Resume sending work to this member
    
  case UnreachableDataCenter(dc) =>
    println(s"Data center unreachable: $dc")
    // Handle cross-DC partition
    
  case ReachableDataCenter(dc) =>
    println(s"Data center reachable again: $dc")
    // Resume cross-DC operations
}

Shutdown Events

Cluster Shutdown Event

case object ClusterShuttingDown extends ClusterDomainEvent

Shutdown Event Handling

def receive = {
  case ClusterShuttingDown =>
    println("Cluster is shutting down")
    // Prepare for shutdown, save state, etc.
    // This is the last cluster event that will be delivered
}

Complete Event Handler Example

import akka.actor.{Actor, ActorLogging}
import akka.cluster.{Cluster, ClusterEvent}
import akka.cluster.ClusterEvent._

class ClusterListener extends Actor with ActorLogging {
  val cluster = Cluster(context.system)

  override def preStart(): Unit = {
    cluster.subscribe(self, InitialStateAsSnapshot, 
                     classOf[MemberEvent], classOf[UnreachableMember])
  }

  override def postStop(): Unit = {
    cluster.unsubscribe(self)
  }

  def receive = {
    case state: CurrentClusterState =>
      log.info("Current members: {}", state.members.mkString(", "))
      
    case MemberUp(member) =>
      log.info("Member is Up: {}", member.address)
      registerMember(member)
      
    case MemberRemoved(member, previousStatus) =>
      log.info("Member is Removed: {} after {}", member.address, previousStatus)  
      deregisterMember(member)
      
    case UnreachableMember(member) =>
      log.info("Member detected as unreachable: {}", member)
      handleUnreachableMember(member)
      
    case _: MemberEvent => // Ignore other member events
  }
  
  def registerMember(member: Member): Unit = {
    // Application-specific member registration
  }
  
  def deregisterMember(member: Member): Unit = {
    // Application-specific member cleanup
  }
  
  def handleUnreachableMember(member: Member): Unit = {
    // Handle member unreachability
  }
}

Event Filtering Patterns

Role-Based Filtering

def receive = {
  case MemberUp(member) if member.hasRole("worker") =>
    // Only handle worker nodes coming up
    addWorkerNode(member)
    
  case MemberRemoved(member, _) if member.hasRole("coordinator") =>
    // Special handling for coordinator removal
    handleCoordinatorRemoval(member)
}

Data Center Filtering

def receive = {
  case MemberUp(member) if member.dataCenter == cluster.selfDataCenter =>
    // Only handle members in same data center
    handleLocalMemberUp(member)
    
  case MemberUp(member) =>
    // Handle remote data center members differently
    handleRemoteMemberUp(member)
}

Event Delivery Guarantees

  • Events are delivered in cluster state order
  • No events are lost once subscription is established
  • CurrentClusterState is always delivered first (with InitialStateAsSnapshot)
  • Events are delivered on the subscriber's actor thread
  • Unsubscription stops all future event delivery
  • Events are not delivered to terminated actors