or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-com-typesafe-akka--akka-cluster-sharding_2-13

Akka Cluster Sharding provides location transparency and automatic distribution of stateful actors across cluster nodes

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/com.typesafe.akka/akka-cluster-sharding_2.13@2.8.x

To install, run

npx @tessl/cli install tessl/maven-com-typesafe-akka--akka-cluster-sharding_2-13@2.8.0

0

# Akka Cluster Sharding

1

2

Akka Cluster Sharding provides location transparency and automatic distribution of stateful actors across cluster nodes. It enables developers to create scalable distributed applications where entities are automatically partitioned across the cluster and can be accessed by logical identifiers without knowing their physical location.

3

4

## Package Information

5

6

- **Package Name**: com.typesafe.akka:akka-cluster-sharding_2.13

7

- **Package Type**: Maven

8

- **Language**: Scala

9

- **Installation**: `libraryDependencies += "com.typesafe.akka" %% "akka-cluster-sharding" % "2.8.8"`

10

11

## Core Imports

12

13

```scala

14

import akka.cluster.sharding.ClusterSharding

15

import akka.cluster.sharding.ClusterShardingSettings

16

import akka.cluster.sharding.ShardRegion

17

import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy

18

```

19

20

## Basic Usage

21

22

```scala

23

import akka.actor.{ActorSystem, Props}

24

import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings, ShardRegion}

25

import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy

26

27

implicit val system: ActorSystem = ActorSystem("MyClusterSystem")

28

29

// Define entity extraction functions

30

val extractEntityId: ShardRegion.ExtractEntityId = {

31

case msg @ MyEntityMessage(entityId, _) => (entityId, msg)

32

}

33

34

val extractShardId: ShardRegion.ExtractShardId = {

35

case MyEntityMessage(entityId, _) => (entityId.hashCode % 100).toString

36

}

37

38

// Start cluster sharding for an entity type

39

val shardRegion = ClusterSharding(system).start(

40

typeName = "MyEntity",

41

entityProps = Props[MyEntityActor](),

42

settings = ClusterShardingSettings(system),

43

extractEntityId = extractEntityId,

44

extractShardId = extractShardId

45

)

46

47

// Send messages to entities

48

shardRegion ! MyEntityMessage("entity-1", "Hello World")

49

```

50

51

## Architecture

52

53

Akka Cluster Sharding is built around several key components:

54

55

- **ClusterSharding Extension**: Main entry point that manages sharded entities across the cluster

56

- **ShardRegion**: Local proxy that routes messages to entities and handles shard lifecycle

57

- **ShardCoordinator**: Centralized singleton that manages shard allocation and rebalancing

58

- **Shard**: Container for a group of entities, distributed across cluster nodes

59

- **Entity**: Individual stateful actors that process business logic

60

- **Allocation Strategies**: Pluggable algorithms that determine shard placement and rebalancing

61

62

## Capabilities

63

64

### Core Sharding Extension

65

66

Main extension for registering entity types and managing distributed sharding across the cluster. Provides lifecycle management and access to shard regions.

67

68

