or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster-management.mdcluster-routing.mdconfiguration-and-management.mdevents-and-state.mdextensibility.mdindex.mdmembers-and-status.md

cluster-routing.mddocs/

0

# Cluster Routing

1

2

Routing configuration for distributing work across cluster nodes with support for pools and groups, role-based routing, and automatic routee management based on cluster membership changes.

3

4

## Capabilities

5

6

### Cluster Router Pool

7

8

Router that creates and manages actor instances (pool) distributed across cluster nodes.

9

10

```scala { .api }

11

/**

12

* Pool-based cluster router that creates actors on cluster nodes.

13

* Pool routers create new actor instances as routees.

14

*/

15

case class ClusterRouterPool(

16

local: Pool,

17

settings: ClusterRouterPoolSettings

18

) extends RouterConfig {

19

/** Create router actor with given props */

20

def createRouter(system: ActorSystem): Router

21

22

/** Router logic for distributing messages */

23

def routingLogic: RoutingLogic

24

25

/** Create routee actor props */

26

def newRoutee(routeeProps: Props, context: ActorContext): Routee

27

}

28

```

29

30

**Usage Examples:**

31

32

```scala

33

import akka.routing._

34

import akka.cluster.routing._

35

36

// Basic cluster pool router

37

val router = system.actorOf(

38

ClusterRouterPool(

39

RoundRobinPool(0), // 0 local instances

40

ClusterRouterPoolSettings(

41

totalInstances = 10,

42

maxInstancesPerNode = 2,

43

allowLocalRoutees = true,

44

useRoles = Set("worker")

45

)

46

).props(Props[WorkerActor]),

47

name = "workerRouter"

48

)

49

50

// Router with specific routing logic

51

val consistentHashRouter = system.actorOf(

52

ClusterRouterPool(

53

ConsistentHashingPool(0),

54

ClusterRouterPoolSettings(20, 3, allowLocalRoutees = false, Set("backend"))

55

).props(Props[HashWorker]),

56

name = "hashRouter"

57

)

58

59

// Send work to the router

60

router ! "some work"

61

router ! WorkMessage("data", "key")

62

```

63

64

### Cluster Router Group

65

66

Router that routes to existing actors (group) identified by actor paths across cluster nodes.

67

68

```scala { .api }

69

/**

70

* Group-based cluster router that routes to existing actors.

71

* Group routers look up actors by path rather than creating them.

72

*/

73

case class ClusterRouterGroup(

74

local: Group,

75

settings: ClusterRouterGroupSettings

76

) extends RouterConfig {

77

/** Create router actor with given props */

78

def createRouter(system: ActorSystem): Router

79

80

/** Router logic for distributing messages */

81

def routingLogic: RoutingLogic

82

83

/** Create actor selection routee */

84

def newRoutee(routeePath: String, context: ActorContext): Routee

85

}

86

```

87

88

**Usage Examples:**

89

90

```scala

91

// Cluster group router

92

val groupRouter = system.actorOf(

93

ClusterRouterGroup(

94

RoundRobinGroup(Nil), // No local routees

95

ClusterRouterGroupSettings(

96

totalInstances = 100,

97

routeesPaths = List("/user/worker"),

98

allowLocalRoutees = true,

99

useRoles = Set("compute")

100

)

101

).props(),

102

name = "workerGroup"

103

)

104

105

// Multiple routee paths

106

val multiPathRouter = system.actorOf(

107

ClusterRouterGroup(

108

RandomGroup(Nil),

109

ClusterRouterGroupSettings(

110

totalInstances = 50,

111

routeesPaths = List("/user/processor", "/user/calculator", "/user/analyzer"),

112

allowLocalRoutees = false,

113

useRoles = Set("processing")

114

)

115

).props(),

116

name = "processingGroup"

117

)

118

119

// Send messages

120

groupRouter ! ProcessRequest("data")

121

multiPathRouter ! CalculationTask(123, 456)

122

```

123

124

### Pool Router Settings

125

126

Configuration settings for cluster router pools.

127

128

```scala { .api }

129

/**

130

* Configuration for cluster router pools.

131

* totalInstances must be > 0

132

*/

133

case class ClusterRouterPoolSettings(

134

totalInstances: Int,

135

maxInstancesPerNode: Int,

136

allowLocalRoutees: Boolean,

137

useRoles: Set[String]

138

) extends ClusterRouterSettingsBase

139

140

object ClusterRouterPoolSettings {

141

/** Create from configuration */

142

def fromConfig(config: Config): ClusterRouterPoolSettings

143

144

/** Java API constructor with roles */

145

def apply(

146

totalInstances: Int,

147

maxInstancesPerNode: Int,

148

allowLocalRoutees: Boolean,

149

useRoles: String*

150

): ClusterRouterPoolSettings

151

}

152

```

