Akka Cluster provides a fault-tolerant decentralized peer-to-peer based cluster membership service with no single point of failure or single point of bottleneck using gossip protocols and automatic failure detection.
—
Core cluster operations for joining, leaving, and managing cluster membership. This provides the fundamental functionality for setting up and managing distributed actor systems using Akka Cluster.
Access the cluster extension instance from an ActorSystem.
/**
* Cluster Extension Id and factory for creating Cluster extension
*/
object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
/** Get cluster extension instance for given actor system */
def get(system: ActorSystem): Cluster
/** Get cluster extension instance for typed actor system */
def get(system: ClassicActorSystemProvider): Cluster
}Usage Example:
import akka.cluster.Cluster
implicit val system = ActorSystem("ClusterSystem")
val cluster = Cluster(system)
// or
val cluster = Cluster.get(system)The primary interface for cluster membership management and operations.
/**
* Main cluster extension responsible for cluster membership information.
* Changes to cluster information are retrieved through subscribe().
* Commands to operate the cluster are available through methods like join(), down(), and leave().
*/
class Cluster(val system: ExtendedActorSystem) extends Extension {
/** The address including a uid of this cluster member */
val selfUniqueAddress: UniqueAddress
/** The address of this cluster member */
def selfAddress: Address
/** Data center to which this node belongs to */
def selfDataCenter: DataCenter
/** Roles that this member has */
def selfRoles: Set[String]
/** Current snapshot state of the cluster */
def state: CurrentClusterState
/** Current snapshot of the member itself */
def selfMember: Member
/** Returns true if this cluster instance has been shutdown */
def isTerminated: Boolean
/** Java API: roles that this member has */
def getSelfRoles: java.util.Set[String]
}Join the cluster by connecting to existing members or seed nodes.
/**
* Try to join this cluster node with the node specified by 'address'.
* An actor system can only join a cluster once. Additional attempts will be ignored.
* When it has successfully joined it must be restarted to be able to join another
* cluster or to join the same cluster again.
*/
def join(address: Address): Unit
/**
* Join the specified seed nodes without defining them in config.
* Especially useful from tests when Addresses are unknown before startup time.
*/
def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit
/** Java API for joining seed nodes */
def joinSeedNodes(seedNodes: java.util.List[Address]): UnitUsage Examples:
import akka.actor.Address
// Join a specific node
val seedAddress = Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2551)
cluster.join(seedAddress)
// Join using multiple seed nodes
val seedNodes = List(
Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2551),
Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2552)
)
cluster.joinSeedNodes(seedNodes)
// Self-join (form single-node cluster)
cluster.join(cluster.selfAddress)Gracefully leave the cluster or forcefully remove nodes.
/**
* Send command to issue state transition to LEAVING for the node specified by 'address'.
* The member will go through the status changes MemberStatus Leaving (not published to
* subscribers) followed by MemberStatus Exiting and finally MemberStatus Removed.
*/
def leave(address: Address): Unit
/**
* Send command to DOWN the node specified by 'address'.
* When a member is considered by the failure detector to be unreachable the leader is not
* allowed to perform its duties. The status of the unreachable member must be changed to 'Down'.
*/
def down(address: Address): UnitUsage Examples:
// Graceful leave
cluster.leave(cluster.selfAddress)
// Force down an unreachable node
val unreachableAddress = Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2553)
cluster.down(unreachableAddress)Subscribe to cluster events to react to membership changes.
/**
* Subscribe to one or more cluster domain events.
* A snapshot of CurrentClusterState will be sent to the subscriber as the first message.
*/
def subscribe(subscriber: ActorRef, to: Class[_]*): Unit
/**
* Subscribe with specific initial state mode.
* If initialStateMode is InitialStateAsEvents the events corresponding
* to the current state will be sent to mimic past events.
* If InitialStateAsSnapshot a snapshot of CurrentClusterState will be sent.
*/
def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit
/** Unsubscribe from all cluster domain events */
def unsubscribe(subscriber: ActorRef): Unit
/** Unsubscribe from specific type of cluster domain events */
def unsubscribe(subscriber: ActorRef, to: Class[_]): Unit
/** Send current cluster state to the specified receiver */
def sendCurrentClusterState(receiver: ActorRef): UnitUsage Examples:
import akka.cluster.ClusterEvent._
// Subscribe to all member events
cluster.subscribe(listener, classOf[MemberEvent])
// Subscribe to specific events
cluster.subscribe(listener,
classOf[MemberUp],
classOf[MemberLeft],
classOf[UnreachableMember])
// Subscribe with event replay
cluster.subscribe(listener, InitialStateAsEvents, classOf[MemberEvent])
// Unsubscribe
cluster.unsubscribe(listener)Register callbacks to execute when cluster member reaches specific states.
/**
* The supplied thunk will be run, once, when current cluster member is Up.
* Typically used together with configuration option 'akka.cluster.min-nr-of-members'
* to defer some action, such as starting actors, until the cluster has reached
* a certain size.
*/
def registerOnMemberUp[T](code: => T): Unit
/**
* Java API: The supplied callback will be run, once, when current cluster member is Up.
* Typically used together with configuration option 'akka.cluster.min-nr-of-members'
* to defer some action, such as starting actors, until the cluster has reached
* a certain size.
*/
def registerOnMemberUp(callback: Runnable): Unit
/**
* The supplied thunk will be run, once, when current cluster member is Removed.
* If the cluster has already been shutdown the thunk will run on the caller thread immediately.
* Typically used together cluster.leave(cluster.selfAddress) and then system.terminate().
*/
def registerOnMemberRemoved[T](code: => T): Unit
/**
* Java API: The supplied callback will be run, once, when current cluster member is Removed.
* If the cluster has already been shutdown the callback will run immediately.
* Typically used together cluster.leave(cluster.selfAddress) and then system.terminate().
*/
def registerOnMemberRemoved(callback: Runnable): UnitUsage Examples:
// Start application actors when cluster is ready
cluster.registerOnMemberUp(new Runnable {
def run(): Unit = {
println("Cluster member is UP - starting application actors")
system.actorOf(Props[MyApplicationActor], "app")
}
})
// Cleanup when leaving cluster
cluster.registerOnMemberRemoved(new Runnable {
def run(): Unit = {
println("Member removed from cluster - shutting down")
system.terminate()
}
})
// Scala-friendly syntax
cluster.registerOnMemberUp {
println("Member is UP!")
// Start application logic
}Additional utility methods for cluster operations.
/**
* Generate the remote actor path by replacing the Address in the RootActor Path
* for the given ActorRef with the cluster's selfAddress, unless address' host is already defined
*/
def remotePathOf(actorRef: ActorRef): ActorPathUsage Example:
val localActor = system.actorOf(Props[MyActor], "myActor")
val remotePath = cluster.remotePathOf(localActor)
// Use remotePath to reference this actor from other cluster nodes// Data center type alias
type DataCenter = String
// Initial state subscription modes
sealed abstract class SubscriptionInitialStateMode
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
case object InitialStateAsEvents extends SubscriptionInitialStateModeInstall with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-cluster