or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

allocation-strategies.mdconfiguration.mdcore-extension.mdexternal-allocation.mdindex.mdmessage-routing.mdmonitoring.md
tile.json

tessl/maven-com-typesafe-akka--akka-cluster-sharding_2-13

Akka Cluster Sharding provides location transparency and automatic distribution of stateful actors across cluster nodes

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/com.typesafe.akka/akka-cluster-sharding_2.13@2.8.x

To install, run

npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-cluster-sharding_2-13@2.8.0

index.mddocs/

Akka Cluster Sharding

Akka Cluster Sharding provides location transparency and automatic distribution of stateful actors across cluster nodes. It enables developers to create scalable distributed applications where entities are automatically partitioned across the cluster and can be accessed by logical identifiers without knowing their physical location.

Package Information

  • Package Name: com.typesafe.akka:akka-cluster-sharding_2.13
  • Package Type: Maven
  • Language: Scala
  • Installation: libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding" % "2.8.8"

Core Imports

import akka.cluster.sharding.ClusterSharding
import akka.cluster.sharding.ClusterShardingSettings
import akka.cluster.sharding.ShardRegion
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy

Basic Usage

import akka.actor.{ActorSystem, Props}
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy

implicit val system: ActorSystem = ActorSystem("MyClusterSystem")

// Define entity extraction functions
val extractEntityId: ShardRegion.ExtractEntityId = {
  case msg @ MyEntityMessage(entityId, _) => (entityId, msg)
}

val extractShardId: ShardRegion.ExtractShardId = {
  case MyEntityMessage(entityId, _) => (entityId.hashCode % 100).toString
}

// Start cluster sharding for an entity type
val shardRegion = ClusterSharding(system).start(
  typeName = "MyEntity",
  entityProps = Props[MyEntityActor](),
  settings = ClusterShardingSettings(system),
  extractEntityId = extractEntityId,
  extractShardId = extractShardId
)

// Send messages to entities
shardRegion ! MyEntityMessage("entity-1", "Hello World")

Architecture

Akka Cluster Sharding is built around several key components:

  • ClusterSharding Extension: Main entry point that manages sharded entities across the cluster
  • ShardRegion: Local proxy that routes messages to entities and handles shard lifecycle
  • ShardCoordinator: Centralized singleton that manages shard allocation and rebalancing
  • Shard: Container for a group of entities, distributed across cluster nodes
  • Entity: Individual stateful actors that process business logic
  • Allocation Strategies: Pluggable algorithms that determine shard placement and rebalancing

Capabilities

Core Sharding Extension

Main extension for registering entity types and managing distributed sharding across the cluster. Provides lifecycle management and access to shard regions.

object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProvider {
  def get(system: ActorSystem): ClusterSharding
  def get(system: ClassicActorSystemProvider): ClusterSharding
}

class ClusterSharding(system: ExtendedActorSystem) extends Extension {
  // Full configuration start methods
  def start(
    typeName: String,
    entityProps: Props,
    settings: ClusterShardingSettings,
    extractEntityId: ShardRegion.ExtractEntityId,
    extractShardId: ShardRegion.ExtractShardId,
    allocationStrategy: ShardAllocationStrategy,
    handOffStopMessage: Any
  ): ActorRef
  
  def start(
    typeName: String,
    entityProps: Props,
    extractEntityId: ShardRegion.ExtractEntityId,
    extractShardId: ShardRegion.ExtractShardId,
    allocationStrategy: ShardAllocationStrategy,
    handOffStopMessage: Any
  ): ActorRef
  
  // Simplified start methods with defaults
  def start(
    typeName: String,
    entityProps: Props,
    settings: ClusterShardingSettings,
    extractEntityId: ShardRegion.ExtractEntityId,
    extractShardId: ShardRegion.ExtractShardId
  ): ActorRef
  
  def start(
    typeName: String,
    entityProps: Props,
    extractEntityId: ShardRegion.ExtractEntityId,
    extractShardId: ShardRegion.ExtractShardId
  ): ActorRef
  
  // Java API start methods
  def start(
    typeName: String,
    entityProps: Props,
    settings: ClusterShardingSettings,
    messageExtractor: ShardRegion.MessageExtractor,
    allocationStrategy: ShardAllocationStrategy,
    handOffStopMessage: Any
  ): ActorRef
  
  def start(
    typeName: String,
    entityProps: Props,
    settings: ClusterShardingSettings,
    messageExtractor: ShardRegion.MessageExtractor
  ): ActorRef
  
  def start(
    typeName: String,
    entityProps: Props,
    messageExtractor: ShardRegion.MessageExtractor
  ): ActorRef
  
  // Proxy start methods
  def startProxy(
    typeName: String,
    role: Option[String],
    extractEntityId: ShardRegion.ExtractEntityId,
    extractShardId: ShardRegion.ExtractShardId
  ): ActorRef
  
