Fault-tolerant decentralized peer-to-peer cluster membership service with no single point of failure for Akka distributed systems
The core cluster management API provides operations for joining, leaving, and controlling cluster membership. The Cluster extension serves as the main entry point for all cluster operations.
class Cluster(val system: ExtendedActorSystem) extends Extension {
def join(address: Address): Unit
def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit
def joinSeedNodes(seedNodes: java.util.List[Address]): Unit
def leave(address: Address): Unit
def down(address: Address): Unit
def prepareForFullClusterShutdown(): Unit
def state: CurrentClusterState
def selfMember: Member
def selfAddress: Address
def selfUniqueAddress: UniqueAddress
def selfDataCenter: DataCenter
def selfRoles: Set[String]
def getSelfRoles: java.util.Set[String]
def remotePathOf(actorRef: ActorRef): ActorPath
def isTerminated: Boolean
}object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
def apply(system: ActorSystem): Cluster
def get(system: ActorSystem): Cluster
def get(system: ClassicActorSystemProvider): Cluster
}Join a cluster by specifying a single node address. The node will attempt to contact the specified address to join the cluster.
val cluster = Cluster(system)
cluster.join(Address("akka", "ClusterSystem", "127.0.0.1", 2551))Join a cluster using multiple seed nodes for improved reliability. The cluster will try each seed node until it successfully joins.
val seedNodes = immutable.Seq(
Address("akka", "ClusterSystem", "127.0.0.1", 2551),
Address("akka", "ClusterSystem", "127.0.0.1", 2552),
Address("akka", "ClusterSystem", "127.0.0.1", 2553)
)
cluster.joinSeedNodes(seedNodes)Java API:
List<Address> seedNodes = Arrays.asList(
Address.create("akka", "ClusterSystem", "127.0.0.1", 2551),
Address.create("akka", "ClusterSystem", "127.0.0.1", 2552)
);
cluster.joinSeedNodes(seedNodes);Set application version dynamically after system startup but before joining:
import scala.concurrent.Future
import akka.util.Version
val appVersionFuture: Future[Version] = loadVersionFromK8s()
cluster.setAppVersionLater(appVersionFuture)
cluster.joinSeedNodes(seedNodes) // Can be called immediatelyJava API:
CompletionStage<Version> appVersion = loadVersionFromK8s();
cluster.setAppVersionLater(appVersion);
cluster.joinSeedNodes(seedNodes);Request graceful removal of a member from the cluster. The member transitions through Leaving → Exiting → Removed states.
// Leave self
cluster.leave(cluster.selfAddress)
// Leave another member (can be called from any cluster member)
cluster.leave(Address("akka", "ClusterSystem", "192.168.1.100", 2551))Mark a member as down when it's unreachable. This allows the leader to continue cluster operations.
// Mark unreachable member as down
cluster.down(unreachableMemberAddress)Prepare all members for coordinated full cluster shutdown:
cluster.prepareForFullClusterShutdown()Get the current cluster state including all members, their statuses, and cluster leadership information:
val state: CurrentClusterState = cluster.state
println(s"Current leader: ${state.leader}")
println(s"Members: ${state.members.map(_.address).mkString(", ")}")
println(s"Unreachable: ${state.unreachable.map(_.address).mkString(", ")}")Access information about the current node:
val self: Member = cluster.selfMember
val address: Address = cluster.selfAddress
val uniqueAddress: UniqueAddress = cluster.selfUniqueAddress
val dataCenter: String = cluster.selfDataCenter
val roles: Set[String] = cluster.selfRoles
println(s"Self: ${address}, Status: ${self.status}, Roles: ${roles.mkString(", ")}")Execute code when the current member becomes Up:
cluster.registerOnMemberUp {
println("This node is now Up - starting application services")
// Start application-specific actors and services
}Java API:
cluster.registerOnMemberUp(() -> {
System.out.println("This node is now Up");
// Start application services
});Execute code when the current member is removed from the cluster:
cluster.registerOnMemberRemoved {
println("This node has been removed from cluster - shutting down")
// Cleanup resources
}Java API:
cluster.registerOnMemberRemoved(() -> {
System.out.println("Node removed from cluster");
// Cleanup resources
});Generate remote actor paths for cluster communication:
val localActorRef: ActorRef = context.actorOf(Props[MyActor], "myactor")
val remotePath: ActorPath = cluster.remotePathOf(localActorRef)
// Result: akka://ClusterSystem@127.0.0.1:2551/user/myactorCheck if the cluster extension has been shut down:
if (!cluster.isTerminated) {
// Cluster is still active
cluster.join(seedNodeAddress)
}// Will throw ConfigurationException if actor provider is not 'cluster'
val cluster = Cluster(system) // Requires akka.actor.provider = cluster
// Address validation - will validate host characters
cluster.join(Address("akka", "MySystem", "invalid-host!", 2551)) // May throw
// Joining with local address gets converted automatically
val localAddress = Address("akka", system.name) // No host/port
cluster.join(localAddress) // Converts to selfAddressKey configuration settings for cluster management:
akka.cluster {
# Seed nodes for automatic cluster joining
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2551",
"akka://ClusterSystem@127.0.0.1:2552"
]
# Minimum number of members before leader actions
min-nr-of-members = 1
# Application version for compatibility checking
app-version = "1.0.0"
# Node roles for targeted operations
roles = ["backend", "compute"]
# Data center designation for multi-DC clusters
multi-data-center.self-data-center = "dc1"
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-cluster-2-12