Akka Cluster routing enables distributing work across cluster members using cluster-aware routers. These routers automatically manage routees based on cluster membership and can be configured to target specific roles or data centers.
Routes messages to actor groups deployed across the cluster.
case class ClusterRouterGroup(
local: Group,
settings: ClusterRouterGroupSettings
) extends Group with ClusterRouterConfigBaseCreates and manages actor pools distributed across cluster members.
case class ClusterRouterPool(
local: Pool,
settings: ClusterRouterPoolSettings
) extends Pool with ClusterRouterConfigBaseclass ClusterRouterGroupSettings(
val totalInstances: Int,
val routeesPaths: immutable.Seq[String],
val allowLocalRoutees: Boolean,
val useRoles: Set[String]
) {
def withTotalInstances(totalInstances: Int): ClusterRouterGroupSettings
def withRouteesPaths(routeesPaths: String*): ClusterRouterGroupSettings
def withAllowLocalRoutees(allowLocalRoutees: Boolean): ClusterRouterGroupSettings
def withUseRoles(useRoles: Set[String]): ClusterRouterGroupSettings
}class ClusterRouterPoolSettings(
val totalInstances: Int,
val maxInstancesPerNode: Int,
val allowLocalRoutees: Boolean,
val useRoles: Set[String]
) {
def withTotalInstances(totalInstances: Int): ClusterRouterPoolSettings
def withMaxInstancesPerNode(maxInstancesPerNode: Int): ClusterRouterPoolSettings
def withAllowLocalRoutees(allowLocalRoutees: Boolean): ClusterRouterPoolSettings
def withUseRoles(useRoles: Set[String]): ClusterRouterPoolSettings
}Routes to existing actors deployed across cluster members:
import akka.routing._
import akka.cluster.routing._
// Create group router settings
val groupSettings = ClusterRouterGroupSettings(
totalInstances = 100,
routeesPaths = List("/user/worker"),
allowLocalRoutees = true,
useRoles = Set("worker")
)
// Create cluster-aware round-robin group router
val groupRouter = system.actorOf(
ClusterRouterGroup(
RoundRobinGroup(Nil), // Local routing logic (paths come from settings)
groupSettings
).props(),
"workerRouter"
)
// Send messages to the router
groupRouter ! WorkMessage("process this")Route to multiple actor types on each node:
val multiPathSettings = ClusterRouterGroupSettings(
totalInstances = 50,
routeesPaths = List("/user/worker", "/user/processor", "/user/analyzer"),
allowLocalRoutees = true,
useRoles = Set("compute")
)
val multiPathRouter = system.actorOf(
ClusterRouterGroup(
RoundRobinGroup(Nil),
multiPathSettings
).props(),
"multiWorkerRouter"
)Route based on message content for stateful processing:
import akka.routing.ConsistentHashingGroup
val hashingSettings = ClusterRouterGroupSettings(
totalInstances = 20,
routeesPaths = List("/user/statefulWorker"),
allowLocalRoutees = false, // Only remote routees
useRoles = Set("stateful")
)
val hashingRouter = system.actorOf(
ClusterRouterGroup(
ConsistentHashingGroup(Nil),
hashingSettings
).props(),
"hashingRouter"
)
// Messages need consistent hashing key
case class HashedMessage(id: String, data: String) extends ConsistentHashingRouter.ConsistentHashable {
override def consistentHashKey: Any = id
}
hashingRouter ! HashedMessage("user123", "process user data")Creates and manages actor instances across cluster:
val poolSettings = ClusterRouterPoolSettings(
totalInstances = 50,
maxInstancesPerNode = 5,
allowLocalRoutees = true,
useRoles = Set("worker")
)
val poolRouter = system.actorOf(
ClusterRouterPool(
RoundRobinPool(nrOfInstances = 0), // Managed by cluster settings
poolSettings
).props(Props[WorkerActor]),
"workerPool"
)
poolRouter ! "process this work"Automatically balances load across routees:
import akka.routing.BalancingPool
val balancingSettings = ClusterRouterPoolSettings(
totalInstances = 100,
maxInstancesPerNode = 10,
allowLocalRoutees = true,
useRoles = Set("processor")
)
val balancingRouter = system.actorOf(
ClusterRouterPool(
BalancingPool(nrOfInstances = 0),
balancingSettings
).props(Props[ProcessorActor]),
"balancingPool"
)Routes to routee with smallest mailbox:
import akka.routing.SmallestMailboxPool
val smallestMailboxSettings = ClusterRouterPoolSettings(
totalInstances = 30,
maxInstancesPerNode = 3,
allowLocalRoutees = true,
useRoles = Set("handler")
)
val smallestMailboxRouter = system.actorOf(
ClusterRouterPool(
SmallestMailboxPool(nrOfInstances = 0),
smallestMailboxSettings
).props(Props[HandlerActor]),
"smallestMailboxPool"
)val backendSettings = ClusterRouterPoolSettings(
totalInstances = 20,
maxInstancesPerNode = 2,
allowLocalRoutees = false,
useRoles = Set("backend") // Only backend nodes
)
val backendRouter = system.actorOf(
ClusterRouterPool(
RoundRobinPool(0),
backendSettings
).props(Props[BackendActor]),
"backendRouter"
)val multiRoleSettings = ClusterRouterPoolSettings(
totalInstances = 40,
maxInstancesPerNode = 4,
allowLocalRoutees = true,
useRoles = Set("worker", "compute") // Either worker OR compute nodes
)
val multiRoleRouter = system.actorOf(
ClusterRouterPool(
RoundRobinPool(0),
multiRoleSettings
).props(Props[ComputeActor]),
"multiRoleRouter"
)// Implicitly routes only to same data center members
val localDcSettings = ClusterRouterPoolSettings(
totalInstances = 15,
maxInstancesPerNode = 3,
allowLocalRoutees = true,
useRoles = Set("local-service")
)
val localDcRouter = system.actorOf(
ClusterRouterPool(
RoundRobinPool(0),
localDcSettings
).props(Props[LocalServiceActor]),
"localDcRouter"
)For cross-DC routing, deploy separate routers per data center or use role-based targeting:
// Use roles to target specific data centers
val crossDcSettings = ClusterRouterPoolSettings(
totalInstances = 50,
maxInstancesPerNode = 5,
allowLocalRoutees = false,
useRoles = Set("dc-west") // Target west data center nodes
)import akka.actor.SupervisorStrategy._
class RouterSupervisor extends Actor {
override val supervisorStrategy = OneForOneStrategy() {
case _: Exception => Restart
}
val workerRouter = context.actorOf(
ClusterRouterPool(
RoundRobinPool(0),
ClusterRouterPoolSettings(20, 2, true, Set("worker"))
).props(Props[WorkerActor]),
"workerRouter"
)
def receive = {
case msg => workerRouter forward msg
}
}Routers automatically adjust to cluster membership changes:
// Router automatically adds/removes routees as cluster members join/leave
// No manual reconfiguration needed
// To get current router state:
router ! GetRoutees
def receive = {
case Routees(routees) =>
println(s"Current routees: ${routees.size}")
routees.foreach(routee => println(s"Routee: ${routee.path}"))
}akka.actor.deployment {
/workerRouter {
router = round-robin-group
routees.paths = ["/user/worker"]
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = on
use-roles = ["worker"]
}
}
/poolRouter {
router = round-robin-pool
nr-of-instances = 100
cluster {
enabled = on
max-nr-of-instances-per-node = 10
allow-local-routees = on
use-roles = ["compute"]
}
}
}// Group router from configuration
val configuredGroupRouter = system.actorOf(
FromConfig.props(),
"workerRouter" // Must match configuration path
)
// Pool router from configuration
val configuredPoolRouter = system.actorOf(
FromConfig.props(Props[WorkerActor]),
"poolRouter"
)import akka.routing._
// Get current routees
router ! GetRoutees
// Adjust pool size (if supported by router type)
router ! Resize(newSize = 50)
// Remove specific routee
router ! RemoveRoutee(actorSelection)
// Add new routee
router ! AddRoutee(actorRef)
def receive = {
case Routees(routees) =>
println(s"Active routees: ${routees.size}")
case RouterRoutingLogic(logic) =>
println(s"Routing logic: ${logic.getClass.getSimpleName}")
}// For high-throughput routing, consider:
// 1. Balancing pool for CPU-bound work
val balancingPool = ClusterRouterPool(
BalancingPool(0),
ClusterRouterPoolSettings(100, 10, true, Set("cpu-intensive"))
)
// 2. Consistent hashing for stateful routing
val consistentHashingPool = ClusterRouterPool(
ConsistentHashingPool(0),
ClusterRouterPoolSettings(50, 5, true, Set("stateful"))
)
// 3. Smallest mailbox for latency-sensitive work
val smallestMailboxPool = ClusterRouterPool(
SmallestMailboxPool(0),
ClusterRouterPoolSettings(30, 3, true, Set("latency-sensitive"))
)class ResilientRouter extends Actor {
import akka.actor.SupervisorStrategy._
override val supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 3,
withinTimeRange = 1.minute
) {
case _: IllegalArgumentException => Stop
case _: Exception => Restart
}
val router = context.actorOf(
ClusterRouterPool(
RoundRobinPool(0),
ClusterRouterPoolSettings(10, 2, true, Set("resilient"))
).props(Props[WorkerActor]),
"resilientRouter"
)
def receive = {
case msg => router forward msg
}
}// Monitor router health
context.watch(router)
def receive = {
case Terminated(routerRef) if routerRef == router =>
log.warning("Router terminated, recreating...")
// Recreate router or escalate failure
context.parent ! RestartRouter
case msg =>
if (router != null) router forward msg
else sender() ! Status.Failure(new IllegalStateException("Router unavailable"))
}