  def startProxy(
    typeName: String,
    role: Option[String],
    dataCenter: Option[DataCenter],
    extractEntityId: ShardRegion.ExtractEntityId,
    extractShardId: ShardRegion.ExtractShardId
  ): ActorRef
  
  def startProxy(
    typeName: String,
    role: Optional[String],
    messageExtractor: ShardRegion.MessageExtractor
  ): ActorRef
  
  def startProxy(
    typeName: String,
    role: Optional[String],
    dataCenter: Optional[String],
    messageExtractor: ShardRegion.MessageExtractor
  ): ActorRef
  
  // Access methods
  def shardRegion(typeName: String): ActorRef
  def shardRegionProxy(typeName: String, dataCenter: DataCenter): ActorRef
  def shardTypeNames: Set[String]
  def getShardTypeNames: java.util.Set[String]
  def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy
}

Core Extension

Message Routing and Entity Management

ShardRegion handles message routing, entity lifecycle, and provides monitoring capabilities for distributed entities.

object ShardRegion {
  // Type aliases
  type EntityId = String
  type ShardId = String
  type Msg = Any
  type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]
  type ExtractShardId = Msg => ShardId
  
  // Message extractor interface
  trait MessageExtractor {
    def entityId(message: Any): String
    def entityMessage(message: Any): Any
    def shardId(message: Any): String
  }
  
  // Hash-based message extractor
  abstract class HashCodeMessageExtractor(maxNumberOfShards: Int) extends MessageExtractor {
    def entityMessage(message: Any): Any = message
    def shardId(message: Any): String
  }
  
  object HashCodeMessageExtractor {
    def shardId(id: String, maxNumberOfShards: Int): String
  }
  
  // Command messages
  sealed trait ShardRegionCommand
  case class Passivate(stopMessage: Any) extends ShardRegionCommand
  case object GracefulShutdown extends ShardRegionCommand
  def gracefulShutdownInstance: GracefulShutdown.type
  
  // Entity lifecycle
  case class ShardInitialized(shardId: ShardId)
  case class StartEntity(entityId: EntityId)
  
  // Query messages
  sealed trait ShardRegionQuery
  case object GetCurrentRegions extends ShardRegionQuery
  def getCurrentRegionsInstance: GetCurrentRegions.type
  case class GetClusterShardingStats(timeout: FiniteDuration) extends ShardRegionQuery
  case object GetShardRegionStats extends ShardRegionQuery
  def getRegionStatsInstance: GetShardRegionStats.type
  case object GetShardRegionState extends ShardRegionQuery
  def getShardRegionStateInstance: GetShardRegionState.type
  
  // Response messages
  case class CurrentRegions(regions: Set[Address]) {
    def getRegions: java.util.Set[Address]
  }
  
  case class ClusterShardingStats(regions: Map[Address, ShardRegionStats]) {
    def getRegions(): java.util.Map[Address, ShardRegionStats]
  }
  
  class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId]) {
    def this(stats: Map[ShardId, Int])
    def getStats(): java.util.Map[ShardId, Int]
    def getFailed(): java.util.Set[ShardId]
  }
  
  object ShardRegionStats extends AbstractFunction1[Map[ShardId, Int], ShardRegionStats] {
    def apply(stats: Map[ShardId, Int]): ShardRegionStats
    def apply(stats: Map[ShardId, Int], failed: Set[ShardId]): ShardRegionStats
  }
  
  class CurrentShardRegionState(val shards: Set[ShardState], val failed: Set[ShardId]) {
    def this(shards: Set[ShardState])
    def getShards(): java.util.Set[ShardState]
    def getFailed(): java.util.Set[ShardId]
  }
  
  object CurrentShardRegionState extends AbstractFunction1[Set[ShardState], CurrentShardRegionState] {
    def apply(shards: Set[ShardState]): CurrentShardRegionState
    def apply(shards: Set[ShardState], failed: Set[ShardId]): CurrentShardRegionState
  }
  
  case class ShardState(shardId: ShardId, entityIds: Set[EntityId]) {
    def getEntityIds(): java.util.Set[EntityId]
  }
}

Message Routing

Configuration and Settings

Comprehensive configuration system for tuning sharding behavior, performance, and operational characteristics.

object ClusterShardingSettings {
  def apply(system: ActorSystem): ClusterShardingSettings
  def apply(config: Config): ClusterShardingSettings
  
  val StateStoreModePersistence = "persistence"
  val StateStoreModeDData = "ddata"
}

class ClusterShardingSettings(
  role: Option[String],
  rememberEntities: Boolean,
  stateStoreMode: String,
  tuningParameters: TuningParameters
)

Configuration

Shard Allocation Strategies

