or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster-management.mdcluster-routing.mdconfiguration-and-management.mdevents-and-state.mdextensibility.mdindex.mdmembers-and-status.md

extensibility.mddocs/

0

# Extensibility

1

2

Service Provider Interfaces for extending cluster behavior including custom downing strategies and join validation. These SPIs allow customization of critical cluster behaviors to meet specific application requirements.

3

4

## Capabilities

5

6

### Downing Provider SPI

7

8

Interface for implementing custom downing strategies that determine when and how to remove unreachable members from the cluster.

9

10

```scala { .api }

11

/**

12

* API for plugins that will handle downing of cluster nodes.

13

* Concrete plugins must subclass and have a public one argument constructor accepting an ActorSystem.

14

*/

15

abstract class DowningProvider {

16

/**

17

* Time margin after which shards or singletons that belonged to a downed/removed

18

* partition are created in surviving partition. This is useful if you implement

19

* downing strategies that handle network partitions.

20

*/

21

def downRemovalMargin: FiniteDuration

22

23

/**

24

* If a props is returned it is created as a child of the core cluster daemon on cluster startup.

25

* It should then handle downing using the regular Cluster APIs.

26

* The actor will run on the same dispatcher as the cluster actor if dispatcher not configured.

27

*

28

* May throw an exception which will then immediately lead to Cluster stopping,

29

* as the downing provider is vital to a working cluster.

30

*/

31

def downingActorProps: Option[Props]

32

}

33

```

34

35

**Usage Examples:**

36

37

```scala

38

// Custom downing provider implementation

39

class QuorumBasedDowningProvider(system: ActorSystem) extends DowningProvider {

40

val settings = new ClusterSettings(system.settings.config, system.name)

41

42

// Allow time for persistence to catch up after downing

43

override def downRemovalMargin: FiniteDuration = 30.seconds

44

45

// Create downing actor to implement the strategy

46

override def downingActorProps: Option[Props] =

47

Some(Props(new QuorumBasedDowningActor(settings)))

48

}

49

50

class QuorumBasedDowningActor(settings: ClusterSettings) extends Actor with ActorLogging {

51

val cluster = Cluster(context.system)

52

var unreachableMembers = Set.empty[Member]

53

54

override def preStart(): Unit = {

55

cluster.subscribe(self,

56

classOf[UnreachableMember],

57

classOf[ReachableMember],

58

classOf[MemberRemoved])

59

}

60

61

def receive = {

62

case UnreachableMember(member) =>

63

unreachableMembers += member

64

evaluateDowning()

65

66

case ReachableMember(member) =>

67

unreachableMembers -= member

68

69

case MemberRemoved(member, _) =>

70

unreachableMembers -= member

71

}

72

73

def evaluateDowning(): Unit = {

74

val currentMembers = cluster.state.members.size

75

val reachableMembers = currentMembers - unreachableMembers.size

76

val quorumSize = (currentMembers / 2) + 1

77

78

if (reachableMembers >= quorumSize) {

79

// We have quorum, safe to down unreachable members

80

unreachableMembers.foreach { member =>

81

log.info("Downing unreachable member: {}", member.address)

82

cluster.down(member.address)

83

}

84

} else {

85

log.warning("Cannot down members - would lose quorum ({} reachable, need {})",

86

reachableMembers, quorumSize)

87

}

88

}

89

90

override def postStop(): Unit = {

91

cluster.unsubscribe(self)

92

}

93

}

94

95

// Configuration

96

// akka.cluster.downing-provider-class = "com.myapp.QuorumBasedDowningProvider"

97

```

98

99

### Default Downing Providers

100

101

Built-in downing provider implementations.

102

103

```scala { .api }

104

/**

105

* Default downing provider used when no provider is configured and

106

* 'auto-down-unreachable-after' is not enabled.

107

*/

108

final class NoDowning(system: ActorSystem) extends DowningProvider {

109

override def downRemovalMargin: FiniteDuration = Cluster(system).settings.DownRemovalMargin

110

override val downingActorProps: Option[Props] = None

111

}

112

113

/**

114

* Downing provider used when auto-down is enabled.

115

* Automatically downs unreachable members after configured timeout.

116

*/

117

class AutoDowning(system: ActorSystem) extends DowningProvider {

118

override def downRemovalMargin: FiniteDuration

119

override def downingActorProps: Option[Props]

120

}

121

```

