Akka Cluster Sharding provides location transparency and automatic distribution of stateful actors across cluster nodes
npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-cluster-sharding_2-13@2.8.0Akka Cluster Sharding provides location transparency and automatic distribution of stateful actors across cluster nodes. It enables developers to create scalable distributed applications where entities are automatically partitioned across the cluster and can be accessed by logical identifiers without knowing their physical location.
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding" % "2.8.8"import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategyimport akka.actor.{ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
implicit val system: ActorSystem = ActorSystem("MyClusterSystem")
// Define entity extraction functions
val extractEntityId: ShardRegion.ExtractEntityId = {
case msg @ MyEntityMessage(entityId, _) => (entityId, msg)
}
val extractShardId: ShardRegion.ExtractShardId = {
case MyEntityMessage(entityId, _) => (entityId.hashCode % 100).toString
}
// Start cluster sharding for an entity type
val shardRegion = ClusterSharding(system).start(
typeName = "MyEntity",
entityProps = Props[MyEntityActor](),
settings = ClusterShardingSettings(system),
extractEntityId = extractEntityId,
extractShardId = extractShardId
)
// Send messages to entities
shardRegion ! MyEntityMessage("entity-1", "Hello World")Akka Cluster Sharding is built around several key components:
Main extension for registering entity types and managing distributed sharding across the cluster. Provides lifecycle management and access to shard regions.
object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProvider {
def get(system: ActorSystem): ClusterSharding
def get(system: ClassicActorSystemProvider): ClusterSharding
}
class ClusterSharding(system: ExtendedActorSystem) extends Extension {
// Full configuration start methods
def start(
typeName: String,
entityProps: Props,
settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any
): ActorRef
def start(
typeName: String,
entityProps: Props,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId,
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any
): ActorRef
// Simplified start methods with defaults
def start(
typeName: String,
entityProps: Props,
settings: ClusterShardingSettings,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId
): ActorRef
def start(
typeName: String,
entityProps: Props,
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId
): ActorRef
// Java API start methods
def start(
typeName: String,
entityProps: Props,
settings: ClusterShardingSettings,
messageExtractor: ShardRegion.MessageExtractor,
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any
): ActorRef
def start(
typeName: String,
entityProps: Props,
settings: ClusterShardingSettings,
messageExtractor: ShardRegion.MessageExtractor
): ActorRef
def start(
typeName: String,
entityProps: Props,
messageExtractor: ShardRegion.MessageExtractor
): ActorRef
// Proxy start methods
def startProxy(
typeName: String,
role: Option[String],
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId
): ActorRef
def startProxy(
typeName: String,
role: Option[String],
dataCenter: Option[DataCenter],
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId
): ActorRef
def startProxy(
typeName: String,
role: Optional[String],
messageExtractor: ShardRegion.MessageExtractor
): ActorRef
def startProxy(
typeName: String,
role: Optional[String],
dataCenter: Optional[String],
messageExtractor: ShardRegion.MessageExtractor
): ActorRef
// Access methods
def shardRegion(typeName: String): ActorRef
def shardRegionProxy(typeName: String, dataCenter: DataCenter): ActorRef
def shardTypeNames: Set[String]
def getShardTypeNames: java.util.Set[String]
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy
}ShardRegion handles message routing, entity lifecycle, and provides monitoring capabilities for distributed entities.
object ShardRegion {
// Type aliases
type EntityId = String
type ShardId = String
type Msg = Any
type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]
type ExtractShardId = Msg => ShardId
// Message extractor interface
trait MessageExtractor {
def entityId(message: Any): String
def entityMessage(message: Any): Any
def shardId(message: Any): String
}
// Hash-based message extractor
abstract class HashCodeMessageExtractor(maxNumberOfShards: Int) extends MessageExtractor {
def entityMessage(message: Any): Any = message
def shardId(message: Any): String
}
object HashCodeMessageExtractor {
def shardId(id: String, maxNumberOfShards: Int): String
}
// Command messages
sealed trait ShardRegionCommand
case class Passivate(stopMessage: Any) extends ShardRegionCommand
case object GracefulShutdown extends ShardRegionCommand
def gracefulShutdownInstance: GracefulShutdown.type
// Entity lifecycle
case class ShardInitialized(shardId: ShardId)
case class StartEntity(entityId: EntityId)
// Query messages
sealed trait ShardRegionQuery
case object GetCurrentRegions extends ShardRegionQuery
def getCurrentRegionsInstance: GetCurrentRegions.type
case class GetClusterShardingStats(timeout: FiniteDuration) extends ShardRegionQuery
case object GetShardRegionStats extends ShardRegionQuery
def getRegionStatsInstance: GetShardRegionStats.type
case object GetShardRegionState extends ShardRegionQuery
def getShardRegionStateInstance: GetShardRegionState.type
// Response messages
case class CurrentRegions(regions: Set[Address]) {
def getRegions: java.util.Set[Address]
}
case class ClusterShardingStats(regions: Map[Address, ShardRegionStats]) {
def getRegions(): java.util.Map[Address, ShardRegionStats]
}
class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId]) {
def this(stats: Map[ShardId, Int])
def getStats(): java.util.Map[ShardId, Int]
def getFailed(): java.util.Set[ShardId]
}
object ShardRegionStats extends AbstractFunction1[Map[ShardId, Int], ShardRegionStats] {
def apply(stats: Map[ShardId, Int]): ShardRegionStats
def apply(stats: Map[ShardId, Int], failed: Set[ShardId]): ShardRegionStats
}
class CurrentShardRegionState(val shards: Set[ShardState], val failed: Set[ShardId]) {
def this(shards: Set[ShardState])
def getShards(): java.util.Set[ShardState]
def getFailed(): java.util.Set[ShardId]
}
object CurrentShardRegionState extends AbstractFunction1[Set[ShardState], CurrentShardRegionState] {
def apply(shards: Set[ShardState]): CurrentShardRegionState
def apply(shards: Set[ShardState], failed: Set[ShardId]): CurrentShardRegionState
}
case class ShardState(shardId: ShardId, entityIds: Set[EntityId]) {
def getEntityIds(): java.util.Set[EntityId]
}
}Comprehensive configuration system for tuning sharding behavior, performance, and operational characteristics.
object ClusterShardingSettings {
def apply(system: ActorSystem): ClusterShardingSettings
def apply(config: Config): ClusterShardingSettings
val StateStoreModePersistence = "persistence"
val StateStoreModeDData = "ddata"
}
class ClusterShardingSettings(
role: Option[String],
rememberEntities: Boolean,
stateStoreMode: String,
tuningParameters: TuningParameters
)Pluggable strategies for controlling how shards are allocated and rebalanced across cluster nodes.
object ShardCoordinator {
type ShardId = String
trait ShardAllocationStrategy {
def allocateShard(
requester: ActorRef,
shardId: ShardId,
currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]
): Future[ActorRef]
def rebalance(
currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]
): Future[Set[ShardId]]
}
// Factory methods for built-in strategies
def leastShardAllocationStrategy(
absoluteLimit: Int,
relativeLimit: Double
): ShardAllocationStrategy
class LeastShardAllocationStrategy(
rebalanceThreshold: Int,
maxSimultaneousRebalance: Int
) extends ShardAllocationStrategy
}
object ShardAllocationStrategy {
def leastShardAllocationStrategy(
absoluteLimit: Int,
relativeLimit: Double
): ShardAllocationStrategy
}Advanced API for external control of shard placement, enabling custom orchestration and management systems.
object ExternalShardAllocation extends ExtensionId[ExternalShardAllocation] with ExtensionIdProvider {
def get(system: ClassicActorSystemProvider): ExternalShardAllocation
}
class ExternalShardAllocation(system: ExtendedActorSystem) extends Extension {
def clientFor(typeName: String): scaladsl.ExternalShardAllocationClient
def getClient(typeName: String): javadsl.ExternalShardAllocationClient
}
// Scala API
package scaladsl {
trait ExternalShardAllocationClient {
def updateShardLocation(shard: ShardId, location: Address): Future[Done]
def updateShardLocations(locations: Map[ShardId, Address]): Future[Done]
def shardLocations(): Future[ShardLocations]
}
}
// Java API
package javadsl {
trait ExternalShardAllocationClient {
def updateShardLocation(shard: ShardId, location: Address): CompletionStage[Done]
def updateShardLocations(locations: java.util.Map[ShardId, Address]): CompletionStage[Done]
def shardLocations(): CompletionStage[ShardLocations]
}
}
// Response type
case class ShardLocations(locations: Map[ShardId, Address]) {
def getLocations(): java.util.Map[ShardId, Address]
}Built-in monitoring, statistics collection, and health check capabilities for operational visibility.
// Health checking
class ClusterShardingHealthCheck extends HealthCheck {
def isHealthy(): Boolean
}
// Observability and monitoring
class ShardingFlightRecorder extends Extension {
def recordShardAllocation(typeName: String, shardId: ShardId, node: Address): Unit
def recordShardRebalance(typeName: String, shardId: ShardId, from: Address, to: Address): Unit
}
object ShardingFlightRecorder extends ExtensionId[ShardingFlightRecorder] with ExtensionIdProvider {
def get(system: ActorSystem): ShardingFlightRecorder
def get(system: ClassicActorSystemProvider): ShardingFlightRecorder
}trait ClusterShardingSerializableMarker trait for messages that require special serialization handling in cluster sharding.
object RemoveInternalClusterShardingDataUtility for removing internal cluster sharding data during maintenance operations.
// Entity and Shard identifiers
type EntityId = String
type ShardId = String
type Msg = Any
// Message extraction types
type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]
type ExtractShardId = Msg => ShardId
// Core sharding components
trait Extension
trait ExtensionId[T <: Extension] {
def get(system: ActorSystem): T
def get(system: ClassicActorSystemProvider): T
}
trait ExtensionIdProvider
// Actor system integration
class ActorRef
class Props
class ActorSystem
class ExtendedActorSystem
trait ClassicActorSystemProvider
// Cluster integration
class Address
class Cluster
trait DataCenter
class Member
// Message types
trait ClusterShardingSerializable
trait ShardRegionCommand
trait ShardRegionQuery
trait NoSerializationVerificationNeeded
// Configuration types
class Config
class FiniteDuration
class ClusterSingletonManagerSettings
// Async types
class Future[T]
class Done
class CompletionStage[T]
// Health and monitoring
trait HealthCheck {
def isHealthy(): Boolean
}
// Collections
trait Set[T] {
def asJava: java.util.Set[T]
}
trait Map[K, V] {
def asJava: java.util.Map[K, V]
}
trait IndexedSeq[T]
// Java interop
class Optional[T] {
def orElse(other: T): T
}
// Utility functions
abstract class AbstractFunction1[T1, R] extends (T1 => R)
// Exception types
class IllegalStateException extends RuntimeException
class NoSuchElementException extends RuntimeException