or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

allocation-strategies.mdconfiguration.mdcore-extension.mdexternal-allocation.mdindex.mdmessage-routing.mdmonitoring.md
tile.json

message-routing.mddocs/

Message Routing and Entity Management

The ShardRegion is responsible for routing messages to entities, managing entity lifecycle, and providing monitoring capabilities. It acts as a local proxy that knows how to forward messages to the correct entities across the cluster.

Core Message Routing Types

Type Aliases

object ShardRegion {
  type EntityId = String
  type ShardId = String
  type Msg = Any
  type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]
  type ExtractShardId = Msg => ShardId
}

Message Extractor Interface (Java API)

trait MessageExtractor {
  def entityId(message: Any): String
  def entityMessage(message: Any): Any
  def shardId(message: Any): String
}

Extract entity ID, shard ID, and entity message from incoming messages.

Usage Example:

public class CounterMessageExtractor implements MessageExtractor {
  private final int maxShards = 100;
  
  @Override
  public String entityId(Object message) {
    if (message instanceof CounterMessage) {
      return ((CounterMessage) message).getEntityId();
    }
    return null;
  }
  
  @Override
  public Object entityMessage(Object message) {
    if (message instanceof CounterMessage) {
      return ((CounterMessage) message).getCommand();
    }
    return message;
  }
  
  @Override
  public String shardId(Object message) {
    if (message instanceof CounterMessage) {
      String entityId = ((CounterMessage) message).getEntityId();
      return String.valueOf(Math.abs(entityId.hashCode()) % maxShards);
    }
    return null;
  }
}

Hash Code Message Extractor

Convenience implementation using hash code for shard distribution:

abstract class HashCodeMessageExtractor(maxNumberOfShards: Int) extends MessageExtractor {
  def entityMessage(message: Any): Any = message  // Default implementation
  def shardId(message: Any): String  // Implemented using hash code
}

object HashCodeMessageExtractor {
  def shardId(id: String, maxNumberOfShards: Int): String
}

Usage Example:

public class CounterHashExtractor extends HashCodeMessageExtractor {
  public CounterHashExtractor() {
    super(100); // 100 shards maximum
  }
  
  @Override
  public String entityId(Object message) {
    if (message instanceof CounterMessage) {
      return ((CounterMessage) message).getEntityId();
    }
    return null;
  }
}

Entity Lifecycle Management

Passivation

Request graceful entity passivation to reduce memory consumption:

case class Passivate(stopMessage: Any) extends ShardRegionCommand

Usage in Entity Actor:

class CounterActor extends Actor {
  context.setReceiveTimeout(30.seconds)
  
  def receive = {
    case ReceiveTimeout =>
      // Request passivation when idle
      context.parent ! ShardRegion.Passivate(PoisonPill)
    
    case PoisonPill =>
      // Graceful shutdown
      context.stop(self)
      
    case msg =>
      // Handle business logic
      handleMessage(msg)
  }
}

Graceful Shutdown

Shutdown all shards in the region:

case object GracefulShutdown extends ShardRegionCommand
def gracefulShutdownInstance = GracefulShutdown  // Java API

Usage Example:

val region = ClusterSharding(system).shardRegion("Counter")
region ! ShardRegion.GracefulShutdown
// Watch the region to know when shutdown is complete
context.watch(region)

Query Messages and Monitoring

Get Current Regions

Query for all active shard regions in the cluster:

case object GetCurrentRegions extends ShardRegionQuery
def getCurrentRegionsInstance: GetCurrentRegions.type = GetCurrentRegions  // Java API

case class CurrentRegions(regions: Set[Address]) {
  def getRegions(): java.util.Set[Address]  // Java API
}

Usage Example:

import akka.pattern.ask
import scala.concurrent.duration._

implicit val timeout = Timeout(5.seconds)
val future = (region ? ShardRegion.GetCurrentRegions).mapTo[ShardRegion.CurrentRegions]
future.foreach { response =>
  println(s"Active regions: ${response.regions}")
}

Get Cluster Sharding Statistics

