CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-typesafe-akka--akka-cluster

Akka Cluster provides a fault-tolerant decentralized peer-to-peer based cluster membership service with no single point of failure or single point of bottleneck using gossip protocols and automatic failure detection.

Pending
Overview
Eval results
Files

cluster-routing.mddocs/

Cluster Routing

Routing configuration for distributing work across cluster nodes with support for pools and groups, role-based routing, and automatic routee management based on cluster membership changes.

Capabilities

Cluster Router Pool

Router that creates and manages actor instances (pool) distributed across cluster nodes.

/**
 * Pool-based cluster router that creates actors on cluster nodes.
 * Pool routers create new actor instances as routees.
 */
case class ClusterRouterPool(
  local: Pool,
  settings: ClusterRouterPoolSettings
) extends RouterConfig {
  /** Create router actor with given props */
  def createRouter(system: ActorSystem): Router
  
  /** Router logic for distributing messages */
  def routingLogic: RoutingLogic
  
  /** Create routee actor props */
  def newRoutee(routeeProps: Props, context: ActorContext): Routee
}

Usage Examples:

import akka.routing._
import akka.cluster.routing._

// Basic cluster pool router
val router = system.actorOf(
  ClusterRouterPool(
    RoundRobinPool(0), // 0 local instances
    ClusterRouterPoolSettings(
      totalInstances = 10,
      maxInstancesPerNode = 2,
      allowLocalRoutees = true,
      useRoles = Set("worker")
    )
  ).props(Props[WorkerActor]), 
  name = "workerRouter"
)

// Router with specific routing logic
val consistentHashRouter = system.actorOf(
  ClusterRouterPool(
    ConsistentHashingPool(0),
    ClusterRouterPoolSettings(20, 3, allowLocalRoutees = false, Set("backend"))
  ).props(Props[HashWorker]),
  name = "hashRouter"
)

// Send work to the router
router ! "some work"
router ! WorkMessage("data", "key")

Cluster Router Group

Router that routes to existing actors (group) identified by actor paths across cluster nodes.

/**
 * Group-based cluster router that routes to existing actors.
 * Group routers look up actors by path rather than creating them.
 */
case class ClusterRouterGroup(
  local: Group,
  settings: ClusterRouterGroupSettings
) extends RouterConfig {
  /** Create router actor with given props */
  def createRouter(system: ActorSystem): Router
  
  /** Router logic for distributing messages */
  def routingLogic: RoutingLogic
  
  /** Create actor selection routee */
  def newRoutee(routeePath: String, context: ActorContext): Routee
}

Usage Examples:

// Cluster group router
val groupRouter = system.actorOf(
  ClusterRouterGroup(
    RoundRobinGroup(Nil), // No local routees
    ClusterRouterGroupSettings(
      totalInstances = 100,
      routeesPaths = List("/user/worker"),  
      allowLocalRoutees = true,
      useRoles = Set("compute")
    )
  ).props(),
  name = "workerGroup"
)

// Multiple routee paths
val multiPathRouter = system.actorOf(
  ClusterRouterGroup(
    RandomGroup(Nil),
    ClusterRouterGroupSettings(
      totalInstances = 50,
      routeesPaths = List("/user/processor", "/user/calculator", "/user/analyzer"),
      allowLocalRoutees = false, 
      useRoles = Set("processing")
    )
  ).props(),
  name = "processingGroup"
)

// Send messages
groupRouter ! ProcessRequest("data")
multiPathRouter ! CalculationTask(123, 456)

Pool Router Settings

Configuration settings for cluster router pools.

/**
 * Configuration for cluster router pools.
 * totalInstances must be > 0
 */
case class ClusterRouterPoolSettings(
  totalInstances: Int,
  maxInstancesPerNode: Int, 
  allowLocalRoutees: Boolean,
  useRoles: Set[String]
) extends ClusterRouterSettingsBase

object ClusterRouterPoolSettings {
  /** Create from configuration */
  def fromConfig(config: Config): ClusterRouterPoolSettings
  
  /** Java API constructor with roles */
  def apply(
    totalInstances: Int,
    maxInstancesPerNode: Int,
    allowLocalRoutees: Boolean,
    useRoles: String*
  ): ClusterRouterPoolSettings
}

