or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

allocation-strategies.mdconfiguration.mdcore-extension.mdexternal-allocation.mdindex.mdmessage-routing.mdmonitoring.md
tile.json

monitoring.mddocs/

Monitoring and Health Checks

Akka Cluster Sharding provides comprehensive monitoring, statistics collection, and health check capabilities for operational visibility and troubleshooting.

Health Check System

Cluster Sharding Health Check

class ClusterShardingHealthCheck(
  system: ExtendedActorSystem,
  typeName: String,
  timeout: FiniteDuration
) extends HealthCheck {
  def check(): Future[HealthCheckResult]
}

Implements Akka Management health check interface for sharding readiness.

Health Check Settings

class ClusterShardingHealthCheckSettings(
  shardRegionNames: Set[String],
  timeout: FiniteDuration
)

Configuration Example:

akka.management.health-checks.readiness-checks {
  cluster-sharding = "akka.cluster.sharding.ClusterShardingHealthCheck"
}

akka.cluster.sharding.health-check {
  shard-region-names = ["Counter", "Account", "Order"]
  timeout = 5s
}

Usage Example:

import akka.cluster.sharding.ClusterShardingHealthCheck

val healthCheck = new ClusterShardingHealthCheck(
  system.asInstanceOf[ExtendedActorSystem],
  typeName = "Counter",
  timeout = 5.seconds
)

val result = healthCheck.check()
result.foreach {
  case HealthCheckResult.Healthy => println("Sharding is healthy")
  case HealthCheckResult.Unhealthy(reason) => println(s"Sharding is unhealthy: $reason")
}

Statistics and Monitoring Queries

Cluster-wide Statistics

Get statistics for all shard regions across the cluster:

case class GetClusterShardingStats(timeout: FiniteDuration) extends ShardRegionQuery

case class ClusterShardingStats(regions: Map[Address, ShardRegionStats]) {
  def getRegions(): java.util.Map[Address, ShardRegionStats]  // Java API
}

Usage Example:

import akka.pattern.ask
import scala.concurrent.duration._

implicit val timeout = Timeout(10.seconds)

val shardRegion = ClusterSharding(system).shardRegion("Counter")
val future = (shardRegion ? ShardRegion.GetClusterShardingStats(5.seconds))
  .mapTo[ShardRegion.ClusterShardingStats]

future.foreach { stats =>
  stats.regions.foreach { case (address, regionStats) =>
    println(s"Region at $address:")
    regionStats.stats.foreach { case (shardId, entityCount) =>
      println(s"  Shard $shardId: $entityCount entities")
    }
    if (regionStats.failed.nonEmpty) {
      println(s"  Failed shards: ${regionStats.failed}")
    }
  }
}

Regional Statistics

Get statistics for a specific shard region:

case object GetShardRegionStats extends ShardRegionQuery
def getRegionStatsInstance = GetShardRegionStats  // Java API

class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId]) {
  def getStats(): java.util.Map[ShardId, Int]    // Java API  
  def getFailed(): java.util.Set[ShardId]        // Java API
}

Usage Example:

val future = (shardRegion ? ShardRegion.GetShardRegionStats)
  .mapTo[ShardRegion.ShardRegionStats]

future.foreach { regionStats =>
  val totalEntities = regionStats.stats.values.sum
  println(s"Region has $totalEntities entities across ${regionStats.stats.size} shards")
  
  // Find most loaded shard
  val (mostLoadedShard, entityCount) = regionStats.stats.maxBy(_._2)
  println(s"Most loaded shard: $mostLoadedShard with $entityCount entities")
}

Region State Details

Get detailed state including entity IDs:

case object GetShardRegionState extends ShardRegionQuery
def getShardRegionStateInstance = GetShardRegionState  // Java API

class CurrentShardRegionState(val shards: Set[ShardState], val failed: Set[ShardId]) {
  def getShards(): java.util.Set[ShardState]     // Java API
  def getFailed(): java.util.Set[ShardId]        // Java API
}

case class ShardState(shardId: ShardId, entityIds: Set[EntityId]) {
  def getEntityIds(): java.util.Set[EntityId]    // Java API
}

Usage Example:

val future = (shardRegion ? ShardRegion.GetShardRegionState)
  .mapTo[ShardRegion.CurrentShardRegionState]

future.foreach { state =>
  state.shards.foreach { shardState =>
    println(s"Shard ${shardState.shardId}:")
    shardState.entityIds.foreach { entityId =>
      println(s"  Entity: $entityId")
    }
  }
}

Active Regions Query

Get addresses of all active shard regions:

case object GetCurrentRegions extends ShardRegionQuery
def getCurrentRegionsInstance: GetCurrentRegions.type = GetCurrentRegions  // Java API

case class CurrentRegions(regions: Set[Address]) {
  def getRegions(): java.util.Set[Address]  // Java API
}

Usage Example:

val future = (shardRegion ? ShardRegion.GetCurrentRegions)
  .mapTo[ShardRegion.CurrentRegions]

future.foreach { currentRegions =>
  println(s"Active shard regions at: ${currentRegions.regions.mkString(", ")}")
}

Flight Recorder Integration

Sharding Flight Recorder Extension

object ShardingFlightRecorder extends ExtensionId[ShardingFlightRecorder] with ExtensionIdProvider {
  def get(system: ActorSystem): ShardingFlightRecorder
  def get(system: ClassicActorSystemProvider): ShardingFlightRecorder
}