122

123

**Auto-Down Configuration:**

124

125

```hocon

126

# Enable automatic downing (use with caution in production)

127

akka.cluster.auto-down-unreachable-after = 10s

128

129

# Or use custom downing provider

130

akka.cluster {

131

downing-provider-class = "com.myapp.CustomDowningProvider"

132

down-removal-margin = 30s

133

}

134

```

135

136

### Advanced Downing Strategies

137

138

Examples of more sophisticated downing strategies.

139

140

```scala

141

// Split-brain resolver based on oldest member

142

class OldestMemberDowningProvider(system: ActorSystem) extends DowningProvider {

143

override def downRemovalMargin: FiniteDuration = 20.seconds

144

145

override def downingActorProps: Option[Props] =

146

Some(Props(new OldestMemberDowningActor()))

147

}

148

149

class OldestMemberDowningActor extends Actor with ActorLogging {

150

val cluster = Cluster(context.system)

151

152

override def preStart(): Unit = {

153

cluster.subscribe(self, classOf[UnreachableMember])

154

}

155

156

def receive = {

157

case UnreachableMember(member) =>

158

val reachableMembers = cluster.state.members -- cluster.state.unreachable

159

val oldestReachable = reachableMembers.minBy(_.upNumber)

160

161

if (cluster.selfMember == oldestReachable) {

162

// I'm the oldest reachable member, I decide who to down

163

log.info("As oldest member, downing unreachable: {}", member.address)

164

cluster.down(member.address)

165

} else {

166

log.info("Not oldest member, waiting for {} to make downing decision",

167

oldestReachable.address)

168

}

169

}

170

171

override def postStop(): Unit = {

172

cluster.unsubscribe(self)

173

}

174

}

175

176

// Role-based downing - only down members not in critical roles

177

class RoleBasedDowningProvider(system: ActorSystem) extends DowningProvider {

178

val criticalRoles = Set("database", "master")

179

180

override def downRemovalMargin: FiniteDuration = 15.seconds

181

182

override def downingActorProps: Option[Props] =

183

Some(Props(new RoleBasedDowningActor(criticalRoles)))

184

}

185

186

class RoleBasedDowningActor(criticalRoles: Set[String]) extends Actor with ActorLogging {

187

val cluster = Cluster(context.system)

188

189

override def preStart(): Unit = {

190

cluster.subscribe(self, classOf[UnreachableMember])

191

}

192

193

def receive = {

194

case UnreachableMember(member) =>

195

val hasCriticalRole = member.roles.intersect(criticalRoles).nonEmpty

196

197

if (hasCriticalRole) {

198

log.warning("NOT downing member {} with critical roles: {}",

199

member.address, member.roles.intersect(criticalRoles))

200

} else {

201

log.info("Downing non-critical member: {} (roles: {})",

202

member.address, member.roles)

203

cluster.down(member.address)

204

}

205

}

206

207

override def postStop(): Unit = {

208

cluster.unsubscribe(self)

209

}

210

}

211

```

212

213

### Join Configuration Compatibility Checker

214

215

SPI for validating configuration compatibility when nodes attempt to join the cluster.

216

217

```scala { .api }

218

/**

219

* Service provider interface for validating configuration compatibility when nodes join.

220

* Implementations must have a constructor accepting ActorSystem and ClusterSettings.

221

*/

222

abstract class JoinConfigCompatChecker {

223

/** Configuration keys that must be present and validated */

224

def requiredKeys: immutable.Seq[String]

225

226

/**

227

* Check if the joining node's configuration is compatible.

228

* @param toCheck Configuration from joining node

229

* @param actualConfig This node's configuration

230

* @return Valid if compatible, Invalid with error messages if not

231

*/

232

def check(toCheck: Config, actualConfig: Config): ConfigValidation

233

}

234

235

/**

236

* Configuration validation result

237

*/

238

sealed trait ConfigValidation

239

case object Valid extends ConfigValidation

240

case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation

241

242

object JoinConfigCompatChecker {

243

/** Factory method to load configured checker */

244

def load(system: ActorSystem, settings: ClusterSettings): JoinConfigCompatChecker

245

246

/** Utility to check key existence */

247

def exists(requiredKeys: immutable.Seq[String], toCheck: Config): ConfigValidation

248

249

/** Utility to check exact value match */

250

def fullMatch(requiredKeys: immutable.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation

251

}

252

```

