0
# Event System
1
2
The Akka Cluster event system provides comprehensive monitoring of cluster state changes through a publisher-subscriber model. Applications can subscribe to specific event types to build cluster-aware behavior.
3
4
## Event Subscription
5
6
### Basic Subscription
7
8
Subscribe to cluster events with automatic initial state delivery:
9
10
```scala { .api }
11
// Subscription methods on Cluster
12
def subscribe(subscriber: ActorRef, to: Class[_]*): Unit
13
def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit
14
def unsubscribe(subscriber: ActorRef): Unit
15
def unsubscribe(subscriber: ActorRef, to: Class[_]): Unit
16
def sendCurrentClusterState(receiver: ActorRef): Unit
17
```
18
19
### Subscription Modes
20
21
```scala { .api }
22
sealed abstract class SubscriptionInitialStateMode
23
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
24
case object InitialStateAsEvents extends SubscriptionInitialStateMode
25
26
// Java API
27
def initialStateAsSnapshot: SubscriptionInitialStateMode
28
def initialStateAsEvents: SubscriptionInitialStateMode
29
```
30
31
### Usage Examples
32
33
```scala
34
import akka.cluster.ClusterEvent._
35
36
// Subscribe to all member events with snapshot
37
cluster.subscribe(self, InitialStateAsSnapshot, classOf[MemberEvent])
38
39
// Subscribe to specific events with event replay
40
cluster.subscribe(self, InitialStateAsEvents,
41
classOf[MemberUp], classOf[MemberRemoved], classOf[UnreachableMember])
42
43
// Subscribe to leadership changes
44
cluster.subscribe(self, classOf[LeaderChanged], classOf[RoleLeaderChanged])
45
46
// Unsubscribe from all events
47
cluster.unsubscribe(self)
48
49
// Unsubscribe from specific event type
50
cluster.unsubscribe(self, classOf[MemberEvent])
51
```
52
53
## Current Cluster State
54
55
### State Snapshot
56
57
```scala { .api }
58
case class CurrentClusterState(
59
members: immutable.SortedSet[Member],
60
unreachable: Set[Member],
61
seenBy: Set[Address],
62
leader: Option[Address],
63
roleLeaderMap: Map[String, Option[Address]],
64
unreachableDataCenters: Set[DataCenter],
65
memberTombstones: Set[UniqueAddress] // Internal API
66
) {
67
// Member queries
68
def roleMembers(role: String): immutable.SortedSet[Member]
69
def unreachableMembers(role: String): Set[Member]
70
def allDataCenters: Set[DataCenter]
71
72
// Utility methods
73
def copy(members: immutable.SortedSet[Member] = members,
74
unreachable: Set[Member] = unreachable,
75
seenBy: Set[Address] = seenBy,
76
leader: Option[Address] = leader,
77
roleLeaderMap: Map[String, Option[Address]] = roleLeaderMap,
78
unreachableDataCenters: Set[DataCenter] = unreachableDataCenters): CurrentClusterState
79
80
def copyUnreachable(unreachable: Set[Member]): CurrentClusterState
81
82
// Java API
83
def getMembers: java.lang.Iterable[Member]
84
def getUnreachable: java.util.Set[Member]
85
def getRoleLeaderMap: java.util.Map[String, Address]
86
def getAllDataCenters: java.util.Set[DataCenter]
87
def getUnreachableDataCenters: java.util.Set[DataCenter]
88
}
89
```
90
91
### State Access
92
93
```scala
94
// Receive current state as first message after subscription
95
def receive = {
96
case state: CurrentClusterState =>
97
println(s"Current members: ${state.members.size}")
98
println(s"Leader: ${state.leader}")
99
println(s"Unreachable: ${state.unreachable.size}")
100
101
// Check role leadership
102
state.roleLeaderMap.foreach { case (role, leaderOpt) =>
103
println(s"Leader for role '$role': ${leaderOpt.getOrElse("None")}")
104
}
105
106
case other => // Handle events
107
}
108
```
109
110
## Member Events
111
112
### Base Member Event
113
114
```scala { .api }
115
trait MemberEvent extends ClusterDomainEvent {
116
def member: Member
117
}
118
```
119
120
### Member Lifecycle Events
121
122
```scala { .api }
123
case class MemberJoined(member: Member) extends MemberEvent
124
case class MemberWeaklyUp(member: Member) extends MemberEvent
125
case class MemberUp(member: Member) extends MemberEvent
126
case class MemberLeft(member: Member) extends MemberEvent
127
case class MemberPreparingForShutdown(member: Member) extends MemberEvent
128
case class MemberReadyForShutdown(member: Member) extends MemberEvent
129
case class MemberExited(member: Member) extends MemberEvent
130
case class MemberDowned(member: Member) extends MemberEvent
131
case class MemberRemoved(member: Member, previousStatus: MemberStatus) extends MemberEvent
132
```
133
134
### Member Event Handling
135
136
```scala
137
def receive = {
138
case MemberJoined(member) =>
139
println(s"Member joined: ${member.address}")
140
141
case MemberUp(member) =>
142
println(s"Member is Up: ${member.address}")
143
if (member.hasRole("backend")) {
144
// Initialize backend-specific communication
145
}
146
147
case MemberLeft(member) =>
148
println(s"Member is leaving: ${member.address}")
149
// Cleanup resources for this member
150
151
case MemberRemoved(member, previousStatus) =>
152
println(s"Member removed: ${member.address}, was: $previousStatus")
153
// Final cleanup
154
155
case MemberDowned(member) =>
156
println(s"Member marked as down: ${member.address}")
157
// Handle failure scenario
158
}
159
```
160
161
## Leadership Events
162
163
### Leadership Change Events
164
165
```scala { .api }
166
case class LeaderChanged(leader: Option[Address]) extends ClusterDomainEvent
167
case class RoleLeaderChanged(role: String, leader: Option[Address]) extends ClusterDomainEvent
168
```
169
170
### Leadership Event Handling
171
172
```scala
173
def receive = {
174
case LeaderChanged(Some(leader)) =>
175
println(s"New cluster leader: $leader")
176
if (leader == cluster.selfAddress) {
177
println("This node is now the leader")
178
// Start leader-specific tasks
179
}
180
181
case LeaderChanged(None) =>
182
println("No cluster leader currently")
183
184
case RoleLeaderChanged(role, Some(leader)) =>
185
println(s"New leader for role '$role': $leader")
186
if (cluster.selfMember.hasRole(role) && leader == cluster.selfAddress) {
187
// This node is now leader for this role
188
}
189
190
case RoleLeaderChanged(role, None) =>
191
println(s"No leader for role '$role'")
192
}
193
```
194
195
## Reachability Events
196
197
### Reachability Event Types
198
199
```scala { .api }
200
case class UnreachableMember(member: Member) extends ClusterDomainEvent
201
case class ReachableMember(member: Member) extends ClusterDomainEvent
202
case class UnreachableDataCenter(dataCenter: DataCenter) extends ClusterDomainEvent
203
case class ReachableDataCenter(dataCenter: DataCenter) extends ClusterDomainEvent
204
```
205
206
### Reachability Event Handling
207
208
```scala
209
def receive = {
210
case UnreachableMember(member) =>
211
println(s"Member unreachable: ${member.address}")
212
// Stop sending work to this member
213
// Potentially trigger failure handling
214
215
case ReachableMember(member) =>
216
println(s"Member reachable again: ${member.address}")
217
// Resume sending work to this member
218
219
case UnreachableDataCenter(dc) =>
220
println(s"Data center unreachable: $dc")
221
// Handle cross-DC partition
222
223
case ReachableDataCenter(dc) =>
224
println(s"Data center reachable again: $dc")
225
// Resume cross-DC operations
226
}
227
```
228
229
## Shutdown Events
230
231
### Cluster Shutdown Event
232
233
```scala { .api }
234
case object ClusterShuttingDown extends ClusterDomainEvent
235
```
236
237
### Shutdown Event Handling
238
239
```scala
240
def receive = {
241
case ClusterShuttingDown =>
242
println("Cluster is shutting down")
243
// Prepare for shutdown, save state, etc.
244
// This is the last cluster event that will be delivered
245
}
246
```
247
248
## Complete Event Handler Example
249
250
```scala
251
import akka.actor.{Actor, ActorLogging}
252
import akka.cluster.{Cluster, ClusterEvent}
253
import akka.cluster.ClusterEvent._
254
255
class ClusterListener extends Actor with ActorLogging {
256
val cluster = Cluster(context.system)
257
258
override def preStart(): Unit = {
259
cluster.subscribe(self, InitialStateAsSnapshot,
260
classOf[MemberEvent], classOf[UnreachableMember])
261
}
262
263
override def postStop(): Unit = {
264
cluster.unsubscribe(self)
265
}
266
267
def receive = {
268
case state: CurrentClusterState =>
269
log.info("Current members: {}", state.members.mkString(", "))
270
271
case MemberUp(member) =>
272
log.info("Member is Up: {}", member.address)
273
registerMember(member)
274
275
case MemberRemoved(member, previousStatus) =>
276
log.info("Member is Removed: {} after {}", member.address, previousStatus)
277
deregisterMember(member)
278
279
case UnreachableMember(member) =>
280
log.info("Member detected as unreachable: {}", member)
281
handleUnreachableMember(member)
282
283
case _: MemberEvent => // Ignore other member events
284
}
285
286
def registerMember(member: Member): Unit = {
287
// Application-specific member registration
288
}
289
290
def deregisterMember(member: Member): Unit = {
291
// Application-specific member cleanup
292
}
293
294
def handleUnreachableMember(member: Member): Unit = {
295
// Handle member unreachability
296
}
297
}
298
```
299
300
## Event Filtering Patterns
301
302
### Role-Based Filtering
303
304
```scala
305
def receive = {
306
case MemberUp(member) if member.hasRole("worker") =>
307
// Only handle worker nodes coming up
308
addWorkerNode(member)
309
310
case MemberRemoved(member, _) if member.hasRole("coordinator") =>
311
// Special handling for coordinator removal
312
handleCoordinatorRemoval(member)
313
}
314
```
315
316
### Data Center Filtering
317
318
```scala
319
def receive = {
320
case MemberUp(member) if member.dataCenter == cluster.selfDataCenter =>
321
// Only handle members in same data center
322
handleLocalMemberUp(member)
323
324
case MemberUp(member) =>
325
// Handle remote data center members differently
326
handleRemoteMemberUp(member)
327
}
328
```
329
330
## Event Delivery Guarantees
331
332
- Events are delivered in cluster state order
333
- No events are lost once subscription is established
334
- `CurrentClusterState` is always delivered first (with `InitialStateAsSnapshot`)
335
- Events are delivered on the subscriber's actor thread
336
- Unsubscription stops all future event delivery
337
- Events are not delivered to terminated actors