0
# Configuration and Management
1
2
Cluster configuration management, settings, and JMX integration for monitoring and management. This covers how to configure cluster behavior, monitor cluster health, and manage clusters programmatically and through JMX.
3
4
## Capabilities
5
6
### Cluster Settings
7
8
Core cluster configuration loaded from application.conf with comprehensive settings for all cluster aspects.
9
10
```scala { .api }
11
/**
12
* Cluster configuration settings loaded from config.
13
* Provides access to all cluster-related configuration values.
14
*/
15
class ClusterSettings(val config: Config, val systemName: String) {
16
/** Configured seed nodes for initial cluster joining */
17
val SeedNodes: immutable.IndexedSeq[Address]
18
19
/** Roles assigned to this cluster node */
20
val Roles: Set[String]
21
22
/** Data center this node belongs to */
23
val SelfDataCenter: DataCenter
24
25
/** Heartbeat interval between cluster nodes */
26
val HeartbeatInterval: FiniteDuration
27
28
/** Expected heartbeat response time */
29
val HeartbeatExpectedResponseAfter: FiniteDuration
30
31
/** Number of members that monitor each member for failure detection */
32
val MonitoredByNrOfMembers: Int
33
34
/** Enable info level cluster logging */
35
val LogInfo: Boolean
36
37
/** Enable JMX monitoring and management */
38
val JmxEnabled: Boolean
39
40
/** Auto-down unreachable members after this duration (if configured) */
41
val AutoDownUnreachableAfter: FiniteDuration
42
43
/** Minimum number of members before leader actions */
44
val MinNrOfMembers: Int
45
46
/** Minimum number of members per role before leader actions */
47
val MinNrOfMembersOfRole: Map[String, Int]
48
49
/** Gossip interval for cluster state dissemination */
50
val GossipInterval: FiniteDuration
51
52
/** Failure detector implementation class */
53
val FailureDetectorImplementationClass: String
54
55
/** Failure detector configuration */
56
val FailureDetectorConfig: Config
57
58
/** Downing provider class name */
59
val DowningProviderClassName: String
60
61
/** Dispatcher to use for cluster actors */
62
val UseDispatcher: String
63
}
64
```
65
66
**Configuration Example (application.conf):**
67
68
```hocon
69
akka {
70
actor {
71
provider = "cluster"
72
}
73
74
cluster {
75
# Seed nodes for joining the cluster
76
seed-nodes = [
77
"akka.tcp://ClusterSystem@127.0.0.1:2551",
78
"akka.tcp://ClusterSystem@127.0.0.1:2552"
79
]
80
81
# Node roles
82
roles = ["frontend", "backend"]
83
84
# Data center (for multi-DC clusters)
85
multi-data-center.self-data-center = "dc1"
86
87
# Minimum cluster size before leader actions
88
min-nr-of-members = 3
89
min-nr-of-members-of-role {
90
backend = 2
91
frontend = 1
92
}
93
94
# Failure detection
95
failure-detector {
96
implementation-class = "akka.remote.PhiAccrualFailureDetector"
97
heartbeat-interval = 1s
98
threshold = 8.0
99
max-sample-size = 1000
100
min-std-deviation = 100ms
101
acceptable-heartbeat-pause = 3s
102
monitored-by-nr-of-members = 5
103
}
104
105
# Auto-down unreachable (use with caution in production)
106
auto-down-unreachable-after = off
107
108
# Downing provider
109
downing-provider-class = "akka.cluster.NoDowning"
110
111
# Gossip settings
112
gossip-interval = 1s
113
gossip-time-to-live = 2s
114
115
# JMX monitoring
116
jmx.enabled = on
117
118
# Logging
119
log-info = on
120
}
121
}
122
```
123
124
**Usage Examples:**
125
126
```scala
127
val cluster = Cluster(system)
128
val settings = cluster.settings
129
130
println(s"Seed nodes: ${settings.SeedNodes}")
131
println(s"My roles: ${settings.Roles}")
132
println(s"Data center: ${settings.SelfDataCenter}")
133
println(s"Heartbeat interval: ${settings.HeartbeatInterval}")
134
println(s"Min cluster size: ${settings.MinNrOfMembers}")
135
println(s"JMX enabled: ${settings.JmxEnabled}")
136
137
// Check role-specific minimums
138
settings.MinNrOfMembersOfRole.foreach { case (role, minCount) =>
139
println(s"Minimum $role nodes: $minCount")
140
}
141
```
142
143
### JMX Management Interface
144
145
JMX interface for cluster node management and monitoring providing operational control.
146
147
```scala { .api }
148
/**
149
* JMX management interface for cluster operations and monitoring.
150
* Accessible via JMX clients like JConsole, VisualVM.
151
*/
152
trait ClusterNodeMBean {
153
/** Get current member status as string */
154
def getMemberStatus: String
155
156
/** Get comma-separated member addresses */
157
def getMembers: String
158
159
/** Get comma-separated unreachable member addresses */
160
def getUnreachable: String
161
162
/** Get comprehensive cluster status as JSON string */
163
def getClusterStatus: String
164
165
/** Get current cluster leader address */
166
def getLeader: String
167
168
/** Check if cluster has only one member */
169
def isSingleton: Boolean
170
171
/** Check if this node is available (Up status) */
172
def isAvailable: Boolean
173
174
/** Join cluster at specified address */
175
def join(address: String): Unit
176
177
/** Leave cluster for specified address */
178
def leave(address: String): Unit
179
180
/** Mark specified address as down */
181
def down(address: String): Unit
182
}
183
```
184
185
**JMX Usage Examples:**
186
187
```scala
188
// JMX is automatically enabled if akka.cluster.jmx.enabled = on
189
// Access via JConsole at: akka:type=Cluster
190
191
// Programmatic JMX access
192
import javax.management.{MBeanServer, ObjectName}
193
import java.lang.management.ManagementFactory
194
195
val server: MBeanServer = ManagementFactory.getPlatformMBeanServer
196
val objectName = new ObjectName("akka:type=Cluster")
197
198
// Get cluster information
199
val memberStatus = server.getAttribute(objectName, "MemberStatus").asInstanceOf[String]
200
val members = server.getAttribute(objectName, "Members").asInstanceOf[String]
201
val leader = server.getAttribute(objectName, "Leader").asInstanceOf[String]
202
val isAvailable = server.getAttribute(objectName, "Available").asInstanceOf[Boolean]
203
204
println(s"Status: $memberStatus")
205
println(s"Members: $members")
206
println(s"Leader: $leader")
207
println(s"Available: $isAvailable")
208
209
// Perform cluster operations
210
server.invoke(objectName, "join", Array("akka.tcp://System@host:2551"), Array("java.lang.String"))
211
server.invoke(objectName, "leave", Array("akka.tcp://System@host:2552"), Array("java.lang.String"))
212
```
213
214
### Settings Constants and Types
215
216
Constants and type definitions used throughout cluster configuration.
217
218
```scala { .api }
219
object ClusterSettings {
220
/** Default data center name */
221
val DefaultDataCenter: String = "default"
222
223
/** Prefix for data center role names */
224
val DcRolePrefix: String = "dc-"
225
226
/** Type alias for data center identifiers */
227
type DataCenter = String
228
229
/** Multi-data center specific settings */
230
object MultiDataCenter {
231
/** Cross data center failure detector settings */
232
val CrossDcFailureDetectorSettings: FailureDetectorSettings
233
234
/** Cross data center connections configuration */
235
val CrossDcConnections: Int
236
}
237
238
/** Debug logging settings */
239
object Debug {
240
/** Enable verbose logging of cluster events */
241
val LogInfo: Boolean
242
243
/** Enable debug logging of gossip */
244
val LogGossip: Boolean
245
}
246
}
247
```
248
249
**Usage Examples:**
250
251
```scala
252
import akka.cluster.ClusterSettings._
253
254
// Check if using default data center
255
if (cluster.selfDataCenter == DefaultDataCenter) {
256
println("Using default data center")
257
}
258
259
// Work with data center roles
260
val dcRole = s"$DcRolePrefix${cluster.selfDataCenter}"
261
if (cluster.selfRoles.contains(dcRole)) {
262
println(s"Node has data center role: $dcRole")
263
}
264
265
// Multi-DC settings
266
val settings = cluster.settings
267
println(s"Cross-DC connections: ${settings.MultiDataCenter.CrossDcConnections}")
268
```
269
270
### Cluster Read View
271
272
Read-only view of cluster state for monitoring and inspection without modification capabilities.
273
274
```scala { .api }
275
/**
276
* Read-only view of the cluster state.
277
* Thread-safe access to current cluster information.
278
*/
279
class ClusterReadView(cluster: Cluster) {
280
/** Current cluster state snapshot */
281
def state: CurrentClusterState
282
283
/** This member's information */
284
def self: Member
285
286
/** Current cluster members */
287
def members: immutable.SortedSet[Member]
288
289
/** Unreachable members */
290
def unreachable: Set[Member]
291
292
/** Current leader address */
293
def leader: Option[Address]
294
295
/** Check if cluster has converged (all members seen latest gossip) */
296
def isConverged: Boolean
297
298
/** Check if this node is the leader */
299
def isLeader: Boolean
300
}
301
```
302
303
**Usage Examples:**
304
305
```scala
306
val readView = cluster.readView
307
308
// Monitor cluster health
309
def printClusterHealth(): Unit = {
310
val state = readView.state
311
println(s"=== Cluster Health ===")
312
println(s"Total members: ${state.members.size}")
313
println(s"Unreachable: ${state.unreachable.size}")
314
println(s"Leader: ${state.leader.getOrElse("None")}")
315
println(s"Converged: ${readView.isConverged}")
316
println(s"Am I leader: ${readView.isLeader}")
317
318
// Members by status
319
val membersByStatus = state.members.groupBy(_.status)
320
membersByStatus.foreach { case (status, members) =>
321
println(s"$status: ${members.size}")
322
}
323
}
324
325
// Scheduled health check
326
import scala.concurrent.duration._
327
system.scheduler.scheduleWithFixedDelay(30.seconds, 30.seconds) { () =>
328
printClusterHealth()
329
}
330
```
331
332
### Configuration Validation
333
334
Configuration compatibility checking when nodes join the cluster.
335
336
```scala { .api }
337
/**
338
* Configuration validation result
339
*/
340
sealed trait ConfigValidation
341
case object Valid extends ConfigValidation
342
case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation
343
344
/**
345
* Service provider interface for join configuration validation
346
*/
347
abstract class JoinConfigCompatChecker {
348
/** Configuration keys that must be validated */
349
def requiredKeys: immutable.Seq[String]
350
351
/** Check configuration compatibility */
352
def check(toCheck: Config, actualConfig: Config): ConfigValidation
353
}
354
355
object JoinConfigCompatChecker {
356
/** Check if required keys exist */
357
def exists(requiredKeys: immutable.Seq[String], toCheck: Config): ConfigValidation
358
359
/** Check if configurations match exactly */
360
def fullMatch(requiredKeys: immutable.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation
361
}
362
```
363
364
**Usage Examples:**
365
366
```scala
367
// Custom configuration validator
368
class MyConfigChecker(system: ActorSystem, settings: ClusterSettings) extends JoinConfigCompatChecker {
369
val requiredKeys = List(
370
"akka.actor.provider",
371
"akka.cluster.roles",
372
"my-app.version"
373
)
374
375
def check(toCheck: Config, actualConfig: Config): ConfigValidation = {
376
// Check that joining node has compatible configuration
377
val existsCheck = JoinConfigCompatChecker.exists(requiredKeys, toCheck)
378
379
existsCheck match {
380
case Valid =>
381
// Additional custom validation
382
val joiningVersion = toCheck.getString("my-app.version")
383
val myVersion = actualConfig.getString("my-app.version")
384
385
if (joiningVersion == myVersion) Valid
386
else Invalid(List(s"Version mismatch: $joiningVersion != $myVersion"))
387
388
case invalid => invalid
389
}
390
}
391
}
392
393
// Configuration in application.conf
394
// akka.cluster.configuration-compatibility-check.checker-class = "com.myapp.MyConfigChecker"
395
```
396
397
### Failure Detection Configuration
398
399
Configurable failure detection for monitoring cluster member health.
400
401
```scala { .api }
402
// Failure detector configuration options
403
// akka.cluster.failure-detector.implementation-class
404
// akka.cluster.failure-detector.heartbeat-interval
405
// akka.cluster.failure-detector.threshold
406
// akka.cluster.failure-detector.max-sample-size
407
// akka.cluster.failure-detector.min-std-deviation
408
// akka.cluster.failure-detector.acceptable-heartbeat-pause
409
// akka.cluster.failure-detector.monitored-by-nr-of-members
410
```
411
412
**Failure Detection Examples:**
413
414
```hocon
415
# Phi Accrual Failure Detector (default)
416
akka.cluster.failure-detector {
417
implementation-class = "akka.remote.PhiAccrualFailureDetector"
418
heartbeat-interval = 1s
419
threshold = 8.0 # Higher = more tolerant to network issues
420
max-sample-size = 1000
421
min-std-deviation = 100ms
422
acceptable-heartbeat-pause = 3s # Allow GC pauses
423
monitored-by-nr-of-members = 5 # Members monitoring each member
424
}
425
426
# Deadline Failure Detector (simpler, less adaptive)
427
akka.cluster.failure-detector {
428
implementation-class = "akka.remote.DeadlineFailureDetector"
429
heartbeat-interval = 1s
430
acceptable-heartbeat-pause = 10s # Fixed timeout
431
}
432
```
433
434
## Types
435
436
```scala { .api }
437
// Configuration types
438
class ClusterSettings(config: Config, systemName: String)
439
type DataCenter = String
440
441
// JMX management interface
442
trait ClusterNodeMBean
443
444
// Configuration validation
445
sealed trait ConfigValidation
446
case object Valid extends ConfigValidation
447
case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation
448
449
// Read-only cluster view
450
class ClusterReadView(cluster: Cluster)
451
452
// Settings constants
453
object ClusterSettings {
454
val DefaultDataCenter: String
455
val DcRolePrefix: String
456
type DataCenter = String
457
}
458
```