or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

allocation-strategies.mdconfiguration.mdcore-extension.mdexternal-allocation.mdindex.mdmessage-routing.mdmonitoring.md
tile.json

external-allocation.mddocs/

External Shard Allocation

The External Shard Allocation API provides advanced control over shard placement, enabling external systems to make centralized decisions about where shards should be located across the cluster.

External Allocation Extension

Extension Access

object ExternalShardAllocation extends ExtensionId[ExternalShardAllocation] with ExtensionIdProvider {
  def get(system: ActorSystem): ExternalShardAllocation
  def get(system: ClassicActorSystemProvider): ExternalShardAllocation
}

class ExternalShardAllocation(system: ExtendedActorSystem) extends Extension

Usage Example:

val externalShardAllocation = ExternalShardAllocation(system)

External Allocation Strategy

Strategy Configuration

class ExternalShardAllocationStrategy(
  system: ActorSystem,
  typeName: String
) extends ShardAllocationStrategy

This strategy delegates all allocation decisions to external control via the ExternalShardAllocationClient.

Usage Example:

import akka.cluster.sharding.external.ExternalShardAllocationStrategy

val strategy = new ExternalShardAllocationStrategy(system, "MyEntity")

ClusterSharding(system).start(
  typeName = "MyEntity",
  entityProps = Props[MyEntityActor](),
  settings = ClusterShardingSettings(system),
  extractEntityId = extractEntityId,
  extractShardId = extractShardId,
  allocationStrategy = strategy,
  handOffStopMessage = PoisonPill
)

Shard Location Specification

case class ShardLocation(region: ActorRef)

Represents the desired location for a shard.

External Allocation Client APIs

Scala API

trait ExternalShardAllocationClient {
  def updateShardLocation(shard: ShardId, location: Address): Future[Done]
  def updateShardLocations(locations: Map[ShardId, Address]): Future[Done]
  def shardLocations(): Future[ShardLocations]
}

Accessing the Client:

import akka.cluster.sharding.external.scaladsl.ExternalShardAllocationClient

val client: ExternalShardAllocationClient = 
  ExternalShardAllocation(system).clientFor("MyEntity")

Java API

trait ExternalShardAllocationClient {
  def updateShardLocation(shard: String, location: Address): CompletionStage[Done]
  def updateShardLocations(locations: java.util.Map[String, Address]): CompletionStage[Done]
  def shardLocations(): CompletionStage[ShardLocations]
}

Accessing the Client:

import akka.cluster.sharding.external.javadsl.ExternalShardAllocationClient;

ExternalShardAllocationClient client = 
    ExternalShardAllocation.get(system).getClient("MyEntity");

Client Operations

Update Single Shard Location

Move a specific shard to a target node:

def updateShardLocation(shard: ShardId, location: Address): Future[Done]

Scala Example:

import akka.cluster.Cluster

val cluster = Cluster(system)
val targetNode = cluster.state.members.head.address

val future = client.updateShardLocation("shard-1", targetNode)
future.onComplete {
  case Success(_) => println("Shard location updated successfully")
  case Failure(exception) => println(s"Failed to update shard location: $exception")
}

Java Example:

Address targetNode = Cluster.get(system).state().getMembers().iterator().next().address();

CompletionStage<Done> future = client.updateShardLocation("shard-1", targetNode);
future.whenComplete((done, throwable) -> {
    if (throwable == null) {
        System.out.println("Shard location updated successfully");
    } else {
        System.out.println("Failed to update shard location: " + throwable);
    }
});

Update Multiple Shard Locations

Update locations for multiple shards in a single operation:

def updateShardLocations(locations: Map[ShardId, Address]): Future[Done]

Scala Example:

val cluster = Cluster(system)
val nodes = cluster.state.members.map(_.address).toList

val shardLocations = Map(
  "shard-1" -> nodes(0),
  "shard-2" -> nodes(1),
  "shard-3" -> nodes(0)
)

val future = client.updateShardLocations(shardLocations)
future.foreach(_ => println("All shard locations updated"))

Java Example:

import java.util.HashMap;
import java.util.Map;

List<Address> nodes = new ArrayList<>(Cluster.get(system).state().getMembers())
    .stream().map(Member::address).collect(Collectors.toList());

Map<String, Address> shardLocations = new HashMap<>();
shardLocations.put("shard-1", nodes.get(0));
shardLocations.put("shard-2", nodes.get(1));
shardLocations.put("shard-3", nodes.get(0));

