or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

allocation-strategies.mdconfiguration.mdcore-extension.mdexternal-allocation.mdindex.mdmessage-routing.mdmonitoring.md

message-routing.mddocs/

0

# Message Routing and Entity Management

1

2

The ShardRegion is responsible for routing messages to entities, managing entity lifecycle, and providing monitoring capabilities. It acts as a local proxy that knows how to forward messages to the correct entities across the cluster.

3

4

## Core Message Routing Types

5

6

### Type Aliases

7

8

```scala { .api }

9

object ShardRegion {

10

type EntityId = String

11

type ShardId = String

12

type Msg = Any

13

type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]

14

type ExtractShardId = Msg => ShardId

15

}

16

```

17

18

### Message Extractor Interface (Java API)

19

20

```scala { .api }

21

trait MessageExtractor {

22

def entityId(message: Any): String

23

def entityMessage(message: Any): Any

24

def shardId(message: Any): String

25

}

26

```

27

28

Extract entity ID, shard ID, and entity message from incoming messages.

29

30

**Usage Example:**

31

```java

32

public class CounterMessageExtractor implements MessageExtractor {

33

private final int maxShards = 100;

34

35

@Override

36

public String entityId(Object message) {

37

if (message instanceof CounterMessage) {

38

return ((CounterMessage) message).getEntityId();

39

}

40

return null;

41

}

42

43

@Override

44

public Object entityMessage(Object message) {

45

if (message instanceof CounterMessage) {

46

return ((CounterMessage) message).getCommand();

47

}

48

return message;

49

}

50

51

@Override

52

public String shardId(Object message) {

53

if (message instanceof CounterMessage) {

54

String entityId = ((CounterMessage) message).getEntityId();

55

return String.valueOf(Math.abs(entityId.hashCode()) % maxShards);

56

}

57

return null;

58

}

59

}

60

```

61

62

### Hash Code Message Extractor

63

64

Convenience implementation using hash code for shard distribution:

65

66

```scala { .api }

67

abstract class HashCodeMessageExtractor(maxNumberOfShards: Int) extends MessageExtractor {

68

def entityMessage(message: Any): Any = message // Default implementation

69

def shardId(message: Any): String // Implemented using hash code

70

}

71

72

object HashCodeMessageExtractor {

73

def shardId(id: String, maxNumberOfShards: Int): String

74

}

75

```

76

77

**Usage Example:**

78

```java

79

public class CounterHashExtractor extends HashCodeMessageExtractor {

80

public CounterHashExtractor() {

81

super(100); // 100 shards maximum

82

}

83

84

@Override

85

public String entityId(Object message) {

86

if (message instanceof CounterMessage) {

87

return ((CounterMessage) message).getEntityId();

88

}

89

return null;

90

}

91

}

92

```

93

94

## Entity Lifecycle Management

95

96

### Passivation

97

98

Request graceful entity passivation to reduce memory consumption:

99

100

```scala { .api }

101

case class Passivate(stopMessage: Any) extends ShardRegionCommand

102

```

103

104

**Usage in Entity Actor:**

105

```scala

106

class CounterActor extends Actor {

107

context.setReceiveTimeout(30.seconds)

108

109

def receive = {

110

case ReceiveTimeout =>

111

// Request passivation when idle

112

context.parent ! ShardRegion.Passivate(PoisonPill)

113

114

case PoisonPill =>

115

// Graceful shutdown

116

context.stop(self)

117

118

case msg =>

119

// Handle business logic

120

handleMessage(msg)

121

}

122

}

123

```

124

125

### Graceful Shutdown

126

127

Shutdown all shards in the region:

128

129

```scala { .api }

130

case object GracefulShutdown extends ShardRegionCommand

131

def gracefulShutdownInstance = GracefulShutdown // Java API

132

```

133

134

**Usage Example:**

135

```scala

136

val region = ClusterSharding(system).shardRegion("Counter")

137

region ! ShardRegion.GracefulShutdown

138

// Watch the region to know when shutdown is complete

139

context.watch(region)

140

```

141

142

## Query Messages and Monitoring

143

144

### Get Current Regions

145

146

Query for all active shard regions in the cluster:

147

148

```scala { .api }

149

case object GetCurrentRegions extends ShardRegionQuery

150

def getCurrentRegionsInstance: GetCurrentRegions.type = GetCurrentRegions // Java API

151

152

case class CurrentRegions(regions: Set[Address]) {

153

def getRegions(): java.util.Set[Address] // Java API

154

}

155

```

156

157

**Usage Example:**

158

```scala

159

import akka.pattern.ask

160

import scala.concurrent.duration._

161

162

implicit val timeout = Timeout(5.seconds)

163

val future = (region ? ShardRegion.GetCurrentRegions).mapTo[ShardRegion.CurrentRegions]

164

future.foreach { response =>

165

println(s"Active regions: ${response.regions}")

166

}

167

```

168

169

