0
# Cluster Routing
1
2
Routing configuration for distributing work across cluster nodes with support for pools and groups, role-based routing, and automatic routee management based on cluster membership changes.
3
4
## Capabilities
5
6
### Cluster Router Pool
7
8
Router that creates and manages actor instances (pool) distributed across cluster nodes.
9
10
```scala { .api }
11
/**
12
* Pool-based cluster router that creates actors on cluster nodes.
13
* Pool routers create new actor instances as routees.
14
*/
15
case class ClusterRouterPool(
16
local: Pool,
17
settings: ClusterRouterPoolSettings
18
) extends RouterConfig {
19
/** Create router actor with given props */
20
def createRouter(system: ActorSystem): Router
21
22
/** Router logic for distributing messages */
23
def routingLogic: RoutingLogic
24
25
/** Create routee actor props */
26
def newRoutee(routeeProps: Props, context: ActorContext): Routee
27
}
28
```
29
30
**Usage Examples:**
31
32
```scala
33
import akka.routing._
34
import akka.cluster.routing._
35
36
// Basic cluster pool router
37
val router = system.actorOf(
38
ClusterRouterPool(
39
RoundRobinPool(0), // 0 local instances
40
ClusterRouterPoolSettings(
41
totalInstances = 10,
42
maxInstancesPerNode = 2,
43
allowLocalRoutees = true,
44
useRoles = Set("worker")
45
)
46
).props(Props[WorkerActor]),
47
name = "workerRouter"
48
)
49
50
// Router with specific routing logic
51
val consistentHashRouter = system.actorOf(
52
ClusterRouterPool(
53
ConsistentHashingPool(0),
54
ClusterRouterPoolSettings(20, 3, allowLocalRoutees = false, Set("backend"))
55
).props(Props[HashWorker]),
56
name = "hashRouter"
57
)
58
59
// Send work to the router
60
router ! "some work"
61
router ! WorkMessage("data", "key")
62
```
63
64
### Cluster Router Group
65
66
Router that routes to existing actors (group) identified by actor paths across cluster nodes.
67
68
```scala { .api }
69
/**
70
* Group-based cluster router that routes to existing actors.
71
* Group routers look up actors by path rather than creating them.
72
*/
73
case class ClusterRouterGroup(
74
local: Group,
75
settings: ClusterRouterGroupSettings
76
) extends RouterConfig {
77
/** Create router actor with given props */
78
def createRouter(system: ActorSystem): Router
79
80
/** Router logic for distributing messages */
81
def routingLogic: RoutingLogic
82
83
/** Create actor selection routee */
84
def newRoutee(routeePath: String, context: ActorContext): Routee
85
}
86
```
87
88
**Usage Examples:**
89
90
```scala
91
// Cluster group router
92
val groupRouter = system.actorOf(
93
ClusterRouterGroup(
94
RoundRobinGroup(Nil), // No local routees
95
ClusterRouterGroupSettings(
96
totalInstances = 100,
97
routeesPaths = List("/user/worker"),
98
allowLocalRoutees = true,
99
useRoles = Set("compute")
100
)
101
).props(),
102
name = "workerGroup"
103
)
104
105
// Multiple routee paths
106
val multiPathRouter = system.actorOf(
107
ClusterRouterGroup(
108
RandomGroup(Nil),
109
ClusterRouterGroupSettings(
110
totalInstances = 50,
111
routeesPaths = List("/user/processor", "/user/calculator", "/user/analyzer"),
112
allowLocalRoutees = false,
113
useRoles = Set("processing")
114
)
115
).props(),
116
name = "processingGroup"
117
)
118
119
// Send messages
120
groupRouter ! ProcessRequest("data")
121
multiPathRouter ! CalculationTask(123, 456)
122
```
123
124
### Pool Router Settings
125
126
Configuration settings for cluster router pools.
127
128
```scala { .api }
129
/**
130
* Configuration for cluster router pools.
131
* totalInstances must be > 0
132
*/
133
case class ClusterRouterPoolSettings(
134
totalInstances: Int,
135
maxInstancesPerNode: Int,
136
allowLocalRoutees: Boolean,
137
useRoles: Set[String]
138
) extends ClusterRouterSettingsBase
139
140
object ClusterRouterPoolSettings {
141
/** Create from configuration */
142
def fromConfig(config: Config): ClusterRouterPoolSettings
143
144
/** Java API constructor with roles */
145
def apply(
146
totalInstances: Int,
147
maxInstancesPerNode: Int,
148
allowLocalRoutees: Boolean,
149
useRoles: String*
150
): ClusterRouterPoolSettings
151
}
152
```
153
154
**Usage Examples:**
155
156
```scala
157
// Pool settings with role restrictions
158
val poolSettings = ClusterRouterPoolSettings(
159
totalInstances = 20, // Total actors across cluster
160
maxInstancesPerNode = 3, // Max per node
161
allowLocalRoutees = true, // Allow on current node
162
useRoles = Set("worker", "compute") // Only on nodes with these roles
163
)
164
165
// Pool settings from configuration
166
val configSettings = ClusterRouterPoolSettings.fromConfig(
167
system.settings.config.getConfig("akka.actor.deployment./myRouter")
168
)
169
170
// Java API
171
val javaSettings = ClusterRouterPoolSettings(10, 2, false, "backend", "processing")
172
```
173
174
### Group Router Settings
175
176
Configuration settings for cluster router groups.
177
178
```scala { .api }
179
/**
180
* Configuration for cluster router groups.
181
* totalInstances must be > 0
182
*/
183
case class ClusterRouterGroupSettings(
184
totalInstances: Int,
185
routeesPaths: immutable.Seq[String],
186
allowLocalRoutees: Boolean,
187
useRoles: Set[String]
188
) extends ClusterRouterSettingsBase
189
190
object ClusterRouterGroupSettings {
191
/** Create from configuration */
192
def fromConfig(config: Config): ClusterRouterGroupSettings
193
194
/** Java API constructor with roles */
195
def apply(
196
totalInstances: Int,
197
routeesPaths: immutable.Seq[String],
198
allowLocalRoutees: Boolean,
199
useRoles: String*
200
): ClusterRouterGroupSettings
201
}
202
```
203
204
**Usage Examples:**
205
206
```scala
207
// Group settings with multiple paths
208
val groupSettings = ClusterRouterGroupSettings(
209
totalInstances = 50,
210
routeesPaths = List("/user/service", "/user/handler"),
211
allowLocalRoutees = false, // No local routees
212
useRoles = Set("api") // Only API nodes
213
)
214
215
// Single routee path
216
val singlePathSettings = ClusterRouterGroupSettings(
217
totalInstances = 10,
218
routeesPaths = List("/user/processor"),
219
allowLocalRoutees = true,
220
useRoles = Set.empty // Any role
221
)
222
223
// Java API with multiple roles
224
val javaGroupSettings = ClusterRouterGroupSettings(
225
25,
226
List("/user/worker").asJava,
227
true,
228
"worker", "compute"
229
)
230
```
231
232
### Configuration-Based Routing
233
234
Router configuration through application.conf deployment configuration.
235
236
```scala { .api }
237
// Configuration keys for cluster routers
238
// akka.actor.deployment.<router-path>.router = cluster-pool | cluster-group
239
// akka.actor.deployment.<router-path>.cluster.enabled = on
240
// akka.actor.deployment.<router-path>.cluster.max-nr-of-instances-per-node = N
241
// akka.actor.deployment.<router-path>.cluster.max-total-nr-of-instances = N
242
// akka.actor.deployment.<router-path>.cluster.allow-local-routees = on|off
243
// akka.actor.deployment.<router-path>.cluster.use-roles = ["role1", "role2"]
244
```
245
246
**Configuration Examples:**
247
248
```hocon
249
# application.conf
250
akka.actor.deployment {
251
/poolRouter {
252
router = cluster-pool
253
pool-dispatcher = cluster-pool-dispatcher
254
nr-of-instances = 0 # local instances
255
cluster {
256
enabled = on
257
max-nr-of-instances-per-node = 3
258
max-total-nr-of-instances = 20
259
allow-local-routees = on
260
use-roles = ["worker"]
261
}
262
}
263
264
/groupRouter {
265
router = cluster-group
266
routees.paths = ["/user/service"]
267
cluster {
268
enabled = on
269
max-total-nr-of-instances = 100
270
allow-local-routees = off
271
use-roles = ["api", "service"]
272
}
273
}
274
}
275
```
276
277
**Usage with Configuration:**
278
279
```scala
280
// Create router using deployment configuration
281
val poolRouter = system.actorOf(Props[WorkerActor], name = "poolRouter")
282
val groupRouter = system.actorOf(Props.empty, name = "groupRouter")
283
284
// Routers are automatically configured from deployment settings
285
poolRouter ! WorkItem("task1")
286
groupRouter ! ServiceRequest("request1")
287
```
288
289
### Routing Logic Integration
290
291
Integration with Akka's routing logic for different distribution strategies.
292
293
```scala { .api }
294
// Supported routing logic types
295
import akka.routing._
296
297
// Round robin - distribute evenly
298
ClusterRouterPool(RoundRobinPool(0), settings)
299
ClusterRouterGroup(RoundRobinGroup(Nil), settings)
300
301
// Random - distribute randomly
302
ClusterRouterPool(RandomPool(0), settings)
303
ClusterRouterGroup(RandomGroup(Nil), settings)
304
305
// Consistent hashing - distribute by message hash
306
ClusterRouterPool(ConsistentHashingPool(0), settings)
307
ClusterRouterGroup(ConsistentHashingGroup(Nil), settings)
308
309
// Smallest mailbox - route to least busy
310
ClusterRouterPool(SmallestMailboxPool(0), settings)
311
312
// Broadcast - send to all routees
313
ClusterRouterPool(BroadcastPool(0), settings)
314
ClusterRouterGroup(BroadcastGroup(Nil), settings)
315
```
316
317
**Usage Examples:**
318
319
```scala
320
// Consistent hash routing for stateful workloads
321
case class HashedMessage(key: String, data: String) extends ConsistentHashable {
322
override def consistentHashKey: Any = key
323
}
324
325
val hashRouter = system.actorOf(
326
ClusterRouterPool(
327
ConsistentHashingPool(0),
328
ClusterRouterPoolSettings(15, 3, true, Set("stateful"))
329
).props(Props[StatefulWorker]),
330
"hashRouter"
331
)
332
333
hashRouter ! HashedMessage("user123", "user data")
334
hashRouter ! HashedMessage("user456", "more data")
335
336
// Broadcast for cache invalidation
337
val broadcastRouter = system.actorOf(
338
ClusterRouterGroup(
339
BroadcastGroup(Nil),
340
ClusterRouterGroupSettings(10, List("/user/cache"), true, Set("cache"))
341
).props(),
342
"cacheInvalidator"
343
)
344
345
broadcastRouter ! InvalidateCache("key123")
346
347
// Smallest mailbox for load balancing
348
val balancedRouter = system.actorOf(
349
ClusterRouterPool(
350
SmallestMailboxPool(0),
351
ClusterRouterPoolSettings(8, 2, true, Set("balanced"))
352
).props(Props[LoadBalancedWorker]),
353
"balancedRouter"
354
)
355
```
356
357
### Router Lifecycle Management
358
359
Managing router lifecycle and adaptation to cluster changes.
360
361
```scala { .api }
362
// Routers automatically adapt to cluster membership changes
363
// - Add routees when new nodes join with matching roles
364
// - Remove routees when nodes leave or become unreachable
365
// - Respect totalInstances and maxInstancesPerNode limits
366
// - Monitor cluster events and adjust routee populations
367
```
368
369
**Lifecycle Example:**
370
371
```scala
372
class RouterManager extends Actor with ActorLogging {
373
val cluster = Cluster(context.system)
374
var currentRouter: Option[ActorRef] = None
375
376
override def preStart(): Unit = {
377
// Subscribe to cluster events to monitor router health
378
cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
379
380
// Create initial router
381
createRouter()
382
}
383
384
def createRouter(): Unit = {
385
val router = context.actorOf(
386
ClusterRouterPool(
387
RoundRobinPool(0),
388
ClusterRouterPoolSettings(20, 4, true, Set("worker"))
389
).props(Props[WorkerActor]),
390
"dynamicRouter"
391
)
392
currentRouter = Some(router)
393
log.info("Created cluster router: {}", router.path)
394
}
395
396
def receive = {
397
case MemberUp(member) if member.hasRole("worker") =>
398
log.info("New worker node joined: {}", member.address)
399
// Router automatically adds routees
400
401
case UnreachableMember(member) if member.hasRole("worker") =>
402
log.warning("Worker node unreachable: {}", member.address)
403
// Router automatically removes routees
404
405
case MemberRemoved(member, _) if member.hasRole("worker") =>
406
log.info("Worker node removed: {}", member.address)
407
// Router automatically cleans up routees
408
409
case work: WorkMessage =>
410
currentRouter.foreach(_ ! work)
411
}
412
413
override def postStop(): Unit = {
414
cluster.unsubscribe(self)
415
}
416
}
417
```
418
419
## Types
420
421
```scala { .api }
422
// Router configuration types
423
case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSettings)
424
case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings)
425
426
// Settings types
427
case class ClusterRouterPoolSettings(
428
totalInstances: Int,
429
maxInstancesPerNode: Int,
430
allowLocalRoutees: Boolean,
431
useRoles: Set[String]
432
)
433
434
case class ClusterRouterGroupSettings(
435
totalInstances: Int,
436
routeesPaths: immutable.Seq[String],
437
allowLocalRoutees: Boolean,
438
useRoles: Set[String]
439
)
440
441
// Base settings trait
442
trait ClusterRouterSettingsBase {
443
def totalInstances: Int
444
def allowLocalRoutees: Boolean
445
def useRoles: Set[String]
446
}
447
```