0
# External Shard Allocation
1
2
The External Shard Allocation API provides advanced control over shard placement, enabling external systems to make centralized decisions about where shards should be located across the cluster.
3
4
## External Allocation Extension
5
6
### Extension Access
7
8
```scala { .api }
9
object ExternalShardAllocation extends ExtensionId[ExternalShardAllocation] with ExtensionIdProvider {
10
def get(system: ActorSystem): ExternalShardAllocation
11
def get(system: ClassicActorSystemProvider): ExternalShardAllocation
12
}
13
14
class ExternalShardAllocation(system: ExtendedActorSystem) extends Extension
15
```
16
17
**Usage Example:**
18
```scala
19
val externalShardAllocation = ExternalShardAllocation(system)
20
```
21
22
## External Allocation Strategy
23
24
### Strategy Configuration
25
26
```scala { .api }
27
class ExternalShardAllocationStrategy(
28
system: ActorSystem,
29
typeName: String
30
) extends ShardAllocationStrategy
31
```
32
33
This strategy delegates all allocation decisions to external control via the ExternalShardAllocationClient.
34
35
**Usage Example:**
36
```scala
37
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
38
39
val strategy = new ExternalShardAllocationStrategy(system, "MyEntity")
40
41
ClusterSharding(system).start(
42
typeName = "MyEntity",
43
entityProps = Props[MyEntityActor](),
44
settings = ClusterShardingSettings(system),
45
extractEntityId = extractEntityId,
46
extractShardId = extractShardId,
47
allocationStrategy = strategy,
48
handOffStopMessage = PoisonPill
49
)
50
```
51
52
### Shard Location Specification
53
54
```scala { .api }
55
case class ShardLocation(region: ActorRef)
56
```
57
58
Represents the desired location for a shard.
59
60
## External Allocation Client APIs
61
62
### Scala API
63
64
```scala { .api }
65
trait ExternalShardAllocationClient {
66
def updateShardLocation(shard: ShardId, location: Address): Future[Done]
67
def updateShardLocations(locations: Map[ShardId, Address]): Future[Done]
68
def shardLocations(): Future[ShardLocations]
69
}
70
```
71
72
**Accessing the Client:**
73
```scala
74
import akka.cluster.sharding.external.scaladsl.ExternalShardAllocationClient
75
76
val client: ExternalShardAllocationClient =
77
ExternalShardAllocation(system).clientFor("MyEntity")
78
```
79
80
### Java API
81
82
```scala { .api }
83
trait ExternalShardAllocationClient {
84
def updateShardLocation(shard: String, location: Address): CompletionStage[Done]
85
def updateShardLocations(locations: java.util.Map[String, Address]): CompletionStage[Done]
86
def shardLocations(): CompletionStage[ShardLocations]
87
}
88
```
89
90
**Accessing the Client:**
91
```java
92
import akka.cluster.sharding.external.javadsl.ExternalShardAllocationClient;
93
94
ExternalShardAllocationClient client =
95
ExternalShardAllocation.get(system).getClient("MyEntity");
96
```
97
98
## Client Operations
99
100
### Update Single Shard Location
101
102
Move a specific shard to a target node:
103
104
```scala { .api }
105
def updateShardLocation(shard: ShardId, location: Address): Future[Done]
106
```
107
108
**Scala Example:**
109
```scala
110
import akka.cluster.Cluster
111
112
val cluster = Cluster(system)
113
val targetNode = cluster.state.members.head.address
114
115
val future = client.updateShardLocation("shard-1", targetNode)
116
future.onComplete {
117
case Success(_) => println("Shard location updated successfully")
118
case Failure(exception) => println(s"Failed to update shard location: $exception")
119
}
120
```
121
122
**Java Example:**
123
```java
124
Address targetNode = Cluster.get(system).state().getMembers().iterator().next().address();
125
126
CompletionStage<Done> future = client.updateShardLocation("shard-1", targetNode);
127
future.whenComplete((done, throwable) -> {
128
if (throwable == null) {
129
System.out.println("Shard location updated successfully");
130
} else {
131
System.out.println("Failed to update shard location: " + throwable);
132
}
133
});
134
```
135
136
### Update Multiple Shard Locations
137
138
Update locations for multiple shards in a single operation:
139
140
```scala { .api }
141
def updateShardLocations(locations: Map[ShardId, Address]): Future[Done]
142
```
143
144
**Scala Example:**
145
```scala
146
val cluster = Cluster(system)
147
val nodes = cluster.state.members.map(_.address).toList
148
149
val shardLocations = Map(
150
"shard-1" -> nodes(0),
151
"shard-2" -> nodes(1),
152
"shard-3" -> nodes(0)
153
)
154
155
val future = client.updateShardLocations(shardLocations)
156
future.foreach(_ => println("All shard locations updated"))
157
```
158
159
**Java Example:**
160
```java
161
import java.util.HashMap;
162
import java.util.Map;
163
164
List<Address> nodes = new ArrayList<>(Cluster.get(system).state().getMembers())
165
.stream().map(Member::address).collect(Collectors.toList());
166
167
Map<String, Address> shardLocations = new HashMap<>();
168
shardLocations.put("shard-1", nodes.get(0));
169
shardLocations.put("shard-2", nodes.get(1));
170
shardLocations.put("shard-3", nodes.get(0));
171
172
CompletionStage<Done> future = client.updateShardLocations(shardLocations);
173
future.thenRun(() -> System.out.println("All shard locations updated"));
174
```
175
176
### Query Current Shard Locations
177
178
Retrieve current shard location mappings:
179
180
```scala { .api }
181
def shardLocations(): Future[ShardLocations]
182
```
183
184
**Scala Example:**
185
```scala
186
val future = client.shardLocations()
187
future.foreach { locations =>
188
println(s"Current shard locations: ${locations.locations}")
189
}
190
```
191
192
**Java Example:**
193
```java
194
CompletionStage<ShardLocations> future = client.shardLocations();
195
future.thenAccept(locations -> {
196
System.out.println("Current shard locations: " + locations.getLocations());
197
});
198
```
199
200
## Shard Locations Container
201
202
### ShardLocations Class
203
204
```scala { .api }
205
case class ShardLocations(locations: Map[ShardId, Address]) {
206
def getLocations(): java.util.Map[String, Address] // Java API
207
}
208
```
209
210
Contains the mapping of shard IDs to their designated cluster node addresses.
211
212
## Error Handling
213
214
### Client Timeout Exception
215
216
```scala { .api }
217
class ClientTimeoutException(message: String, cause: Throwable) extends RuntimeException(message, cause)
218
```
219
220
Thrown when external allocation operations timeout.
221
222
**Handling Timeouts:**
223
```scala
224
client.updateShardLocation("shard-1", targetNode).recover {
225
case _: ClientTimeoutException =>
226
println("Shard location update timed out, will retry")
227
Done
228
case other =>
229
println(s"Unexpected error: $other")
230
throw other
231
}
232
```
233
234
## Integration Patterns
235
236
### Centralized Orchestration
237
238
Use external allocation with a centralized orchestration service:
239
240
```scala
241
class ShardOrchestrator(client: ExternalShardAllocationClient) {
242
243
def rebalanceShards(): Future[Done] = {
244
for {
245
currentLocations <- client.shardLocations()
246
newLocations = calculateOptimalDistribution(currentLocations)
247
_ <- client.updateShardLocations(newLocations)
248
} yield Done
249
}
250
251
private def calculateOptimalDistribution(
252
current: ShardLocations
253
): Map[ShardId, Address] = {
254
// Custom logic for optimal shard distribution
255
current.locations
256
}
257
}
258
```
259
260
### Load-Based Allocation
261
262
Integrate with monitoring systems for load-based shard placement:
263
264
```scala
265
class LoadBasedAllocator(
266
client: ExternalShardAllocationClient,
267
metricsCollector: MetricsCollector
268
) {
269
270
def allocateBasedOnLoad(): Future[Done] = {
271
for {
272
nodeLoads <- metricsCollector.getNodeLoads()
273
shardLoads <- metricsCollector.getShardLoads()
274
optimalPlacements = calculateOptimalPlacements(nodeLoads, shardLoads)
275
_ <- client.updateShardLocations(optimalPlacements)
276
} yield Done
277
}
278
279
private def calculateOptimalPlacements(
280
nodeLoads: Map[Address, Double],
281
shardLoads: Map[ShardId, Double]
282
): Map[ShardId, Address] = {
283
// Place high-load shards on low-load nodes
284
Map.empty // Placeholder
285
}
286
}
287
```
288
289
### Maintenance and Migration
290
291
Use external allocation for planned maintenance:
292
293
```scala
294
class MaintenanceManager(client: ExternalShardAllocationClient) {
295
296
def evacuateNode(nodeToEvacuate: Address): Future[Done] = {
297
for {
298
currentLocations <- client.shardLocations()
299
shardsToMove = currentLocations.locations.filter(_._2 == nodeToEvacuate)
300
availableNodes = getAvailableNodes().filterNot(_ == nodeToEvacuate)
301
newLocations = redistributeShards(shardsToMove.keys, availableNodes)
302
_ <- client.updateShardLocations(newLocations)
303
} yield Done
304
}
305
306
private def redistributeShards(
307
shards: Iterable[ShardId],
308
availableNodes: List[Address]
309
): Map[ShardId, Address] = {
310
// Evenly distribute shards across available nodes
311
shards.zipWithIndex.map { case (shard, index) =>
312
shard -> availableNodes(index % availableNodes.size)
313
}.toMap
314
}
315
316
private def getAvailableNodes(): List[Address] = {
317
// Get list of healthy cluster nodes
318
List.empty // Placeholder
319
}
320
}
321
```
322
323
## Best Practices
324
325
### Timing Considerations
326
- Allow time for shard movement to complete before making new changes
327
- Use the returned `Future[Done]` to coordinate sequential operations
328
- Consider cluster rebalancing intervals when making updates
329
330
### Error Recovery
331
- Implement retry logic for transient failures
332
- Monitor for `ClientTimeoutException` and handle appropriately
333
- Validate node addresses before attempting updates
334
335
### Performance
336
- Update multiple shards in batch operations when possible
337
- Cache shard location information to reduce query frequency
338
- Avoid excessive updates that could cause cluster instability
339
340
### Monitoring
341
- Track successful vs failed allocation updates
342
- Monitor shard movement completion times
343
- Alert on repeated allocation failures