Usage Examples:

// Pool settings with role restrictions
val poolSettings = ClusterRouterPoolSettings(
  totalInstances = 20,        // Total actors across cluster
  maxInstancesPerNode = 3,    // Max per node  
  allowLocalRoutees = true,   // Allow on current node
  useRoles = Set("worker", "compute") // Only on nodes with these roles
)

// Pool settings from configuration
val configSettings = ClusterRouterPoolSettings.fromConfig(
  system.settings.config.getConfig("akka.actor.deployment./myRouter")
)

// Java API
val javaSettings = ClusterRouterPoolSettings(10, 2, false, "backend", "processing")

Group Router Settings

Configuration settings for cluster router groups.

/**
 * Configuration for cluster router groups.
 * totalInstances must be > 0
 */
case class ClusterRouterGroupSettings(
  totalInstances: Int,
  routeesPaths: immutable.Seq[String],
  allowLocalRoutees: Boolean, 
  useRoles: Set[String]
) extends ClusterRouterSettingsBase

object ClusterRouterGroupSettings {
  /** Create from configuration */
  def fromConfig(config: Config): ClusterRouterGroupSettings
  
  /** Java API constructor with roles */
  def apply(
    totalInstances: Int,
    routeesPaths: immutable.Seq[String],
    allowLocalRoutees: Boolean,
    useRoles: String*
  ): ClusterRouterGroupSettings
}

Usage Examples:

// Group settings with multiple paths
val groupSettings = ClusterRouterGroupSettings(
  totalInstances = 50,
  routeesPaths = List("/user/service", "/user/handler"),
  allowLocalRoutees = false,  // No local routees
  useRoles = Set("api")      // Only API nodes
)

// Single routee path
val singlePathSettings = ClusterRouterGroupSettings(
  totalInstances = 10,
  routeesPaths = List("/user/processor"),
  allowLocalRoutees = true,
  useRoles = Set.empty // Any role
)

// Java API with multiple roles
val javaGroupSettings = ClusterRouterGroupSettings(
  25,
  List("/user/worker").asJava,
  true,
  "worker", "compute"
)

Configuration-Based Routing

Router configuration through application.conf deployment configuration.

// Configuration keys for cluster routers
// akka.actor.deployment.<router-path>.router = cluster-pool | cluster-group
// akka.actor.deployment.<router-path>.cluster.enabled = on
// akka.actor.deployment.<router-path>.cluster.max-nr-of-instances-per-node = N
// akka.actor.deployment.<router-path>.cluster.max-total-nr-of-instances = N  
// akka.actor.deployment.<router-path>.cluster.allow-local-routees = on|off
// akka.actor.deployment.<router-path>.cluster.use-roles = ["role1", "role2"]

Configuration Examples:

# application.conf
akka.actor.deployment {
  /poolRouter {
    router = cluster-pool
    pool-dispatcher = cluster-pool-dispatcher
    nr-of-instances = 0 # local instances
    cluster {
      enabled = on
      max-nr-of-instances-per-node = 3
      max-total-nr-of-instances = 20
      allow-local-routees = on
      use-roles = ["worker"]
    }
  }
  
  /groupRouter {
    router = cluster-group
    routees.paths = ["/user/service"]
    cluster {
      enabled = on  
      max-total-nr-of-instances = 100
      allow-local-routees = off
      use-roles = ["api", "service"]
    }
  }
}

Usage with Configuration:

// Create router using deployment configuration
val poolRouter = system.actorOf(Props[WorkerActor], name = "poolRouter")
val groupRouter = system.actorOf(Props.empty, name = "groupRouter")

// Routers are automatically configured from deployment settings
poolRouter ! WorkItem("task1")
groupRouter ! ServiceRequest("request1")

Routing Logic Integration

Integration with Akka's routing logic for different distribution strategies.

// Supported routing logic types
import akka.routing._

// Round robin - distribute evenly
ClusterRouterPool(RoundRobinPool(0), settings)
ClusterRouterGroup(RoundRobinGroup(Nil), settings)

// Random - distribute randomly  
ClusterRouterPool(RandomPool(0), settings)
ClusterRouterGroup(RandomGroup(Nil), settings)

