or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster-management.mdcluster-routing.mdconfiguration-and-management.mdevents-and-state.mdextensibility.mdindex.mdmembers-and-status.md

configuration-and-management.mddocs/

0

# Configuration and Management

1

2

Cluster configuration management, settings, and JMX integration for monitoring and management. This covers how to configure cluster behavior, monitor cluster health, and manage clusters programmatically and through JMX.

3

4

## Capabilities

5

6

### Cluster Settings

7

8

Core cluster configuration loaded from application.conf with comprehensive settings for all cluster aspects.

9

10

```scala { .api }

11

/**

12

* Cluster configuration settings loaded from config.

13

* Provides access to all cluster-related configuration values.

14

*/

15

class ClusterSettings(val config: Config, val systemName: String) {

16

/** Configured seed nodes for initial cluster joining */

17

val SeedNodes: immutable.IndexedSeq[Address]

18

19

/** Roles assigned to this cluster node */

20

val Roles: Set[String]

21

22

/** Data center this node belongs to */

23

val SelfDataCenter: DataCenter

24

25

/** Heartbeat interval between cluster nodes */

26

val HeartbeatInterval: FiniteDuration

27

28

/** Expected heartbeat response time */

29

val HeartbeatExpectedResponseAfter: FiniteDuration

30

31

/** Number of members that monitor each member for failure detection */

32

val MonitoredByNrOfMembers: Int

33

34

/** Enable info level cluster logging */

35

val LogInfo: Boolean

36

37

/** Enable JMX monitoring and management */

38

val JmxEnabled: Boolean

39

40

/** Auto-down unreachable members after this duration (if configured) */

41

val AutoDownUnreachableAfter: FiniteDuration

42

43

/** Minimum number of members before leader actions */

44

val MinNrOfMembers: Int

45

46

/** Minimum number of members per role before leader actions */

47

val MinNrOfMembersOfRole: Map[String, Int]

48

49

/** Gossip interval for cluster state dissemination */

50

val GossipInterval: FiniteDuration

51

52

/** Failure detector implementation class */

53

val FailureDetectorImplementationClass: String

54

55

/** Failure detector configuration */

56

val FailureDetectorConfig: Config

57

58

/** Downing provider class name */

59

val DowningProviderClassName: String

60

61

/** Dispatcher to use for cluster actors */

62

val UseDispatcher: String

63

}

64

```

65

66

**Configuration Example (application.conf):**

67

68

```hocon

69

akka {

70

actor {

71

provider = "cluster"

72

}

73

74

cluster {

75

# Seed nodes for joining the cluster

76

seed-nodes = [

77

"akka.tcp://ClusterSystem@127.0.0.1:2551",

78

"akka.tcp://ClusterSystem@127.0.0.1:2552"

79

]

80

81

# Node roles

82

roles = ["frontend", "backend"]

83

84

# Data center (for multi-DC clusters)

85

multi-data-center.self-data-center = "dc1"

86

87

# Minimum cluster size before leader actions

88

min-nr-of-members = 3

89

min-nr-of-members-of-role {

90

backend = 2

91

frontend = 1

92

}

93

94

# Failure detection

95

failure-detector {

96

implementation-class = "akka.remote.PhiAccrualFailureDetector"

97

heartbeat-interval = 1s

98

threshold = 8.0

99

max-sample-size = 1000

100

min-std-deviation = 100ms

101

acceptable-heartbeat-pause = 3s

102

monitored-by-nr-of-members = 5

103

}

104

105

# Auto-down unreachable (use with caution in production)

106

auto-down-unreachable-after = off

107

108

# Downing provider

109

downing-provider-class = "akka.cluster.NoDowning"

110

111

# Gossip settings

112

gossip-interval = 1s

113

gossip-time-to-live = 2s

114

115

# JMX monitoring

116

jmx.enabled = on

117

118

# Logging

119

log-info = on

120

}

121

}

122

```

123

124

**Usage Examples:**

125

126

```scala

127

val cluster = Cluster(system)

128

val settings = cluster.settings

129

130

println(s"Seed nodes: ${settings.SeedNodes}")

131

println(s"My roles: ${settings.Roles}")

132

println(s"Data center: ${settings.SelfDataCenter}")

133

println(s"Heartbeat interval: ${settings.HeartbeatInterval}")

134

println(s"Min cluster size: ${settings.MinNrOfMembers}")

135

println(s"JMX enabled: ${settings.JmxEnabled}")

136

137

// Check role-specific minimums

138

settings.MinNrOfMembersOfRole.foreach { case (role, minCount) =>

139

println(s"Minimum $role nodes: $minCount")

140

}

141

```

142

143

### JMX Management Interface

144

145

JMX interface for cluster node management and monitoring providing operational control.

146

147

