Shard allocation strategies control how shards are distributed across cluster nodes and when rebalancing occurs. Akka Cluster Sharding provides built-in strategies and allows custom implementations.
trait ShardAllocationStrategy extends NoSerializationVerificationNeeded {
def allocateShard(
requester: ActorRef,
shardId: ShardId,
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]
): Future[ActorRef]
def rebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]
): Future[Set[ShardId]]
}Key Methods:
allocateShard: Decides where to place a new shard when first accessedrebalance: Periodically called to determine which shards should be movedFor strategies that need initialization:
trait StartableAllocationStrategy extends ShardAllocationStrategy {
def start(): Unit
}abstract class AbstractShardAllocationStrategy extends ShardAllocationStrategy {
// Java-friendly base class for custom implementations
}Modern implementation with absolute and relative limits:
object ShardAllocationStrategy {
def leastShardAllocationStrategy(
absoluteLimit: Int,
relativeLimit: Double
): ShardAllocationStrategy
}
// Also available via ShardCoordinator
object ShardCoordinator {
def leastShardAllocationStrategy(
absoluteLimit: Int,
relativeLimit: Double
): ShardAllocationStrategy
}Parameters:
absoluteLimit: Maximum number of shards to rebalance per roundrelativeLimit: Fraction of total shards to rebalance per round (< 1.0)Usage Example:
val strategy = ShardCoordinator.leastShardAllocationStrategy(
absoluteLimit = 20, // Max 20 shards per rebalance
relativeLimit = 0.1 // Max 10% of total shards
)
ClusterSharding(system).start(
typeName = "MyEntity",
entityProps = Props[MyEntityActor](),
settings = ClusterShardingSettings(system),
extractEntityId = myExtractEntityId,
extractShardId = myExtractShardId,
allocationStrategy = strategy,
handOffStopMessage = PoisonPill
)Legacy implementation with threshold-based rebalancing:
class LeastShardAllocationStrategy(
rebalanceThreshold: Int,
maxSimultaneousRebalance: Int
) extends ShardAllocationStrategyParameters:
rebalanceThreshold: Minimum difference in shard count to trigger rebalancingmaxSimultaneousRebalance: Maximum concurrent rebalancing operationsUsage Example:
val strategy = new ShardCoordinator.LeastShardAllocationStrategy(
rebalanceThreshold = 10, // Start rebalancing when difference >= 10
maxSimultaneousRebalance = 3 // Max 3 concurrent rebalances
)Alternative strategy using consistent hashing:
class ConsistentHashingShardAllocationStrategy(
virtualNodesFactor: Int,
absoluteLimit: Int,
relativeLimit: Double
) extends ShardAllocationStrategyUsage Example:
val strategy = new ConsistentHashingShardAllocationStrategy(
virtualNodesFactor = 10, // Virtual nodes per physical node
absoluteLimit = 20, // Max shards per rebalance round
relativeLimit = 0.1 // Max 10% of shards per round
)For external control of shard placement:
class ExternalShardAllocationStrategy(
system: ActorSystem,
typeName: String
) extends ShardAllocationStrategyThis strategy delegates allocation decisions to an external system via the ExternalShardAllocation extension.
Usage Example:
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
val strategy = new ExternalShardAllocationStrategy(system, "MyEntity")
ClusterSharding(system).start(
typeName = "MyEntity",
entityProps = Props[MyEntityActor](),
settings = ClusterShardingSettings(system),
extractEntityId = myExtractEntityId,
extractShardId = myExtractShardId,
allocationStrategy = strategy,
handOffStopMessage = PoisonPill
)class CustomAllocationStrategy extends ShardAllocationStrategy {
override def allocateShard(
requester: ActorRef,
shardId: ShardId,
currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]
): Future[ActorRef] = {
// Find region with least shards
val selectedRegion = currentShardAllocations.minBy(_._2.size)._1
Future.successful(selectedRegion)
}
override def rebalance(
currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]
): Future[Set[ShardId]] = {
if (rebalanceInProgress.nonEmpty) {
// Don't rebalance if already in progress
Future.successful(Set.empty)
} else {
// Find shards to rebalance based on custom logic
val (mostLoaded, leastLoaded) = findMostAndLeastLoaded(currentShardAllocations)
val shardsToMove = selectShardsToMove(mostLoaded, leastLoaded)
Future.successful(shardsToMove.toSet)
}
}
private def findMostAndLeastLoaded(
allocations: Map[ActorRef, IndexedSeq[ShardId]]
): (ActorRef, ActorRef) = {
val mostLoaded = allocations.maxBy(_._2.size)._1
val leastLoaded = allocations.minBy(_._2.size)._1
(mostLoaded, leastLoaded)
}
private def selectShardsToMove(
from: ActorRef,
to: ActorRef
): List[ShardId] = {
// Custom logic to select which shards to move
List.empty // Placeholder
}
}public class JavaCustomAllocationStrategy extends AbstractShardAllocationStrategy {
@Override
public CompletionStage<ActorRef> allocateShard(
ActorRef requester,
String shardId,
Map<ActorRef, ImmutableList<String>> currentShardAllocations) {
// Find region with minimum shard count
ActorRef selectedRegion = currentShardAllocations.entrySet().stream()
.min(Map.Entry.comparingByValue((a, b) -> Integer.compare(a.size(), b.size())))
.map(Map.Entry::getKey)
.orElse(requester);
return CompletableFuture.completedFuture(selectedRegion);
}
@Override
public CompletionStage<Set<String>> rebalance(
Map<ActorRef, ImmutableList<String>> currentShardAllocations,
Set<String> rebalanceInProgress) {
if (!rebalanceInProgress.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptySet());
}
// Custom rebalancing logic
return CompletableFuture.completedFuture(Collections.emptySet());
}
}Strategies can be configured in application.conf:
akka.cluster.sharding {
least-shard-allocation-strategy {
# New algorithm parameters
rebalance-absolute-limit = 20
rebalance-relative-limit = 0.1
# Legacy algorithm parameters (deprecated)
rebalance-threshold = 10
max-simultaneous-rebalance = 3
}
}allocateShard should return quickly (< 100ms)rebalance is called periodically (default: 10s intervals)