Akka Cluster Tools provides utilities for building clustered applications including cluster singleton, distributed publish-subscribe, and cluster client functionality
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Akka Cluster Tools provides essential utilities for building distributed applications with Akka Cluster. It includes cluster singleton management for exactly-one-instance patterns, distributed publish-subscribe for decoupled messaging, and cluster client functionality for external system integration.
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-tools" % "2.8.8"import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonProxy}
import akka.cluster.pubsub.{DistributedPubSub, DistributedPubSubMediator}
import akka.cluster.client.{ClusterClient, ClusterClientReceptionist}import akka.actor.{ActorSystem, Props}
import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings}
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator.{Publish, Subscribe}
implicit val system: ActorSystem = ActorSystem("cluster-system")
// Create a cluster singleton
val singletonManager = system.actorOf(
ClusterSingletonManager.props(
singletonProps = Props[MyWorkerActor](),
terminationMessage = "stop",
settings = ClusterSingletonManagerSettings(system)
),
name = "workerSingleton"
)
// Use distributed pub-sub
val mediator = DistributedPubSub(system).mediator
mediator ! Subscribe("news", self)
mediator ! Publish("news", "Breaking: Akka cluster is operational!")Akka Cluster Tools is built around three core patterns for distributed systems:
Each pattern addresses different distributed system challenges:
Manages a singleton actor instance across cluster nodes with automatic failover and hand-over coordination.
class ClusterSingletonManager(
singletonProps: Props,
terminationMessage: Any,
settings: ClusterSingletonManagerSettings
)
object ClusterSingletonManager {
def props(
singletonProps: Props,
terminationMessage: Any,
settings: ClusterSingletonManagerSettings
): Props
}
class ClusterSingletonProxy(
singletonManagerPath: String,
settings: ClusterSingletonProxySettings
)
object ClusterSingletonProxy {
def props(
singletonManagerPath: String,
settings: ClusterSingletonProxySettings
): Props
}Location-transparent publish-subscribe messaging system that works across cluster nodes.
class DistributedPubSubMediator(settings: DistributedPubSubSettings)
object DistributedPubSubMediator {
def props(settings: DistributedPubSubSettings): Props
case class Subscribe(topic: String, group: Option[String], ref: ActorRef)
case class Publish(topic: String, msg: Any, sendOneMessageToEachGroup: Boolean)
case class Send(path: String, msg: Any, localAffinity: Boolean)
case class SendToAll(path: String, msg: Any, allButSelf: Boolean)
}
object DistributedPubSub extends ExtensionId[DistributedPubSub] {
def get(system: ActorSystem): DistributedPubSub
}
class DistributedPubSub(system: ExtendedActorSystem) extends Extension {
def mediator: ActorRef
}Client-side gateway for external systems to communicate with cluster actors. Deprecated since Akka 2.6.0 in favor of Akka gRPC.
@deprecated("Use Akka gRPC instead", since = "2.6.0")
class ClusterClient(settings: ClusterClientSettings)
@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ClusterClient {
def props(settings: ClusterClientSettings): Props
case class Send(path: String, msg: Any, localAffinity: Boolean)
case class SendToAll(path: String, msg: Any)
case class Publish(topic: String, msg: Any)
}// Configuration classes
final class ClusterSingletonManagerSettings(
singletonName: String,
role: Option[String],
removalMargin: FiniteDuration,
handOverRetryInterval: FiniteDuration,
leaseSettings: Option[LeaseUsageSettings]
) {
def withSingletonName(name: String): ClusterSingletonManagerSettings
def withRole(role: String): ClusterSingletonManagerSettings
def withRole(role: Option[String]): ClusterSingletonManagerSettings
def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings
def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings
def withLeaseSettings(leaseSettings: LeaseUsageSettings): ClusterSingletonManagerSettings
}
object ClusterSingletonManagerSettings {
def apply(system: ActorSystem): ClusterSingletonManagerSettings
def apply(config: Config): ClusterSingletonManagerSettings
def create(system: ActorSystem): ClusterSingletonManagerSettings
def create(config: Config): ClusterSingletonManagerSettings
}
final class ClusterSingletonProxySettings(
singletonName: String,
role: Option[String],
dataCenter: Option[DataCenter],
singletonIdentificationInterval: FiniteDuration,
bufferSize: Int
) {
def withSingletonName(name: String): ClusterSingletonProxySettings
def withRole(role: String): ClusterSingletonProxySettings
def withRole(role: Option[String]): ClusterSingletonProxySettings
def withDataCenter(dataCenter: DataCenter): ClusterSingletonProxySettings
def withDataCenter(dataCenter: Option[DataCenter]): ClusterSingletonProxySettings
def withSingletonIdentificationInterval(interval: FiniteDuration): ClusterSingletonProxySettings
def withBufferSize(bufferSize: Int): ClusterSingletonProxySettings
}
object ClusterSingletonProxySettings {
def apply(system: ActorSystem): ClusterSingletonProxySettings
def apply(config: Config): ClusterSingletonProxySettings
def create(system: ActorSystem): ClusterSingletonProxySettings
def create(config: Config): ClusterSingletonProxySettings
}
final class DistributedPubSubSettings(
role: Option[String],
routingLogic: RoutingLogic,
gossipInterval: FiniteDuration,
removedTimeToLive: FiniteDuration,
maxDeltaElements: Int,
sendToDeadLettersWhenNoSubscribers: Boolean
)
// Import types from Akka core
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.cluster.ClusterSettings.DataCenter
import akka.coordination.lease.LeaseUsageSettings
import akka.routing.RoutingLogic
import com.typesafe.config.Config
import scala.concurrent.duration.FiniteDuration