or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster-management.mdcluster-routing.mdconfiguration.mdevent-system.mdindex.mdmember-management.mdsplit-brain-resolution.md

event-system.mddocs/

0

# Event System

1

2

The Akka Cluster event system provides comprehensive monitoring of cluster state changes through a publisher-subscriber model. Applications can subscribe to specific event types to build cluster-aware behavior.

3

4

## Event Subscription

5

6

### Basic Subscription

7

8

Subscribe to cluster events with automatic initial state delivery:

9

10

```scala { .api }

11

// Subscription methods on Cluster

12

def subscribe(subscriber: ActorRef, to: Class[_]*): Unit

13

def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit

14

def unsubscribe(subscriber: ActorRef): Unit

15

def unsubscribe(subscriber: ActorRef, to: Class[_]): Unit

16

def sendCurrentClusterState(receiver: ActorRef): Unit

17

```

18

19

### Subscription Modes

20

21

```scala { .api }

22

sealed abstract class SubscriptionInitialStateMode

23

case object InitialStateAsSnapshot extends SubscriptionInitialStateMode

24

case object InitialStateAsEvents extends SubscriptionInitialStateMode

25

26

// Java API

27

def initialStateAsSnapshot: SubscriptionInitialStateMode

28

def initialStateAsEvents: SubscriptionInitialStateMode

29

```

30

31

### Usage Examples

32

33

```scala

34

import akka.cluster.ClusterEvent._

35

36

// Subscribe to all member events with snapshot

37

cluster.subscribe(self, InitialStateAsSnapshot, classOf[MemberEvent])

38

39

// Subscribe to specific events with event replay

40

cluster.subscribe(self, InitialStateAsEvents,

41

classOf[MemberUp], classOf[MemberRemoved], classOf[UnreachableMember])

42

43

// Subscribe to leadership changes

44

cluster.subscribe(self, classOf[LeaderChanged], classOf[RoleLeaderChanged])

45

46

// Unsubscribe from all events

47

cluster.unsubscribe(self)

48

49

// Unsubscribe from specific event type

50

cluster.unsubscribe(self, classOf[MemberEvent])

51

```

52

53

## Current Cluster State

54

55

### State Snapshot

56

57

```scala { .api }

58

case class CurrentClusterState(

59

members: immutable.SortedSet[Member],

60

unreachable: Set[Member],

61

seenBy: Set[Address],

62

leader: Option[Address],

63

roleLeaderMap: Map[String, Option[Address]],

64

unreachableDataCenters: Set[DataCenter],

65

memberTombstones: Set[UniqueAddress] // Internal API

66

) {

67

// Member queries

68

def roleMembers(role: String): immutable.SortedSet[Member]

69

def unreachableMembers(role: String): Set[Member]

70

def allDataCenters: Set[DataCenter]

71

72

// Utility methods

73

def copy(members: immutable.SortedSet[Member] = members,

74

unreachable: Set[Member] = unreachable,

75

seenBy: Set[Address] = seenBy,

76

leader: Option[Address] = leader,

77

roleLeaderMap: Map[String, Option[Address]] = roleLeaderMap,

78

unreachableDataCenters: Set[DataCenter] = unreachableDataCenters): CurrentClusterState

79

80

def copyUnreachable(unreachable: Set[Member]): CurrentClusterState

81

82

// Java API

83

def getMembers: java.lang.Iterable[Member]

84

def getUnreachable: java.util.Set[Member]

85

def getRoleLeaderMap: java.util.Map[String, Address]

86

def getAllDataCenters: java.util.Set[DataCenter]

87

def getUnreachableDataCenters: java.util.Set[DataCenter]

88

}

89

```

90

91

### State Access

92

93

```scala

94

// Receive current state as first message after subscription

95

def receive = {

96

case state: CurrentClusterState =>

97

println(s"Current members: ${state.members.size}")

98

println(s"Leader: ${state.leader}")

99

println(s"Unreachable: ${state.unreachable.size}")

100

101

// Check role leadership

102

state.roleLeaderMap.foreach { case (role, leaderOpt) =>

103

println(s"Leader for role '$role': ${leaderOpt.getOrElse("None")}")

104

}

105

106

case other => // Handle events

107

}

108

```

109

110

## Member Events

111

112

### Base Member Event

113

114

```scala { .api }

115

trait MemberEvent extends ClusterDomainEvent {

116

def member: Member

117

}

118

```

119

120

### Member Lifecycle Events

121

122

```scala { .api }

123

case class MemberJoined(member: Member) extends MemberEvent

124

case class MemberWeaklyUp(member: Member) extends MemberEvent

125

case class MemberUp(member: Member) extends MemberEvent

126

case class MemberLeft(member: Member) extends MemberEvent

127

case class MemberPreparingForShutdown(member: Member) extends MemberEvent

128

case class MemberReadyForShutdown(member: Member) extends MemberEvent

129

case class MemberExited(member: Member) extends MemberEvent

130

case class MemberDowned(member: Member) extends MemberEvent

131

case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent

132

```

133

134

### Member Event Handling

135

136

```scala

137

def receive = {

138

case MemberJoined(member) =>

139

println(s"Member joined: ${member.address}")

140

141

case MemberUp(member) =>

142

println(s"Member is Up: ${member.address}")

143

if (member.hasRole("backend")) {

144

// Initialize backend-specific communication

145

}

146

147

case MemberLeft(member) =>

148

println(s"Member is leaving: ${member.address}")

149

// Cleanup resources for this member

150

151

case MemberRemoved(member, previousStatus) =>

152

println(s"Member removed: ${member.address}, was: $previousStatus")

153

// Final cleanup

154

155

case MemberDowned(member) =>

156

println(s"Member marked as down: ${member.address}")

157

// Handle failure scenario

158

}

159

```

