or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

allocation-strategies.mdconfiguration.mdcore-extension.mdexternal-allocation.mdindex.mdmessage-routing.mdmonitoring.md

external-allocation.mddocs/

0

# External Shard Allocation

1

2

The External Shard Allocation API provides advanced control over shard placement, enabling external systems to make centralized decisions about where shards should be located across the cluster.

3

4

## External Allocation Extension

5

6

### Extension Access

7

8

```scala { .api }

9

object ExternalShardAllocation extends ExtensionId[ExternalShardAllocation] with ExtensionIdProvider {

10

def get(system: ActorSystem): ExternalShardAllocation

11

def get(system: ClassicActorSystemProvider): ExternalShardAllocation

12

}

13

14

class ExternalShardAllocation(system: ExtendedActorSystem) extends Extension

15

```

16

17

**Usage Example:**

18

```scala

19

val externalShardAllocation = ExternalShardAllocation(system)

20

```

21

22

## External Allocation Strategy

23

24

### Strategy Configuration

25

26

```scala { .api }

27

class ExternalShardAllocationStrategy(

28

system: ActorSystem,

29

typeName: String

30

) extends ShardAllocationStrategy

31

```

32

33

This strategy delegates all allocation decisions to external control via the ExternalShardAllocationClient.

34

35

**Usage Example:**

36

```scala

37

import akka.cluster.sharding.external.ExternalShardAllocationStrategy

38

39

val strategy = new ExternalShardAllocationStrategy(system, "MyEntity")

40

41

ClusterSharding(system).start(

42

typeName = "MyEntity",

43

entityProps = Props[MyEntityActor](),

44

settings = ClusterShardingSettings(system),

45

extractEntityId = extractEntityId,

46

extractShardId = extractShardId,

47

allocationStrategy = strategy,

48

handOffStopMessage = PoisonPill

49

)

50

```

51

52

### Shard Location Specification

53

54

```scala { .api }

55

case class ShardLocation(region: ActorRef)

56

```

57

58

Represents the desired location for a shard.

59

60

## External Allocation Client APIs

61

62

### Scala API

63

64

```scala { .api }

65

trait ExternalShardAllocationClient {

66

def updateShardLocation(shard: ShardId, location: Address): Future[Done]

67

def updateShardLocations(locations: Map[ShardId, Address]): Future[Done]

68

def shardLocations(): Future[ShardLocations]

69

}

70

```

71

72

**Accessing the Client:**

73

```scala

74

import akka.cluster.sharding.external.scaladsl.ExternalShardAllocationClient

75

76

val client: ExternalShardAllocationClient =

77

ExternalShardAllocation(system).clientFor("MyEntity")

78

```

79

80

### Java API

81

82

```scala { .api }

83

trait ExternalShardAllocationClient {

84

def updateShardLocation(shard: String, location: Address): CompletionStage[Done]

85

def updateShardLocations(locations: java.util.Map[String, Address]): CompletionStage[Done]

86

def shardLocations(): CompletionStage[ShardLocations]

87

}

88

```

89

90

**Accessing the Client:**

91

```java

92

import akka.cluster.sharding.external.javadsl.ExternalShardAllocationClient;

93

94

ExternalShardAllocationClient client =

95

ExternalShardAllocation.get(system).getClient("MyEntity");

96

```

97

98

## Client Operations

99

100

### Update Single Shard Location

101

102

Move a specific shard to a target node:

103

104

```scala { .api }

105

def updateShardLocation(shard: ShardId, location: Address): Future[Done]

106

```

107

108

**Scala Example:**

109

```scala

110

import akka.cluster.Cluster

111

112

val cluster = Cluster(system)

113

val targetNode = cluster.state.members.head.address

114

115

val future = client.updateShardLocation("shard-1", targetNode)

116

future.onComplete {

117

case Success(_) => println("Shard location updated successfully")

118

case Failure(exception) => println(s"Failed to update shard location: $exception")

119

}

120

```

121

122

**Java Example:**

123

```java

124

Address targetNode = Cluster.get(system).state().getMembers().iterator().next().address();

125

126

CompletionStage<Done> future = client.updateShardLocation("shard-1", targetNode);

127

future.whenComplete((done, throwable) -> {

128

if (throwable == null) {

129

System.out.println("Shard location updated successfully");

130

} else {

131

System.out.println("Failed to update shard location: " + throwable);

132

}

133

});

134

```

135

136

### Update Multiple Shard Locations

137

138

Update locations for multiple shards in a single operation:

139

140

```scala { .api }

141

def updateShardLocations(locations: Map[ShardId, Address]): Future[Done]

142

```

143

144

**Scala Example:**

145

```scala

146

val cluster = Cluster(system)

147

val nodes = cluster.state.members.map(_.address).toList

148

149

val shardLocations = Map(

150

"shard-1" -> nodes(0),

151

"shard-2" -> nodes(1),

152

"shard-3" -> nodes(0)

153

)

154

155

val future = client.updateShardLocations(shardLocations)

156

future.foreach(_ => println("All shard locations updated"))

157

```

158

159

**Java Example:**

160

