Akka Cluster Sharding provides location transparency and automatic distribution of stateful actors across cluster nodes
Shard allocation strategies control how shards are distributed across cluster nodes and when rebalancing occurs. Akka Cluster Sharding provides built-in strategies and allows custom implementations.
trait ShardAllocationStrategy extends NoSerializationVerificationNeeded {
def allocateShard(
requester: ActorRef,
shardId: ShardId,
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]]
): Future[ActorRef]
def rebalance(
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]
): Future[Set[ShardId]]
}Key Methods:
allocateShard: Decides where to place a new shard when first accessedrebalance: Periodically called to determine which shards should be movedFor strategies that need initialization:
trait StartableAllocationStrategy extends ShardAllocationStrategy {
def start(): Unit
}abstract class AbstractShardAllocationStrategy extends ShardAllocationStrategy {
// Java-friendly base class for custom implementations
}Modern implementation with absolute and relative limits:
object ShardAllocationStrategy {
def leastShardAllocationStrategy(
absoluteLimit: Int,
relativeLimit: Double
): ShardAllocationStrategy
}
// Also available via ShardCoordinator
object ShardCoordinator {
def leastShardAllocationStrategy(
absoluteLimit: Int,
relativeLimit: Double
): ShardAllocationStrategy
}Parameters:
absoluteLimit: Maximum number of shards to rebalance per roundrelativeLimit: Fraction of total shards to rebalance per round (< 1.0)Usage Example:
val strategy = ShardCoordinator.leastShardAllocationStrategy(
absoluteLimit = 20, // Max 20 shards per rebalance
relativeLimit = 0.1 // Max 10% of total shards
)
ClusterSharding(system).start(
typeName = "MyEntity",
entityProps = Props[MyEntityActor](),
settings = ClusterShardingSettings(system),
extractEntityId = myExtractEntityId,
extractShardId = myExtractShardId,
allocationStrategy = strategy,
handOffStopMessage = PoisonPill
)Legacy implementation with threshold-based rebalancing:
class LeastShardAllocationStrategy(
rebalanceThreshold: Int,
maxSimultaneousRebalance: Int
) extends ShardAllocationStrategyParameters:
rebalanceThreshold: Minimum difference in shard count to trigger rebalancingmaxSimultaneousRebalance: Maximum concurrent rebalancing operationsUsage Example:
val strategy = new ShardCoordinator.LeastShardAllocationStrategy(
rebalanceThreshold = 10, // Start rebalancing when difference >= 10
maxSimultaneousRebalance = 3 // Max 3 concurrent rebalances
)Alternative strategy using consistent hashing:
class ConsistentHashingShardAllocationStrategy(
virtualNodesFactor: Int,
absoluteLimit: Int,
relativeLimit: Double
) extends ShardAllocationStrategyUsage Example:
val strategy = new ConsistentHashingShardAllocationStrategy(
virtualNodesFactor = 10, // Virtual nodes per physical node
absoluteLimit = 20, // Max shards per rebalance round
relativeLimit = 0.1 // Max 10% of shards per round
)For external control of shard placement:
class ExternalShardAllocationStrategy(
system: ActorSystem,
typeName: String
) extends ShardAllocationStrategyThis strategy delegates allocation decisions to an external system via the ExternalShardAllocation extension.
Usage Example:
import akka.cluster.sharding.external.ExternalShardAllocationStrategy
val strategy = new ExternalShardAllocationStrategy(system, "MyEntity")
ClusterSharding(system).start(
typeName = "MyEntity",
entityProps = Props[MyEntityActor](),
settings = ClusterShardingSettings(system),
extractEntityId = myExtractEntityId,
extractShardId = myExtractShardId,
allocationStrategy = strategy,
handOffStopMessage = PoisonPill
)class CustomAllocationStrategy extends ShardAllocationStrategy {
override def allocateShard(
requester: ActorRef,
shardId: ShardId,
currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]
): Future[ActorRef] = {
// Find region with least shards
val selectedRegion = currentShardAllocations.minBy(_._2.size)._1
Future.successful(selectedRegion)
}
override def rebalance(
currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]
): Future[Set[ShardId]] = {
if (rebalanceInProgress.nonEmpty) {
// Don't rebalance if already in progress
Future.successful(Set.empty)
} else {
// Find shards to rebalance based on custom logic
val (mostLoaded, leastLoaded) = findMostAndLeastLoaded(currentShardAllocations)
val shardsToMove = selectShardsToMove(mostLoaded, leastLoaded)
Future.successful(shardsToMove.toSet)
}
}
private def findMostAndLeastLoaded(
allocations: Map[ActorRef, IndexedSeq[ShardId]]
): (ActorRef, ActorRef) = {
val mostLoaded = allocations.maxBy(_._2.size)._1
val leastLoaded = allocations.minBy(_._2.size)._1
(mostLoaded, leastLoaded)
}
private def selectShardsToMove(
from: ActorRef,
to: ActorRef
): List[ShardId] = {
// Custom logic to select which shards to move
List.empty // Placeholder
}
}public class JavaCustomAllocationStrategy extends AbstractShardAllocationStrategy {
@Override
public CompletionStage<ActorRef> allocateShard(
ActorRef requester,
String shardId,
Map<ActorRef, ImmutableList<String>> currentShardAllocations) {
// Find region with minimum shard count
ActorRef selectedRegion = currentShardAllocations.entrySet().stream()
.min(Map.Entry.comparingByValue((a, b) -> Integer.compare(a.size(), b.size())))
.map(Map.Entry::getKey)
.orElse(requester);
return CompletableFuture.completedFuture(selectedRegion);
}
@Override
public CompletionStage<Set<String>> rebalance(
Map<ActorRef, ImmutableList<String>> currentShardAllocations,
Set<String> rebalanceInProgress) {
if (!rebalanceInProgress.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptySet());
}
// Custom rebalancing logic
return CompletableFuture.completedFuture(Collections.emptySet());
}
}Strategies can be configured in application.conf:
akka.cluster.sharding {
least-shard-allocation-strategy {
# New algorithm parameters
rebalance-absolute-limit = 20
rebalance-relative-limit = 0.1
# Legacy algorithm parameters (deprecated)
rebalance-threshold = 10
max-simultaneous-rebalance = 3
}
}allocateShard should return quickly (< 100ms)rebalance is called periodically (default: 10s intervals)Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-cluster-sharding-2-13