```scala { .api }

148

/**

149

* JMX management interface for cluster operations and monitoring.

150

* Accessible via JMX clients like JConsole, VisualVM.

151

*/

152

trait ClusterNodeMBean {

153

/** Get current member status as string */

154

def getMemberStatus: String

155

156

/** Get comma-separated member addresses */

157

def getMembers: String

158

159

/** Get comma-separated unreachable member addresses */

160

def getUnreachable: String

161

162

/** Get comprehensive cluster status as JSON string */

163

def getClusterStatus: String

164

165

/** Get current cluster leader address */

166

def getLeader: String

167

168

/** Check if cluster has only one member */

169

def isSingleton: Boolean

170

171

/** Check if this node is available (Up status) */

172

def isAvailable: Boolean

173

174

/** Join cluster at specified address */

175

def join(address: String): Unit

176

177

/** Leave cluster for specified address */

178

def leave(address: String): Unit

179

180

/** Mark specified address as down */

181

def down(address: String): Unit

182

}

183

```

184

185

**JMX Usage Examples:**

186

187

```scala

188

// JMX is automatically enabled if akka.cluster.jmx.enabled = on

189

// Access via JConsole at: akka:type=Cluster

190

191

// Programmatic JMX access

192

import javax.management.{MBeanServer, ObjectName}

193

import java.lang.management.ManagementFactory

194

195

val server: MBeanServer = ManagementFactory.getPlatformMBeanServer

196

val objectName = new ObjectName("akka:type=Cluster")

197

198

// Get cluster information

199

val memberStatus = server.getAttribute(objectName, "MemberStatus").asInstanceOf[String]

200

val members = server.getAttribute(objectName, "Members").asInstanceOf[String]

201

val leader = server.getAttribute(objectName, "Leader").asInstanceOf[String]

202

val isAvailable = server.getAttribute(objectName, "Available").asInstanceOf[Boolean]

203

204

println(s"Status: $memberStatus")

205

println(s"Members: $members")

206

println(s"Leader: $leader")

207

println(s"Available: $isAvailable")

208

209

// Perform cluster operations

210

server.invoke(objectName, "join", Array("akka.tcp://System@host:2551"), Array("java.lang.String"))

211

server.invoke(objectName, "leave", Array("akka.tcp://System@host:2552"), Array("java.lang.String"))

212

```

213

214

### Settings Constants and Types

215

216

Constants and type definitions used throughout cluster configuration.

217

218

```scala { .api }

219

object ClusterSettings {

220

/** Default data center name */

221

val DefaultDataCenter: String = "default"

222

223

/** Prefix for data center role names */

224

val DcRolePrefix: String = "dc-"

225

226

/** Type alias for data center identifiers */

227

type DataCenter = String

228

229

/** Multi-data center specific settings */

230

object MultiDataCenter {

231

/** Cross data center failure detector settings */

232

val CrossDcFailureDetectorSettings: FailureDetectorSettings

233

234

/** Cross data center connections configuration */

235

val CrossDcConnections: Int

236

}

237

238

/** Debug logging settings */

239

object Debug {

240

/** Enable verbose logging of cluster events */

241

val LogInfo: Boolean

242

243

/** Enable debug logging of gossip */

244

val LogGossip: Boolean

245

}

246

}

247

```

248

249

**Usage Examples:**

250

251

```scala

252

import akka.cluster.ClusterSettings._

253

254

// Check if using default data center

255

if (cluster.selfDataCenter == DefaultDataCenter) {

256

println("Using default data center")

257

}

258

259

// Work with data center roles

260

val dcRole = s"$DcRolePrefix${cluster.selfDataCenter}"

261

if (cluster.selfRoles.contains(dcRole)) {

262

println(s"Node has data center role: $dcRole")

263

}

264

265

// Multi-DC settings

266

val settings = cluster.settings

267

println(s"Cross-DC connections: ${settings.MultiDataCenter.CrossDcConnections}")

268

```

269

270

### Cluster Read View

271

272

Read-only view of cluster state for monitoring and inspection without modification capabilities.

273

274

```scala { .api }

275

/**

276

* Read-only view of the cluster state.

277

* Thread-safe access to current cluster information.

278

*/

279

class ClusterReadView(cluster: Cluster) {

280

/** Current cluster state snapshot */

281

def state: CurrentClusterState

282

283

/** This member's information */

284

def self: Member

285

286

/** Current cluster members */

287

def members: immutable.SortedSet[Member]

288

289

/** Unreachable members */

290

def unreachable: Set[Member]

291

292

/** Current leader address */

293

def leader: Option[Address]

294

295

/** Check if cluster has converged (all members seen latest gossip) */

296

def isConverged: Boolean

297

298

/** Check if this node is the leader */

299

def isLeader: Boolean

300

}

301

```

302

303

**Usage Examples:**

304

305

