The core cluster management API provides operations for joining, leaving, and controlling cluster membership. The Cluster extension serves as the main entry point for all cluster operations.
class Cluster(val system: ExtendedActorSystem) extends Extension {
def join(address: Address): Unit
def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit
def joinSeedNodes(seedNodes: java.util.List[Address]): Unit
def leave(address: Address): Unit
def down(address: Address): Unit
def prepareForFullClusterShutdown(): Unit
def state: CurrentClusterState
def selfMember: Member
def selfAddress: Address
def selfUniqueAddress: UniqueAddress
def selfDataCenter: DataCenter
def selfRoles: Set[String]
def getSelfRoles: java.util.Set[String]
def remotePathOf(actorRef: ActorRef): ActorPath
def isTerminated: Boolean
}object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
def apply(system: ActorSystem): Cluster
def get(system: ActorSystem): Cluster
def get(system: ClassicActorSystemProvider): Cluster
}Join a cluster by specifying a single node address. The node will attempt to contact the specified address to join the cluster.
val cluster = Cluster(system)
cluster.join(Address("akka", "ClusterSystem", "127.0.0.1", 2551))Join a cluster using multiple seed nodes for improved reliability. The cluster will try each seed node until it successfully joins.
val seedNodes = immutable.Seq(
Address("akka", "ClusterSystem", "127.0.0.1", 2551),
Address("akka", "ClusterSystem", "127.0.0.1", 2552),
Address("akka", "ClusterSystem", "127.0.0.1", 2553)
)
cluster.joinSeedNodes(seedNodes)Java API:
List<Address> seedNodes = Arrays.asList(
Address.create("akka", "ClusterSystem", "127.0.0.1", 2551),
Address.create("akka", "ClusterSystem", "127.0.0.1", 2552)
);
cluster.joinSeedNodes(seedNodes);Set application version dynamically after system startup but before joining:
import scala.concurrent.Future
import akka.util.Version
val appVersionFuture: Future[Version] = loadVersionFromK8s()
cluster.setAppVersionLater(appVersionFuture)
cluster.joinSeedNodes(seedNodes) // Can be called immediatelyJava API:
CompletionStage<Version> appVersion = loadVersionFromK8s();
cluster.setAppVersionLater(appVersion);
cluster.joinSeedNodes(seedNodes);Request graceful removal of a member from the cluster. The member transitions through Leaving → Exiting → Removed states.
// Leave self
cluster.leave(cluster.selfAddress)
// Leave another member (can be called from any cluster member)
cluster.leave(Address("akka", "ClusterSystem", "192.168.1.100", 2551))Mark a member as down when it's unreachable. This allows the leader to continue cluster operations.
// Mark unreachable member as down
cluster.down(unreachableMemberAddress)Prepare all members for coordinated full cluster shutdown:
cluster.prepareForFullClusterShutdown()Get the current cluster state including all members, their statuses, and cluster leadership information:
val state: CurrentClusterState = cluster.state
println(s"Current leader: ${state.leader}")
println(s"Members: ${state.members.map(_.address).mkString(", ")}")
println(s"Unreachable: ${state.unreachable.map(_.address).mkString(", ")}")Access information about the current node:
val self: Member = cluster.selfMember
val address: Address = cluster.selfAddress
val uniqueAddress: UniqueAddress = cluster.selfUniqueAddress
val dataCenter: String = cluster.selfDataCenter
val roles: Set[String] = cluster.selfRoles
println(s"Self: ${address}, Status: ${self.status}, Roles: ${roles.mkString(", ")}")Execute code when the current member becomes Up:
cluster.registerOnMemberUp {
println("This node is now Up - starting application services")
// Start application-specific actors and services
}Java API:
cluster.registerOnMemberUp(() -> {
System.out.println("This node is now Up");
// Start application services
});Execute code when the current member is removed from the cluster:
cluster.registerOnMemberRemoved {
println("This node has been removed from cluster - shutting down")
// Cleanup resources
}Java API:
cluster.registerOnMemberRemoved(() -> {
System.out.println("Node removed from cluster");
// Cleanup resources
});Generate remote actor paths for cluster communication:
val localActorRef: ActorRef = context.actorOf(Props[MyActor], "myactor")
val remotePath: ActorPath = cluster.remotePathOf(localActorRef)
// Result: akka://ClusterSystem@127.0.0.1:2551/user/myactorCheck if the cluster extension has been shut down:
if (!cluster.isTerminated) {
// Cluster is still active
cluster.join(seedNodeAddress)
}// Will throw ConfigurationException if actor provider is not 'cluster'
val cluster = Cluster(system) // Requires akka.actor.provider = cluster
// Address validation - will validate host characters
cluster.join(Address("akka", "MySystem", "invalid-host!", 2551)) // May throw
// Joining with local address gets converted automatically
val localAddress = Address("akka", system.name) // No host/port
cluster.join(localAddress) // Converts to selfAddressKey configuration settings for cluster management:
akka.cluster {
# Seed nodes for automatic cluster joining
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2551",
"akka://ClusterSystem@127.0.0.1:2552"
]
# Minimum number of members before leader actions
min-nr-of-members = 1
# Application version for compatibility checking
app-version = "1.0.0"
# Node roles for targeted operations
roles = ["backend", "compute"]
# Data center designation for multi-DC clusters
multi-data-center.self-data-center = "dc1"
}