// Consistent hashing - distribute by message hash
ClusterRouterPool(ConsistentHashingPool(0), settings)
ClusterRouterGroup(ConsistentHashingGroup(Nil), settings)

// Smallest mailbox - route to least busy
ClusterRouterPool(SmallestMailboxPool(0), settings)

// Broadcast - send to all routees
ClusterRouterPool(BroadcastPool(0), settings)
ClusterRouterGroup(BroadcastGroup(Nil), settings)

Usage Examples:

// Consistent hash routing for stateful workloads
case class HashedMessage(key: String, data: String) extends ConsistentHashable {
  override def consistentHashKey: Any = key
}

val hashRouter = system.actorOf(
  ClusterRouterPool(
    ConsistentHashingPool(0),
    ClusterRouterPoolSettings(15, 3, true, Set("stateful"))
  ).props(Props[StatefulWorker]),
  "hashRouter"
)

hashRouter ! HashedMessage("user123", "user data")
hashRouter ! HashedMessage("user456", "more data") 

// Broadcast for cache invalidation
val broadcastRouter = system.actorOf(
  ClusterRouterGroup(
    BroadcastGroup(Nil),
    ClusterRouterGroupSettings(10, List("/user/cache"), true, Set("cache"))
  ).props(),
  "cacheInvalidator"
)

broadcastRouter ! InvalidateCache("key123")

// Smallest mailbox for load balancing
val balancedRouter = system.actorOf(
  ClusterRouterPool(
    SmallestMailboxPool(0),  
    ClusterRouterPoolSettings(8, 2, true, Set("balanced"))
  ).props(Props[LoadBalancedWorker]),
  "balancedRouter"
)

Router Lifecycle Management

Managing router lifecycle and adaptation to cluster changes.

// Routers automatically adapt to cluster membership changes
// - Add routees when new nodes join with matching roles
// - Remove routees when nodes leave or become unreachable  
// - Respect totalInstances and maxInstancesPerNode limits
// - Monitor cluster events and adjust routee populations

Lifecycle Example:

class RouterManager extends Actor with ActorLogging {
  val cluster = Cluster(context.system)
  var currentRouter: Option[ActorRef] = None
  
  override def preStart(): Unit = {
    // Subscribe to cluster events to monitor router health
    cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
    
    // Create initial router
    createRouter()
  }
  
  def createRouter(): Unit = {
    val router = context.actorOf(
      ClusterRouterPool(
        RoundRobinPool(0),
        ClusterRouterPoolSettings(20, 4, true, Set("worker"))
      ).props(Props[WorkerActor]),
      "dynamicRouter"
    )
    currentRouter = Some(router)
    log.info("Created cluster router: {}", router.path)
  }
  
  def receive = {
    case MemberUp(member) if member.hasRole("worker") =>
      log.info("New worker node joined: {}", member.address)
      // Router automatically adds routees
      
    case UnreachableMember(member) if member.hasRole("worker") =>
      log.warning("Worker node unreachable: {}", member.address)
      // Router automatically removes routees
      
    case MemberRemoved(member, _) if member.hasRole("worker") =>
      log.info("Worker node removed: {}", member.address)
      // Router automatically cleans up routees
      
    case work: WorkMessage =>
      currentRouter.foreach(_ ! work)
  }
  
  override def postStop(): Unit = {
    cluster.unsubscribe(self)
  }
}

Types

// Router configuration types
case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSettings)
case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings)

// Settings types
case class ClusterRouterPoolSettings(
  totalInstances: Int,
  maxInstancesPerNode: Int,
  allowLocalRoutees: Boolean,
  useRoles: Set[String]
)

case class ClusterRouterGroupSettings(
  totalInstances: Int,
  routeesPaths: immutable.Seq[String],
  allowLocalRoutees: Boolean,
  useRoles: Set[String]  
)

// Base settings trait
trait ClusterRouterSettingsBase {
  def totalInstances: Int
  def allowLocalRoutees: Boolean  
  def useRoles: Set[String]
}

Install with Tessl CLI

npx tessl i tessl/maven-com-typesafe-akka--akka-cluster

docs

cluster-management.md

cluster-routing.md

configuration-and-management.md

events-and-state.md

extensibility.md

index.md

members-and-status.md

tile.json