```scala

306

val readView = cluster.readView

307

308

// Monitor cluster health

309

def printClusterHealth(): Unit = {

310

val state = readView.state

311

println(s"=== Cluster Health ===")

312

println(s"Total members: ${state.members.size}")

313

println(s"Unreachable: ${state.unreachable.size}")

314

println(s"Leader: ${state.leader.getOrElse("None")}")

315

println(s"Converged: ${readView.isConverged}")

316

println(s"Am I leader: ${readView.isLeader}")

317

318

// Members by status

319

val membersByStatus = state.members.groupBy(_.status)

320

membersByStatus.foreach { case (status, members) =>

321

println(s"$status: ${members.size}")

322

}

323

}

324

325

// Scheduled health check

326

import scala.concurrent.duration._

327

system.scheduler.scheduleWithFixedDelay(30.seconds, 30.seconds) { () =>

328

printClusterHealth()

329

}

330

```

331

332

### Configuration Validation

333

334

Configuration compatibility checking when nodes join the cluster.

335

336

```scala { .api }

337

/**

338

* Configuration validation result

339

*/

340

sealed trait ConfigValidation

341

case object Valid extends ConfigValidation

342

case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation

343

344

/**

345

* Service provider interface for join configuration validation

346

*/

347

abstract class JoinConfigCompatChecker {

348

/** Configuration keys that must be validated */

349

def requiredKeys: immutable.Seq[String]

350

351

/** Check configuration compatibility */

352

def check(toCheck: Config, actualConfig: Config): ConfigValidation

353

}

354

355

object JoinConfigCompatChecker {

356

/** Check if required keys exist */

357

def exists(requiredKeys: immutable.Seq[String], toCheck: Config): ConfigValidation

358

359

/** Check if configurations match exactly */

360

def fullMatch(requiredKeys: immutable.Seq[String], toCheck: Config, actualConfig: Config): ConfigValidation

361

}

362

```

363

364

**Usage Examples:**

365

366

```scala

367

// Custom configuration validator

368

class MyConfigChecker(system: ActorSystem, settings: ClusterSettings) extends JoinConfigCompatChecker {

369

val requiredKeys = List(

370

"akka.actor.provider",

371

"akka.cluster.roles",

372

"my-app.version"

373

)

374

375

def check(toCheck: Config, actualConfig: Config): ConfigValidation = {

376

// Check that joining node has compatible configuration

377

val existsCheck = JoinConfigCompatChecker.exists(requiredKeys, toCheck)

378

379

existsCheck match {

380

case Valid =>

381

// Additional custom validation

382

val joiningVersion = toCheck.getString("my-app.version")

383

val myVersion = actualConfig.getString("my-app.version")

384

385

if (joiningVersion == myVersion) Valid

386

else Invalid(List(s"Version mismatch: $joiningVersion != $myVersion"))

387

388

case invalid => invalid

389

}

390

}

391

}

392

393

// Configuration in application.conf

394

// akka.cluster.configuration-compatibility-check.checker-class = "com.myapp.MyConfigChecker"

395

```

396

397

### Failure Detection Configuration

398

399

Configurable failure detection for monitoring cluster member health.

400

401

```scala { .api }

402

// Failure detector configuration options

403

// akka.cluster.failure-detector.implementation-class

404

// akka.cluster.failure-detector.heartbeat-interval

405

// akka.cluster.failure-detector.threshold

406

// akka.cluster.failure-detector.max-sample-size

407

// akka.cluster.failure-detector.min-std-deviation

408

// akka.cluster.failure-detector.acceptable-heartbeat-pause

409

// akka.cluster.failure-detector.monitored-by-nr-of-members

410

```

411

412

**Failure Detection Examples:**

413

414

```hocon

415

# Phi Accrual Failure Detector (default)

416

akka.cluster.failure-detector {

417

implementation-class = "akka.remote.PhiAccrualFailureDetector"

418

heartbeat-interval = 1s

419

threshold = 8.0 # Higher = more tolerant to network issues

420

max-sample-size = 1000

421

min-std-deviation = 100ms

422

acceptable-heartbeat-pause = 3s # Allow GC pauses

423

monitored-by-nr-of-members = 5 # Members monitoring each member

424

}

425

426

# Deadline Failure Detector (simpler, less adaptive)

427

akka.cluster.failure-detector {

428

implementation-class = "akka.remote.DeadlineFailureDetector"

429

heartbeat-interval = 1s

430

acceptable-heartbeat-pause = 10s # Fixed timeout

431

}

432

```

433

434

## Types

435

436

```scala { .api }

437

// Configuration types

438

class ClusterSettings(config: Config, systemName: String)

439

type DataCenter = String

440

441

// JMX management interface

442

trait ClusterNodeMBean

443

444

// Configuration validation

445

sealed trait ConfigValidation

446

case object Valid extends ConfigValidation

447

case class Invalid(errorMessages: immutable.Seq[String]) extends ConfigValidation

448

449

// Read-only cluster view

450

class ClusterReadView(cluster: Cluster)

451

452

// Settings constants

453

object ClusterSettings {

454

val DefaultDataCenter: String

455

val DcRolePrefix: String

456

type DataCenter = String

457

}

458

```