CompletionStage<Done> future = client.updateShardLocations(shardLocations);
future.thenRun(() -> System.out.println("All shard locations updated"));

Query Current Shard Locations

Retrieve current shard location mappings:

def shardLocations(): Future[ShardLocations]

Scala Example:

val future = client.shardLocations()
future.foreach { locations =>
  println(s"Current shard locations: ${locations.locations}")
}

Java Example:

CompletionStage<ShardLocations> future = client.shardLocations();
future.thenAccept(locations -> {
    System.out.println("Current shard locations: " + locations.getLocations());
});

Shard Locations Container

ShardLocations Class

case class ShardLocations(locations: Map[ShardId, Address]) {
  def getLocations(): java.util.Map[String, Address]  // Java API
}

Contains the mapping of shard IDs to their designated cluster node addresses.

Error Handling

Client Timeout Exception

class ClientTimeoutException(message: String, cause: Throwable) extends RuntimeException(message, cause)

Thrown when external allocation operations timeout.

Handling Timeouts:

client.updateShardLocation("shard-1", targetNode).recover {
  case _: ClientTimeoutException =>
    println("Shard location update timed out, will retry")
    Done
  case other =>
    println(s"Unexpected error: $other")
    throw other
}

Integration Patterns

Centralized Orchestration

Use external allocation with a centralized orchestration service:

class ShardOrchestrator(client: ExternalShardAllocationClient) {
  
  def rebalanceShards(): Future[Done] = {
    for {
      currentLocations <- client.shardLocations()
      newLocations = calculateOptimalDistribution(currentLocations)
      _ <- client.updateShardLocations(newLocations)
    } yield Done
  }
  
  private def calculateOptimalDistribution(
    current: ShardLocations
  ): Map[ShardId, Address] = {
    // Custom logic for optimal shard distribution
    current.locations
  }
}

Load-Based Allocation

Integrate with monitoring systems for load-based shard placement:

class LoadBasedAllocator(
  client: ExternalShardAllocationClient,
  metricsCollector: MetricsCollector
) {
  
  def allocateBasedOnLoad(): Future[Done] = {
    for {
      nodeLoads <- metricsCollector.getNodeLoads()
      shardLoads <- metricsCollector.getShardLoads()
      optimalPlacements = calculateOptimalPlacements(nodeLoads, shardLoads)
      _ <- client.updateShardLocations(optimalPlacements)
    } yield Done
  }
  
  private def calculateOptimalPlacements(
    nodeLoads: Map[Address, Double],
    shardLoads: Map[ShardId, Double]
  ): Map[ShardId, Address] = {
    // Place high-load shards on low-load nodes
    Map.empty // Placeholder
  }
}

Maintenance and Migration

Use external allocation for planned maintenance:

class MaintenanceManager(client: ExternalShardAllocationClient) {
  
  def evacuateNode(nodeToEvacuate: Address): Future[Done] = {
    for {
      currentLocations <- client.shardLocations()
      shardsToMove = currentLocations.locations.filter(_._2 == nodeToEvacuate)
      availableNodes = getAvailableNodes().filterNot(_ == nodeToEvacuate)
      newLocations = redistributeShards(shardsToMove.keys, availableNodes)
      _ <- client.updateShardLocations(newLocations)
    } yield Done
  }
  
  private def redistributeShards(
    shards: Iterable[ShardId],
    availableNodes: List[Address]
  ): Map[ShardId, Address] = {
    // Evenly distribute shards across available nodes
    shards.zipWithIndex.map { case (shard, index) =>
      shard -> availableNodes(index % availableNodes.size)
    }.toMap
  }
  
  private def getAvailableNodes(): List[Address] = {
    // Get list of healthy cluster nodes
    List.empty // Placeholder
  }
}

Best Practices

Timing Considerations

  • Allow time for shard movement to complete before making new changes
  • Use the returned Future[Done] to coordinate sequential operations
  • Consider cluster rebalancing intervals when making updates

Error Recovery

  • Implement retry logic for transient failures
  • Monitor for ClientTimeoutException and handle appropriately
  • Validate node addresses before attempting updates

Performance

  • Update multiple shards in batch operations when possible
  • Cache shard location information to reduce query frequency
  • Avoid excessive updates that could cause cluster instability

Monitoring

  • Track successful vs failed allocation updates
  • Monitor shard movement completion times
  • Alert on repeated allocation failures