The Cluster Singleton pattern ensures that exactly one instance of a particular actor is running somewhere in the cluster at any point in time. This is useful for central coordination, resource management, or any scenario where you need exactly-one semantics in a distributed system.
Manages a singleton actor instance, ensuring it runs on exactly one node with automatic failover.
/**
* Manages singleton actor instance among all cluster nodes or a group
* of nodes tagged with a specific role. At most one singleton instance
* is running at any point in time.
*
* @param singletonProps Props of the singleton actor instance
* @param terminationMessage Message sent to singleton during hand-over
* @param settings Configuration settings
*/
class ClusterSingletonManager(
singletonProps: Props,
terminationMessage: Any,
settings: ClusterSingletonManagerSettings
) extends Actor
object ClusterSingletonManager {
/**
* Scala API: Factory method for ClusterSingletonManager Props
*/
def props(
singletonProps: Props,
terminationMessage: Any,
settings: ClusterSingletonManagerSettings
): Props
}Usage Example:
import akka.actor.{ActorSystem, Props, PoisonPill}
import akka.cluster.singleton.{ClusterSingletonManager, ClusterSingletonManagerSettings}
implicit val system: ActorSystem = ActorSystem("cluster-system")
// Create singleton manager
val singletonManager = system.actorOf(
ClusterSingletonManager.props(
singletonProps = Props[DatabaseCoordinator](),
terminationMessage = PoisonPill,
settings = ClusterSingletonManagerSettings(system)
.withRole("backend")
.withSingletonName("db-coordinator")
),
name = "database-singleton"
)Provides a proxy to communicate with the singleton actor, handling location transparency and buffering.
/**
* Proxy that forwards messages to the singleton actor.
* Can be started on any node to provide location-transparent access.
* Buffers messages when singleton is unavailable during hand-over.
*
* @param singletonManagerPath Logical path to the singleton manager
* @param settings Configuration settings
*/
class ClusterSingletonProxy(
singletonManagerPath: String,
settings: ClusterSingletonProxySettings
) extends Actor
object ClusterSingletonProxy {
/**
* Scala API: Factory method for ClusterSingletonProxy Props
*
* @param singletonManagerPath The logical path of the singleton manager
* @param settings Configuration settings
*/
def props(
singletonManagerPath: String,
settings: ClusterSingletonProxySettings
): Props
}Usage Example:
import akka.cluster.singleton.{ClusterSingletonProxy, ClusterSingletonProxySettings}
// Create proxy to communicate with singleton
val singletonProxy = system.actorOf(
ClusterSingletonProxy.props(
singletonManagerPath = "/user/database-singleton",
settings = ClusterSingletonProxySettings(system)
.withRole("backend")
.withSingletonName("db-coordinator")
.withBufferSize(1000)
),
name = "database-proxy"
)
// Send messages to singleton via proxy
singletonProxy ! "process-batch"
singletonProxy ! GetStatusConfiguration for the singleton manager behavior.
/**
* Configuration settings for ClusterSingletonManager
*
* @param singletonName The actor name of the child singleton actor
* @param role Singleton among nodes tagged with specified role. None means all nodes
* @param removalMargin Margin until singleton is created in surviving partition after split
* @param handOverRetryInterval Retry interval for hand-over requests
* @param leaseSettings Optional lease configuration for singleton coordination
*/
final class ClusterSingletonManagerSettings(
val singletonName: String,
val role: Option[String],
val removalMargin: FiniteDuration,
val handOverRetryInterval: FiniteDuration,
val leaseSettings: Option[LeaseUsageSettings]
) extends NoSerializationVerificationNeeded
object ClusterSingletonManagerSettings {
/**
* Create settings from the default configuration akka.cluster.singleton
*/
def apply(system: ActorSystem): ClusterSingletonManagerSettings
/**
* Create settings from a configuration with the same layout as
* the default configuration akka.cluster.singleton
*/
def apply(config: Config): ClusterSingletonManagerSettings
/**
* Java API: Create settings from the default configuration
*/
def create(system: ActorSystem): ClusterSingletonManagerSettings
/**
* Java API: Create settings from configuration
*/
def create(config: Config): ClusterSingletonManagerSettings
}Settings Methods:
// Configuration methods for ClusterSingletonManagerSettings
def withSingletonName(name: String): ClusterSingletonManagerSettings
def withRole(role: String): ClusterSingletonManagerSettings
def withRole(role: Option[String]): ClusterSingletonManagerSettings
def withRemovalMargin(removalMargin: FiniteDuration): ClusterSingletonManagerSettings
def withHandOverRetryInterval(retryInterval: FiniteDuration): ClusterSingletonManagerSettings
def withLeaseSettings(leaseSettings: LeaseUsageSettings): ClusterSingletonManagerSettingsConfiguration for the singleton proxy behavior.
/**
* Configuration settings for ClusterSingletonProxy
*
* @param singletonName The actor name of the singleton actor started by ClusterSingletonManager
* @param role The role of cluster nodes where singleton can be deployed
* @param dataCenter The data center of cluster nodes where singleton is running
* @param singletonIdentificationInterval Interval at which proxy tries to resolve singleton
* @param bufferSize Number of messages to buffer when singleton location unknown
*/
final class ClusterSingletonProxySettings(
val singletonName: String,
val role: Option[String],
val dataCenter: Option[DataCenter],
val singletonIdentificationInterval: FiniteDuration,
val bufferSize: Int
) extends NoSerializationVerificationNeeded
object ClusterSingletonProxySettings {
/**
* Create settings from the default configuration akka.cluster.singleton-proxy
*/
def apply(system: ActorSystem): ClusterSingletonProxySettings
/**
* Create settings from configuration
*/
def apply(config: Config): ClusterSingletonProxySettings
/**
* Java API: Create settings from default configuration
*/
def create(system: ActorSystem): ClusterSingletonProxySettings
/**
* Java API: Create settings from configuration
*/
def create(config: Config): ClusterSingletonProxySettings
}Settings Methods:
// Configuration methods for ClusterSingletonProxySettings
def withSingletonName(name: String): ClusterSingletonProxySettings
def withRole(role: String): ClusterSingletonProxySettings
def withRole(role: Option[String]): ClusterSingletonProxySettings
def withDataCenter(dataCenter: DataCenter): ClusterSingletonProxySettings
def withDataCenter(dataCenter: Option[DataCenter]): ClusterSingletonProxySettings
def withSingletonIdentificationInterval(interval: FiniteDuration): ClusterSingletonProxySettings
def withBufferSize(bufferSize: Int): ClusterSingletonProxySettings/**
* Thrown when a consistent state can't be determined within the defined retry limits.
* Parent supervisor should typically restart the actor.
*/
class ClusterSingletonManagerIsStuck(message: String) extends AkkaException// Singleton that coordinates database operations
class DatabaseCoordinator extends Actor {
def receive = {
case "process-batch" =>
// Process batch operations
sender() ! "batch-processed"
case GetStatus =>
sender() ! DatabaseStatus("active", processedBatches = 42)
}
}
// Setup singleton manager and proxy
val manager = system.actorOf(
ClusterSingletonManager.props(
Props[DatabaseCoordinator](),
terminationMessage = "shutdown",
ClusterSingletonManagerSettings(system).withRole("backend")
),
"db-coordinator-singleton"
)
val proxy = system.actorOf(
ClusterSingletonProxy.props(
"/user/db-coordinator-singleton",
ClusterSingletonProxySettings(system).withRole("backend")
),
"db-coordinator-proxy"
)// Use singleton for leader election
class ClusterLeader extends Actor {
override def preStart(): Unit = {
super.preStart()
log.info("I am the leader!")
// Initialize leader-specific operations
}
def receive = {
case "election-status" => sender() ! "leader"
case LeaderTask(work) =>
// Only the leader processes these tasks
processLeaderWork(work)
}
}// Required imports for singleton functionality
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.cluster.ClusterSettings.DataCenter
import akka.coordination.lease.LeaseUsageSettings
import com.typesafe.config.Config
import scala.concurrent.duration.FiniteDuration