0
# Extensibility
1
2
Service Provider Interfaces for extending cluster behavior including custom downing strategies and join validation. These SPIs allow customization of critical cluster behaviors to meet specific application requirements.
3
4
## Capabilities
5
6
### Downing Provider SPI
7
8
Interface for implementing custom downing strategies that determine when and how to remove unreachable members from the cluster.
9
10
```scala { .api }
11
/**
12
* API for plugins that will handle downing of cluster nodes.
13
* Concrete plugins must subclass and have a public one argument constructor accepting an ActorSystem.
14
*/
15
abstract class DowningProvider {
16
/**
17
* Time margin after which shards or singletons that belonged to a downed/removed
18
* partition are created in surviving partition. This is useful if you implement
19
* downing strategies that handle network partitions.
20
*/
21
def downRemovalMargin: FiniteDuration
22
23
/**
24
* If a props is returned it is created as a child of the core cluster daemon on cluster startup.
25
* It should then handle downing using the regular Cluster APIs.
26
* The actor will run on the same dispatcher as the cluster actor if dispatcher not configured.
27
*
28
* May throw an exception which will then immediately lead to Cluster stopping,
29
* as the downing provider is vital to a working cluster.
30
*/
31
def downingActorProps: Option[Props]
32
}
33
```
34
35
**Usage Examples:**
36
37
```scala
38
// Custom downing provider implementation
39
class QuorumBasedDowningProvider(system: ActorSystem) extends DowningProvider {
40
val settings = new ClusterSettings(system.settings.config, system.name)
41
42
// Allow time for persistence to catch up after downing
43
override def downRemovalMargin: FiniteDuration = 30.seconds
44
45
// Create downing actor to implement the strategy
46
override def downingActorProps: Option[Props] =
47
Some(Props(new QuorumBasedDowningActor(settings)))
48
}
49
50
class QuorumBasedDowningActor(settings: ClusterSettings) extends Actor with ActorLogging {
51
val cluster = Cluster(context.system)
52
var unreachableMembers = Set.empty[Member]
53
54
override def preStart(): Unit = {
55
cluster.subscribe(self,
56
classOf[UnreachableMember],
57
classOf[ReachableMember],
58
classOf[MemberRemoved])
59
}
60
61
def receive = {
62
case UnreachableMember(member) =>
63
unreachableMembers += member
64
evaluateDowning()
65
66
case ReachableMember(member) =>
67
unreachableMembers -= member
68
69
case MemberRemoved(member, _) =>
70
unreachableMembers -= member
71
}
72
73
def evaluateDowning(): Unit = {
74
val currentMembers = cluster.state.members.size
75
val reachableMembers = currentMembers - unreachableMembers.size
76
val quorumSize = (currentMembers / 2) + 1
77
78
if (reachableMembers >= quorumSize) {
79
// We have quorum, safe to down unreachable members
80
unreachableMembers.foreach { member =>
81
log.info("Downing unreachable member: {}", member.address)
82
cluster.down(member.address)
83
}
84
} else {
85
log.warning("Cannot down members - would lose quorum ({} reachable, need {})",
86
reachableMembers, quorumSize)
87
}
88
}
89
90
override def postStop(): Unit = {
91
cluster.unsubscribe(self)
92
}
93
}
94
95
// Configuration
96
// akka.cluster.downing-provider-class = "com.myapp.QuorumBasedDowningProvider"
97
```
98
99
### Default Downing Providers
100
101
Built-in downing provider implementations.
102
103
```scala { .api }
104
/**
105
* Default downing provider used when no provider is configured and
106
* 'auto-down-unreachable-after' is not enabled.
107
*/
108
final class NoDowning(system: ActorSystem) extends DowningProvider {
109
override def downRemovalMargin: FiniteDuration = Cluster(system).settings.DownRemovalMargin
110
override val downingActorProps: Option[Props] = None
111
}
112
113
/**
114
* Downing provider used when auto-down is enabled.
115
* Automatically downs unreachable members after configured timeout.
116
*/
117
class AutoDowning(system: ActorSystem) extends DowningProvider {
118
override def downRemovalMargin: FiniteDuration
119
override def downingActorProps: Option[Props]
120
}
121
```
122
123
**Auto-Down Configuration:**
124
125
```hocon
126
# Enable automatic downing (use with caution in production)
127
akka.cluster.auto-down-unreachable-after = 10s
128
129
# Or use custom downing provider
130
akka.cluster {
131
downing-provider-class = "com.myapp.CustomDowningProvider"
132
down-removal-margin = 30s
133
}
134
```
135
136
### Advanced Downing Strategies
137
138
Examples of more sophisticated downing strategies.
139
140
```scala
141
// Split-brain resolver based on oldest member
142
class OldestMemberDowningProvider(system: ActorSystem) extends DowningProvider {
143
override def downRemovalMargin: FiniteDuration = 20.seconds
144
145
override def downingActorProps: Option[Props] =
146
Some(Props(new OldestMemberDowningActor()))
147
}
148
149
class OldestMemberDowningActor extends Actor with ActorLogging {
150
val cluster = Cluster(context.system)
151
152
override def preStart(): Unit = {
153
cluster.subscribe(self, classOf[UnreachableMember])
154
}
155
156
def receive = {
157
case UnreachableMember(member) =>
158
val reachableMembers = cluster.state.members -- cluster.state.unreachable
159
val oldestReachable = reachableMembers.minBy(_.upNumber)
160
161
if (cluster.selfMember == oldestReachable) {
162
// I'm the oldest reachable member, I decide who to down
163
log.info("As oldest member, downing unreachable: {}", member.address)
164
cluster.down(member.address)
165
} else {
166
log.info("Not oldest member, waiting for {} to make downing decision",
167
oldestReachable.address)
168
}
169
}
170
171
override def postStop(): Unit = {
172
cluster.unsubscribe(self)
173
}
174
}
175
176
// Role-based downing - only down members not in critical roles
177
class RoleBasedDowningProvider(system: ActorSystem) extends DowningProvider {
178
val criticalRoles = Set("database", "master")
179
180
override def downRemovalMargin: FiniteDuration = 15.seconds
181
182
override def downingActorProps: Option[Props] =
183
Some(Props(new RoleBasedDowningActor(criticalRoles)))
184
}
185
186
class RoleBasedDowningActor(criticalRoles: Set[String]) extends Actor with ActorLogging {
187
val cluster = Cluster(context.system)
188
189
override def preStart(): Unit = {
190
cluster.subscribe(self, classOf[UnreachableMember])
191
}
192
193
def receive = {
194
case UnreachableMember(member) =>
195
val hasCriticalRole = member.roles.intersect(criticalRoles).nonEmpty
196
197
if (hasCriticalRole) {
198
log.warning("NOT downing member {} with critical roles: {}",
199
member.address, member.roles.intersect(criticalRoles))
200
} else {
201
log.info("Downing non-critical member: {} (roles: {})",
202
member.address, member.roles)
203
cluster.down(member.address)
204
}
205
}
206
207
override def postStop(): Unit = {
208
cluster.unsubscribe(self)
209
}
210
}
211
```
212
213
### Join Configuration Compatibility Checker
214
215
SPI for validating configuration compatibility when nodes attempt to join the cluster.
216
217
```scala { .api }
218
/**
219
* Service provider interface for validating configuration compatibility when nodes join.
220
* Implementations must have a constructor accepting ActorSystem and ClusterSettings.
221
*/
222
abstract class JoinConfigCompatChecker {
223
/** Configuration keys that must be present and validated */
224
def requiredKeys: immutable.Seq[String]
225
226
/**
227
* Check if the joining node's configuration is compatible.
228
* @param toCheck Configuration from joining node
229
* @param actualConfig This node's configuration
230
* @return Valid if compatible, Invalid with error messages if not
231
*/
232
def check(toCheck: Config, actualConfig: Config): ConfigValidation
233
}
234
235
/**
236
* Configuration validation result
237
*/
238
sealed trait ConfigValidation
239
case object Valid extends ConfigValidation
240
case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation
241
242
object JoinConfigCompatChecker {
243
/** Factory method to load configured checker */
244
def load(system: ActorSystem, settings: ClusterSettings): JoinConfigCompatChecker
245
246
/** Utility to check key existence */
247
def exists(requiredKeys: immutable.Seq[String], toCheck: Config): ConfigValidation
248
249
/** Utility to check exact value match */
250
def fullMatch(requiredKeys: immutable.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation
251
}
252
```
253
254
**Usage Examples:**
255
256
```scala
257
// Application version compatibility checker
258
class AppVersionCompatChecker(system: ActorSystem, settings: ClusterSettings)
259
extends JoinConfigCompatChecker {
260
261
val requiredKeys = List(
262
"akka.actor.provider",
263
"akka.cluster.roles",
264
"app.version",
265
"app.api-version"
266
)
267
268
def check(toCheck: Config, actualConfig: Config): ConfigValidation = {
269
// First check required keys exist
270
JoinConfigCompatChecker.exists(requiredKeys, toCheck) match {
271
case Valid =>
272
// Custom validation logic
273
validateVersions(toCheck, actualConfig)
274
case invalid => invalid
275
}
276
}
277
278
private def validateVersions(joining: Config, actual: Config): ConfigValidation = {
279
val joiningAppVersion = joining.getString("app.version")
280
val joiningApiVersion = joining.getString("app.api-version")
281
val actualAppVersion = actual.getString("app.version")
282
val actualApiVersion = actual.getString("app.api-version")
283
284
val errors = scala.collection.mutable.ListBuffer[String]()
285
286
// Allow same major version
287
if (!isCompatibleVersion(joiningAppVersion, actualAppVersion)) {
288
errors += s"Incompatible app version: $joiningAppVersion vs $actualAppVersion"
289
}
290
291
// API version must match exactly
292
if (joiningApiVersion != actualApiVersion) {
293
errors += s"API version mismatch: $joiningApiVersion vs $actualApiVersion"
294
}
295
296
// Check cluster roles compatibility
297
val joiningRoles = joining.getStringList("akka.cluster.roles").asScala.toSet
298
val actualRoles = actual.getStringList("akka.cluster.roles").asScala.toSet
299
val allowedRoleCombinations = Set(
300
Set("frontend", "api"),
301
Set("backend", "worker"),
302
Set("database")
303
)
304
305
if (!allowedRoleCombinations.exists(allowed => joiningRoles.subsetOf(allowed))) {
306
errors += s"Invalid role combination: ${joiningRoles.mkString(", ")}"
307
}
308
309
if (errors.isEmpty) Valid else Invalid(errors.toList)
310
}
311
312
private def isCompatibleVersion(v1: String, v2: String): Boolean = {
313
// Simple major.minor.patch compatibility
314
val Array(major1, _, _) = v1.split("\\.")
315
val Array(major2, _, _) = v2.split("\\.")
316
major1 == major2
317
}
318
}
319
320
// Configuration
321
// akka.cluster.configuration-compatibility-check.checker-class = "com.myapp.AppVersionCompatChecker"
322
```
323
324
### Environment-Specific Compatibility Checkers
325
326
```scala
327
// Environment-aware compatibility checker
328
class EnvironmentCompatChecker(system: ActorSystem, settings: ClusterSettings)
329
extends JoinConfigCompatChecker {
330
331
val requiredKeys = List(
332
"app.environment",
333
"app.datacenter",
334
"app.instance-type"
335
)
336
337
def check(toCheck: Config, actualConfig: Config): ConfigValidation = {
338
JoinConfigCompatChecker.exists(requiredKeys, toCheck) match {
339
case Valid => validateEnvironment(toCheck, actualConfig)
340
case invalid => invalid
341
}
342
}
343
344
private def validateEnvironment(joining: Config, actual: Config): ConfigValidation = {
345
val joiningEnv = joining.getString("app.environment")
346
val actualEnv = actual.getString("app.environment")
347
348
// Only allow same environment
349
if (joiningEnv != actualEnv) {
350
return Invalid(List(s"Environment mismatch: $joiningEnv cannot join $actualEnv cluster"))
351
}
352
353
val joiningDc = joining.getString("app.datacenter")
354
val actualDc = actual.getString("app.datacenter")
355
val joiningInstanceType = joining.getString("app.instance-type")
356
357
// Validate data center compatibility
358
val compatibleDcs = Map(
359
"us-east-1" -> Set("us-east-1", "us-west-2"),
360
"eu-west-1" -> Set("eu-west-1", "eu-central-1")
361
)
362
363
compatibleDcs.get(actualDc) match {
364
case Some(allowed) if !allowed.contains(joiningDc) =>
365
Invalid(List(s"Data center $joiningDc not compatible with $actualDc"))
366
case _ =>
367
// Additional instance type validation
368
validateInstanceType(joiningInstanceType, joiningDc)
369
}
370
}
371
372
private def validateInstanceType(instanceType: String, dc: String): ConfigValidation = {
373
val allowedTypes = Map(
374
"us-east-1" -> Set("m5.large", "m5.xlarge", "c5.large"),
375
"eu-west-1" -> Set("m5.large", "c5.large")
376
)
377
378
allowedTypes.get(dc) match {
379
case Some(allowed) if !allowed.contains(instanceType) =>
380
Invalid(List(s"Instance type $instanceType not allowed in $dc"))
381
case _ => Valid
382
}
383
}
384
}
385
386
// Development vs Production checker
387
class DevProdCompatChecker(system: ActorSystem, settings: ClusterSettings)
388
extends JoinConfigCompatChecker {
389
390
val requiredKeys = List("app.mode", "app.debug-enabled")
391
392
def check(toCheck: Config, actualConfig: Config): ConfigValidation = {
393
val joiningMode = toCheck.getString("app.mode")
394
val actualMode = actualConfig.getString("app.mode")
395
396
// Strict separation of dev and prod
397
if (joiningMode != actualMode) {
398
Invalid(List(s"Cannot mix $joiningMode and $actualMode nodes in same cluster"))
399
} else {
400
// In development, allow debug mismatch with warning
401
// In production, require exact match
402
if (actualMode == "production") {
403
JoinConfigCompatChecker.fullMatch(requiredKeys, toCheck, actualConfig)
404
} else {
405
Valid // Dev mode is more permissive
406
}
407
}
408
}
409
}
410
```
411
412
### Downing Provider Factory Pattern
413
414
Pattern for creating configurable downing providers.
415
416
```scala
417
// Factory for creating different downing strategies
418
object DowningProviderFactory {
419
def create(strategy: String, system: ActorSystem): DowningProvider = strategy match {
420
case "none" => new NoDowning(system)
421
case "auto" => new AutoDowning(system)
422
case "quorum" => new QuorumBasedDowningProvider(system)
423
case "oldest" => new OldestMemberDowningProvider(system)
424
case "role-based" => new RoleBasedDowningProvider(system)
425
case custom =>
426
// Load custom provider by class name
427
system.asInstanceOf[ExtendedActorSystem].dynamicAccess
428
.createInstanceFor[DowningProvider](custom, List(classOf[ActorSystem] -> system))
429
.get
430
}
431
}
432
433
// Configuration-driven provider
434
class ConfigurableDowningProvider(system: ActorSystem) extends DowningProvider {
435
val config = system.settings.config.getConfig("app.cluster.downing")
436
val strategy = config.getString("strategy")
437
val delegate = DowningProviderFactory.create(strategy, system)
438
439
override def downRemovalMargin: FiniteDuration = delegate.downRemovalMargin
440
override def downingActorProps: Option[Props] = delegate.downingActorProps
441
}
442
443
// Configuration
444
/*
445
app.cluster.downing {
446
strategy = "quorum" # or "auto", "oldest", "role-based", etc.
447
}
448
akka.cluster.downing-provider-class = "com.myapp.ConfigurableDowningProvider"
449
*/
450
```
451
452
## Types
453
454
```scala { .api }
455
// Downing provider SPI
456
abstract class DowningProvider {
457
def downRemovalMargin: FiniteDuration
458
def downingActorProps: Option[Props]
459
}
460
461
// Built-in providers
462
final class NoDowning(system: ActorSystem) extends DowningProvider
463
class AutoDowning(system: ActorSystem) extends DowningProvider
464
465
// Configuration validation SPI
466
abstract class JoinConfigCompatChecker {
467
def requiredKeys: immutable.Seq[String]
468
def check(toCheck: Config, actualConfig: Config): ConfigValidation
469
}
470
471
// Validation result types
472
sealed trait ConfigValidation
473
case object Valid extends ConfigValidation
474
case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation
475
476
// Factory methods
477
object JoinConfigCompatChecker {
478
def load(system: ActorSystem, settings: ClusterSettings): JoinConfigCompatChecker
479
def exists(requiredKeys: immutable.Seq[String], toCheck: Config): ConfigValidation
480
def fullMatch(requiredKeys: immutable.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation
481
}
482
```