0
# Message Routing and Entity Management
1
2
The ShardRegion is responsible for routing messages to entities, managing entity lifecycle, and providing monitoring capabilities. It acts as a local proxy that knows how to forward messages to the correct entities across the cluster.
3
4
## Core Message Routing Types
5
6
### Type Aliases
7
8
```scala { .api }
9
object ShardRegion {
10
type EntityId = String
11
type ShardId = String
12
type Msg = Any
13
type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]
14
type ExtractShardId = Msg => ShardId
15
}
16
```
17
18
### Message Extractor Interface (Java API)
19
20
```scala { .api }
21
trait MessageExtractor {
22
def entityId(message: Any): String
23
def entityMessage(message: Any): Any
24
def shardId(message: Any): String
25
}
26
```
27
28
Extract entity ID, shard ID, and entity message from incoming messages.
29
30
**Usage Example:**
31
```java
32
public class CounterMessageExtractor implements MessageExtractor {
33
private final int maxShards = 100;
34
35
@Override
36
public String entityId(Object message) {
37
if (message instanceof CounterMessage) {
38
return ((CounterMessage) message).getEntityId();
39
}
40
return null;
41
}
42
43
@Override
44
public Object entityMessage(Object message) {
45
if (message instanceof CounterMessage) {
46
return ((CounterMessage) message).getCommand();
47
}
48
return message;
49
}
50
51
@Override
52
public String shardId(Object message) {
53
if (message instanceof CounterMessage) {
54
String entityId = ((CounterMessage) message).getEntityId();
55
return String.valueOf(Math.abs(entityId.hashCode()) % maxShards);
56
}
57
return null;
58
}
59
}
60
```
61
62
### Hash Code Message Extractor
63
64
Convenience implementation using hash code for shard distribution:
65
66
```scala { .api }
67
abstract class HashCodeMessageExtractor(maxNumberOfShards: Int) extends MessageExtractor {
68
def entityMessage(message: Any): Any = message // Default implementation
69
def shardId(message: Any): String // Implemented using hash code
70
}
71
72
object HashCodeMessageExtractor {
73
def shardId(id: String, maxNumberOfShards: Int): String
74
}
75
```
76
77
**Usage Example:**
78
```java
79
public class CounterHashExtractor extends HashCodeMessageExtractor {
80
public CounterHashExtractor() {
81
super(100); // 100 shards maximum
82
}
83
84
@Override
85
public String entityId(Object message) {
86
if (message instanceof CounterMessage) {
87
return ((CounterMessage) message).getEntityId();
88
}
89
return null;
90
}
91
}
92
```
93
94
## Entity Lifecycle Management
95
96
### Passivation
97
98
Request graceful entity passivation to reduce memory consumption:
99
100
```scala { .api }
101
case class Passivate(stopMessage: Any) extends ShardRegionCommand
102
```
103
104
**Usage in Entity Actor:**
105
```scala
106
class CounterActor extends Actor {
107
context.setReceiveTimeout(30.seconds)
108
109
def receive = {
110
case ReceiveTimeout =>
111
// Request passivation when idle
112
context.parent ! ShardRegion.Passivate(PoisonPill)
113
114
case PoisonPill =>
115
// Graceful shutdown
116
context.stop(self)
117
118
case msg =>
119
// Handle business logic
120
handleMessage(msg)
121
}
122
}
123
```
124
125
### Graceful Shutdown
126
127
Shutdown all shards in the region:
128
129
```scala { .api }
130
case object GracefulShutdown extends ShardRegionCommand
131
def gracefulShutdownInstance = GracefulShutdown // Java API
132
```
133
134
**Usage Example:**
135
```scala
136
val region = ClusterSharding(system).shardRegion("Counter")
137
region ! ShardRegion.GracefulShutdown
138
// Watch the region to know when shutdown is complete
139
context.watch(region)
140
```
141
142
## Query Messages and Monitoring
143
144
### Get Current Regions
145
146
Query for all active shard regions in the cluster:
147
148
```scala { .api }
149
case object GetCurrentRegions extends ShardRegionQuery
150
def getCurrentRegionsInstance: GetCurrentRegions.type = GetCurrentRegions // Java API
151
152
case class CurrentRegions(regions: Set[Address]) {
153
def getRegions(): java.util.Set[Address] // Java API
154
}
155
```
156
157
**Usage Example:**
158
```scala
159
import akka.pattern.ask
160
import scala.concurrent.duration._
161
162
implicit val timeout = Timeout(5.seconds)
163
val future = (region ? ShardRegion.GetCurrentRegions).mapTo[ShardRegion.CurrentRegions]
164
future.foreach { response =>
165
println(s"Active regions: ${response.regions}")
166
}
167
```
168
169
### Get Cluster Sharding Statistics
170
171
Query cluster-wide sharding statistics:
172
173
```scala { .api }
174
case class GetClusterShardingStats(timeout: FiniteDuration) extends ShardRegionQuery
175
176
case class ClusterShardingStats(regions: Map[Address, ShardRegionStats]) {
177
def getRegions(): java.util.Map[Address, ShardRegionStats] // Java API
178
}
179
```
180
181
### Get Region Statistics
182
183
Query statistics for a specific region:
184
185
```scala { .api }
186
case object GetShardRegionStats extends ShardRegionQuery
187
def getRegionStatsInstance = GetShardRegionStats // Java API
188
189
class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId]) {
190
def this(stats: Map[ShardId, Int])
191
def getStats(): java.util.Map[ShardId, Int] // Java API
192
def getFailed(): java.util.Set[ShardId] // Java API
193
}
194
195
object ShardRegionStats extends AbstractFunction1[Map[ShardId, Int], ShardRegionStats] {
196
def apply(stats: Map[ShardId, Int]): ShardRegionStats
197
def apply(stats: Map[ShardId, Int], failed: Set[ShardId]): ShardRegionStats
198
}
199
```
200
201
**Usage Example:**
202
```scala
203
val future = (region ? ShardRegion.GetShardRegionStats).mapTo[ShardRegion.ShardRegionStats]
204
future.foreach { stats =>
205
println(s"Shard statistics: ${stats.stats}")
206
if (stats.failed.nonEmpty) {
207
println(s"Failed shards: ${stats.failed}")
208
}
209
}
210
```
211
212
### Get Region State
213
214
Query detailed state of a region including entities:
215
216
```scala { .api }
217
case object GetShardRegionState extends ShardRegionQuery
218
def getShardRegionStateInstance = GetShardRegionState // Java API
219
220
class CurrentShardRegionState(val shards: Set[ShardState], val failed: Set[ShardId]) {
221
def this(shards: Set[ShardState])
222
def getShards(): java.util.Set[ShardState] // Java API
223
def getFailed(): java.util.Set[ShardId] // Java API
224
}
225
226
object CurrentShardRegionState extends AbstractFunction1[Set[ShardState], CurrentShardRegionState] {
227
def apply(shards: Set[ShardState]): CurrentShardRegionState
228
def apply(shards: Set[ShardState], failed: Set[ShardId]): CurrentShardRegionState
229
}
230
231
case class ShardState(shardId: ShardId, entityIds: Set[EntityId]) {
232
def getEntityIds(): java.util.Set[EntityId] // Java API
233
}
234
```
235
236
**Usage Example:**
237
```scala
238
val future = (region ? ShardRegion.GetShardRegionState).mapTo[ShardRegion.CurrentShardRegionState]
239
future.foreach { state =>
240
state.shards.foreach { shardState =>
241
println(s"Shard ${shardState.shardId} has ${shardState.entityIds.size} entities")
242
}
243
}
244
```
245
246
## Internal Messages and Notifications
247
248
### Entity Lifecycle Messages
249
250
Messages for controlling individual entity lifecycle:
251
252
```scala { .api }
253
case class StartEntity(entityId: EntityId) extends ClusterShardingSerializable
254
case class StartEntityAck(entityId: EntityId, shardId: ShardId) extends ClusterShardingSerializable
255
```
256
257
- **`StartEntity`**: Explicitly starts an entity (used with remember-entities)
258
- **`StartEntityAck`**: Acknowledgment that entity was started
259
260
### Shard Initialization
261
262
Notification when a shard is ready to accept messages:
263
264
```scala { .api }
265
case class ShardInitialized(shardId: ShardId)
266
```
267
268
This message is sent internally and typically doesn't need to be handled by user code.
269
270
## Message Routing Flow
271
272
1. **Message Arrival**: Message arrives at ShardRegion
273
2. **Entity Extraction**: `ExtractEntityId` function extracts entity ID and message
274
3. **Shard Determination**: `ExtractShardId` function determines target shard
275
4. **Shard Resolution**: ShardRegion resolves shard location via ShardCoordinator
276
5. **Message Forwarding**: Message is forwarded to appropriate shard/entity
277
6. **Entity Creation**: If entity doesn't exist, it's created on-demand
278
7. **Message Delivery**: Message is delivered to target entity
279
280
## Error Handling
281
282
### Unhandled Messages
283
284
If `ExtractEntityId` doesn't match a message:
285
- Message is posted as `Unhandled` on the event stream
286
- No processing occurs
287
288
### Timeout Scenarios
289
290
Statistics and state queries may timeout:
291
- Failed shards are reported in the `failed` set
292
- Partial results are returned for successful shards
293
294
### Shard Failure
295
296
If a shard fails during operation:
297
- Entities in that shard become temporarily unavailable
298
- Messages are buffered until shard is restored
299
- ShardCoordinator handles shard reallocation