or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster-management.mdcluster-routing.mdconfiguration-and-management.mdevents-and-state.mdextensibility.mdindex.mdmembers-and-status.md

events-and-state.mddocs/

0

# Events and State

1

2

Comprehensive event system for monitoring cluster state changes, member lifecycle events, and reachability updates. The Akka Cluster event system provides fine-grained notifications about all aspects of cluster membership and health.

3

4

## Capabilities

5

6

### Current Cluster State

7

8

Snapshot of the current cluster state containing all members, their status, and leadership information.

9

10

```scala { .api }

11

/**

12

* Current snapshot state of the cluster. Sent to new subscriber.

13

* @param leader leader of the data center of this node

14

*/

15

class CurrentClusterState(

16

val members: immutable.SortedSet[Member],

17

val unreachable: Set[Member],

18

val seenBy: Set[Address],

19

val leader: Option[Address],

20

val roleLeaderMap: Map[String, Option[Address]],

21

val unreachableDataCenters: Set[DataCenter]

22

) {

23

/** Get current leader for specific role */

24

def roleLeader(role: String): Option[Address]

25

26

/** All node roles in the cluster */

27

def allRoles: Set[String]

28

29

/** All data centers in the cluster */

30

def allDataCenters: Set[String]

31

32

/** Java API: get current member list */

33

def getMembers: java.lang.Iterable[Member]

34

35

/** Java API: get current unreachable set */

36

def getUnreachable: java.util.Set[Member]

37

38

/** Java API: get current leader address, or null if none */

39

def getLeader: Address

40

41

/** Java API: All data centers in the cluster */

42

def getAllDataCenters: java.util.Set[String]

43

44

/** Java API: All unreachable data centers in the cluster */

45

def getUnreachableDataCenters: java.util.Set[String]

46

}

47

```

48

49

**Usage Example:**

50

51

```scala

52

val state = cluster.state

53

println(s"Total members: ${state.members.size}")

54

println(s"Unreachable members: ${state.unreachable.size}")

55

println(s"Current leader: ${state.leader}")

56

println(s"All roles: ${state.allRoles}")

57

58

// Check for specific member

59

val targetAddress = Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2551)

60

val targetMember = state.members.find(_.address == targetAddress)

61

targetMember.foreach(m => println(s"Target member status: ${m.status}"))

62

```

63

64

### Base Event Types

65

66

Core event interfaces that all cluster events implement.

67

68

```scala { .api }

69

/**

70

* Marker interface for cluster domain events.

71

* Not intended for user extension.

72

*/

73

trait ClusterDomainEvent extends DeadLetterSuppression

74

75

/**

76

* Marker interface for membership events.

77

* Published when the state change is first seen on a node.

78

*/

79

sealed trait MemberEvent extends ClusterDomainEvent {

80

def member: Member

81

}

82

83

/**

84

* Marker interface to facilitate subscription of both UnreachableMember and ReachableMember.

85

*/

86

sealed trait ReachabilityEvent extends ClusterDomainEvent {

87

def member: Member

88

}

89

90

/**

91

* Marker interface for data center reachability events.

92

*/

93

sealed trait DataCenterReachabilityEvent extends ClusterDomainEvent

94

```

95

96

### Member Lifecycle Events

97

98

Events fired when members join, leave, or change status in the cluster.

99

100

```scala { .api }

101

/** Member status changed to Joining */

102

case class MemberJoined(member: Member) extends MemberEvent

103

104

/**

105

* Member status changed to WeaklyUp.

106

* A joining member can be moved to WeaklyUp if convergence

107

* cannot be reached, i.e. there are unreachable nodes.

108

* It will be moved to Up when convergence is reached.

109

*/

110

case class MemberWeaklyUp(member: Member) extends MemberEvent

111

112

/** Member status changed to Up */

113

case class MemberUp(member: Member) extends MemberEvent

114

115

/** Member status changed to Leaving */

116

case class MemberLeft(member: Member) extends MemberEvent

117

118

/**

119

* Member status changed to MemberStatus.Exiting and will be removed

120

* when all members have seen the Exiting status.

121

*/

122

case class MemberExited(member: Member) extends MemberEvent

123

124

/**

125

* Member status changed to MemberStatus.Down and will be removed

126

* when all members have seen the Down status.

127

*/

128

case class MemberDowned(member: Member) extends MemberEvent

129

130

/**

131

* Member completely removed from the cluster.

132

* When previousStatus is MemberStatus.Down the node was removed

133

* after being detected as unreachable and downed.

134

* When previousStatus is MemberStatus.Exiting the node was removed

135

* after graceful leaving and exiting.

136

*/

137

case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent

138

```