253

254

**Usage Examples:**

255

256

```scala

257

// Application version compatibility checker

258

class AppVersionCompatChecker(system: ActorSystem, settings: ClusterSettings)

259

extends JoinConfigCompatChecker {

260

261

val requiredKeys = List(

262

"akka.actor.provider",

263

"akka.cluster.roles",

264

"app.version",

265

"app.api-version"

266

)

267

268

def check(toCheck: Config, actualConfig: Config): ConfigValidation = {

269

// First check required keys exist

270

JoinConfigCompatChecker.exists(requiredKeys, toCheck) match {

271

case Valid =>

272

// Custom validation logic

273

validateVersions(toCheck, actualConfig)

274

case invalid => invalid

275

}

276

}

277

278

private def validateVersions(joining: Config, actual: Config): ConfigValidation = {

279

val joiningAppVersion = joining.getString("app.version")

280

val joiningApiVersion = joining.getString("app.api-version")

281

val actualAppVersion = actual.getString("app.version")

282

val actualApiVersion = actual.getString("app.api-version")

283

284

val errors = scala.collection.mutable.ListBuffer[String]()

285

286

// Allow same major version

287

if (!isCompatibleVersion(joiningAppVersion, actualAppVersion)) {

288

errors += s"Incompatible app version: $joiningAppVersion vs $actualAppVersion"

289

}

290

291

// API version must match exactly

292

if (joiningApiVersion != actualApiVersion) {

293

errors += s"API version mismatch: $joiningApiVersion vs $actualApiVersion"

294

}

295

296

// Check cluster roles compatibility

297

val joiningRoles = joining.getStringList("akka.cluster.roles").asScala.toSet

298

val actualRoles = actual.getStringList("akka.cluster.roles").asScala.toSet

299

val allowedRoleCombinations = Set(

300

Set("frontend", "api"),

301

Set("backend", "worker"),

302

Set("database")

303

)

304

305

if (!allowedRoleCombinations.exists(allowed => joiningRoles.subsetOf(allowed))) {

306

errors += s"Invalid role combination: ${joiningRoles.mkString(", ")}"

307

}

308

309

if (errors.isEmpty) Valid else Invalid(errors.toList)

310

}

311

312

private def isCompatibleVersion(v1: String, v2: String): Boolean = {

313

// Simple major.minor.patch compatibility

314

val Array(major1, _, _) = v1.split("\\.")

315

val Array(major2, _, _) = v2.split("\\.")

316

major1 == major2

317

}

318

}

319

320

// Configuration

321

// akka.cluster.configuration-compatibility-check.checker-class = "com.myapp.AppVersionCompatChecker"

322

```

323

324

### Environment-Specific Compatibility Checkers

325

326

