Akka Cluster Tools provides utilities for building clustered applications including cluster singleton, distributed publish-subscribe, and cluster client functionality
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-cluster-tools-2-13@2.8.0Akka Cluster Tools provides essential utilities for building distributed applications with Akka Cluster. It includes cluster singleton management for exactly-one-instance patterns, distributed publish-subscribe for decoupled messaging, and cluster client functionality for external system integration.
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-tools" % "2.8.8"import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonProxy}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import akka.cluster.client.{ClusterClient, ClusterClientReceptionist}import akka.actor.{ActorSystem, Props}
import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe}
implicit val system: ActorSystem = ActorSystem("cluster-system")
// Create a cluster singleton
val singletonManager = system.actorOf(
ClusterSingletonManager.props(
singletonProps = Props[MyWorkerActor](),
terminationMessage = "stop",
settings = ClusterSingletonManagerSettings(system)
),
name = "workerSingleton"
)
// Use distributed pub-sub
val mediator = DistributedPubSub(system).mediator
mediator ! Subscribe("news", self)
mediator ! Publish("news", "Breaking: Akka cluster is operational!")Akka Cluster Tools is built around three core patterns for distributed systems:
Each pattern addresses different distributed system challenges:
Manages a singleton actor instance across cluster nodes with automatic failover and hand-over coordination.
class ClusterSingletonManager(
singletonProps: Props,
terminationMessage: Any,
settings: ClusterSingletonManagerSettings
)
object ClusterSingletonManager {
def props(
singletonProps: Props,
terminationMessage: Any,
settings: ClusterSingletonManagerSettings
): Props
}
class ClusterSingletonProxy(
singletonManagerPath: String,
settings: ClusterSingletonProxySettings
)
object ClusterSingletonProxy {
def props(
singletonManagerPath: String,
settings: ClusterSingletonProxySettings
): Props
}Location-transparent publish-subscribe messaging system that works across cluster nodes.
class DistributedPubSubMediator(settings: DistributedPubSubSettings)
object DistributedPubSubMediator {
def props(settings: DistributedPubSubSettings): Props
case class Subscribe(topic: String, group: Option[String], ref: ActorRef)
case class Publish(topic: String, msg: Any, sendOneMessageToEachGroup: Boolean)
case class Send(path: String, msg: Any, localAffinity: Boolean)
case class SendToAll(path: String, msg: Any, allButSelf: Boolean)
}
object DistributedPubSub extends ExtensionId[DistributedPubSub] {
def get(system: ActorSystem): DistributedPubSub
}
class DistributedPubSub(system: ExtendedActorSystem) extends Extension {
def mediator: ActorRef
}Client-side gateway for external systems to communicate with cluster actors. Deprecated since Akka 2.6.0 in favor of Akka gRPC.
@deprecated("Use Akka gRPC instead", since = "2.6.0")
class ClusterClient(settings: ClusterClientSettings)
@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ClusterClient {
def props(settings: ClusterClientSettings): Props
case class Send(path: String, msg: Any, localAffinity: Boolean)
case class SendToAll(path: String, msg: Any)
case class Publish(topic: String, msg: Any)
}// Configuration classes
final class ClusterSingletonManagerSettings(
singletonName: String,
role: Option[String],
removalMargin: FiniteDuration,
handOverRetryInterval: FiniteDuration,
leaseSettings: Option[LeaseUsageSettings]
) {
def withSingletonName(name: String): ClusterSingletonManagerSettings
def withRole(role: String): ClusterSingletonManagerSettings
def withRole(role: Option[String]): ClusterSingletonManagerSettings
def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings
def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings
def withLeaseSettings(leaseSettings: LeaseUsageSettings): ClusterSingletonManagerSettings
}
object ClusterSingletonManagerSettings {
def apply(system: ActorSystem): ClusterSingletonManagerSettings
def apply(config: Config): ClusterSingletonManagerSettings
def create(system: ActorSystem): ClusterSingletonManagerSettings
def create(config: Config): ClusterSingletonManagerSettings
}
final class ClusterSingletonProxySettings(
singletonName: String,
role: Option[String],
dataCenter: Option[DataCenter],
singletonIdentificationInterval: FiniteDuration,
bufferSize: Int
) {
def withSingletonName(name: String): ClusterSingletonProxySettings
def withRole(role: String): ClusterSingletonProxySettings
def withRole(role: Option[String]): ClusterSingletonProxySettings
def withDataCenter(dataCenter: DataCenter): ClusterSingletonProxySettings
def withDataCenter(dataCenter: Option[DataCenter]): ClusterSingletonProxySettings
def withSingletonIdentificationInterval(interval: FiniteDuration): ClusterSingletonProxySettings
def withBufferSize(bufferSize: Int): ClusterSingletonProxySettings
}
object ClusterSingletonProxySettings {
def apply(system: ActorSystem): ClusterSingletonProxySettings
def apply(config: Config): ClusterSingletonProxySettings
def create(system: ActorSystem): ClusterSingletonProxySettings
def create(config: Config): ClusterSingletonProxySettings
}
final class DistributedPubSubSettings(
role: Option[String],
routingLogic: RoutingLogic,
gossipInterval: FiniteDuration,
removedTimeToLive: FiniteDuration,
maxDeltaElements: Int,
sendToDeadLettersWhenNoSubscribers: Boolean
)
// Import types from Akka core
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.ClusterSettings.DataCenter
import akka.coordination.lease.LeaseUsageSettings
import akka.routing.RoutingLogic
import com.typesafe.config.Config
import scala.concurrent.duration.FiniteDuration