### Get Cluster Sharding Statistics

170

171

Query cluster-wide sharding statistics:

172

173

```scala { .api }

174

case class GetClusterShardingStats(timeout: FiniteDuration) extends ShardRegionQuery

175

176

case class ClusterShardingStats(regions: Map[Address, ShardRegionStats]) {

177

def getRegions(): java.util.Map[Address, ShardRegionStats] // Java API

178

}

179

```

180

181

### Get Region Statistics

182

183

Query statistics for a specific region:

184

185

```scala { .api }

186

case object GetShardRegionStats extends ShardRegionQuery

187

def getRegionStatsInstance = GetShardRegionStats // Java API

188

189

class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId]) {

190

def this(stats: Map[ShardId, Int])

191

def getStats(): java.util.Map[ShardId, Int] // Java API

192

def getFailed(): java.util.Set[ShardId] // Java API

193

}

194

195

object ShardRegionStats extends AbstractFunction1[Map[ShardId, Int], ShardRegionStats] {

196

def apply(stats: Map[ShardId, Int]): ShardRegionStats

197

def apply(stats: Map[ShardId, Int], failed: Set[ShardId]): ShardRegionStats

198

}

199

```

200

201

**Usage Example:**

202

```scala

203

val future = (region ? ShardRegion.GetShardRegionStats).mapTo[ShardRegion.ShardRegionStats]

204

future.foreach { stats =>

205

println(s"Shard statistics: ${stats.stats}")

206

if (stats.failed.nonEmpty) {

207

println(s"Failed shards: ${stats.failed}")

208

}

209

}

210

```

211

212

### Get Region State

213

214

Query detailed state of a region including entities:

215

216

```scala { .api }

217

case object GetShardRegionState extends ShardRegionQuery

218

def getShardRegionStateInstance = GetShardRegionState // Java API

219

220

class CurrentShardRegionState(val shards: Set[ShardState], val failed: Set[ShardId]) {

221

def this(shards: Set[ShardState])

222

def getShards(): java.util.Set[ShardState] // Java API

223

def getFailed(): java.util.Set[ShardId] // Java API

224

}

225

226

object CurrentShardRegionState extends AbstractFunction1[Set[ShardState], CurrentShardRegionState] {

227

def apply(shards: Set[ShardState]): CurrentShardRegionState

228

def apply(shards: Set[ShardState], failed: Set[ShardId]): CurrentShardRegionState

229

}

230

231

case class ShardState(shardId: ShardId, entityIds: Set[EntityId]) {

232

def getEntityIds(): java.util.Set[EntityId] // Java API

233

}

234

```

235

236

**Usage Example:**

237

```scala

238

val future = (region ? ShardRegion.GetShardRegionState).mapTo[ShardRegion.CurrentShardRegionState]

239

future.foreach { state =>

240

state.shards.foreach { shardState =>

241

println(s"Shard ${shardState.shardId} has ${shardState.entityIds.size} entities")

242

}

243

}

244

```

245

246

## Internal Messages and Notifications

247

248

### Entity Lifecycle Messages

249

250

Messages for controlling individual entity lifecycle:

251

252

```scala { .api }

253

case class StartEntity(entityId: EntityId) extends ClusterShardingSerializable

254

case class StartEntityAck(entityId: EntityId, shardId: ShardId) extends ClusterShardingSerializable

255

```

256

257

- **`StartEntity`**: Explicitly starts an entity (used with remember-entities)

258

- **`StartEntityAck`**: Acknowledgment that entity was started

259

260

### Shard Initialization

261

262

Notification when a shard is ready to accept messages:

263

264

```scala { .api }

265

case class ShardInitialized(shardId: ShardId)

266

```

267

268

This message is sent internally and typically doesn't need to be handled by user code.

269

270

## Message Routing Flow

271

272

1. **Message Arrival**: Message arrives at ShardRegion

273

2. **Entity Extraction**: `ExtractEntityId` function extracts entity ID and message

274

3. **Shard Determination**: `ExtractShardId` function determines target shard

275

4. **Shard Resolution**: ShardRegion resolves shard location via ShardCoordinator

276

5. **Message Forwarding**: Message is forwarded to appropriate shard/entity

277

6. **Entity Creation**: If entity doesn't exist, it's created on-demand

278

7. **Message Delivery**: Message is delivered to target entity

279

280

## Error Handling

281

282

### Unhandled Messages

283

284

If `ExtractEntityId` doesn't match a message:

285

- Message is posted as `Unhandled` on the event stream

286

- No processing occurs

287

288

### Timeout Scenarios

289

290

Statistics and state queries may timeout:

291

- Failed shards are reported in the `failed` set

292

- Partial results are returned for successful shards

293

294

### Shard Failure

295

296

If a shard fails during operation:

297

- Entities in that shard become temporarily unavailable

298

- Messages are buffered until shard is restored

299

- ShardCoordinator handles shard reallocation