```scala { .api }

69

object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProvider {

70

def get(system: ActorSystem): ClusterSharding

71

def get(system: ClassicActorSystemProvider): ClusterSharding

72

}

73

74

class ClusterSharding(system: ExtendedActorSystem) extends Extension {

75

// Full configuration start methods

76

def start(

77

typeName: String,

78

entityProps: Props,

79

settings: ClusterShardingSettings,

80

extractEntityId: ShardRegion.ExtractEntityId,

81

extractShardId: ShardRegion.ExtractShardId,

82

allocationStrategy: ShardAllocationStrategy,

83

handOffStopMessage: Any

84

): ActorRef

85

86

def start(

87

typeName: String,

88

entityProps: Props,

89

extractEntityId: ShardRegion.ExtractEntityId,

90

extractShardId: ShardRegion.ExtractShardId,

91

allocationStrategy: ShardAllocationStrategy,

92

handOffStopMessage: Any

93

): ActorRef

94

95

// Simplified start methods with defaults

96

def start(

97

typeName: String,

98

entityProps: Props,

99

settings: ClusterShardingSettings,

100

extractEntityId: ShardRegion.ExtractEntityId,

101

extractShardId: ShardRegion.ExtractShardId

102

): ActorRef

103

104

def start(

105

typeName: String,

106

entityProps: Props,

107

extractEntityId: ShardRegion.ExtractEntityId,

108

extractShardId: ShardRegion.ExtractShardId

109

): ActorRef

110

111

// Java API start methods

112

def start(

113

typeName: String,

114

entityProps: Props,

115

settings: ClusterShardingSettings,

116

messageExtractor: ShardRegion.MessageExtractor,

117

allocationStrategy: ShardAllocationStrategy,

118

handOffStopMessage: Any

119

): ActorRef

120

121

def start(

122

typeName: String,

123

entityProps: Props,

124

settings: ClusterShardingSettings,

125

messageExtractor: ShardRegion.MessageExtractor

126

): ActorRef

127

128

def start(

129

typeName: String,

130

entityProps: Props,

131

messageExtractor: ShardRegion.MessageExtractor

132

): ActorRef

133

134

// Proxy start methods

135

def startProxy(

136

typeName: String,

137

role: Option[String],

138

extractEntityId: ShardRegion.ExtractEntityId,

139

extractShardId: ShardRegion.ExtractShardId

140

): ActorRef

141

142

def startProxy(

143

typeName: String,

144

role: Option[String],

145

dataCenter: Option[DataCenter],

146

extractEntityId: ShardRegion.ExtractEntityId,

147

extractShardId: ShardRegion.ExtractShardId

148

): ActorRef

149

150

def startProxy(

151

typeName: String,

152

role: Optional[String],

153

messageExtractor: ShardRegion.MessageExtractor

154

): ActorRef

155

156

def startProxy(

157

typeName: String,

158

role: Optional[String],

159

dataCenter: Optional[String],

160

messageExtractor: ShardRegion.MessageExtractor

161

): ActorRef

162

163

// Access methods

164

def shardRegion(typeName: String): ActorRef

165

def shardRegionProxy(typeName: String, dataCenter: DataCenter): ActorRef

166

def shardTypeNames: Set[String]

167

def getShardTypeNames: java.util.Set[String]

168

def defaultShardAllocationStrategy(settings: ClusterShardingSettings): ShardAllocationStrategy

169

}

170

```

171

172

[Core Extension](./core-extension.md)

173

174

### Message Routing and Entity Management

175

176

ShardRegion handles message routing, entity lifecycle, and provides monitoring capabilities for distributed entities.

177

178

```scala { .api }

179

object ShardRegion {

180

// Type aliases

181

type EntityId = String

182

type ShardId = String

183

type Msg = Any

184

type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]

185

type ExtractShardId = Msg => ShardId

186

187

// Message extractor interface

188

trait MessageExtractor {

189

def entityId(message: Any): String

190

def entityMessage(message: Any): Any

191

def shardId(message: Any): String

192

}

193

194

// Hash-based message extractor

195

abstract class HashCodeMessageExtractor(maxNumberOfShards: Int) extends MessageExtractor {

196

def entityMessage(message: Any): Any = message

197

def shardId(message: Any): String

198

}

199

200

object HashCodeMessageExtractor {

201

def shardId(id: String, maxNumberOfShards: Int): String

202

}

203

204

// Command messages

205

sealed trait ShardRegionCommand

206

case class Passivate(stopMessage: Any) extends ShardRegionCommand

207

case object GracefulShutdown extends ShardRegionCommand

208

def gracefulShutdownInstance: GracefulShutdown.type

209

210

// Entity lifecycle

211

case class ShardInitialized(shardId: ShardId)

212

case class StartEntity(entityId: EntityId)

213

214

// Query messages

215

sealed trait ShardRegionQuery

216

case object GetCurrentRegions extends ShardRegionQuery

217

def getCurrentRegionsInstance: GetCurrentRegions.type

218

case class GetClusterShardingStats(timeout: FiniteDuration) extends ShardRegionQuery

219

case object GetShardRegionStats extends ShardRegionQuery

220

def getRegionStatsInstance: GetShardRegionStats.type

221

case object GetShardRegionState extends ShardRegionQuery

222

def getShardRegionStateInstance: GetShardRegionState.type

223

224

// Response messages

225

case class CurrentRegions(regions: Set[Address]) {

226

def getRegions: java.util.Set[Address]

227

}

228

229

case class ClusterShardingStats(regions: Map[Address, ShardRegionStats]) {

230

def getRegions(): java.util.Map[Address, ShardRegionStats]

231

}

232

233

class ShardRegionStats(val stats: Map[ShardId, Int], val failed: Set[ShardId]) {

234

def this(stats: Map[ShardId, Int])

235

def getStats(): java.util.Map[ShardId, Int]

236

def getFailed(): java.util.Set[ShardId]

237

}

238

239

object ShardRegionStats extends AbstractFunction1[Map[ShardId, Int], ShardRegionStats] {

240

def apply(stats: Map[ShardId, Int]): ShardRegionStats

241

def apply(stats: Map[ShardId, Int], failed: Set[ShardId]): ShardRegionStats

242

}

243

244

class CurrentShardRegionState(val shards: Set[ShardState], val failed: Set[ShardId]) {

245

def this(shards: Set[ShardState])

246

def getShards(): java.util.Set[ShardState]

247

def getFailed(): java.util.Set[ShardId]

248

}

249

250

object CurrentShardRegionState extends AbstractFunction1[Set[ShardState], CurrentShardRegionState] {

251

def apply(shards: Set[ShardState]): CurrentShardRegionState

252

def apply(shards: Set[ShardState], failed: Set[ShardId]): CurrentShardRegionState

253

}

254

255

case class ShardState(shardId: ShardId, entityIds: Set[EntityId]) {

256

def getEntityIds(): java.util.Set[EntityId]

257

}

258

}

259

```

