CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-cluster

Akka Cluster provides a fault-tolerant decentralized peer-to-peer based cluster membership service with no single point of failure or single point of bottleneck using gossip protocols and automatic failure detection.

Pending
Overview
Eval results
Files

events-and-state.mddocs/

Events and State

Comprehensive event system for monitoring cluster state changes, member lifecycle events, and reachability updates. The Akka Cluster event system provides fine-grained notifications about all aspects of cluster membership and health.

Capabilities

Current Cluster State

Snapshot of the current cluster state containing all members, their status, and leadership information.

/**
 * Current snapshot state of the cluster. Sent to new subscriber.
 * @param leader leader of the data center of this node
 */
class CurrentClusterState(
  val members: immutable.SortedSet[Member],
  val unreachable: Set[Member], 
  val seenBy: Set[Address],
  val leader: Option[Address],
  val roleLeaderMap: Map[String, Option[Address]],
  val unreachableDataCenters: Set[DataCenter]
) {
  /** Get current leader for specific role */
  def roleLeader(role: String): Option[Address]
  
  /** All node roles in the cluster */
  def allRoles: Set[String]
  
  /** All data centers in the cluster */  
  def allDataCenters: Set[String]
  
  /** Java API: get current member list */
  def getMembers: java.lang.Iterable[Member]
  
  /** Java API: get current unreachable set */
  def getUnreachable: java.util.Set[Member]
  
  /** Java API: get current leader address, or null if none */
  def getLeader: Address
  
  /** Java API: All data centers in the cluster */
  def getAllDataCenters: java.util.Set[String]
  
  /** Java API: All unreachable data centers in the cluster */
  def getUnreachableDataCenters: java.util.Set[String]
}

Usage Example:

val state = cluster.state
println(s"Total members: ${state.members.size}")
println(s"Unreachable members: ${state.unreachable.size}")
println(s"Current leader: ${state.leader}")
println(s"All roles: ${state.allRoles}")

// Check for specific member
val targetAddress = Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2551)
val targetMember = state.members.find(_.address == targetAddress)
targetMember.foreach(m => println(s"Target member status: ${m.status}"))

Base Event Types

Core event interfaces that all cluster events implement.

/**
 * Marker interface for cluster domain events.
 * Not intended for user extension.
 */
trait ClusterDomainEvent extends DeadLetterSuppression

/**
 * Marker interface for membership events.
 * Published when the state change is first seen on a node.
 */
sealed trait MemberEvent extends ClusterDomainEvent {
  def member: Member
}

/**
 * Marker interface to facilitate subscription of both UnreachableMember and ReachableMember.
 */
sealed trait ReachabilityEvent extends ClusterDomainEvent {
  def member: Member  
}

/**
 * Marker interface for data center reachability events.
 */
sealed trait DataCenterReachabilityEvent extends ClusterDomainEvent

Member Lifecycle Events

Events fired when members join, leave, or change status in the cluster.

/** Member status changed to Joining */
case class MemberJoined(member: Member) extends MemberEvent

/**
 * Member status changed to WeaklyUp.
 * A joining member can be moved to WeaklyUp if convergence
 * cannot be reached, i.e. there are unreachable nodes.
 * It will be moved to Up when convergence is reached.
 */
case class MemberWeaklyUp(member: Member) extends MemberEvent

/** Member status changed to Up */
case class MemberUp(member: Member) extends MemberEvent

/** Member status changed to Leaving */  
case class MemberLeft(member: Member) extends MemberEvent

/**
 * Member status changed to MemberStatus.Exiting and will be removed
 * when all members have seen the Exiting status.
 */
case class MemberExited(member: Member) extends MemberEvent

/**
 * Member status changed to MemberStatus.Down and will be removed
 * when all members have seen the Down status.
 */
case class MemberDowned(member: Member) extends MemberEvent

/**
 * Member completely removed from the cluster.
 * When previousStatus is MemberStatus.Down the node was removed
 * after being detected as unreachable and downed.
 * When previousStatus is MemberStatus.Exiting the node was removed
 * after graceful leaving and exiting.
 */
case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent

Usage Example:

import akka.cluster.ClusterEvent._

class ClusterListener extends Actor with ActorLogging {
  def receive = {
    case MemberUp(member) =>
      log.info("Member is Up: {}", member.address)
      
    case MemberJoined(member) =>
      log.info("Member joined: {}", member.address)
      
    case MemberLeft(member) =>
      log.info("Member left: {}", member.address)
      
    case MemberExited(member) =>
      log.info("Member exited: {}", member.address)
      
    case MemberRemoved(member, previousStatus) =>
      log.info("Member removed: {} (was: {})", member.address, previousStatus)
      
    case _: MemberEvent => // ignore
  }
}

Leadership Events

Events related to cluster leadership changes at both cluster and role levels.

/**
 * Leader of the cluster data center of this node changed. 
 * Published when the state change is first seen on a node.
 */
case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent {
  /** Java API: get address of current leader, or null if none */
  def getLeader: Address = leader.orNull
}

/**
 * First member (leader) of the members within a role set changed.
 * Published when the state change is first seen on a node.
 */
case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent {
  /** Java API: get address of current leader, or null if none */
  def getLeader: Address = leader.orNull
}

Usage Example:

case LeaderChanged(newLeader) =>
  newLeader match {
    case Some(address) => log.info("New leader: {}", address)
    case None => log.info("No leader currently")
  }
  