Pluggable strategies for controlling how shards are allocated and rebalanced across cluster nodes.

object ShardCoordinator {
  type ShardId = String
  
  trait ShardAllocationStrategy {
    def allocateShard(
      requester: ActorRef,
      shardId: ShardId,
      currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]
    ): Future[ActorRef]
    
    def rebalance(
      currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]],
      rebalanceInProgress: Set[ShardId]
    ): Future[Set[ShardId]]
  }
  
  // Factory methods for built-in strategies
  def leastShardAllocationStrategy(
    absoluteLimit: Int,
    relativeLimit: Double
  ): ShardAllocationStrategy
  
  class LeastShardAllocationStrategy(
    rebalanceThreshold: Int,
    maxSimultaneousRebalance: Int
  ) extends ShardAllocationStrategy
}

object ShardAllocationStrategy {
  def leastShardAllocationStrategy(
    absoluteLimit: Int,
    relativeLimit: Double
  ): ShardAllocationStrategy
}

Allocation Strategies

External Shard Allocation

Advanced API for external control of shard placement, enabling custom orchestration and management systems.

object ExternalShardAllocation extends ExtensionId[ExternalShardAllocation] with ExtensionIdProvider {
  def get(system: ClassicActorSystemProvider): ExternalShardAllocation
}

class ExternalShardAllocation(system: ExtendedActorSystem) extends Extension {
  def clientFor(typeName: String): scaladsl.ExternalShardAllocationClient
  def getClient(typeName: String): javadsl.ExternalShardAllocationClient
}

// Scala API
package scaladsl {
  trait ExternalShardAllocationClient {
    def updateShardLocation(shard: ShardId, location: Address): Future[Done]
    def updateShardLocations(locations: Map[ShardId, Address]): Future[Done] 
    def shardLocations(): Future[ShardLocations]
  }
}

// Java API  
package javadsl {
  trait ExternalShardAllocationClient {
    def updateShardLocation(shard: ShardId, location: Address): CompletionStage[Done]
    def updateShardLocations(locations: java.util.Map[ShardId, Address]): CompletionStage[Done]
    def shardLocations(): CompletionStage[ShardLocations]
  }
}

// Response type
case class ShardLocations(locations: Map[ShardId, Address]) {
  def getLocations(): java.util.Map[ShardId, Address]
}

External Allocation

Monitoring and Health Checks

Built-in monitoring, statistics collection, and health check capabilities for operational visibility.

// Health checking
class ClusterShardingHealthCheck extends HealthCheck {
  def isHealthy(): Boolean
}

// Observability and monitoring
class ShardingFlightRecorder extends Extension {
  def recordShardAllocation(typeName: String, shardId: ShardId, node: Address): Unit
  def recordShardRebalance(typeName: String, shardId: ShardId, from: Address, to: Address): Unit
}

object ShardingFlightRecorder extends ExtensionId[ShardingFlightRecorder] with ExtensionIdProvider {
  def get(system: ActorSystem): ShardingFlightRecorder
  def get(system: ClassicActorSystemProvider): ShardingFlightRecorder
}

Monitoring

Utility Components

Serialization Marker

trait ClusterShardingSerializable

Marker trait for messages that require special serialization handling in cluster sharding.

Data Cleanup Utility

object RemoveInternalClusterShardingData

Utility for removing internal cluster sharding data during maintenance operations.

Core Types

// Entity and Shard identifiers
type EntityId = String
type ShardId = String
type Msg = Any

// Message extraction types
type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]
type ExtractShardId = Msg => ShardId

// Core sharding components
trait Extension
trait ExtensionId[T <: Extension] {
  def get(system: ActorSystem): T
  def get(system: ClassicActorSystemProvider): T
}
trait ExtensionIdProvider

// Actor system integration
class ActorRef
class Props
class ActorSystem
class ExtendedActorSystem
trait ClassicActorSystemProvider

// Cluster integration
class Address
class Cluster
trait DataCenter
class Member

// Message types
trait ClusterShardingSerializable
trait ShardRegionCommand
trait ShardRegionQuery
trait NoSerializationVerificationNeeded

// Configuration types
class Config
class FiniteDuration
class ClusterSingletonManagerSettings

// Async types
class Future[T]
class Done
class CompletionStage[T]

// Health and monitoring
trait HealthCheck {
  def isHealthy(): Boolean
}

// Collections
trait Set[T] {
  def asJava: java.util.Set[T]
}
trait Map[K, V] {
  def asJava: java.util.Map[K, V]
}
trait IndexedSeq[T]

// Java interop
class Optional[T] {
  def orElse(other: T): T
}

// Utility functions
abstract class AbstractFunction1[T1, R] extends (T1 => R)

// Exception types
class IllegalStateException extends RuntimeException
class NoSuchElementException extends RuntimeException