139

140

**Usage Example:**

141

142

```scala

143

import akka.cluster.ClusterEvent._

144

145

class ClusterListener extends Actor with ActorLogging {

146

def receive = {

147

case MemberUp(member) =>

148

log.info("Member is Up: {}", member.address)

149

150

case MemberJoined(member) =>

151

log.info("Member joined: {}", member.address)

152

153

case MemberLeft(member) =>

154

log.info("Member left: {}", member.address)

155

156

case MemberExited(member) =>

157

log.info("Member exited: {}", member.address)

158

159

case MemberRemoved(member, previousStatus) =>

160

log.info("Member removed: {} (was: {})", member.address, previousStatus)

161

162

case _: MemberEvent => // ignore

163

}

164

}

165

```

166

167

### Leadership Events

168

169

Events related to cluster leadership changes at both cluster and role levels.

170

171

```scala { .api }

172

/**

173

* Leader of the cluster data center of this node changed.

174

* Published when the state change is first seen on a node.

175

*/

176

case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent {

177

/** Java API: get address of current leader, or null if none */

178

def getLeader: Address = leader.orNull

179

}

180

181

/**

182

* First member (leader) of the members within a role set changed.

183

* Published when the state change is first seen on a node.

184

*/

185

case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent {

186

/** Java API: get address of current leader, or null if none */

187

def getLeader: Address = leader.orNull

188

}

189

```

190

191

**Usage Example:**

192

193

```scala

194

case LeaderChanged(newLeader) =>

195

newLeader match {

196

case Some(address) => log.info("New leader: {}", address)

197

case None => log.info("No leader currently")

198

}

199

200

case RoleLeaderChanged(role, newLeader) =>

201

log.info("Role '{}' leader changed to: {}", role, newLeader.getOrElse("none"))

202

```

203

204

### Reachability Events

205

206

Events indicating when cluster members become reachable or unreachable from the perspective of the failure detector.

207

208

```scala { .api }

209

/** A member is considered as unreachable by the failure detector */

210

case class UnreachableMember(member: Member) extends ReachabilityEvent

211

212

/**

213

* A member is considered as reachable by the failure detector

214

* after having been unreachable.

215

*/

216

case class ReachableMember(member: Member) extends ReachabilityEvent

217

```

218

219

**Usage Example:**

220

221

```scala

222

case UnreachableMember(member) =>

223

log.warning("Member became unreachable: {}", member.address)

224

// Maybe take action like redistributing work

225

226

case ReachableMember(member) =>

227

log.info("Member became reachable again: {}", member.address)

228

// Member is back, can route work to it again

229

```

230

231

### Data Center Events

232

233

Events for multi-data center clusters indicating when entire data centers become unreachable.

234

235

```scala { .api }

236

/** A data center is considered as unreachable when any members from the data center are unreachable */

237

case class UnreachableDataCenter(dataCenter: DataCenter) extends DataCenterReachabilityEvent

238

239

/** A data center is considered reachable when all members from the data center are reachable */

240

case class ReachableDataCenter(dataCenter: DataCenter) extends DataCenterReachabilityEvent

241

```

242

243

**Usage Example:**

244

245

```scala

246

case UnreachableDataCenter(dc) =>

247

log.warning("Data center became unreachable: {}", dc)

248

// Adjust routing to avoid unreachable DC

249

250

case ReachableDataCenter(dc) =>

251

log.info("Data center became reachable: {}", dc)

252

// Can route to this DC again

253

```

254

255

### Shutdown Events

256

257

Events indicating cluster shutdown.

258

259

```scala { .api }

260

/**

261

* This event is published when the cluster node is shutting down,

262

* before the final MemberRemoved events are published.

263

*/

264

case object ClusterShuttingDown extends ClusterDomainEvent

265

```

