0
# Events and State
1
2
Comprehensive event system for monitoring cluster state changes, member lifecycle events, and reachability updates. The Akka Cluster event system provides fine-grained notifications about all aspects of cluster membership and health.
3
4
## Capabilities
5
6
### Current Cluster State
7
8
Snapshot of the current cluster state containing all members, their status, and leadership information.
9
10
```scala { .api }
11
/**
12
* Current snapshot state of the cluster. Sent to new subscriber.
13
* @param leader leader of the data center of this node
14
*/
15
class CurrentClusterState(
16
val members: immutable.SortedSet[Member],
17
val unreachable: Set[Member],
18
val seenBy: Set[Address],
19
val leader: Option[Address],
20
val roleLeaderMap: Map[String, Option[Address]],
21
val unreachableDataCenters: Set[DataCenter]
22
) {
23
/** Get current leader for specific role */
24
def roleLeader(role: String): Option[Address]
25
26
/** All node roles in the cluster */
27
def allRoles: Set[String]
28
29
/** All data centers in the cluster */
30
def allDataCenters: Set[String]
31
32
/** Java API: get current member list */
33
def getMembers: java.lang.Iterable[Member]
34
35
/** Java API: get current unreachable set */
36
def getUnreachable: java.util.Set[Member]
37
38
/** Java API: get current leader address, or null if none */
39
def getLeader: Address
40
41
/** Java API: All data centers in the cluster */
42
def getAllDataCenters: java.util.Set[String]
43
44
/** Java API: All unreachable data centers in the cluster */
45
def getUnreachableDataCenters: java.util.Set[String]
46
}
47
```
48
49
**Usage Example:**
50
51
```scala
52
val state = cluster.state
53
println(s"Total members: ${state.members.size}")
54
println(s"Unreachable members: ${state.unreachable.size}")
55
println(s"Current leader: ${state.leader}")
56
println(s"All roles: ${state.allRoles}")
57
58
// Check for specific member
59
val targetAddress = Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2551)
60
val targetMember = state.members.find(_.address == targetAddress)
61
targetMember.foreach(m => println(s"Target member status: ${m.status}"))
62
```
63
64
### Base Event Types
65
66
Core event interfaces that all cluster events implement.
67
68
```scala { .api }
69
/**
70
* Marker interface for cluster domain events.
71
* Not intended for user extension.
72
*/
73
trait ClusterDomainEvent extends DeadLetterSuppression
74
75
/**
76
* Marker interface for membership events.
77
* Published when the state change is first seen on a node.
78
*/
79
sealed trait MemberEvent extends ClusterDomainEvent {
80
def member: Member
81
}
82
83
/**
84
* Marker interface to facilitate subscription of both UnreachableMember and ReachableMember.
85
*/
86
sealed trait ReachabilityEvent extends ClusterDomainEvent {
87
def member: Member
88
}
89
90
/**
91
* Marker interface for data center reachability events.
92
*/
93
sealed trait DataCenterReachabilityEvent extends ClusterDomainEvent
94
```
95
96
### Member Lifecycle Events
97
98
Events fired when members join, leave, or change status in the cluster.
99
100
```scala { .api }
101
/** Member status changed to Joining */
102
case class MemberJoined(member: Member) extends MemberEvent
103
104
/**
105
* Member status changed to WeaklyUp.
106
* A joining member can be moved to WeaklyUp if convergence
107
* cannot be reached, i.e. there are unreachable nodes.
108
* It will be moved to Up when convergence is reached.
109
*/
110
case class MemberWeaklyUp(member: Member) extends MemberEvent
111
112
/** Member status changed to Up */
113
case class MemberUp(member: Member) extends MemberEvent
114
115
/** Member status changed to Leaving */
116
case class MemberLeft(member: Member) extends MemberEvent
117
118
/**
119
* Member status changed to MemberStatus.Exiting and will be removed
120
* when all members have seen the Exiting status.
121
*/
122
case class MemberExited(member: Member) extends MemberEvent
123
124
/**
125
* Member status changed to MemberStatus.Down and will be removed
126
* when all members have seen the Down status.
127
*/
128
case class MemberDowned(member: Member) extends MemberEvent
129
130
/**
131
* Member completely removed from the cluster.
132
* When previousStatus is MemberStatus.Down the node was removed
133
* after being detected as unreachable and downed.
134
* When previousStatus is MemberStatus.Exiting the node was removed
135
* after graceful leaving and exiting.
136
*/
137
case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent
138
```
139
140
**Usage Example:**
141
142
```scala
143
import akka.cluster.ClusterEvent._
144
145
class ClusterListener extends Actor with ActorLogging {
146
def receive = {
147
case MemberUp(member) =>
148
log.info("Member is Up: {}", member.address)
149
150
case MemberJoined(member) =>
151
log.info("Member joined: {}", member.address)
152
153
case MemberLeft(member) =>
154
log.info("Member left: {}", member.address)
155
156
case MemberExited(member) =>
157
log.info("Member exited: {}", member.address)
158
159
case MemberRemoved(member, previousStatus) =>
160
log.info("Member removed: {} (was: {})", member.address, previousStatus)
161
162
case _: MemberEvent => // ignore
163
}
164
}
165
```
166
167
### Leadership Events
168
169
Events related to cluster leadership changes at both cluster and role levels.
170
171
```scala { .api }
172
/**
173
* Leader of the cluster data center of this node changed.
174
* Published when the state change is first seen on a node.
175
*/
176
case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent {
177
/** Java API: get address of current leader, or null if none */
178
def getLeader: Address = leader.orNull
179
}
180
181
/**
182
* First member (leader) of the members within a role set changed.
183
* Published when the state change is first seen on a node.
184
*/
185
case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent {
186
/** Java API: get address of current leader, or null if none */
187
def getLeader: Address = leader.orNull
188
}
189
```
190
191
**Usage Example:**
192
193
```scala
194
case LeaderChanged(newLeader) =>
195
newLeader match {
196
case Some(address) => log.info("New leader: {}", address)
197
case None => log.info("No leader currently")
198
}
199
200
case RoleLeaderChanged(role, newLeader) =>
201
log.info("Role '{}' leader changed to: {}", role, newLeader.getOrElse("none"))
202
```
203
204
### Reachability Events
205
206
Events indicating when cluster members become reachable or unreachable from the perspective of the failure detector.
207
208
```scala { .api }
209
/** A member is considered as unreachable by the failure detector */
210
case class UnreachableMember(member: Member) extends ReachabilityEvent
211
212
/**
213
* A member is considered as reachable by the failure detector
214
* after having been unreachable.
215
*/
216
case class ReachableMember(member: Member) extends ReachabilityEvent
217
```
218
219
**Usage Example:**
220
221
```scala
222
case UnreachableMember(member) =>
223
log.warning("Member became unreachable: {}", member.address)
224
// Maybe take action like redistributing work
225
226
case ReachableMember(member) =>
227
log.info("Member became reachable again: {}", member.address)
228
// Member is back, can route work to it again
229
```
230
231
### Data Center Events
232
233
Events for multi-data center clusters indicating when entire data centers become unreachable.
234
235
```scala { .api }
236
/** A data center is considered as unreachable when any members from the data center are unreachable */
237
case class UnreachableDataCenter(dataCenter: DataCenter) extends DataCenterReachabilityEvent
238
239
/** A data center is considered reachable when all members from the data center are reachable */
240
case class ReachableDataCenter(dataCenter: DataCenter) extends DataCenterReachabilityEvent
241
```
242
243
**Usage Example:**
244
245
```scala
246
case UnreachableDataCenter(dc) =>
247
log.warning("Data center became unreachable: {}", dc)
248
// Adjust routing to avoid unreachable DC
249
250
case ReachableDataCenter(dc) =>
251
log.info("Data center became reachable: {}", dc)
252
// Can route to this DC again
253
```
254
255
### Shutdown Events
256
257
Events indicating cluster shutdown.
258
259
```scala { .api }
260
/**
261
* This event is published when the cluster node is shutting down,
262
* before the final MemberRemoved events are published.
263
*/
264
case object ClusterShuttingDown extends ClusterDomainEvent
265
```
266
267
**Usage Example:**
268
269
```scala
270
case ClusterShuttingDown =>
271
log.info("Cluster is shutting down")
272
// Perform cleanup before final shutdown
273
```
274
275
### Event Subscription
276
277
Methods for subscribing to cluster events with different initial state modes.
278
279
```scala { .api }
280
/**
281
* When using this subscription mode a snapshot of
282
* CurrentClusterState will be sent to the subscriber as the first message.
283
*/
284
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
285
286
/**
287
* When using this subscription mode the events corresponding
288
* to the current state will be sent to the subscriber to mimic what you would
289
* have seen if you were listening to the events when they occurred in the past.
290
*/
291
case object InitialStateAsEvents extends SubscriptionInitialStateMode
292
```
293
294
**Usage Examples:**
295
296
```scala
297
// Subscribe with snapshot - get current state immediately
298
cluster.subscribe(listener, InitialStateAsSnapshot, classOf[MemberEvent])
299
300
// Subscribe with event replay - replay all past events to current state
301
cluster.subscribe(listener, InitialStateAsEvents, classOf[MemberEvent])
302
303
// Subscribe to multiple event types
304
cluster.subscribe(listener,
305
classOf[MemberEvent],
306
classOf[ReachabilityEvent],
307
classOf[LeaderChanged])
308
309
// Subscribe to all cluster events
310
cluster.subscribe(listener, classOf[ClusterDomainEvent])
311
```
312
313
### Complete Event Handling Example
314
315
```scala
316
import akka.cluster.ClusterEvent._
317
318
class ComprehensiveClusterListener extends Actor with ActorLogging {
319
override def preStart(): Unit = {
320
val cluster = Cluster(context.system)
321
cluster.subscribe(self, InitialStateAsSnapshot,
322
classOf[MemberEvent],
323
classOf[ReachabilityEvent],
324
classOf[LeaderChanged],
325
classOf[RoleLeaderChanged],
326
classOf[DataCenterReachabilityEvent])
327
}
328
329
def receive = {
330
// Initial state
331
case state: CurrentClusterState =>
332
log.info("Current cluster state: {} members", state.members.size)
333
state.members.foreach(m => log.info("Member: {} - {}", m.address, m.status))
334
335
// Member lifecycle
336
case MemberUp(member) =>
337
log.info("Member UP: {}", member.address)
338
case MemberJoined(member) =>
339
log.info("Member JOINED: {}", member.address)
340
case MemberLeft(member) =>
341
log.info("Member LEFT: {}", member.address)
342
case MemberExited(member) =>
343
log.info("Member EXITED: {}", member.address)
344
case MemberRemoved(member, previousStatus) =>
345
log.info("Member REMOVED: {} (was {})", member.address, previousStatus)
346
347
// Reachability
348
case UnreachableMember(member) =>
349
log.warning("Member UNREACHABLE: {}", member.address)
350
case ReachableMember(member) =>
351
log.info("Member REACHABLE: {}", member.address)
352
353
// Leadership
354
case LeaderChanged(leader) =>
355
log.info("Leader changed: {}", leader.getOrElse("none"))
356
case RoleLeaderChanged(role, leader) =>
357
log.info("Role '{}' leader changed: {}", role, leader.getOrElse("none"))
358
359
// Data center events
360
case UnreachableDataCenter(dc) =>
361
log.warning("Data center UNREACHABLE: {}", dc)
362
case ReachableDataCenter(dc) =>
363
log.info("Data center REACHABLE: {}", dc)
364
365
// Shutdown
366
case ClusterShuttingDown =>
367
log.info("Cluster shutting down")
368
}
369
370
override def postStop(): Unit = {
371
val cluster = Cluster(context.system)
372
cluster.unsubscribe(self)
373
}
374
}
375
```
376
377
## Types
378
379
```scala { .api }
380
// Subscription initial state modes
381
sealed abstract class SubscriptionInitialStateMode
382
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
383
case object InitialStateAsEvents extends SubscriptionInitialStateMode
384
385
// Data center type
386
type DataCenter = String
387
```