0
# Akka Cluster Sharding
1
2
Akka Cluster Sharding provides location transparency and automatic distribution of stateful actors across cluster nodes. It enables developers to create scalable distributed applications where entities are automatically partitioned across the cluster and can be accessed by logical identifiers without knowing their physical location.
3
4
## Package Information
5
6
- **Package Name**: com.typesafe.akka:akka-cluster-sharding_2.13
7
- **Package Type**: Maven
8
- **Language**: Scala
9
- **Installation**: `libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding" % "2.8.8"`
10
11
## Core Imports
12
13
```scala
14
import akka.cluster.sharding.ClusterSharding
15
import akka.cluster.sharding.ClusterShardingSettings
16
import akka.cluster.sharding.ShardRegion
17
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
18
```
19
20
## Basic Usage
21
22
```scala
23
import akka.actor.{ActorSystem, Props}
24
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}
25
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
26
27
implicit val system: ActorSystem = ActorSystem("MyClusterSystem")
28
29
// Define entity extraction functions
30
val extractEntityId: ShardRegion.ExtractEntityId = {
31
case msg @ MyEntityMessage(entityId, _) => (entityId, msg)
32
}
33
34
val extractShardId: ShardRegion.ExtractShardId = {
35
case MyEntityMessage(entityId, _) => (entityId.hashCode % 100).toString
36
}
37
38
// Start cluster sharding for an entity type
39
val shardRegion = ClusterSharding(system).start(
40
typeName = "MyEntity",
41
entityProps = Props[MyEntityActor](),
42
settings = ClusterShardingSettings(system),
43
extractEntityId = extractEntityId,
44
extractShardId = extractShardId
45
)
46
47
// Send messages to entities
48
shardRegion ! MyEntityMessage("entity-1", "Hello World")
49
```
50
51
## Architecture
52
53
Akka Cluster Sharding is built around several key components:
54
55
- **ClusterSharding Extension**: Main entry point that manages sharded entities across the cluster
56
- **ShardRegion**: Local proxy that routes messages to entities and handles shard lifecycle
57
- **ShardCoordinator**: Centralized singleton that manages shard allocation and rebalancing
58
- **Shard**: Container for a group of entities, distributed across cluster nodes
59
- **Entity**: Individual stateful actors that process business logic
60
- **Allocation Strategies**: Pluggable algorithms that determine shard placement and rebalancing
61
62
## Capabilities
63
64
### Core Sharding Extension
65
66
Main extension for registering entity types and managing distributed sharding across the cluster. Provides lifecycle management and access to shard regions.
67
68
```scala { .api }
69
object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProvider {
70
def get(system: ActorSystem): ClusterSharding
71
def get(system: ClassicActorSystemProvider): ClusterSharding
72
}
73
74
class ClusterSharding(system: ExtendedActorSystem) extends Extension {
75
// Full configuration start methods
76
def start(
77
typeName: String,
78
entityProps: Props,
79
settings: ClusterShardingSettings,
80
extractEntityId: ShardRegion.ExtractEntityId,
81
extractShardId: ShardRegion.ExtractShardId,
82
allocationStrategy: ShardAllocationStrategy,
83
handOffStopMessage: Any
84
): ActorRef
85
86
def start(
87
typeName: String,
88
entityProps: Props,
89
extractEntityId: ShardRegion.ExtractEntityId,
90
extractShardId: ShardRegion.ExtractShardId,
91
allocationStrategy: ShardAllocationStrategy,
92
handOffStopMessage: Any
93
): ActorRef
94
95
// Simplified start methods with defaults
96
def start(
97
typeName: String,
98
entityProps: Props,
99
settings: ClusterShardingSettings,
100
extractEntityId: ShardRegion.ExtractEntityId,
101
extractShardId: ShardRegion.ExtractShardId
102
): ActorRef
103
104
def start(
105
typeName: String,
106
entityProps: Props,
107
extractEntityId: ShardRegion.ExtractEntityId,
108
extractShardId: ShardRegion.ExtractShardId
109
): ActorRef
110
111
// Java API start methods
112
def start(
113
typeName: String,
114
entityProps: Props,
115
settings: ClusterShardingSettings,
116
messageExtractor: ShardRegion.MessageExtractor,
117
allocationStrategy: ShardAllocationStrategy,
118
handOffStopMessage: Any
119
): ActorRef
120
121
def start(
122
typeName: String,
123
entityProps: Props,
124
settings: ClusterShardingSettings,
125
messageExtractor: ShardRegion.MessageExtractor
126
): ActorRef
127
128
def start(
129
typeName: String,
130
entityProps: Props,
131
messageExtractor: ShardRegion.MessageExtractor
132
): ActorRef
133
134
// Proxy start methods
135
def startProxy(
136
typeName: String,
137
role: Option[String],
138
extractEntityId: ShardRegion.ExtractEntityId,
139
extractShardId: ShardRegion.ExtractShardId
140
): ActorRef
141
142
def startProxy(
143
typeName: String,
144
role: Option[String],
145
dataCenter: Option[DataCenter],
146
extractEntityId: ShardRegion.ExtractEntityId,
147
extractShardId: ShardRegion.ExtractShardId
148
): ActorRef
149
150
def startProxy(
151
typeName: String,
152
role: Optional[String],
153
messageExtractor: ShardRegion.MessageExtractor
154
): ActorRef
155
156
def startProxy(
157
typeName: String,
158
role: Optional[String],
159
dataCenter: Optional[String],
160
messageExtractor: ShardRegion.MessageExtractor
161
): ActorRef
162
163
// Access methods
164
def shardRegion(typeName: String): ActorRef
165
def shardRegionProxy(typeName: String, dataCenter: DataCenter): ActorRef
166
def shardTypeNames: Set[String]
167
def getShardTypeNames: java.util.Set[String]
168
def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy
169
}
170
```
171
172
[Core Extension](./core-extension.md)
173
174
### Message Routing and Entity Management
175
176
ShardRegion handles message routing, entity lifecycle, and provides monitoring capabilities for distributed entities.
177
178
```scala { .api }
179
object ShardRegion {
180
// Type aliases
181
type EntityId = String
182
type ShardId = String
183
type Msg = Any
184
type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]
185
type ExtractShardId = Msg => ShardId
186
187
// Message extractor interface
188
trait MessageExtractor {
189
def entityId(message: Any): String
190
def entityMessage(message: Any): Any
191
def shardId(message: Any): String
192
}
193
194
// Hash-based message extractor
195
abstract class HashCodeMessageExtractor(maxNumberOfShards: Int) extends MessageExtractor {
196
def entityMessage(message: Any): Any = message
197
def shardId(message: Any): String
198
}
199
200
object HashCodeMessageExtractor {
201
def shardId(id: String, maxNumberOfShards: Int): String
202
}
203
204
// Command messages
205
sealed trait ShardRegionCommand
206
case class Passivate(stopMessage: Any) extends ShardRegionCommand
207
case object GracefulShutdown extends ShardRegionCommand
208
def gracefulShutdownInstance: GracefulShutdown.type
209
210
// Entity lifecycle
211
case class ShardInitialized(shardId: ShardId)
212
case class StartEntity(entityId: EntityId)
213
214
// Query messages
215
sealed trait ShardRegionQuery
216
case object GetCurrentRegions extends ShardRegionQuery
217
def getCurrentRegionsInstance: GetCurrentRegions.type
218
case class GetClusterShardingStats(timeout: FiniteDuration) extends ShardRegionQuery
219
case object GetShardRegionStats extends ShardRegionQuery
220
def getRegionStatsInstance: GetShardRegionStats.type
221
case object GetShardRegionState extends ShardRegionQuery
222
def getShardRegionStateInstance: GetShardRegionState.type
223
224
// Response messages
225
case class CurrentRegions(regions: Set[Address]) {
226
def getRegions: java.util.Set[Address]
227
}
228
229
case class ClusterShardingStats(regions: Map[Address, ShardRegionStats]) {
230
def getRegions(): java.util.Map[Address, ShardRegionStats]
231
}
232
233
class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId]) {
234
def this(stats: Map[ShardId, Int])
235
def getStats(): java.util.Map[ShardId, Int]
236
def getFailed(): java.util.Set[ShardId]
237
}
238
239
object ShardRegionStats extends AbstractFunction1[Map[ShardId, Int], ShardRegionStats] {
240
def apply(stats: Map[ShardId, Int]): ShardRegionStats
241
def apply(stats: Map[ShardId, Int], failed: Set[ShardId]): ShardRegionStats
242
}
243
244
class CurrentShardRegionState(val shards: Set[ShardState], val failed: Set[ShardId]) {
245
def this(shards: Set[ShardState])
246
def getShards(): java.util.Set[ShardState]
247
def getFailed(): java.util.Set[ShardId]
248
}
249
250
object CurrentShardRegionState extends AbstractFunction1[Set[ShardState], CurrentShardRegionState] {
251
def apply(shards: Set[ShardState]): CurrentShardRegionState
252
def apply(shards: Set[ShardState], failed: Set[ShardId]): CurrentShardRegionState
253
}
254
255
case class ShardState(shardId: ShardId, entityIds: Set[EntityId]) {
256
def getEntityIds(): java.util.Set[EntityId]
257
}
258
}
259
```
260
261
[Message Routing](./message-routing.md)
262
263
### Configuration and Settings
264
265
Comprehensive configuration system for tuning sharding behavior, performance, and operational characteristics.
266
267
```scala { .api }
268
object ClusterShardingSettings {
269
def apply(system: ActorSystem): ClusterShardingSettings
270
def apply(config: Config): ClusterShardingSettings
271
272
val StateStoreModePersistence = "persistence"
273
val StateStoreModeDData = "ddata"
274
}
275
276
class ClusterShardingSettings(
277
role: Option[String],
278
rememberEntities: Boolean,
279
stateStoreMode: String,
280
tuningParameters: TuningParameters
281
)
282
```
283
284
[Configuration](./configuration.md)
285
286
### Shard Allocation Strategies
287
288
Pluggable strategies for controlling how shards are allocated and rebalanced across cluster nodes.
289
290
```scala { .api }
291
object ShardCoordinator {
292
type ShardId = String
293
294
trait ShardAllocationStrategy {
295
def allocateShard(
296
requester: ActorRef,
297
shardId: ShardId,
298
currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]
299
): Future[ActorRef]
300
301
def rebalance(
302
currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]],
303
rebalanceInProgress: Set[ShardId]
304
): Future[Set[ShardId]]
305
}
306
307
// Factory methods for built-in strategies
308
def leastShardAllocationStrategy(
309
absoluteLimit: Int,
310
relativeLimit: Double
311
): ShardAllocationStrategy
312
313
class LeastShardAllocationStrategy(
314
rebalanceThreshold: Int,
315
maxSimultaneousRebalance: Int
316
) extends ShardAllocationStrategy
317
}
318
319
object ShardAllocationStrategy {
320
def leastShardAllocationStrategy(
321
absoluteLimit: Int,
322
relativeLimit: Double
323
): ShardAllocationStrategy
324
}
325
```
326
327
[Allocation Strategies](./allocation-strategies.md)
328
329
### External Shard Allocation
330
331
Advanced API for external control of shard placement, enabling custom orchestration and management systems.
332
333
```scala { .api }
334
object ExternalShardAllocation extends ExtensionId[ExternalShardAllocation] with ExtensionIdProvider {
335
def get(system: ClassicActorSystemProvider): ExternalShardAllocation
336
}
337
338
class ExternalShardAllocation(system: ExtendedActorSystem) extends Extension {
339
def clientFor(typeName: String): scaladsl.ExternalShardAllocationClient
340
def getClient(typeName: String): javadsl.ExternalShardAllocationClient
341
}
342
343
// Scala API
344
package scaladsl {
345
trait ExternalShardAllocationClient {
346
def updateShardLocation(shard: ShardId, location: Address): Future[Done]
347
def updateShardLocations(locations: Map[ShardId, Address]): Future[Done]
348
def shardLocations(): Future[ShardLocations]
349
}
350
}
351
352
// Java API
353
package javadsl {
354
trait ExternalShardAllocationClient {
355
def updateShardLocation(shard: ShardId, location: Address): CompletionStage[Done]
356
def updateShardLocations(locations: java.util.Map[ShardId, Address]): CompletionStage[Done]
357
def shardLocations(): CompletionStage[ShardLocations]
358
}
359
}
360
361
// Response type
362
case class ShardLocations(locations: Map[ShardId, Address]) {
363
def getLocations(): java.util.Map[ShardId, Address]
364
}
365
```
366
367
[External Allocation](./external-allocation.md)
368
369
### Monitoring and Health Checks
370
371
Built-in monitoring, statistics collection, and health check capabilities for operational visibility.
372
373
```scala { .api }
374
// Health checking
375
class ClusterShardingHealthCheck extends HealthCheck {
376
def isHealthy(): Boolean
377
}
378
379
// Observability and monitoring
380
class ShardingFlightRecorder extends Extension {
381
def recordShardAllocation(typeName: String, shardId: ShardId, node: Address): Unit
382
def recordShardRebalance(typeName: String, shardId: ShardId, from: Address, to: Address): Unit
383
}
384
385
object ShardingFlightRecorder extends ExtensionId[ShardingFlightRecorder] with ExtensionIdProvider {
386
def get(system: ActorSystem): ShardingFlightRecorder
387
def get(system: ClassicActorSystemProvider): ShardingFlightRecorder
388
}
389
```
390
391
[Monitoring](./monitoring.md)
392
393
## Utility Components
394
395
### Serialization Marker
396
397
```scala { .api }
398
trait ClusterShardingSerializable
399
```
400
401
Marker trait for messages that require special serialization handling in cluster sharding.
402
403
### Data Cleanup Utility
404
405
```scala { .api }
406
object RemoveInternalClusterShardingData
407
```
408
409
Utility for removing internal cluster sharding data during maintenance operations.
410
411
## Core Types
412
413
```scala { .api }
414
// Entity and Shard identifiers
415
type EntityId = String
416
type ShardId = String
417
type Msg = Any
418
419
// Message extraction types
420
type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]
421
type ExtractShardId = Msg => ShardId
422
423
// Core sharding components
424
trait Extension
425
trait ExtensionId[T <: Extension] {
426
def get(system: ActorSystem): T
427
def get(system: ClassicActorSystemProvider): T
428
}
429
trait ExtensionIdProvider
430
431
// Actor system integration
432
class ActorRef
433
class Props
434
class ActorSystem
435
class ExtendedActorSystem
436
trait ClassicActorSystemProvider
437
438
// Cluster integration
439
class Address
440
class Cluster
441
trait DataCenter
442
class Member
443
444
// Message types
445
trait ClusterShardingSerializable
446
trait ShardRegionCommand
447
trait ShardRegionQuery
448
trait NoSerializationVerificationNeeded
449
450
// Configuration types
451
class Config
452
class FiniteDuration
453
class ClusterSingletonManagerSettings
454
455
// Async types
456
class Future[T]
457
class Done
458
class CompletionStage[T]
459
460
// Health and monitoring
461
trait HealthCheck {
462
def isHealthy(): Boolean
463
}
464
465
// Collections
466
trait Set[T] {
467
def asJava: java.util.Set[T]
468
}
469
trait Map[K, V] {
470
def asJava: java.util.Map[K, V]
471
}
472
trait IndexedSeq[T]
473
474
// Java interop
475
class Optional[T] {
476
def orElse(other: T): T
477
}
478
479
// Utility functions
480
abstract class AbstractFunction1[T1, R] extends (T1 => R)
481
482
// Exception types
483
class IllegalStateException extends RuntimeException
484
class NoSuchElementException extends RuntimeException
485
```