260

261

[Message Routing](./message-routing.md)

262

263

### Configuration and Settings

264

265

Comprehensive configuration system for tuning sharding behavior, performance, and operational characteristics.

266

267

```scala { .api }

268

object ClusterShardingSettings {

269

def apply(system: ActorSystem): ClusterShardingSettings

270

def apply(config: Config): ClusterShardingSettings

271

272

val StateStoreModePersistence = "persistence"

273

val StateStoreModeDData = "ddata"

274

}

275

276

class ClusterShardingSettings(

277

role: Option[String],

278

rememberEntities: Boolean,

279

stateStoreMode: String,

280

tuningParameters: TuningParameters

281

)

282

```

283

284

[Configuration](./configuration.md)

285

286

### Shard Allocation Strategies

287

288

Pluggable strategies for controlling how shards are allocated and rebalanced across cluster nodes.

289

290

```scala { .api }

291

object ShardCoordinator {

292

type ShardId = String

293

294

trait ShardAllocationStrategy {

295

def allocateShard(

296

requester: ActorRef,

297

shardId: ShardId,

298

currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]]

299

): Future[ActorRef]

300

301

def rebalance(

302

currentShardAllocations: Map[ActorRef, IndexedSeq[ShardId]],

303

rebalanceInProgress: Set[ShardId]

304

): Future[Set[ShardId]]

305

}

306

307

// Factory methods for built-in strategies

308

def leastShardAllocationStrategy(

309

absoluteLimit: Int,

310

relativeLimit: Double

311

): ShardAllocationStrategy

312

313

class LeastShardAllocationStrategy(

314

rebalanceThreshold: Int,

315

maxSimultaneousRebalance: Int

316

) extends ShardAllocationStrategy

317

}

318

319

object ShardAllocationStrategy {

320

def leastShardAllocationStrategy(

321

absoluteLimit: Int,

322

relativeLimit: Double

323

): ShardAllocationStrategy

324

}

325

```

326

327

[Allocation Strategies](./allocation-strategies.md)

328

329

### External Shard Allocation

330

331

Advanced API for external control of shard placement, enabling custom orchestration and management systems.

332

333

