0
# Cluster Routing
1
2
Akka Cluster routing enables distributing work across cluster members using cluster-aware routers. These routers automatically manage routees based on cluster membership and can be configured to target specific roles or data centers.
3
4
## Router Types
5
6
### ClusterRouterGroup
7
8
Routes messages to actor groups deployed across the cluster.
9
10
```scala { .api }
11
case class ClusterRouterGroup(
12
local: Group,
13
settings: ClusterRouterGroupSettings
14
) extends Group with ClusterRouterConfigBase
15
```
16
17
### ClusterRouterPool
18
19
Creates and manages actor pools distributed across cluster members.
20
21
```scala { .api }
22
case class ClusterRouterPool(
23
local: Pool,
24
settings: ClusterRouterPoolSettings
25
) extends Pool with ClusterRouterConfigBase
26
```
27
28
## Router Settings
29
30
### ClusterRouterGroupSettings
31
32
```scala { .api }
33
class ClusterRouterGroupSettings(
34
val totalInstances: Int,
35
val routeesPaths: immutable.Seq[String],
36
val allowLocalRoutees: Boolean,
37
val useRoles: Set[String]
38
) {
39
def withTotalInstances(totalInstances: Int): ClusterRouterGroupSettings
40
def withRouteesPaths(routeesPaths: String*): ClusterRouterGroupSettings
41
def withAllowLocalRoutees(allowLocalRoutees: Boolean): ClusterRouterGroupSettings
42
def withUseRoles(useRoles: Set[String]): ClusterRouterGroupSettings
43
}
44
```
45
46
### ClusterRouterPoolSettings
47
48
```scala { .api }
49
class ClusterRouterPoolSettings(
50
val totalInstances: Int,
51
val maxInstancesPerNode: Int,
52
val allowLocalRoutees: Boolean,
53
val useRoles: Set[String]
54
) {
55
def withTotalInstances(totalInstances: Int): ClusterRouterPoolSettings
56
def withMaxInstancesPerNode(maxInstancesPerNode: Int): ClusterRouterPoolSettings
57
def withAllowLocalRoutees(allowLocalRoutees: Boolean): ClusterRouterPoolSettings
58
def withUseRoles(useRoles: Set[String]): ClusterRouterPoolSettings
59
}
60
```
61
62
## Group Router Usage
63
64
### Basic Group Router
65
66
Routes to existing actors deployed across cluster members:
67
68
```scala
69
import akka.routing._
70
import akka.cluster.routing._
71
72
// Create group router settings
73
val groupSettings = ClusterRouterGroupSettings(
74
totalInstances = 100,
75
routeesPaths = List("/user/worker"),
76
allowLocalRoutees = true,
77
useRoles = Set("worker")
78
)
79
80
// Create cluster-aware round-robin group router
81
val groupRouter = system.actorOf(
82
ClusterRouterGroup(
83
RoundRobinGroup(Nil), // Local routing logic (paths come from settings)
84
groupSettings
85
).props(),
86
"workerRouter"
87
)
88
89
// Send messages to the router
90
groupRouter ! WorkMessage("process this")
91
```
92
93
### Group Router with Multiple Paths
94
95
Route to multiple actor types on each node:
96
97
```scala
98
val multiPathSettings = ClusterRouterGroupSettings(
99
totalInstances = 50,
100
routeesPaths = List("/user/worker", "/user/processor", "/user/analyzer"),
101
allowLocalRoutees = true,
102
useRoles = Set("compute")
103
)
104
105
val multiPathRouter = system.actorOf(
106
ClusterRouterGroup(
107
RoundRobinGroup(Nil),
108
multiPathSettings
109
).props(),
110
"multiWorkerRouter"
111
)
112
```
113
114
### Consistent Hashing Group Router
115
116
Route based on message content for stateful processing:
117
118
```scala
119
import akka.routing.ConsistentHashingGroup
120
121
val hashingSettings = ClusterRouterGroupSettings(
122
totalInstances = 20,
123
routeesPaths = List("/user/statefulWorker"),
124
allowLocalRoutees = false, // Only remote routees
125
useRoles = Set("stateful")
126
)
127
128
val hashingRouter = system.actorOf(
129
ClusterRouterGroup(
130
ConsistentHashingGroup(Nil),
131
hashingSettings
132
).props(),
133
"hashingRouter"
134
)
135
136
// Messages need consistent hashing key
137
case class HashedMessage(id: String, data: String) extends ConsistentHashingRouter.ConsistentHashable {
138
override def consistentHashKey: Any = id
139
}
140
141
hashingRouter ! HashedMessage("user123", "process user data")
142
```
143
144
## Pool Router Usage
145
146
### Basic Pool Router
147
148
Creates and manages actor instances across cluster:
149
150
```scala
151
val poolSettings = ClusterRouterPoolSettings(
152
totalInstances = 50,
153
maxInstancesPerNode = 5,
154
allowLocalRoutees = true,
155
useRoles = Set("worker")
156
)
157
158
val poolRouter = system.actorOf(
159
ClusterRouterPool(
160
RoundRobinPool(nrOfInstances = 0), // Managed by cluster settings
161
poolSettings
162
).props(Props[WorkerActor]),
163
"workerPool"
164
)
165
166
poolRouter ! "process this work"
167
```
168
169
### Balancing Pool Router
170
171
Automatically balances load across routees:
172
173
```scala
174
import akka.routing.BalancingPool
175
176
val balancingSettings = ClusterRouterPoolSettings(
177
totalInstances = 100,
178
maxInstancesPerNode = 10,
179
allowLocalRoutees = true,
180
useRoles = Set("processor")
181
)
182
183
val balancingRouter = system.actorOf(
184
ClusterRouterPool(
185
BalancingPool(nrOfInstances = 0),
186
balancingSettings
187
).props(Props[ProcessorActor]),
188
"balancingPool"
189
)
190
```
191
192
### Smallest Mailbox Pool Router
193
194
Routes to routee with smallest mailbox:
195
196
```scala
197
import akka.routing.SmallestMailboxPool
198
199
val smallestMailboxSettings = ClusterRouterPoolSettings(
200
totalInstances = 30,
201
maxInstancesPerNode = 3,
202
allowLocalRoutees = true,
203
useRoles = Set("handler")
204
)
205
206
val smallestMailboxRouter = system.actorOf(
207
ClusterRouterPool(
208
SmallestMailboxPool(nrOfInstances = 0),
209
smallestMailboxSettings
210
).props(Props[HandlerActor]),
211
"smallestMailboxPool"
212
)
213
```
214
215
## Role-Based Routing
216
217
### Single Role Targeting
218
219
```scala
220
val backendSettings = ClusterRouterPoolSettings(
221
totalInstances = 20,
222
maxInstancesPerNode = 2,
223
allowLocalRoutees = false,
224
useRoles = Set("backend") // Only backend nodes
225
)
226
227
val backendRouter = system.actorOf(
228
ClusterRouterPool(
229
RoundRobinPool(0),
230
backendSettings
231
).props(Props[BackendActor]),
232
"backendRouter"
233
)
234
```
235
236
### Multiple Role Targeting
237
238
```scala
239
val multiRoleSettings = ClusterRouterPoolSettings(
240
totalInstances = 40,
241
maxInstancesPerNode = 4,
242
allowLocalRoutees = true,
243
useRoles = Set("worker", "compute") // Either worker OR compute nodes
244
)
245
246
val multiRoleRouter = system.actorOf(
247
ClusterRouterPool(
248
RoundRobinPool(0),
249
multiRoleSettings
250
).props(Props[ComputeActor]),
251
"multiRoleRouter"
252
)
253
```
254
255
## Data Center Aware Routing
256
257
### Local Data Center Only
258
259
```scala
260
// Implicitly routes only to same data center members
261
val localDcSettings = ClusterRouterPoolSettings(
262
totalInstances = 15,
263
maxInstancesPerNode = 3,
264
allowLocalRoutees = true,
265
useRoles = Set("local-service")
266
)
267
268
val localDcRouter = system.actorOf(
269
ClusterRouterPool(
270
RoundRobinPool(0),
271
localDcSettings
272
).props(Props[LocalServiceActor]),
273
"localDcRouter"
274
)
275
```
276
277
### Cross Data Center Routing
278
279
For cross-DC routing, deploy separate routers per data center or use role-based targeting:
280
281
```scala
282
// Use roles to target specific data centers
283
val crossDcSettings = ClusterRouterPoolSettings(
284
totalInstances = 50,
285
maxInstancesPerNode = 5,
286
allowLocalRoutees = false,
287
useRoles = Set("dc-west") // Target west data center nodes
288
)
289
```
290
291
## Router Lifecycle Management
292
293
### Router Creation with Supervision
294
295
```scala
296
import akka.actor.SupervisorStrategy._
297
298
class RouterSupervisor extends Actor {
299
override val supervisorStrategy = OneForOneStrategy() {
300
case _: Exception => Restart
301
}
302
303
val workerRouter = context.actorOf(
304
ClusterRouterPool(
305
RoundRobinPool(0),
306
ClusterRouterPoolSettings(20, 2, true, Set("worker"))
307
).props(Props[WorkerActor]),
308
"workerRouter"
309
)
310
311
def receive = {
312
case msg => workerRouter forward msg
313
}
314
}
315
```
316
317
### Dynamic Router Reconfiguration
318
319
Routers automatically adjust to cluster membership changes:
320
321
```scala
322
// Router automatically adds/removes routees as cluster members join/leave
323
// No manual reconfiguration needed
324
325
// To get current router state:
326
router ! GetRoutees
327
328
def receive = {
329
case Routees(routees) =>
330
println(s"Current routees: ${routees.size}")
331
routees.foreach(routee => println(s"Routee: ${routee.path}"))
332
}
333
```
334
335
## Configuration-Based Routing
336
337
### Configuration Setup
338
339
```hocon
340
akka.actor.deployment {
341
/workerRouter {
342
router = round-robin-group
343
routees.paths = ["/user/worker"]
344
cluster {
345
enabled = on
346
max-nr-of-instances-per-node = 3
347
allow-local-routees = on
348
use-roles = ["worker"]
349
}
350
}
351
352
/poolRouter {
353
router = round-robin-pool
354
nr-of-instances = 100
355
cluster {
356
enabled = on
357
max-nr-of-instances-per-node = 10
358
allow-local-routees = on
359
use-roles = ["compute"]
360
}
361
}
362
}
363
```
364
365
### Creating Configured Routers
366
367
```scala
368
// Group router from configuration
369
val configuredGroupRouter = system.actorOf(
370
FromConfig.props(),
371
"workerRouter" // Must match configuration path
372
)
373
374
// Pool router from configuration
375
val configuredPoolRouter = system.actorOf(
376
FromConfig.props(Props[WorkerActor]),
377
"poolRouter"
378
)
379
```
380
381
## Router Monitoring and Metrics
382
383
### Router State Inspection
384
385
```scala
386
import akka.routing._
387
388
// Get current routees
389
router ! GetRoutees
390
391
// Adjust pool size (if supported by router type)
392
router ! Resize(newSize = 50)
393
394
// Remove specific routee
395
router ! RemoveRoutee(actorSelection)
396
397
// Add new routee
398
router ! AddRoutee(actorRef)
399
400
def receive = {
401
case Routees(routees) =>
402
println(s"Active routees: ${routees.size}")
403
404
case RouterRoutingLogic(logic) =>
405
println(s"Routing logic: ${logic.getClass.getSimpleName}")
406
}
407
```
408
409
### Performance Considerations
410
411
```scala
412
// For high-throughput routing, consider:
413
414
// 1. Balancing pool for CPU-bound work
415
val balancingPool = ClusterRouterPool(
416
BalancingPool(0),
417
ClusterRouterPoolSettings(100, 10, true, Set("cpu-intensive"))
418
)
419
420
// 2. Consistent hashing for stateful routing
421
val consistentHashingPool = ClusterRouterPool(
422
ConsistentHashingPool(0),
423
ClusterRouterPoolSettings(50, 5, true, Set("stateful"))
424
)
425
426
// 3. Smallest mailbox for latency-sensitive work
427
val smallestMailboxPool = ClusterRouterPool(
428
SmallestMailboxPool(0),
429
ClusterRouterPoolSettings(30, 3, true, Set("latency-sensitive"))
430
)
431
```
432
433
## Error Handling and Resilience
434
435
### Router Supervision Strategy
436
437
```scala
438
class ResilientRouter extends Actor {
439
import akka.actor.SupervisorStrategy._
440
441
override val supervisorStrategy = OneForOneStrategy(
442
maxNrOfRetries = 3,
443
withinTimeRange = 1.minute
444
) {
445
case _: IllegalArgumentException => Stop
446
case _: Exception => Restart
447
}
448
449
val router = context.actorOf(
450
ClusterRouterPool(
451
RoundRobinPool(0),
452
ClusterRouterPoolSettings(10, 2, true, Set("resilient"))
453
).props(Props[WorkerActor]),
454
"resilientRouter"
455
)
456
457
def receive = {
458
case msg => router forward msg
459
}
460
}
461
```
462
463
### Handling Router Failures
464
465
```scala
466
// Monitor router health
467
context.watch(router)
468
469
def receive = {
470
case Terminated(routerRef) if routerRef == router =>
471
log.warning("Router terminated, recreating...")
472
// Recreate router or escalate failure
473
context.parent ! RestartRouter
474
475
case msg =>
476
if (router != null) router forward msg
477
else sender() ! Status.Failure(new IllegalStateException("Router unavailable"))
478
}
479
```