160

161

## Leadership Events

162

163

### Leadership Change Events

164

165

```scala { .api }

166

case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent

167

case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent

168

```

169

170

### Leadership Event Handling

171

172

```scala

173

def receive = {

174

case LeaderChanged(Some(leader)) =>

175

println(s"New cluster leader: $leader")

176

if (leader == cluster.selfAddress) {

177

println("This node is now the leader")

178

// Start leader-specific tasks

179

}

180

181

case LeaderChanged(None) =>

182

println("No cluster leader currently")

183

184

case RoleLeaderChanged(role, Some(leader)) =>

185

println(s"New leader for role '$role': $leader")

186

if (cluster.selfMember.hasRole(role) && leader == cluster.selfAddress) {

187

// This node is now leader for this role

188

}

189

190

case RoleLeaderChanged(role, None) =>

191

println(s"No leader for role '$role'")

192

}

193

```

194

195

## Reachability Events

196

197

### Reachability Event Types

198

199

```scala { .api }

200

case class UnreachableMember(member: Member) extends ClusterDomainEvent

201

case class ReachableMember(member: Member) extends ClusterDomainEvent

202

case class UnreachableDataCenter(dataCenter: DataCenter) extends ClusterDomainEvent

203

case class ReachableDataCenter(dataCenter: DataCenter) extends ClusterDomainEvent

204

```

205

206

### Reachability Event Handling

207

208

```scala

209

def receive = {

210

case UnreachableMember(member) =>

211

println(s"Member unreachable: ${member.address}")

212

// Stop sending work to this member

213

// Potentially trigger failure handling

214

215

case ReachableMember(member) =>

216

println(s"Member reachable again: ${member.address}")

217

// Resume sending work to this member

218

219

case UnreachableDataCenter(dc) =>

220

println(s"Data center unreachable: $dc")

221

// Handle cross-DC partition

222

223

case ReachableDataCenter(dc) =>

224

println(s"Data center reachable again: $dc")

225

// Resume cross-DC operations

226

}

227

```

228

229

## Shutdown Events

230

231

### Cluster Shutdown Event

232

233

```scala { .api }

234

case object ClusterShuttingDown extends ClusterDomainEvent

235

```

236

237

### Shutdown Event Handling

238

239

```scala

240

def receive = {

241

case ClusterShuttingDown =>

242

println("Cluster is shutting down")

243

// Prepare for shutdown, save state, etc.

244

// This is the last cluster event that will be delivered

245

}

246

```

247

248

## Complete Event Handler Example

249

250

```scala

251

import akka.actor.{Actor, ActorLogging}

252

import akka.cluster.{Cluster, ClusterEvent}

253

import akka.cluster.ClusterEvent._

254

255

class ClusterListener extends Actor with ActorLogging {

256

val cluster = Cluster(context.system)

257

258

override def preStart(): Unit = {

259

cluster.subscribe(self, InitialStateAsSnapshot,

260

classOf[MemberEvent], classOf[UnreachableMember])

261

}

262

263

override def postStop(): Unit = {

264

cluster.unsubscribe(self)

265

}

266

267

def receive = {

268

case state: CurrentClusterState =>

269

log.info("Current members: {}", state.members.mkString(", "))

270

271

case MemberUp(member) =>

272

log.info("Member is Up: {}", member.address)

273

registerMember(member)

274

275

case MemberRemoved(member, previousStatus) =>

276

log.info("Member is Removed: {} after {}", member.address, previousStatus)

277

deregisterMember(member)

278

279

case UnreachableMember(member) =>

280

log.info("Member detected as unreachable: {}", member)

281

handleUnreachableMember(member)

282

283

case _: MemberEvent => // Ignore other member events

284

}

285

286

def registerMember(member: Member): Unit = {

287

// Application-specific member registration

288

}

289

290

def deregisterMember(member: Member): Unit = {

291

// Application-specific member cleanup

292

}

293

294

def handleUnreachableMember(member: Member): Unit = {

295

// Handle member unreachability

296

}

297

}

298

```

299

300

## Event Filtering Patterns

301

302

### Role-Based Filtering

303

304

```scala

305

def receive = {

306

case MemberUp(member) if member.hasRole("worker") =>

307

// Only handle worker nodes coming up

308

addWorkerNode(member)

309

310

case MemberRemoved(member, _) if member.hasRole("coordinator") =>

311

// Special handling for coordinator removal

312

handleCoordinatorRemoval(member)

313

}

314

```

315

316

### Data Center Filtering

317

318

```scala

319

def receive = {

320

case MemberUp(member) if member.dataCenter == cluster.selfDataCenter =>

321

// Only handle members in same data center

322

handleLocalMemberUp(member)

323

324

case MemberUp(member) =>

325

// Handle remote data center members differently

326

handleRemoteMemberUp(member)

327

}

328

```

329

330

## Event Delivery Guarantees

331

332

- Events are delivered in cluster state order

333

- No events are lost once subscription is established

334

- `CurrentClusterState` is always delivered first (with `InitialStateAsSnapshot`)

335

- Events are delivered on the subscriber's actor thread

336

- Unsubscription stops all future event delivery

337

- Events are not delivered to terminated actors