266

267

**Usage Example:**

268

269

```scala

270

case ClusterShuttingDown =>

271

log.info("Cluster is shutting down")

272

// Perform cleanup before final shutdown

273

```

274

275

### Event Subscription

276

277

Methods for subscribing to cluster events with different initial state modes.

278

279

```scala { .api }

280

/**

281

* When using this subscription mode a snapshot of

282

* CurrentClusterState will be sent to the subscriber as the first message.

283

*/

284

case object InitialStateAsSnapshot extends SubscriptionInitialStateMode

285

286

/**

287

* When using this subscription mode the events corresponding

288

* to the current state will be sent to the subscriber to mimic what you would

289

* have seen if you were listening to the events when they occurred in the past.

290

*/

291

case object InitialStateAsEvents extends SubscriptionInitialStateMode

292

```

293

294

**Usage Examples:**

295

296

```scala

297

// Subscribe with snapshot - get current state immediately

298

cluster.subscribe(listener, InitialStateAsSnapshot, classOf[MemberEvent])

299

300

// Subscribe with event replay - replay all past events to current state

301

cluster.subscribe(listener, InitialStateAsEvents, classOf[MemberEvent])

302

303

// Subscribe to multiple event types

304

cluster.subscribe(listener,

305

classOf[MemberEvent],

306

classOf[ReachabilityEvent],

307

classOf[LeaderChanged])

308

309

// Subscribe to all cluster events

310

cluster.subscribe(listener, classOf[ClusterDomainEvent])

311

```

312

313

### Complete Event Handling Example

314

315

```scala

316

import akka.cluster.ClusterEvent._

317

318

class ComprehensiveClusterListener extends Actor with ActorLogging {

319

override def preStart(): Unit = {

320

val cluster = Cluster(context.system)

321

cluster.subscribe(self, InitialStateAsSnapshot,

322

classOf[MemberEvent],

323

classOf[ReachabilityEvent],

324

classOf[LeaderChanged],

325

classOf[RoleLeaderChanged],

326

classOf[DataCenterReachabilityEvent])

327

}

328

329

def receive = {

330

// Initial state

331

case state: CurrentClusterState =>

332

log.info("Current cluster state: {} members", state.members.size)

333

state.members.foreach(m => log.info("Member: {} - {}", m.address, m.status))

334

335

// Member lifecycle

336

case MemberUp(member) =>

337

log.info("Member UP: {}", member.address)

338

case MemberJoined(member) =>

339

log.info("Member JOINED: {}", member.address)

340

case MemberLeft(member) =>

341

log.info("Member LEFT: {}", member.address)

342

case MemberExited(member) =>

343

log.info("Member EXITED: {}", member.address)

344

case MemberRemoved(member, previousStatus) =>

345

log.info("Member REMOVED: {} (was {})", member.address, previousStatus)

346

347

// Reachability

348

case UnreachableMember(member) =>

349

log.warning("Member UNREACHABLE: {}", member.address)

350

case ReachableMember(member) =>

351

log.info("Member REACHABLE: {}", member.address)

352

353

// Leadership

354

case LeaderChanged(leader) =>

355

log.info("Leader changed: {}", leader.getOrElse("none"))

356

case RoleLeaderChanged(role, leader) =>

357

log.info("Role '{}' leader changed: {}", role, leader.getOrElse("none"))

358

359

// Data center events

360

case UnreachableDataCenter(dc) =>

361

log.warning("Data center UNREACHABLE: {}", dc)

362

case ReachableDataCenter(dc) =>

363

log.info("Data center REACHABLE: {}", dc)

364

365

// Shutdown

366

case ClusterShuttingDown =>

367

log.info("Cluster shutting down")

368

}

369

370

override def postStop(): Unit = {

371

val cluster = Cluster(context.system)

372

cluster.unsubscribe(self)

373

}

374

}

375

```

376

377

## Types

378

379

```scala { .api }

380

// Subscription initial state modes

381

sealed abstract class SubscriptionInitialStateMode

382

case object InitialStateAsSnapshot extends SubscriptionInitialStateMode

383

case object InitialStateAsEvents extends SubscriptionInitialStateMode

384

385

// Data center type

386

type DataCenter = String

387

```