153

154

**Usage Examples:**

155

156

```scala

157

// Pool settings with role restrictions

158

val poolSettings = ClusterRouterPoolSettings(

159

totalInstances = 20, // Total actors across cluster

160

maxInstancesPerNode = 3, // Max per node

161

allowLocalRoutees = true, // Allow on current node

162

useRoles = Set("worker", "compute") // Only on nodes with these roles

163

)

164

165

// Pool settings from configuration

166

val configSettings = ClusterRouterPoolSettings.fromConfig(

167

system.settings.config.getConfig("akka.actor.deployment./myRouter")

168

)

169

170

// Java API

171

val javaSettings = ClusterRouterPoolSettings(10, 2, false, "backend", "processing")

172

```

173

174

### Group Router Settings

175

176

Configuration settings for cluster router groups.

177

178

```scala { .api }

179

/**

180

* Configuration for cluster router groups.

181

* totalInstances must be > 0

182

*/

183

case class ClusterRouterGroupSettings(

184

totalInstances: Int,

185

routeesPaths: immutable.Seq[String],

186

allowLocalRoutees: Boolean,

187

useRoles: Set[String]

188

) extends ClusterRouterSettingsBase

189

190

object ClusterRouterGroupSettings {

191

/** Create from configuration */

192

def fromConfig(config: Config): ClusterRouterGroupSettings

193

194

/** Java API constructor with roles */

195

def apply(

196

totalInstances: Int,

197

routeesPaths: immutable.Seq[String],

198

allowLocalRoutees: Boolean,

199

useRoles: String*

200

): ClusterRouterGroupSettings

201

}

202

```

203

204

**Usage Examples:**

205

206

```scala

207

// Group settings with multiple paths

208

val groupSettings = ClusterRouterGroupSettings(

209

totalInstances = 50,

210

routeesPaths = List("/user/service", "/user/handler"),

211

allowLocalRoutees = false, // No local routees

212

useRoles = Set("api") // Only API nodes

213

)

214

215

// Single routee path

216

val singlePathSettings = ClusterRouterGroupSettings(

217

totalInstances = 10,

218

routeesPaths = List("/user/processor"),

219

allowLocalRoutees = true,

220

useRoles = Set.empty // Any role

221

)

222

223

// Java API with multiple roles

224

val javaGroupSettings = ClusterRouterGroupSettings(

225

25,

226

List("/user/worker").asJava,

227

true,

228

"worker", "compute"

229

)

230

```

231

232

### Configuration-Based Routing

233

234

Router configuration through application.conf deployment configuration.

235

236

```scala { .api }

237

// Configuration keys for cluster routers

238

// akka.actor.deployment.<router-path>.router = cluster-pool | cluster-group

239

// akka.actor.deployment.<router-path>.cluster.enabled = on

240

// akka.actor.deployment.<router-path>.cluster.max-nr-of-instances-per-node = N

241

// akka.actor.deployment.<router-path>.cluster.max-total-nr-of-instances = N

242

// akka.actor.deployment.<router-path>.cluster.allow-local-routees = on|off

243

// akka.actor.deployment.<router-path>.cluster.use-roles = ["role1", "role2"]

244

```

245

246

**Configuration Examples:**

247

248

```hocon

249

# application.conf

250

akka.actor.deployment {

251

/poolRouter {

252

router = cluster-pool

253

pool-dispatcher = cluster-pool-dispatcher

254

nr-of-instances = 0 # local instances

255

cluster {

256

enabled = on

257

max-nr-of-instances-per-node = 3

258

max-total-nr-of-instances = 20

259

allow-local-routees = on

260

use-roles = ["worker"]

261

}

262

}

263

264

/groupRouter {

265

router = cluster-group

266

routees.paths = ["/user/service"]

267

cluster {

268

enabled = on

269

max-total-nr-of-instances = 100

270

allow-local-routees = off

271

use-roles = ["api", "service"]

272

}

273

}

274

}

275

```

276

277

**Usage with Configuration:**

278

279

```scala

280

// Create router using deployment configuration

281

val poolRouter = system.actorOf(Props[WorkerActor], name = "poolRouter")

282

val groupRouter = system.actorOf(Props.empty, name = "groupRouter")

283

284

// Routers are automatically configured from deployment settings

285

poolRouter ! WorkItem("task1")

286

groupRouter ! ServiceRequest("request1")

287

```

288

289

### Routing Logic Integration

290

291

Integration with Akka's routing logic for different distribution strategies.

292

293

