Fault-tolerant decentralized peer-to-peer cluster membership service with no single point of failure for Akka distributed systems
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-cluster_2-12@2.8.0Akka Cluster provides fault-tolerant decentralized peer-to-peer cluster membership management with no single point of failure. It enables building resilient distributed systems using the Actor Model with features like gossip-based cluster membership, leader election, unreachable member detection, and cluster state management.
build.sbt: libraryDependencies += "com.typesafe.akka" %% "akka-cluster" % "2.8.8"import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.actor.{ActorSystem, Address}import akka.actor.ActorSystem
import akka.cluster.{Cluster, ClusterEvent}
import akka.cluster.ClusterEvent._
implicit val system = ActorSystem("ClusterSystem")
val cluster = Cluster(system)
// Join cluster via seed nodes
val seedNodes = List(
Address("akka", "ClusterSystem", "127.0.0.1", 2551),
Address("akka", "ClusterSystem", "127.0.0.1", 2552)
)
cluster.joinSeedNodes(seedNodes)
// Subscribe to cluster events
cluster.subscribe(self, initialStateMode = InitialStateAsSnapshot,
classOf[MemberEvent], classOf[UnreachableMember])
// React to events
def receive = {
case MemberUp(member) =>
println(s"Member is Up: ${member.address}")
case UnreachableMember(member) =>
println(s"Member detected as unreachable: ${member}")
case MemberRemoved(member, previousStatus) =>
println(s"Member is Removed: ${member.address} after $previousStatus")
case _: MemberEvent => // ignore
}
// Graceful shutdown
cluster.leave(cluster.selfAddress)Akka Cluster is built around several key components:
Cluster) providing cluster operations and state accessCore cluster operations including joining, leaving, and managing cluster membership. Provides the main API for interacting with the cluster.
class Cluster(system: ExtendedActorSystem) extends Extension {
// System and Configuration Access
val system: ExtendedActorSystem
val settings: ClusterSettings
val failureDetector: FailureDetectorRegistry[Address]
val crossDcFailureDetector: FailureDetectorRegistry[Address]
lazy val downingProvider: DowningProvider
// Basic Cluster Operations
def join(address: Address): Unit
def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit
def joinSeedNodes(seedNodes: java.util.List[Address]): Unit // Java API
def leave(address: Address): Unit
def down(address: Address): Unit
def prepareForFullClusterShutdown(): Unit
def isTerminated: Boolean
// State Access
def state: CurrentClusterState
def selfMember: Member
def selfAddress: Address
def selfUniqueAddress: UniqueAddress
def selfDataCenter: DataCenter
def selfRoles: Set[String]
def getSelfRoles: java.util.Set[String] // Java API
def sendCurrentClusterState(receiver: ActorRef): Unit
// Event Subscription
def subscribe(subscriber: ActorRef, to: Class[_]*): Unit
def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit
def unsubscribe(subscriber: ActorRef): Unit
def unsubscribe(subscriber: ActorRef, to: Class[_]): Unit
// Lifecycle Callbacks
def registerOnMemberUp[T](code: => T): Unit
def registerOnMemberUp(callback: Runnable): Unit // Java API
def registerOnMemberRemoved[T](code: => T): Unit
def registerOnMemberRemoved(callback: Runnable): Unit // Java API
// Utilities
def setAppVersionLater(appVersion: Future[Version]): Unit
def setAppVersionLater(appVersion: CompletionStage[Version]): Unit // Java API
def remotePathOf(actorRef: ActorRef): ActorPath
}
object Cluster extends ExtensionId[Cluster] {
def apply(system: ActorSystem): Cluster
def get(system: ActorSystem): Cluster
def get(system: ClassicActorSystemProvider): Cluster
}Comprehensive event system for monitoring cluster state changes, member lifecycle events, and reachability status. Essential for building cluster-aware applications.
trait ClusterDomainEvent
case class CurrentClusterState(
members: immutable.SortedSet[Member],
unreachable: Set[Member],
seenBy: Set[Address],
leader: Option[Address],
roleLeaderMap: Map[String, Option[Address]]
)
// Member Events
case class MemberUp(member: Member) extends MemberEvent
case class MemberJoined(member: Member) extends MemberEvent
case class MemberLeft(member: Member) extends MemberEvent
case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent
// Reachability Events
case class UnreachableMember(member: Member) extends ClusterDomainEvent
case class ReachableMember(member: Member) extends ClusterDomainEventRepresentation and management of cluster members, including status tracking, role-based operations, and member comparison capabilities.
class Member(
val uniqueAddress: UniqueAddress,
val status: MemberStatus,
val roles: Set[String],
val appVersion: Version
) {
def address: Address
def dataCenter: DataCenter
def hasRole(role: String): Boolean
def isOlderThan(other: Member): Boolean
def copy(status: MemberStatus): Member
}
// Member statuses
sealed abstract class MemberStatus
case object Joining extends MemberStatus
case object WeaklyUp extends MemberStatus
case object Up extends MemberStatus
case object Leaving extends MemberStatus
case object Exiting extends MemberStatus
case object Down extends MemberStatus
case object Removed extends MemberStatusComprehensive configuration system for cluster behavior, failure detection, gossip settings, and multi-data center operations.
class ClusterSettings(config: Config, systemName: String) {
def SeedNodes: immutable.IndexedSeq[Address]
def Roles: Set[String]
def SelfDataCenter: DataCenter
def MinNrOfMembers: Int
def AppVersion: Version
def GossipInterval: FiniteDuration
def FailureDetectorImplementationClass: String
def DowningProviderClassName: String
}Cluster-aware routing functionality for distributing work across cluster members, with support for both pool and group routing strategies.
case class ClusterRouterGroup(
local: Group,
settings: ClusterRouterGroupSettings
) extends Group
case class ClusterRouterPool(
local: Pool,
settings: ClusterRouterPoolSettings
) extends Pool
class ClusterRouterGroupSettings(
totalInstances: Int,
routeesPaths: immutable.Seq[String],
allowLocalRoutees: Boolean,
useRoles: Set[String]
)Advanced split brain resolution strategies for handling network partitions and maintaining cluster consistency during network failures.
abstract class DowningProvider {
def downRemovalMargin: FiniteDuration
def downingActorProps: Option[Props]
}
class SplitBrainResolverProvider(system: ActorSystem) extends DowningProvidercase class UniqueAddress(address: Address, uid: Long)
type DataCenter = String
sealed abstract class SubscriptionInitialStateMode
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
case object InitialStateAsEvents extends SubscriptionInitialStateMode
trait MemberEvent extends ClusterDomainEvent {
def member: Member
}
class ValidationError(val message: String, val cause: Option[Throwable] = None)