```java

161

import java.util.HashMap;

162

import java.util.Map;

163

164

List<Address> nodes = new ArrayList<>(Cluster.get(system).state().getMembers())

165

.stream().map(Member::address).collect(Collectors.toList());

166

167

Map<String, Address> shardLocations = new HashMap<>();

168

shardLocations.put("shard-1", nodes.get(0));

169

shardLocations.put("shard-2", nodes.get(1));

170

shardLocations.put("shard-3", nodes.get(0));

171

172

CompletionStage<Done> future = client.updateShardLocations(shardLocations);

173

future.thenRun(() -> System.out.println("All shard locations updated"));

174

```

175

176

### Query Current Shard Locations

177

178

Retrieve current shard location mappings:

179

180

```scala { .api }

181

def shardLocations(): Future[ShardLocations]

182

```

183

184

**Scala Example:**

185

```scala

186

val future = client.shardLocations()

187

future.foreach { locations =>

188

println(s"Current shard locations: ${locations.locations}")

189

}

190

```

191

192

**Java Example:**

193

```java

194

CompletionStage<ShardLocations> future = client.shardLocations();

195

future.thenAccept(locations -> {

196

System.out.println("Current shard locations: " + locations.getLocations());

197

});

198

```

199

200

## Shard Locations Container

201

202

### ShardLocations Class

203

204

```scala { .api }

205

case class ShardLocations(locations: Map[ShardId, Address]) {

206

def getLocations(): java.util.Map[String, Address] // Java API

207

}

208

```

209

210

Contains the mapping of shard IDs to their designated cluster node addresses.

211

212

## Error Handling

213

214

### Client Timeout Exception

215

216

```scala { .api }

217

class ClientTimeoutException(message: String, cause: Throwable) extends RuntimeException(message, cause)

218

```

219

220

Thrown when external allocation operations timeout.

221

222

**Handling Timeouts:**

223

```scala

224

client.updateShardLocation("shard-1", targetNode).recover {

225

case _: ClientTimeoutException =>

226

println("Shard location update timed out, will retry")

227

Done

228

case other =>

229

println(s"Unexpected error: $other")

230

throw other

231

}

232

```

233

234

## Integration Patterns

235

236

### Centralized Orchestration

237

238

Use external allocation with a centralized orchestration service:

239

240

```scala

241

class ShardOrchestrator(client: ExternalShardAllocationClient) {

242

243

def rebalanceShards(): Future[Done] = {

244

for {

245

currentLocations <- client.shardLocations()

246

newLocations = calculateOptimalDistribution(currentLocations)

247

_ <- client.updateShardLocations(newLocations)

248

} yield Done

249

}

250

251

private def calculateOptimalDistribution(

252

current: ShardLocations

253

): Map[ShardId, Address] = {

254

// Custom logic for optimal shard distribution

255

current.locations

256

}

257

}

258

```

259

260

### Load-Based Allocation

261

262

Integrate with monitoring systems for load-based shard placement:

263

264

```scala

265

class LoadBasedAllocator(

266

client: ExternalShardAllocationClient,

267

metricsCollector: MetricsCollector

268

) {

269

270

def allocateBasedOnLoad(): Future[Done] = {

271

for {

272

nodeLoads <- metricsCollector.getNodeLoads()

273

shardLoads <- metricsCollector.getShardLoads()

274

optimalPlacements = calculateOptimalPlacements(nodeLoads, shardLoads)

275

_ <- client.updateShardLocations(optimalPlacements)

276

} yield Done

277

}

278

279

private def calculateOptimalPlacements(

280

nodeLoads: Map[Address, Double],

281

shardLoads: Map[ShardId, Double]

282

): Map[ShardId, Address] = {

283

// Place high-load shards on low-load nodes

284

Map.empty // Placeholder

285

}

286

}

287

```

288

289

### Maintenance and Migration

290

291

Use external allocation for planned maintenance:

292

293

```scala

294

class MaintenanceManager(client: ExternalShardAllocationClient) {

295

296

def evacuateNode(nodeToEvacuate: Address): Future[Done] = {

297

for {

298

currentLocations <- client.shardLocations()

299

shardsToMove = currentLocations.locations.filter(_._2 == nodeToEvacuate)

300

availableNodes = getAvailableNodes().filterNot(_ == nodeToEvacuate)

301

newLocations = redistributeShards(shardsToMove.keys, availableNodes)

302

_ <- client.updateShardLocations(newLocations)

303

} yield Done

304

}

305

306

private def redistributeShards(

307

shards: Iterable[ShardId],

308

availableNodes: List[Address]

309

): Map[ShardId, Address] = {

310

// Evenly distribute shards across available nodes

311

shards.zipWithIndex.map { case (shard, index) =>

312

shard -> availableNodes(index % availableNodes.size)

313

}.toMap

314

}

315

316

private def getAvailableNodes(): List[Address] = {

317

// Get list of healthy cluster nodes

318

List.empty // Placeholder

319

}

320

}

321

```

322

323

## Best Practices

324

325

### Timing Considerations

326

- Allow time for shard movement to complete before making new changes

327

- Use the returned `Future[Done]` to coordinate sequential operations

328

- Consider cluster rebalancing intervals when making updates

329

330

### Error Recovery

331

- Implement retry logic for transient failures

332

- Monitor for `ClientTimeoutException` and handle appropriately

333

- Validate node addresses before attempting updates

334

335

### Performance

336

- Update multiple shards in batch operations when possible

337

- Cache shard location information to reduce query frequency

338

- Avoid excessive updates that could cause cluster instability

339

340

### Monitoring

341

- Track successful vs failed allocation updates

342

- Monitor shard movement completion times

343

- Alert on repeated allocation failures