Query cluster-wide sharding statistics:

case class GetClusterShardingStats(timeout: FiniteDuration) extends ShardRegionQuery

case class ClusterShardingStats(regions: Map[Address, ShardRegionStats]) {
  def getRegions(): java.util.Map[Address, ShardRegionStats]  // Java API
}

Get Region Statistics

Query statistics for a specific region:

case object GetShardRegionStats extends ShardRegionQuery
def getRegionStatsInstance = GetShardRegionStats  // Java API

class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId]) {
  def this(stats: Map[ShardId, Int])
  def getStats(): java.util.Map[ShardId, Int]    // Java API
  def getFailed(): java.util.Set[ShardId]        // Java API
}

object ShardRegionStats extends AbstractFunction1[Map[ShardId, Int], ShardRegionStats] {
  def apply(stats: Map[ShardId, Int]): ShardRegionStats
  def apply(stats: Map[ShardId, Int], failed: Set[ShardId]): ShardRegionStats
}

Usage Example:

val future = (region ? ShardRegion.GetShardRegionStats).mapTo[ShardRegion.ShardRegionStats]
future.foreach { stats =>
  println(s"Shard statistics: ${stats.stats}")
  if (stats.failed.nonEmpty) {
    println(s"Failed shards: ${stats.failed}")
  }
}

Get Region State

Query detailed state of a region including entities:

case object GetShardRegionState extends ShardRegionQuery
def getShardRegionStateInstance = GetShardRegionState  // Java API

class CurrentShardRegionState(val shards: Set[ShardState], val failed: Set[ShardId]) {
  def this(shards: Set[ShardState])
  def getShards(): java.util.Set[ShardState]     // Java API
  def getFailed(): java.util.Set[ShardId]        // Java API
}

object CurrentShardRegionState extends AbstractFunction1[Set[ShardState], CurrentShardRegionState] {
  def apply(shards: Set[ShardState]): CurrentShardRegionState
  def apply(shards: Set[ShardState], failed: Set[ShardId]): CurrentShardRegionState
}

case class ShardState(shardId: ShardId, entityIds: Set[EntityId]) {
  def getEntityIds(): java.util.Set[EntityId]    // Java API
}

Usage Example:

val future = (region ? ShardRegion.GetShardRegionState).mapTo[ShardRegion.CurrentShardRegionState]
future.foreach { state =>
  state.shards.foreach { shardState =>
    println(s"Shard ${shardState.shardId} has ${shardState.entityIds.size} entities")
  }
}

Internal Messages and Notifications

Entity Lifecycle Messages

Messages for controlling individual entity lifecycle:

case class StartEntity(entityId: EntityId) extends ClusterShardingSerializable
case class StartEntityAck(entityId: EntityId, shardId: ShardId) extends ClusterShardingSerializable
  • StartEntity: Explicitly starts an entity (used with remember-entities)
  • StartEntityAck: Acknowledgment that entity was started

Shard Initialization

Notification when a shard is ready to accept messages:

case class ShardInitialized(shardId: ShardId)

This message is sent internally and typically doesn't need to be handled by user code.

Message Routing Flow

  1. Message Arrival: Message arrives at ShardRegion
  2. Entity Extraction: ExtractEntityId function extracts entity ID and message
  3. Shard Determination: ExtractShardId function determines target shard
  4. Shard Resolution: ShardRegion resolves shard location via ShardCoordinator
  5. Message Forwarding: Message is forwarded to appropriate shard/entity
  6. Entity Creation: If entity doesn't exist, it's created on-demand
  7. Message Delivery: Message is delivered to target entity

Error Handling

Unhandled Messages

If ExtractEntityId doesn't match a message:

  • Message is posted as Unhandled on the event stream
  • No processing occurs

Timeout Scenarios

Statistics and state queries may timeout:

  • Failed shards are reported in the failed set
  • Partial results are returned for successful shards

Shard Failure

If a shard fails during operation:

  • Entities in that shard become temporarily unavailable
  • Messages are buffered until shard is restored
  • ShardCoordinator handles shard reallocation