```scala

327

// Environment-aware compatibility checker

328

class EnvironmentCompatChecker(system: ActorSystem, settings: ClusterSettings)

329

extends JoinConfigCompatChecker {

330

331

val requiredKeys = List(

332

"app.environment",

333

"app.datacenter",

334

"app.instance-type"

335

)

336

337

def check(toCheck: Config, actualConfig: Config): ConfigValidation = {

338

JoinConfigCompatChecker.exists(requiredKeys, toCheck) match {

339

case Valid => validateEnvironment(toCheck, actualConfig)

340

case invalid => invalid

341

}

342

}

343

344

private def validateEnvironment(joining: Config, actual: Config): ConfigValidation = {

345

val joiningEnv = joining.getString("app.environment")

346

val actualEnv = actual.getString("app.environment")

347

348

// Only allow same environment

349

if (joiningEnv != actualEnv) {

350

return Invalid(List(s"Environment mismatch: $joiningEnv cannot join $actualEnv cluster"))

351

}

352

353

val joiningDc = joining.getString("app.datacenter")

354

val actualDc = actual.getString("app.datacenter")

355

val joiningInstanceType = joining.getString("app.instance-type")

356

357

// Validate data center compatibility

358

val compatibleDcs = Map(

359

"us-east-1" -> Set("us-east-1", "us-west-2"),

360

"eu-west-1" -> Set("eu-west-1", "eu-central-1")

361

)

362

363

compatibleDcs.get(actualDc) match {

364

case Some(allowed) if !allowed.contains(joiningDc) =>

365

Invalid(List(s"Data center $joiningDc not compatible with $actualDc"))

366

case _ =>

367

// Additional instance type validation

368

validateInstanceType(joiningInstanceType, joiningDc)

369

}

370

}

371

372

private def validateInstanceType(instanceType: String, dc: String): ConfigValidation = {

373

val allowedTypes = Map(

374

"us-east-1" -> Set("m5.large", "m5.xlarge", "c5.large"),

375

"eu-west-1" -> Set("m5.large", "c5.large")

376

)

377

378

allowedTypes.get(dc) match {

379

case Some(allowed) if !allowed.contains(instanceType) =>

380

Invalid(List(s"Instance type $instanceType not allowed in $dc"))

381

case _ => Valid

382

}

383

}

384

}

385

386

// Development vs Production checker

387

class DevProdCompatChecker(system: ActorSystem, settings: ClusterSettings)

388

extends JoinConfigCompatChecker {

389

390

val requiredKeys = List("app.mode", "app.debug-enabled")

391

392

def check(toCheck: Config, actualConfig: Config): ConfigValidation = {

393

val joiningMode = toCheck.getString("app.mode")

394

val actualMode = actualConfig.getString("app.mode")

395

396

// Strict separation of dev and prod

397

if (joiningMode != actualMode) {

398

Invalid(List(s"Cannot mix $joiningMode and $actualMode nodes in same cluster"))

399

} else {

400

// In development, allow debug mismatch with warning

401

// In production, require exact match

402

if (actualMode == "production") {

403

JoinConfigCompatChecker.fullMatch(requiredKeys, toCheck, actualConfig)

404

} else {

405

Valid // Dev mode is more permissive

406

}

407

}

408

}

409

}

410

```

411

412

### Downing Provider Factory Pattern

413

414

Pattern for creating configurable downing providers.

415

416

```scala

417

// Factory for creating different downing strategies

418

object DowningProviderFactory {

419

def create(strategy: String, system: ActorSystem): DowningProvider = strategy match {

420

case "none" => new NoDowning(system)

421

case "auto" => new AutoDowning(system)

422

case "quorum" => new QuorumBasedDowningProvider(system)

423

case "oldest" => new OldestMemberDowningProvider(system)

424

case "role-based" => new RoleBasedDowningProvider(system)

425

case custom =>

426

// Load custom provider by class name

427

system.asInstanceOf[ExtendedActorSystem].dynamicAccess

428

.createInstanceFor[DowningProvider](custom, List(classOf[ActorSystem] -> system))

429

.get

430

}

431

}

432

433

// Configuration-driven provider

434

class ConfigurableDowningProvider(system: ActorSystem) extends DowningProvider {

435

val config = system.settings.config.getConfig("app.cluster.downing")

436

val strategy = config.getString("strategy")

437

val delegate = DowningProviderFactory.create(strategy, system)

438

439

override def downRemovalMargin: FiniteDuration = delegate.downRemovalMargin

440

override def downingActorProps: Option[Props] = delegate.downingActorProps

441

}

442

443

// Configuration

444

/*

445

app.cluster.downing {

446

strategy = "quorum" # or "auto", "oldest", "role-based", etc.

447

}

448

akka.cluster.downing-provider-class = "com.myapp.ConfigurableDowningProvider"

449

*/

450

```

451

452

## Types

453

454

```scala { .api }

455

// Downing provider SPI

456

abstract class DowningProvider {

457

def downRemovalMargin: FiniteDuration

458

def downingActorProps: Option[Props]

459

}

460

461

// Built-in providers

462

final class NoDowning(system: ActorSystem) extends DowningProvider

463

class AutoDowning(system: ActorSystem) extends DowningProvider

464

465

// Configuration validation SPI

466

abstract class JoinConfigCompatChecker {

467

def requiredKeys: immutable.Seq[String]

468

def check(toCheck: Config, actualConfig: Config): ConfigValidation

469

}

470

471

// Validation result types

472

sealed trait ConfigValidation

473

case object Valid extends ConfigValidation

474

case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation

475

476

// Factory methods

477

object JoinConfigCompatChecker {

478

def load(system: ActorSystem, settings: ClusterSettings): JoinConfigCompatChecker

479

def exists(requiredKeys: immutable.Seq[String], toCheck: Config): ConfigValidation

480

def fullMatch(requiredKeys: immutable.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation

481

}

482

```