```scala { .api }

294

// Supported routing logic types

295

import akka.routing._

296

297

// Round robin - distribute evenly

298

ClusterRouterPool(RoundRobinPool(0), settings)

299

ClusterRouterGroup(RoundRobinGroup(Nil), settings)

300

301

// Random - distribute randomly

302

ClusterRouterPool(RandomPool(0), settings)

303

ClusterRouterGroup(RandomGroup(Nil), settings)

304

305

// Consistent hashing - distribute by message hash

306

ClusterRouterPool(ConsistentHashingPool(0), settings)

307

ClusterRouterGroup(ConsistentHashingGroup(Nil), settings)

308

309

// Smallest mailbox - route to least busy

310

ClusterRouterPool(SmallestMailboxPool(0), settings)

311

312

// Broadcast - send to all routees

313

ClusterRouterPool(BroadcastPool(0), settings)

314

ClusterRouterGroup(BroadcastGroup(Nil), settings)

315

```

316

317

**Usage Examples:**

318

319

```scala

320

// Consistent hash routing for stateful workloads

321

case class HashedMessage(key: String, data: String) extends ConsistentHashable {

322

override def consistentHashKey: Any = key

323

}

324

325

val hashRouter = system.actorOf(

326

ClusterRouterPool(

327

ConsistentHashingPool(0),

328

ClusterRouterPoolSettings(15, 3, true, Set("stateful"))

329

).props(Props[StatefulWorker]),

330

"hashRouter"

331

)

332

333

hashRouter ! HashedMessage("user123", "user data")

334

hashRouter ! HashedMessage("user456", "more data")

335

336

// Broadcast for cache invalidation

337

val broadcastRouter = system.actorOf(

338

ClusterRouterGroup(

339

BroadcastGroup(Nil),

340

ClusterRouterGroupSettings(10, List("/user/cache"), true, Set("cache"))

341

).props(),

342

"cacheInvalidator"

343

)

344

345

broadcastRouter ! InvalidateCache("key123")

346

347

// Smallest mailbox for load balancing

348

val balancedRouter = system.actorOf(

349

ClusterRouterPool(

350

SmallestMailboxPool(0),

351

ClusterRouterPoolSettings(8, 2, true, Set("balanced"))

352

).props(Props[LoadBalancedWorker]),

353

"balancedRouter"

354

)

355

```

356

357

### Router Lifecycle Management

358

359

Managing router lifecycle and adaptation to cluster changes.

360

361

```scala { .api }

362

// Routers automatically adapt to cluster membership changes

363

// - Add routees when new nodes join with matching roles

364

// - Remove routees when nodes leave or become unreachable

365

// - Respect totalInstances and maxInstancesPerNode limits

366

// - Monitor cluster events and adjust routee populations

367

```

368

369

**Lifecycle Example:**

370

371

```scala

372

class RouterManager extends Actor with ActorLogging {

373

val cluster = Cluster(context.system)

374

var currentRouter: Option[ActorRef] = None

375

376

override def preStart(): Unit = {

377

// Subscribe to cluster events to monitor router health

378

cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])

379

380

// Create initial router

381

createRouter()

382

}

383

384

def createRouter(): Unit = {

385

val router = context.actorOf(

386

ClusterRouterPool(

387

RoundRobinPool(0),

388

ClusterRouterPoolSettings(20, 4, true, Set("worker"))

389

).props(Props[WorkerActor]),

390

"dynamicRouter"

391

)

392

currentRouter = Some(router)

393

log.info("Created cluster router: {}", router.path)

394

}

395

396

def receive = {

397

case MemberUp(member) if member.hasRole("worker") =>

398

log.info("New worker node joined: {}", member.address)

399

// Router automatically adds routees

400

401

case UnreachableMember(member) if member.hasRole("worker") =>

402

log.warning("Worker node unreachable: {}", member.address)

403

// Router automatically removes routees

404

405

case MemberRemoved(member, _) if member.hasRole("worker") =>

406

log.info("Worker node removed: {}", member.address)

407

// Router automatically cleans up routees

408

409

case work: WorkMessage =>

410

currentRouter.foreach(_ ! work)

411

}

412

413

override def postStop(): Unit = {

414

cluster.unsubscribe(self)

415

}

416

}

417

```

418

419

## Types

420

421

```scala { .api }

422

// Router configuration types

423

case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSettings)

424

case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings)

425

426

// Settings types

427

case class ClusterRouterPoolSettings(

428

totalInstances: Int,

429

maxInstancesPerNode: Int,

430

allowLocalRoutees: Boolean,

431

useRoles: Set[String]

432

)

433

434

case class ClusterRouterGroupSettings(

435

totalInstances: Int,

436

routeesPaths: immutable.Seq[String],

437

allowLocalRoutees: Boolean,

438

useRoles: Set[String]

439

)

440

441

// Base settings trait

442

trait ClusterRouterSettingsBase {

443

def totalInstances: Int

444

def allowLocalRoutees: Boolean

445

def useRoles: Set[String]

446

}

447

```