Akka Cluster Sharding provides comprehensive monitoring, statistics collection, and health check capabilities for operational visibility and troubleshooting.
class ClusterShardingHealthCheck(
system: ExtendedActorSystem,
typeName: String,
timeout: FiniteDuration
) extends HealthCheck {
def check(): Future[HealthCheckResult]
}Implements Akka Management health check interface for sharding readiness.
class ClusterShardingHealthCheckSettings(
shardRegionNames: Set[String],
timeout: FiniteDuration
)Configuration Example:
akka.management.health-checks.readiness-checks {
cluster-sharding = "akka.cluster.sharding.ClusterShardingHealthCheck"
}
akka.cluster.sharding.health-check {
shard-region-names = ["Counter", "Account", "Order"]
timeout = 5s
}Usage Example:
import akka.cluster.sharding.ClusterShardingHealthCheck
val healthCheck = new ClusterShardingHealthCheck(
system.asInstanceOf[ExtendedActorSystem],
typeName = "Counter",
timeout = 5.seconds
)
val result = healthCheck.check()
result.foreach {
case HealthCheckResult.Healthy => println("Sharding is healthy")
case HealthCheckResult.Unhealthy(reason) => println(s"Sharding is unhealthy: $reason")
}Get statistics for all shard regions across the cluster:
case class GetClusterShardingStats(timeout: FiniteDuration) extends ShardRegionQuery
case class ClusterShardingStats(regions: Map[Address, ShardRegionStats]) {
def getRegions(): java.util.Map[Address, ShardRegionStats] // Java API
}Usage Example:
import akka.pattern.ask
import scala.concurrent.duration._
implicit val timeout = Timeout(10.seconds)
val shardRegion = ClusterSharding(system).shardRegion("Counter")
val future = (shardRegion ? ShardRegion.GetClusterShardingStats(5.seconds))
.mapTo[ShardRegion.ClusterShardingStats]
future.foreach { stats =>
stats.regions.foreach { case (address, regionStats) =>
println(s"Region at $address:")
regionStats.stats.foreach { case (shardId, entityCount) =>
println(s" Shard $shardId: $entityCount entities")
}
if (regionStats.failed.nonEmpty) {
println(s" Failed shards: ${regionStats.failed}")
}
}
}Get statistics for a specific shard region:
case object GetShardRegionStats extends ShardRegionQuery
def getRegionStatsInstance = GetShardRegionStats // Java API
class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId]) {
def getStats(): java.util.Map[ShardId, Int] // Java API
def getFailed(): java.util.Set[ShardId] // Java API
}Usage Example:
val future = (shardRegion ? ShardRegion.GetShardRegionStats)
.mapTo[ShardRegion.ShardRegionStats]
future.foreach { regionStats =>
val totalEntities = regionStats.stats.values.sum
println(s"Region has $totalEntities entities across ${regionStats.stats.size} shards")
// Find most loaded shard
val (mostLoadedShard, entityCount) = regionStats.stats.maxBy(_._2)
println(s"Most loaded shard: $mostLoadedShard with $entityCount entities")
}Get detailed state including entity IDs:
case object GetShardRegionState extends ShardRegionQuery
def getShardRegionStateInstance = GetShardRegionState // Java API
class CurrentShardRegionState(val shards: Set[ShardState], val failed: Set[ShardId]) {
def getShards(): java.util.Set[ShardState] // Java API
def getFailed(): java.util.Set[ShardId] // Java API
}
case class ShardState(shardId: ShardId, entityIds: Set[EntityId]) {
def getEntityIds(): java.util.Set[EntityId] // Java API
}Usage Example:
val future = (shardRegion ? ShardRegion.GetShardRegionState)
.mapTo[ShardRegion.CurrentShardRegionState]
future.foreach { state =>
state.shards.foreach { shardState =>
println(s"Shard ${shardState.shardId}:")
shardState.entityIds.foreach { entityId =>
println(s" Entity: $entityId")
}
}
}Get addresses of all active shard regions:
case object GetCurrentRegions extends ShardRegionQuery
def getCurrentRegionsInstance: GetCurrentRegions.type = GetCurrentRegions // Java API
case class CurrentRegions(regions: Set[Address]) {
def getRegions(): java.util.Set[Address] // Java API
}Usage Example:
val future = (shardRegion ? ShardRegion.GetCurrentRegions)
.mapTo[ShardRegion.CurrentRegions]
future.foreach { currentRegions =>
println(s"Active shard regions at: ${currentRegions.regions.mkString(", ")}")
}object ShardingFlightRecorder extends ExtensionId[ShardingFlightRecorder] with ExtensionIdProvider {
def get(system: ActorSystem): ShardingFlightRecorder
def get(system: ClassicActorSystemProvider): ShardingFlightRecorder
}
trait ShardingFlightRecorder extends Extension {
// Flight recorder methods for performance monitoring
}Records sharding events for performance analysis and debugging.
Usage Example:
val flightRecorder = ShardingFlightRecorder(system)
// Flight recorder automatically captures sharding eventsThe flight recorder captures various sharding events:
object ShardingLogMarker {
// Structured logging markers for sharding events
val shardAllocation: LogMarker
val shardRebalancing: LogMarker
val entityStartup: LogMarker
val entityPassivation: LogMarker
}Provides structured logging markers for better log analysis.
Configuration Example:
akka.cluster.sharding.log-markers = on
logback.xml configuration:
<logger name="akka.cluster.sharding" level="INFO">
<appender-ref ref="JSON_APPENDER" />
</logger>Integrate with your metrics system:
class ShardingMetricsCollector(system: ActorSystem) {
private val shardRegions = mutable.Map[String, ActorRef]()
def collectMetrics(): Future[Map[String, ShardingMetrics]] = {
val futures = shardRegions.map { case (typeName, region) =>
collectRegionMetrics(typeName, region)
}
Future.sequence(futures.toSeq).map(_.toMap)
}
private def collectRegionMetrics(
typeName: String,
region: ActorRef
): Future[(String, ShardingMetrics)] = {
implicit val timeout = Timeout(5.seconds)
for {
stats <- (region ? ShardRegion.GetShardRegionStats)
.mapTo[ShardRegion.ShardRegionStats]
state <- (region ? ShardRegion.GetShardRegionState)
.mapTo[ShardRegion.CurrentShardRegionState]
} yield {
val metrics = ShardingMetrics(
totalEntities = stats.stats.values.sum,
totalShards = stats.stats.size,
failedShards = stats.failed.size,
avgEntitiesPerShard = if (stats.stats.nonEmpty) stats.stats.values.sum.toDouble / stats.stats.size else 0.0
)
typeName -> metrics
}
}
}
case class ShardingMetrics(
totalEntities: Int,
totalShards: Int,
failedShards: Int,
avgEntitiesPerShard: Double
)Set up alerting based on sharding health:
class ShardingAlerting(metricsCollector: ShardingMetricsCollector) {
def checkHealth(): Future[List[Alert]] = {
metricsCollector.collectMetrics().map { metricsMap =>
metricsMap.flatMap { case (typeName, metrics) =>
generateAlerts(typeName, metrics)
}.toList
}
}
private def generateAlerts(typeName: String, metrics: ShardingMetrics): List[Alert] = {
val alerts = mutable.ListBuffer[Alert]()
// Alert on failed shards
if (metrics.failedShards > 0) {
alerts += Alert.error(s"$typeName has ${metrics.failedShards} failed shards")
}
// Alert on uneven distribution
if (metrics.avgEntitiesPerShard > 1000) {
alerts += Alert.warning(s"$typeName has high entity density: ${metrics.avgEntitiesPerShard} entities per shard")
}
// Alert on no entities
if (metrics.totalEntities == 0) {
alerts += Alert.info(s"$typeName has no active entities")
}
alerts.toList
}
}
sealed trait Alert {
def message: String
}
object Alert {
case class Error(message: String) extends Alert
case class Warning(message: String) extends Alert
case class Info(message: String) extends Alert
def error(msg: String): Alert = Error(msg)
def warning(msg: String): Alert = Warning(msg)
def info(msg: String): Alert = Info(msg)
}// Prometheus metrics integration
class PrometheusShardingMetrics(registry: CollectorRegistry) {
private val entityGauge = Gauge.build()
.name("akka_sharding_entities_total")
.help("Total number of entities per shard type")
.labelNames("shard_type")
.register(registry)
private val shardGauge = Gauge.build()
.name("akka_sharding_shards_total")
.help("Total number of shards per shard type")
.labelNames("shard_type")
.register(registry)
def updateMetrics(typeName: String, metrics: ShardingMetrics): Unit = {
entityGauge.labels(typeName).set(metrics.totalEntities)
shardGauge.labels(typeName).set(metrics.totalShards)
}
}