The ShardRegion is responsible for routing messages to entities, managing entity lifecycle, and providing monitoring capabilities. It acts as a local proxy that knows how to forward messages to the correct entities across the cluster.
object ShardRegion {
type EntityId = String
type ShardId = String
type Msg = Any
type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]
type ExtractShardId = Msg => ShardId
}trait MessageExtractor {
def entityId(message: Any): String
def entityMessage(message: Any): Any
def shardId(message: Any): String
}Extract entity ID, shard ID, and entity message from incoming messages.
Usage Example:
public class CounterMessageExtractor implements MessageExtractor {
private final int maxShards = 100;
@Override
public String entityId(Object message) {
if (message instanceof CounterMessage) {
return ((CounterMessage) message).getEntityId();
}
return null;
}
@Override
public Object entityMessage(Object message) {
if (message instanceof CounterMessage) {
return ((CounterMessage) message).getCommand();
}
return message;
}
@Override
public String shardId(Object message) {
if (message instanceof CounterMessage) {
String entityId = ((CounterMessage) message).getEntityId();
return String.valueOf(Math.abs(entityId.hashCode()) % maxShards);
}
return null;
}
}Convenience implementation using hash code for shard distribution:
abstract class HashCodeMessageExtractor(maxNumberOfShards: Int) extends MessageExtractor {
def entityMessage(message: Any): Any = message // Default implementation
def shardId(message: Any): String // Implemented using hash code
}
object HashCodeMessageExtractor {
def shardId(id: String, maxNumberOfShards: Int): String
}Usage Example:
public class CounterHashExtractor extends HashCodeMessageExtractor {
public CounterHashExtractor() {
super(100); // 100 shards maximum
}
@Override
public String entityId(Object message) {
if (message instanceof CounterMessage) {
return ((CounterMessage) message).getEntityId();
}
return null;
}
}Request graceful entity passivation to reduce memory consumption:
case class Passivate(stopMessage: Any) extends ShardRegionCommandUsage in Entity Actor:
class CounterActor extends Actor {
context.setReceiveTimeout(30.seconds)
def receive = {
case ReceiveTimeout =>
// Request passivation when idle
context.parent ! ShardRegion.Passivate(PoisonPill)
case PoisonPill =>
// Graceful shutdown
context.stop(self)
case msg =>
// Handle business logic
handleMessage(msg)
}
}Shutdown all shards in the region:
case object GracefulShutdown extends ShardRegionCommand
def gracefulShutdownInstance = GracefulShutdown // Java APIUsage Example:
val region = ClusterSharding(system).shardRegion("Counter")
region ! ShardRegion.GracefulShutdown
// Watch the region to know when shutdown is complete
context.watch(region)Query for all active shard regions in the cluster:
case object GetCurrentRegions extends ShardRegionQuery
def getCurrentRegionsInstance: GetCurrentRegions.type = GetCurrentRegions // Java API
case class CurrentRegions(regions: Set[Address]) {
def getRegions(): java.util.Set[Address] // Java API
}Usage Example:
import akka.pattern.ask
import scala.concurrent.duration._
implicit val timeout = Timeout(5.seconds)
val future = (region ? ShardRegion.GetCurrentRegions).mapTo[ShardRegion.CurrentRegions]
future.foreach { response =>
println(s"Active regions: ${response.regions}")
}Query cluster-wide sharding statistics:
case class GetClusterShardingStats(timeout: FiniteDuration) extends ShardRegionQuery
case class ClusterShardingStats(regions: Map[Address, ShardRegionStats]) {
def getRegions(): java.util.Map[Address, ShardRegionStats] // Java API
}Query statistics for a specific region:
case object GetShardRegionStats extends ShardRegionQuery
def getRegionStatsInstance = GetShardRegionStats // Java API
class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId]) {
def this(stats: Map[ShardId, Int])
def getStats(): java.util.Map[ShardId, Int] // Java API
def getFailed(): java.util.Set[ShardId] // Java API
}
object ShardRegionStats extends AbstractFunction1[Map[ShardId, Int], ShardRegionStats] {
def apply(stats: Map[ShardId, Int]): ShardRegionStats
def apply(stats: Map[ShardId, Int], failed: Set[ShardId]): ShardRegionStats
}Usage Example:
val future = (region ? ShardRegion.GetShardRegionStats).mapTo[ShardRegion.ShardRegionStats]
future.foreach { stats =>
println(s"Shard statistics: ${stats.stats}")
if (stats.failed.nonEmpty) {
println(s"Failed shards: ${stats.failed}")
}
}Query detailed state of a region including entities:
case object GetShardRegionState extends ShardRegionQuery
def getShardRegionStateInstance = GetShardRegionState // Java API
class CurrentShardRegionState(val shards: Set[ShardState], val failed: Set[ShardId]) {
def this(shards: Set[ShardState])
def getShards(): java.util.Set[ShardState] // Java API
def getFailed(): java.util.Set[ShardId] // Java API
}
object CurrentShardRegionState extends AbstractFunction1[Set[ShardState], CurrentShardRegionState] {
def apply(shards: Set[ShardState]): CurrentShardRegionState
def apply(shards: Set[ShardState], failed: Set[ShardId]): CurrentShardRegionState
}
case class ShardState(shardId: ShardId, entityIds: Set[EntityId]) {
def getEntityIds(): java.util.Set[EntityId] // Java API
}Usage Example:
val future = (region ? ShardRegion.GetShardRegionState).mapTo[ShardRegion.CurrentShardRegionState]
future.foreach { state =>
state.shards.foreach { shardState =>
println(s"Shard ${shardState.shardId} has ${shardState.entityIds.size} entities")
}
}Messages for controlling individual entity lifecycle:
case class StartEntity(entityId: EntityId) extends ClusterShardingSerializable
case class StartEntityAck(entityId: EntityId, shardId: ShardId) extends ClusterShardingSerializableStartEntity: Explicitly starts an entity (used with remember-entities)StartEntityAck: Acknowledgment that entity was startedNotification when a shard is ready to accept messages:
case class ShardInitialized(shardId: ShardId)This message is sent internally and typically doesn't need to be handled by user code.
ExtractEntityId function extracts entity ID and messageExtractShardId function determines target shardIf ExtractEntityId doesn't match a message:
Unhandled on the event streamStatistics and state queries may timeout:
failed setIf a shard fails during operation: