Akka Cluster Sharding provides location transparency and automatic distribution of stateful actors across cluster nodes
Akka Cluster Sharding provides comprehensive monitoring, statistics collection, and health check capabilities for operational visibility and troubleshooting.
class ClusterShardingHealthCheck(
system: ExtendedActorSystem,
typeName: String,
timeout: FiniteDuration
) extends HealthCheck {
def check(): Future[HealthCheckResult]
}Implements Akka Management health check interface for sharding readiness.
class ClusterShardingHealthCheckSettings(
shardRegionNames: Set[String],
timeout: FiniteDuration
)Configuration Example:
akka.management.health-checks.readiness-checks {
cluster-sharding = "akka.cluster.sharding.ClusterShardingHealthCheck"
}
akka.cluster.sharding.health-check {
shard-region-names = ["Counter", "Account", "Order"]
timeout = 5s
}Usage Example:
import akka.cluster.sharding.ClusterShardingHealthCheck
val healthCheck = new ClusterShardingHealthCheck(
system.asInstanceOf[ExtendedActorSystem],
typeName = "Counter",
timeout = 5.seconds
)
val result = healthCheck.check()
result.foreach {
case HealthCheckResult.Healthy => println("Sharding is healthy")
case HealthCheckResult.Unhealthy(reason) => println(s"Sharding is unhealthy: $reason")
}Get statistics for all shard regions across the cluster:
case class GetClusterShardingStats(timeout: FiniteDuration) extends ShardRegionQuery
case class ClusterShardingStats(regions: Map[Address, ShardRegionStats]) {
def getRegions(): java.util.Map[Address, ShardRegionStats] // Java API
}Usage Example:
import akka.pattern.ask
import scala.concurrent.duration._
implicit val timeout = Timeout(10.seconds)
val shardRegion = ClusterSharding(system).shardRegion("Counter")
val future = (shardRegion ? ShardRegion.GetClusterShardingStats(5.seconds))
.mapTo[ShardRegion.ClusterShardingStats]
future.foreach { stats =>
stats.regions.foreach { case (address, regionStats) =>
println(s"Region at $address:")
regionStats.stats.foreach { case (shardId, entityCount) =>
println(s" Shard $shardId: $entityCount entities")
}
if (regionStats.failed.nonEmpty) {
println(s" Failed shards: ${regionStats.failed}")
}
}
}Get statistics for a specific shard region:
case object GetShardRegionStats extends ShardRegionQuery
def getRegionStatsInstance = GetShardRegionStats // Java API
class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId]) {
def getStats(): java.util.Map[ShardId, Int] // Java API
def getFailed(): java.util.Set[ShardId] // Java API
}Usage Example:
val future = (shardRegion ? ShardRegion.GetShardRegionStats)
.mapTo[ShardRegion.ShardRegionStats]
future.foreach { regionStats =>
val totalEntities = regionStats.stats.values.sum
println(s"Region has $totalEntities entities across ${regionStats.stats.size} shards")
// Find most loaded shard
val (mostLoadedShard, entityCount) = regionStats.stats.maxBy(_._2)
println(s"Most loaded shard: $mostLoadedShard with $entityCount entities")
}Get detailed state including entity IDs:
case object GetShardRegionState extends ShardRegionQuery
def getShardRegionStateInstance = GetShardRegionState // Java API
class CurrentShardRegionState(val shards: Set[ShardState], val failed: Set[ShardId]) {
def getShards(): java.util.Set[ShardState] // Java API
def getFailed(): java.util.Set[ShardId] // Java API
}
case class ShardState(shardId: ShardId, entityIds: Set[EntityId]) {
def getEntityIds(): java.util.Set[EntityId] // Java API
}Usage Example:
val future = (shardRegion ? ShardRegion.GetShardRegionState)
.mapTo[ShardRegion.CurrentShardRegionState]
future.foreach { state =>
state.shards.foreach { shardState =>
println(s"Shard ${shardState.shardId}:")
shardState.entityIds.foreach { entityId =>
println(s" Entity: $entityId")
}
}
}Get addresses of all active shard regions:
case object GetCurrentRegions extends ShardRegionQuery
def getCurrentRegionsInstance: GetCurrentRegions.type = GetCurrentRegions // Java API
case class CurrentRegions(regions: Set[Address]) {
def getRegions(): java.util.Set[Address] // Java API
}Usage Example:
val future = (shardRegion ? ShardRegion.GetCurrentRegions)
.mapTo[ShardRegion.CurrentRegions]
future.foreach { currentRegions =>
println(s"Active shard regions at: ${currentRegions.regions.mkString(", ")}")
}object ShardingFlightRecorder extends ExtensionId[ShardingFlightRecorder] with ExtensionIdProvider {
def get(system: ActorSystem): ShardingFlightRecorder
def get(system: ClassicActorSystemProvider): ShardingFlightRecorder
}
trait ShardingFlightRecorder extends Extension {
// Flight recorder methods for performance monitoring
}Records sharding events for performance analysis and debugging.
Usage Example:
val flightRecorder = ShardingFlightRecorder(system)
// Flight recorder automatically captures sharding eventsThe flight recorder captures various sharding events:
object ShardingLogMarker {
// Structured logging markers for sharding events
val shardAllocation: LogMarker
val shardRebalancing: LogMarker
val entityStartup: LogMarker
val entityPassivation: LogMarker
}Provides structured logging markers for better log analysis.
Configuration Example:
akka.cluster.sharding.log-markers = on
logback.xml configuration:
<logger name="akka.cluster.sharding" level="INFO">
<appender-ref ref="JSON_APPENDER" />
</logger>Integrate with your metrics system:
class ShardingMetricsCollector(system: ActorSystem) {
private val shardRegions = mutable.Map[String, ActorRef]()
def collectMetrics(): Future[Map[String, ShardingMetrics]] = {
val futures = shardRegions.map { case (typeName, region) =>
collectRegionMetrics(typeName, region)
}
Future.sequence(futures.toSeq).map(_.toMap)
}
private def collectRegionMetrics(
typeName: String,
region: ActorRef
): Future[(String, ShardingMetrics)] = {
implicit val timeout = Timeout(5.seconds)
for {
stats <- (region ? ShardRegion.GetShardRegionStats)
.mapTo[ShardRegion.ShardRegionStats]
state <- (region ? ShardRegion.GetShardRegionState)
.mapTo[ShardRegion.CurrentShardRegionState]
} yield {
val metrics = ShardingMetrics(
totalEntities = stats.stats.values.sum,
totalShards = stats.stats.size,
failedShards = stats.failed.size,
avgEntitiesPerShard = if (stats.stats.nonEmpty) stats.stats.values.sum.toDouble / stats.stats.size else 0.0
)
typeName -> metrics
}
}
}
case class ShardingMetrics(
totalEntities: Int,
totalShards: Int,
failedShards: Int,
avgEntitiesPerShard: Double
)Set up alerting based on sharding health:
class ShardingAlerting(metricsCollector: ShardingMetricsCollector) {
def checkHealth(): Future[List[Alert]] = {
metricsCollector.collectMetrics().map { metricsMap =>
metricsMap.flatMap { case (typeName, metrics) =>
generateAlerts(typeName, metrics)
}.toList
}
}
private def generateAlerts(typeName: String, metrics: ShardingMetrics): List[Alert] = {
val alerts = mutable.ListBuffer[Alert]()
// Alert on failed shards
if (metrics.failedShards > 0) {
alerts += Alert.error(s"$typeName has ${metrics.failedShards} failed shards")
}
// Alert on uneven distribution
if (metrics.avgEntitiesPerShard > 1000) {
alerts += Alert.warning(s"$typeName has high entity density: ${metrics.avgEntitiesPerShard} entities per shard")
}
// Alert on no entities
if (metrics.totalEntities == 0) {
alerts += Alert.info(s"$typeName has no active entities")
}
alerts.toList
}
}
sealed trait Alert {
def message: String
}
object Alert {
case class Error(message: String) extends Alert
case class Warning(message: String) extends Alert
case class Info(message: String) extends Alert
def error(msg: String): Alert = Error(msg)
def warning(msg: String): Alert = Warning(msg)
def info(msg: String): Alert = Info(msg)
}// Prometheus metrics integration
class PrometheusShardingMetrics(registry: CollectorRegistry) {
private val entityGauge = Gauge.build()
.name("akka_sharding_entities_total")
.help("Total number of entities per shard type")
.labelNames("shard_type")
.register(registry)
private val shardGauge = Gauge.build()
.name("akka_sharding_shards_total")
.help("Total number of shards per shard type")
.labelNames("shard_type")
.register(registry)
def updateMetrics(typeName: String, metrics: ShardingMetrics): Unit = {
entityGauge.labels(typeName).set(metrics.totalEntities)
shardGauge.labels(typeName).set(metrics.totalShards)
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-typesafe-akka--akka-cluster-sharding-2-13