The External Shard Allocation API provides advanced control over shard placement, enabling external systems to make centralized decisions about where shards should be located across the cluster.
object ExternalShardAllocation extends ExtensionId[ExternalShardAllocation] with ExtensionIdProvider {
def get(system: ActorSystem): ExternalShardAllocation
def get(system: ClassicActorSystemProvider): ExternalShardAllocation
}
class ExternalShardAllocation(system: ExtendedActorSystem) extends ExtensionUsage Example:
val externalShardAllocation = ExternalShardAllocation(system)class ExternalShardAllocationStrategy(
system: ActorSystem,
typeName: String
) extends ShardAllocationStrategyThis strategy delegates all allocation decisions to external control via the ExternalShardAllocationClient.
Usage Example:
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
val strategy = new ExternalShardAllocationStrategy(system, "MyEntity")
ClusterSharding(system).start(
typeName = "MyEntity",
entityProps = Props[MyEntityActor](),
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId,
allocationStrategy = strategy,
handOffStopMessage = PoisonPill
)case class ShardLocation(region: ActorRef)Represents the desired location for a shard.
trait ExternalShardAllocationClient {
def updateShardLocation(shard: ShardId, location: Address): Future[Done]
def updateShardLocations(locations: Map[ShardId, Address]): Future[Done]
def shardLocations(): Future[ShardLocations]
}Accessing the Client:
import akka.cluster.sharding.external.scaladsl.ExternalShardAllocationClient
val client: ExternalShardAllocationClient =
ExternalShardAllocation(system).clientFor("MyEntity")trait ExternalShardAllocationClient {
def updateShardLocation(shard: String, location: Address): CompletionStage[Done]
def updateShardLocations(locations: java.util.Map[String, Address]): CompletionStage[Done]
def shardLocations(): CompletionStage[ShardLocations]
}Accessing the Client:
import akka.cluster.sharding.external.javadsl.ExternalShardAllocationClient;
ExternalShardAllocationClient client =
ExternalShardAllocation.get(system).getClient("MyEntity");Move a specific shard to a target node:
def updateShardLocation(shard: ShardId, location: Address): Future[Done]Scala Example:
import akka.cluster.Cluster
val cluster = Cluster(system)
val targetNode = cluster.state.members.head.address
val future = client.updateShardLocation("shard-1", targetNode)
future.onComplete {
case Success(_) => println("Shard location updated successfully")
case Failure(exception) => println(s"Failed to update shard location: $exception")
}Java Example:
Address targetNode = Cluster.get(system).state().getMembers().iterator().next().address();
CompletionStage<Done> future = client.updateShardLocation("shard-1", targetNode);
future.whenComplete((done, throwable) -> {
if (throwable == null) {
System.out.println("Shard location updated successfully");
} else {
System.out.println("Failed to update shard location: " + throwable);
}
});Update locations for multiple shards in a single operation:
def updateShardLocations(locations: Map[ShardId, Address]): Future[Done]Scala Example:
val cluster = Cluster(system)
val nodes = cluster.state.members.map(_.address).toList
val shardLocations = Map(
"shard-1" -> nodes(0),
"shard-2" -> nodes(1),
"shard-3" -> nodes(0)
)
val future = client.updateShardLocations(shardLocations)
future.foreach(_ => println("All shard locations updated"))Java Example:
import java.util.HashMap;
import java.util.Map;
List<Address> nodes = new ArrayList<>(Cluster.get(system).state().getMembers())
.stream().map(Member::address).collect(Collectors.toList());
Map<String, Address> shardLocations = new HashMap<>();
shardLocations.put("shard-1", nodes.get(0));
shardLocations.put("shard-2", nodes.get(1));
shardLocations.put("shard-3", nodes.get(0));
CompletionStage<Done> future = client.updateShardLocations(shardLocations);
future.thenRun(() -> System.out.println("All shard locations updated"));Retrieve current shard location mappings:
def shardLocations(): Future[ShardLocations]Scala Example:
val future = client.shardLocations()
future.foreach { locations =>
println(s"Current shard locations: ${locations.locations}")
}Java Example:
CompletionStage<ShardLocations> future = client.shardLocations();
future.thenAccept(locations -> {
System.out.println("Current shard locations: " + locations.getLocations());
});case class ShardLocations(locations: Map[ShardId, Address]) {
def getLocations(): java.util.Map[String, Address] // Java API
}Contains the mapping of shard IDs to their designated cluster node addresses.
class ClientTimeoutException(message: String, cause: Throwable) extends RuntimeException(message, cause)Thrown when external allocation operations timeout.
Handling Timeouts:
client.updateShardLocation("shard-1", targetNode).recover {
case _: ClientTimeoutException =>
println("Shard location update timed out, will retry")
Done
case other =>
println(s"Unexpected error: $other")
throw other
}Use external allocation with a centralized orchestration service:
class ShardOrchestrator(client: ExternalShardAllocationClient) {
def rebalanceShards(): Future[Done] = {
for {
currentLocations <- client.shardLocations()
newLocations = calculateOptimalDistribution(currentLocations)
_ <- client.updateShardLocations(newLocations)
} yield Done
}
private def calculateOptimalDistribution(
current: ShardLocations
): Map[ShardId, Address] = {
// Custom logic for optimal shard distribution
current.locations
}
}Integrate with monitoring systems for load-based shard placement:
class LoadBasedAllocator(
client: ExternalShardAllocationClient,
metricsCollector: MetricsCollector
) {
def allocateBasedOnLoad(): Future[Done] = {
for {
nodeLoads <- metricsCollector.getNodeLoads()
shardLoads <- metricsCollector.getShardLoads()
optimalPlacements = calculateOptimalPlacements(nodeLoads, shardLoads)
_ <- client.updateShardLocations(optimalPlacements)
} yield Done
}
private def calculateOptimalPlacements(
nodeLoads: Map[Address, Double],
shardLoads: Map[ShardId, Double]
): Map[ShardId, Address] = {
// Place high-load shards on low-load nodes
Map.empty // Placeholder
}
}Use external allocation for planned maintenance:
class MaintenanceManager(client: ExternalShardAllocationClient) {
def evacuateNode(nodeToEvacuate: Address): Future[Done] = {
for {
currentLocations <- client.shardLocations()
shardsToMove = currentLocations.locations.filter(_._2 == nodeToEvacuate)
availableNodes = getAvailableNodes().filterNot(_ == nodeToEvacuate)
newLocations = redistributeShards(shardsToMove.keys, availableNodes)
_ <- client.updateShardLocations(newLocations)
} yield Done
}
private def redistributeShards(
shards: Iterable[ShardId],
availableNodes: List[Address]
): Map[ShardId, Address] = {
// Evenly distribute shards across available nodes
shards.zipWithIndex.map { case (shard, index) =>
shard -> availableNodes(index % availableNodes.size)
}.toMap
}
private def getAvailableNodes(): List[Address] = {
// Get list of healthy cluster nodes
List.empty // Placeholder
}
}Future[Done] to coordinate sequential operationsClientTimeoutException and handle appropriately