trait ShardingFlightRecorder extends Extension {
  // Flight recorder methods for performance monitoring
}

Records sharding events for performance analysis and debugging.

Usage Example:

val flightRecorder = ShardingFlightRecorder(system)
// Flight recorder automatically captures sharding events

Event Recording

The flight recorder captures various sharding events:

  • Shard allocation decisions
  • Entity startup and shutdown
  • Message routing latencies
  • Rebalancing operations
  • Coordinator state changes

Structured Logging

Sharding Log Markers

object ShardingLogMarker {
  // Structured logging markers for sharding events
  val shardAllocation: LogMarker
  val shardRebalancing: LogMarker
  val entityStartup: LogMarker
  val entityPassivation: LogMarker
}

Provides structured logging markers for better log analysis.

Configuration Example:

akka.cluster.sharding.log-markers = on

logback.xml configuration:
<logger name="akka.cluster.sharding" level="INFO">
  <appender-ref ref="JSON_APPENDER" />
</logger>

Custom Monitoring Integration

Metrics Collection

Integrate with your metrics system:

class ShardingMetricsCollector(system: ActorSystem) {
  private val shardRegions = mutable.Map[String, ActorRef]()
  
  def collectMetrics(): Future[Map[String, ShardingMetrics]] = {
    val futures = shardRegions.map { case (typeName, region) =>
      collectRegionMetrics(typeName, region)
    }
    Future.sequence(futures.toSeq).map(_.toMap)
  }
  
  private def collectRegionMetrics(
    typeName: String,
    region: ActorRef
  ): Future[(String, ShardingMetrics)] = {
    implicit val timeout = Timeout(5.seconds)
    
    for {
      stats <- (region ? ShardRegion.GetShardRegionStats)
        .mapTo[ShardRegion.ShardRegionStats]
      state <- (region ? ShardRegion.GetShardRegionState)
        .mapTo[ShardRegion.CurrentShardRegionState]
    } yield {
      val metrics = ShardingMetrics(
        totalEntities = stats.stats.values.sum,
        totalShards = stats.stats.size,
        failedShards = stats.failed.size,
        avgEntitiesPerShard = if (stats.stats.nonEmpty) stats.stats.values.sum.toDouble / stats.stats.size else 0.0
      )
      typeName -> metrics
    }
  }
}

case class ShardingMetrics(
  totalEntities: Int,
  totalShards: Int,
  failedShards: Int,
  avgEntitiesPerShard: Double
)

Alerting Integration

Set up alerting based on sharding health:

class ShardingAlerting(metricsCollector: ShardingMetricsCollector) {
  
  def checkHealth(): Future[List[Alert]] = {
    metricsCollector.collectMetrics().map { metricsMap =>
      metricsMap.flatMap { case (typeName, metrics) =>
        generateAlerts(typeName, metrics)
      }.toList
    }
  }
  
  private def generateAlerts(typeName: String, metrics: ShardingMetrics): List[Alert] = {
    val alerts = mutable.ListBuffer[Alert]()
    
    // Alert on failed shards
    if (metrics.failedShards > 0) {
      alerts += Alert.error(s"$typeName has ${metrics.failedShards} failed shards")
    }
    
    // Alert on uneven distribution
    if (metrics.avgEntitiesPerShard > 1000) {
      alerts += Alert.warning(s"$typeName has high entity density: ${metrics.avgEntitiesPerShard} entities per shard")
    }
    
    // Alert on no entities
    if (metrics.totalEntities == 0) {
      alerts += Alert.info(s"$typeName has no active entities")
    }
    
    alerts.toList
  }
}

sealed trait Alert {
  def message: String
}
object Alert {
  case class Error(message: String) extends Alert
  case class Warning(message: String) extends Alert
  case class Info(message: String) extends Alert
  
  def error(msg: String): Alert = Error(msg)
  def warning(msg: String): Alert = Warning(msg)
  def info(msg: String): Alert = Info(msg)
}

Monitoring Best Practices

Regular Health Checks

  • Set up periodic health checks for all entity types
  • Monitor both local region health and cluster-wide statistics
  • Include sharding health in your overall service health endpoints

Performance Monitoring

  • Track entity creation/passivation rates
  • Monitor message routing latencies
  • Watch for shard rebalancing frequency and duration

Capacity Planning

  • Monitor entity distribution across shards
  • Track memory usage per shard region
  • Alert on approaching shard limits

Troubleshooting

  • Use region state queries to debug entity placement issues
  • Monitor failed shard counts for cluster problems
  • Correlate sharding events with application-level metrics

Configuration Monitoring

  • Track actual vs configured rebalancing intervals
  • Monitor passivation effectiveness
  • Validate allocation strategy performance

Integration Examples

// Prometheus metrics integration
class PrometheusShardingMetrics(registry: CollectorRegistry) {
  private val entityGauge = Gauge.build()
    .name("akka_sharding_entities_total")
    .help("Total number of entities per shard type")
    .labelNames("shard_type")
    .register(registry)
  
  private val shardGauge = Gauge.build()
    .name("akka_sharding_shards_total")
    .help("Total number of shards per shard type") 
    .labelNames("shard_type")
    .register(registry)
  
  def updateMetrics(typeName: String, metrics: ShardingMetrics): Unit = {
    entityGauge.labels(typeName).set(metrics.totalEntities)
    shardGauge.labels(typeName).set(metrics.totalShards)
  }
}