case RoleLeaderChanged(role, newLeader) =>
  log.info("Role '{}' leader changed to: {}", role, newLeader.getOrElse("none"))

Reachability Events

Events indicating when cluster members become reachable or unreachable from the perspective of the failure detector.

/** A member is considered as unreachable by the failure detector */
case class UnreachableMember(member: Member) extends ReachabilityEvent

/**
 * A member is considered as reachable by the failure detector
 * after having been unreachable.
 */
case class ReachableMember(member: Member) extends ReachabilityEvent

Usage Example:

case UnreachableMember(member) =>
  log.warning("Member became unreachable: {}", member.address)
  // Maybe take action like redistributing work
  
case ReachableMember(member) =>  
  log.info("Member became reachable again: {}", member.address)
  // Member is back, can route work to it again

Data Center Events

Events for multi-data center clusters indicating when entire data centers become unreachable.

/** A data center is considered as unreachable when any members from the data center are unreachable */
case class UnreachableDataCenter(dataCenter: DataCenter) extends DataCenterReachabilityEvent

/** A data center is considered reachable when all members from the data center are reachable */
case class ReachableDataCenter(dataCenter: DataCenter) extends DataCenterReachabilityEvent

Usage Example:

case UnreachableDataCenter(dc) =>
  log.warning("Data center became unreachable: {}", dc)
  // Adjust routing to avoid unreachable DC
  
case ReachableDataCenter(dc) =>
  log.info("Data center became reachable: {}", dc)
  // Can route to this DC again

Shutdown Events

Events indicating cluster shutdown.

/**
 * This event is published when the cluster node is shutting down,
 * before the final MemberRemoved events are published.
 */
case object ClusterShuttingDown extends ClusterDomainEvent

Usage Example:

case ClusterShuttingDown =>
  log.info("Cluster is shutting down")
  // Perform cleanup before final shutdown

Event Subscription

Methods for subscribing to cluster events with different initial state modes.

/**
 * When using this subscription mode a snapshot of
 * CurrentClusterState will be sent to the subscriber as the first message.
 */
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode

/**
 * When using this subscription mode the events corresponding
 * to the current state will be sent to the subscriber to mimic what you would
 * have seen if you were listening to the events when they occurred in the past.
 */
case object InitialStateAsEvents extends SubscriptionInitialStateMode

Usage Examples:

// Subscribe with snapshot - get current state immediately
cluster.subscribe(listener, InitialStateAsSnapshot, classOf[MemberEvent])

// Subscribe with event replay - replay all past events to current state
cluster.subscribe(listener, InitialStateAsEvents, classOf[MemberEvent])

// Subscribe to multiple event types
cluster.subscribe(listener,
  classOf[MemberEvent],
  classOf[ReachabilityEvent], 
  classOf[LeaderChanged])

// Subscribe to all cluster events
cluster.subscribe(listener, classOf[ClusterDomainEvent])

Complete Event Handling Example

import akka.cluster.ClusterEvent._

class ComprehensiveClusterListener extends Actor with ActorLogging {
  override def preStart(): Unit = {
    val cluster = Cluster(context.system)
    cluster.subscribe(self, InitialStateAsSnapshot,
      classOf[MemberEvent],
      classOf[ReachabilityEvent],
      classOf[LeaderChanged],
      classOf[RoleLeaderChanged],
      classOf[DataCenterReachabilityEvent])
  }

  def receive = {
    // Initial state
    case state: CurrentClusterState =>
      log.info("Current cluster state: {} members", state.members.size)
      state.members.foreach(m => log.info("Member: {} - {}", m.address, m.status))
      
    // Member lifecycle  
    case MemberUp(member) =>
      log.info("Member UP: {}", member.address)
    case MemberJoined(member) =>
      log.info("Member JOINED: {}", member.address)
    case MemberLeft(member) =>
      log.info("Member LEFT: {}", member.address) 
    case MemberExited(member) =>
      log.info("Member EXITED: {}", member.address)
    case MemberRemoved(member, previousStatus) =>
      log.info("Member REMOVED: {} (was {})", member.address, previousStatus)
      
    // Reachability
    case UnreachableMember(member) =>
      log.warning("Member UNREACHABLE: {}", member.address)
    case ReachableMember(member) =>
      log.info("Member REACHABLE: {}", member.address)
      
    // Leadership
    case LeaderChanged(leader) =>
      log.info("Leader changed: {}", leader.getOrElse("none"))
    case RoleLeaderChanged(role, leader) =>
      log.info("Role '{}' leader changed: {}", role, leader.getOrElse("none"))
      
    // Data center events
    case UnreachableDataCenter(dc) =>
      log.warning("Data center UNREACHABLE: {}", dc)
    case ReachableDataCenter(dc) =>
      log.info("Data center REACHABLE: {}", dc)
      
    // Shutdown
    case ClusterShuttingDown =>
      log.info("Cluster shutting down")
  }
  
  override def postStop(): Unit = {
    val cluster = Cluster(context.system)
    cluster.unsubscribe(self)
  }
}

Types

// Subscription initial state modes
sealed abstract class SubscriptionInitialStateMode
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
case object InitialStateAsEvents extends SubscriptionInitialStateMode

// Data center type
type DataCenter = String

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-cluster

docs

cluster-management.md

cluster-routing.md

configuration-and-management.md

events-and-state.md

extensibility.md

index.md

members-and-status.md

tile.json