Akka Cluster Sharding provides location transparency and automatic distribution of stateful actors across cluster nodes
The ShardRegion is responsible for routing messages to entities, managing entity lifecycle, and providing monitoring capabilities. It acts as a local proxy that knows how to forward messages to the correct entities across the cluster.
object ShardRegion {
type EntityId = String
type ShardId = String
type Msg = Any
type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]
type ExtractShardId = Msg => ShardId
}trait MessageExtractor {
def entityId(message: Any): String
def entityMessage(message: Any): Any
def shardId(message: Any): String
}Extract entity ID, shard ID, and entity message from incoming messages.
Usage Example:
public class CounterMessageExtractor implements MessageExtractor {
private final int maxShards = 100;
@Override
public String entityId(Object message) {
if (message instanceof CounterMessage) {
return ((CounterMessage) message).getEntityId();
}
return null;
}
@Override
public Object entityMessage(Object message) {
if (message instanceof CounterMessage) {
return ((CounterMessage) message).getCommand();
}
return message;
}
@Override
public String shardId(Object message) {
if (message instanceof CounterMessage) {
String entityId = ((CounterMessage) message).getEntityId();
return String.valueOf(Math.abs(entityId.hashCode()) % maxShards);
}
return null;
}
}Convenience implementation using hash code for shard distribution:
abstract class HashCodeMessageExtractor(maxNumberOfShards: Int) extends MessageExtractor {
def entityMessage(message: Any): Any = message // Default implementation
def shardId(message: Any): String // Implemented using hash code
}
object HashCodeMessageExtractor {
def shardId(id: String, maxNumberOfShards: Int): String
}Usage Example:
public class CounterHashExtractor extends HashCodeMessageExtractor {
public CounterHashExtractor() {
super(100); // 100 shards maximum
}
@Override
public String entityId(Object message) {
if (message instanceof CounterMessage) {
return ((CounterMessage) message).getEntityId();
}
return null;
}
}Request graceful entity passivation to reduce memory consumption:
case class Passivate(stopMessage: Any) extends ShardRegionCommandUsage in Entity Actor:
class CounterActor extends Actor {
context.setReceiveTimeout(30.seconds)
def receive = {
case ReceiveTimeout =>
// Request passivation when idle
context.parent ! ShardRegion.Passivate(PoisonPill)
case PoisonPill =>
// Graceful shutdown
context.stop(self)
case msg =>
// Handle business logic
handleMessage(msg)
}
}Shutdown all shards in the region:
case object GracefulShutdown extends ShardRegionCommand
def gracefulShutdownInstance = GracefulShutdown // Java APIUsage Example:
val region = ClusterSharding(system).shardRegion("Counter")
region ! ShardRegion.GracefulShutdown
// Watch the region to know when shutdown is complete
context.watch(region)Query for all active shard regions in the cluster:
case object GetCurrentRegions extends ShardRegionQuery
def getCurrentRegionsInstance: GetCurrentRegions.type = GetCurrentRegions // Java API
case class CurrentRegions(regions: Set[Address]) {
def getRegions(): java.util.Set[Address] // Java API
}Usage Example:
import akka.pattern.ask
import scala.concurrent.duration._
implicit val timeout = Timeout(5.seconds)
val future = (region ? ShardRegion.GetCurrentRegions).mapTo[ShardRegion.CurrentRegions]
future.foreach { response =>
println(s"Active regions: ${response.regions}")
}Query cluster-wide sharding statistics:
case class GetClusterShardingStats(timeout: FiniteDuration) extends ShardRegionQuery
case class ClusterShardingStats(regions: Map[Address, ShardRegionStats]) {
def getRegions(): java.util.Map[Address, ShardRegionStats] // Java API
}Query statistics for a specific region:
case object GetShardRegionStats extends ShardRegionQuery
def getRegionStatsInstance = GetShardRegionStats // Java API
class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId]) {
def this(stats: Map[ShardId, Int])
def getStats(): java.util.Map[ShardId, Int] // Java API
def getFailed(): java.util.Set[ShardId] // Java API
}
object ShardRegionStats extends AbstractFunction1[Map[ShardId, Int], ShardRegionStats] {
def apply(stats: Map[ShardId, Int]): ShardRegionStats
def apply(stats: Map[ShardId, Int], failed: Set[ShardId]): ShardRegionStats
}Usage Example:
val future = (region ? ShardRegion.GetShardRegionStats).mapTo[ShardRegion.ShardRegionStats]
future.foreach { stats =>
println(s"Shard statistics: ${stats.stats}")
if (stats.failed.nonEmpty) {
println(s"Failed shards: ${stats.failed}")
}
}Query detailed state of a region including entities:
case object GetShardRegionState extends ShardRegionQuery
def getShardRegionStateInstance = GetShardRegionState // Java API
class CurrentShardRegionState(val shards: Set[ShardState], val failed: Set[ShardId]) {
def this(shards: Set[ShardState])
def getShards(): java.util.Set[ShardState] // Java API
def getFailed(): java.util.Set[ShardId] // Java API
}
object CurrentShardRegionState extends AbstractFunction1[Set[ShardState], CurrentShardRegionState] {
def apply(shards: Set[ShardState]): CurrentShardRegionState
def apply(shards: Set[ShardState], failed: Set[ShardId]): CurrentShardRegionState
}
case class ShardState(shardId: ShardId, entityIds: Set[EntityId]) {
def getEntityIds(): java.util.Set[EntityId] // Java API
}Usage Example:
val future = (region ? ShardRegion.GetShardRegionState).mapTo[ShardRegion.CurrentShardRegionState]
future.foreach { state =>
state.shards.foreach { shardState =>
println(s"Shard ${shardState.shardId} has ${shardState.entityIds.size} entities")
}
}Messages for controlling individual entity lifecycle:
case class StartEntity(entityId: EntityId) extends ClusterShardingSerializable
case class StartEntityAck(entityId: EntityId, shardId: ShardId) extends ClusterShardingSerializableStartEntity: Explicitly starts an entity (used with remember-entities)StartEntityAck: Acknowledgment that entity was startedNotification when a shard is ready to accept messages:
case class ShardInitialized(shardId: ShardId)This message is sent internally and typically doesn't need to be handled by user code.
ExtractEntityId function extracts entity ID and messageExtractShardId function determines target shardIf ExtractEntityId doesn't match a message:
Unhandled on the event streamStatistics and state queries may timeout:
failed setIf a shard fails during operation:
Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-cluster-sharding-2-13