```scala { .api }

334

object ExternalShardAllocation extends ExtensionId[ExternalShardAllocation] with ExtensionIdProvider {

335

def get(system: ClassicActorSystemProvider): ExternalShardAllocation

336

}

337

338

class ExternalShardAllocation(system: ExtendedActorSystem) extends Extension {

339

def clientFor(typeName: String): scaladsl.ExternalShardAllocationClient

340

def getClient(typeName: String): javadsl.ExternalShardAllocationClient

341

}

342

343

// Scala API

344

package scaladsl {

345

trait ExternalShardAllocationClient {

346

def updateShardLocation(shard: ShardId, location: Address): Future[Done]

347

def updateShardLocations(locations: Map[ShardId, Address]): Future[Done]

348

def shardLocations(): Future[ShardLocations]

349

}

350

}

351

352

// Java API

353

package javadsl {

354

trait ExternalShardAllocationClient {

355

def updateShardLocation(shard: ShardId, location: Address): CompletionStage[Done]

356

def updateShardLocations(locations: java.util.Map[ShardId, Address]): CompletionStage[Done]

357

def shardLocations(): CompletionStage[ShardLocations]

358

}

359

}

360

361

// Response type

362

case class ShardLocations(locations: Map[ShardId, Address]) {

363

def getLocations(): java.util.Map[ShardId, Address]

364

}

365

```

366

367

[External Allocation](./external-allocation.md)

368

369

### Monitoring and Health Checks

370

371

Built-in monitoring, statistics collection, and health check capabilities for operational visibility.

372

373

```scala { .api }

374

// Health checking

375

class ClusterShardingHealthCheck extends HealthCheck {

376

def isHealthy(): Boolean

377

}

378

379

// Observability and monitoring

380

class ShardingFlightRecorder extends Extension {

381

def recordShardAllocation(typeName: String, shardId: ShardId, node: Address): Unit

382

def recordShardRebalance(typeName: String, shardId: ShardId, from: Address, to: Address): Unit

383

}

384

385

object ShardingFlightRecorder extends ExtensionId[ShardingFlightRecorder] with ExtensionIdProvider {

386

def get(system: ActorSystem): ShardingFlightRecorder

387

def get(system: ClassicActorSystemProvider): ShardingFlightRecorder

388

}

389

```

390

391

[Monitoring](./monitoring.md)

392

393

## Utility Components

394

395

### Serialization Marker

396

397

```scala { .api }

398

trait ClusterShardingSerializable

399

```

400

401

Marker trait for messages that require special serialization handling in cluster sharding.

402

403

### Data Cleanup Utility

404

405

```scala { .api }

406

object RemoveInternalClusterShardingData

407

```

408

409

Utility for removing internal cluster sharding data during maintenance operations.

410

411

## Core Types

412

413

```scala { .api }

414

// Entity and Shard identifiers

415

type EntityId = String

416

type ShardId = String

417

type Msg = Any

418

419

// Message extraction types

420

type ExtractEntityId = PartialFunction[Msg, (EntityId, Msg)]

421

type ExtractShardId = Msg => ShardId

422

423

// Core sharding components

424

trait Extension

425

trait ExtensionId[T <: Extension] {

426

def get(system: ActorSystem): T

427

def get(system: ClassicActorSystemProvider): T

428

}

429

trait ExtensionIdProvider

430

431

// Actor system integration

432

class ActorRef

433

class Props

434

class ActorSystem

435

class ExtendedActorSystem

436

trait ClassicActorSystemProvider

437

438

// Cluster integration

439

class Address

440

class Cluster

441

trait DataCenter

442

class Member

443

444

// Message types

445

trait ClusterShardingSerializable

446

trait ShardRegionCommand

447

trait ShardRegionQuery

448

trait NoSerializationVerificationNeeded

449

450

// Configuration types

451

class Config

452

class FiniteDuration

453

class ClusterSingletonManagerSettings

454

455

// Async types

456

class Future[T]

457

class Done

458

class CompletionStage[T]

459

460

// Health and monitoring

461

trait HealthCheck {

462

def isHealthy(): Boolean

463

}

464

465

// Collections

466

trait Set[T] {

467

def asJava: java.util.Set[T]

468

}

469

trait Map[K, V] {

470

def asJava: java.util.Map[K, V]

471

}

472

trait IndexedSeq[T]

473

474

// Java interop

475

class Optional[T] {

476

def orElse(other: T): T

477

}

478

479

// Utility functions

480

abstract class AbstractFunction1[T1, R] extends (T1 => R)

481

482

// Exception types

483

class IllegalStateException extends RuntimeException

484

class NoSuchElementException extends RuntimeException

485

```