0
# Cluster Management
1
2
Core cluster operations for joining, leaving, and managing cluster membership. This provides the fundamental functionality for setting up and managing distributed actor systems using Akka Cluster.
3
4
## Capabilities
5
6
### Cluster Extension Access
7
8
Access the cluster extension instance from an ActorSystem.
9
10
```scala { .api }
11
/**
12
* Cluster Extension Id and factory for creating Cluster extension
13
*/
14
object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
15
/** Get cluster extension instance for given actor system */
16
def get(system: ActorSystem): Cluster
17
18
/** Get cluster extension instance for typed actor system */
19
def get(system: ClassicActorSystemProvider): Cluster
20
}
21
```
22
23
**Usage Example:**
24
25
```scala
26
import akka.cluster.Cluster
27
28
implicit val system = ActorSystem("ClusterSystem")
29
val cluster = Cluster(system)
30
// or
31
val cluster = Cluster.get(system)
32
```
33
34
### Main Cluster Extension
35
36
The primary interface for cluster membership management and operations.
37
38
```scala { .api }
39
/**
40
* Main cluster extension responsible for cluster membership information.
41
* Changes to cluster information are retrieved through subscribe().
42
* Commands to operate the cluster are available through methods like join(), down(), and leave().
43
*/
44
class Cluster(val system: ExtendedActorSystem) extends Extension {
45
/** The address including a uid of this cluster member */
46
val selfUniqueAddress: UniqueAddress
47
48
/** The address of this cluster member */
49
def selfAddress: Address
50
51
/** Data center to which this node belongs to */
52
def selfDataCenter: DataCenter
53
54
/** Roles that this member has */
55
def selfRoles: Set[String]
56
57
/** Current snapshot state of the cluster */
58
def state: CurrentClusterState
59
60
/** Current snapshot of the member itself */
61
def selfMember: Member
62
63
/** Returns true if this cluster instance has been shutdown */
64
def isTerminated: Boolean
65
66
/** Java API: roles that this member has */
67
def getSelfRoles: java.util.Set[String]
68
}
69
```
70
71
### Cluster Join Operations
72
73
Join the cluster by connecting to existing members or seed nodes.
74
75
```scala { .api }
76
/**
77
* Try to join this cluster node with the node specified by 'address'.
78
* An actor system can only join a cluster once. Additional attempts will be ignored.
79
* When it has successfully joined it must be restarted to be able to join another
80
* cluster or to join the same cluster again.
81
*/
82
def join(address: Address): Unit
83
84
/**
85
* Join the specified seed nodes without defining them in config.
86
* Especially useful from tests when Addresses are unknown before startup time.
87
*/
88
def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit
89
90
/** Java API for joining seed nodes */
91
def joinSeedNodes(seedNodes: java.util.List[Address]): Unit
92
```
93
94
**Usage Examples:**
95
96
```scala
97
import akka.actor.Address
98
99
// Join a specific node
100
val seedAddress = Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2551)
101
cluster.join(seedAddress)
102
103
// Join using multiple seed nodes
104
val seedNodes = List(
105
Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2551),
106
Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2552)
107
)
108
cluster.joinSeedNodes(seedNodes)
109
110
// Self-join (form single-node cluster)
111
cluster.join(cluster.selfAddress)
112
```
113
114
### Cluster Leave Operations
115
116
Gracefully leave the cluster or forcefully remove nodes.
117
118
```scala { .api }
119
/**
120
* Send command to issue state transition to LEAVING for the node specified by 'address'.
121
* The member will go through the status changes MemberStatus Leaving (not published to
122
* subscribers) followed by MemberStatus Exiting and finally MemberStatus Removed.
123
*/
124
def leave(address: Address): Unit
125
126
/**
127
* Send command to DOWN the node specified by 'address'.
128
* When a member is considered by the failure detector to be unreachable the leader is not
129
* allowed to perform its duties. The status of the unreachable member must be changed to 'Down'.
130
*/
131
def down(address: Address): Unit
132
```
133
134
**Usage Examples:**
135
136
```scala
137
// Graceful leave
138
cluster.leave(cluster.selfAddress)
139
140
// Force down an unreachable node
141
val unreachableAddress = Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2553)
142
cluster.down(unreachableAddress)
143
```
144
145
### Event Subscription Management
146
147
Subscribe to cluster events to react to membership changes.
148
149
```scala { .api }
150
/**
151
* Subscribe to one or more cluster domain events.
152
* A snapshot of CurrentClusterState will be sent to the subscriber as the first message.
153
*/
154
def subscribe(subscriber: ActorRef, to: Class[_]*): Unit
155
156
/**
157
* Subscribe with specific initial state mode.
158
* If initialStateMode is InitialStateAsEvents the events corresponding
159
* to the current state will be sent to mimic past events.
160
* If InitialStateAsSnapshot a snapshot of CurrentClusterState will be sent.
161
*/
162
def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit
163
164
/** Unsubscribe from all cluster domain events */
165
def unsubscribe(subscriber: ActorRef): Unit
166
167
/** Unsubscribe from specific type of cluster domain events */
168
def unsubscribe(subscriber: ActorRef, to: Class[_]): Unit
169
170
/** Send current cluster state to the specified receiver */
171
def sendCurrentClusterState(receiver: ActorRef): Unit
172
```
173
174
**Usage Examples:**
175
176
```scala
177
import akka.cluster.ClusterEvent._
178
179
// Subscribe to all member events
180
cluster.subscribe(listener, classOf[MemberEvent])
181
182
// Subscribe to specific events
183
cluster.subscribe(listener,
184
classOf[MemberUp],
185
classOf[MemberLeft],
186
classOf[UnreachableMember])
187
188
// Subscribe with event replay
189
cluster.subscribe(listener, InitialStateAsEvents, classOf[MemberEvent])
190
191
// Unsubscribe
192
cluster.unsubscribe(listener)
193
```
194
195
### Lifecycle Callbacks
196
197
Register callbacks to execute when cluster member reaches specific states.
198
199
```scala { .api }
200
/**
201
* The supplied thunk will be run, once, when current cluster member is Up.
202
* Typically used together with configuration option 'akka.cluster.min-nr-of-members'
203
* to defer some action, such as starting actors, until the cluster has reached
204
* a certain size.
205
*/
206
def registerOnMemberUp[T](code: => T): Unit
207
208
/**
209
* Java API: The supplied callback will be run, once, when current cluster member is Up.
210
* Typically used together with configuration option 'akka.cluster.min-nr-of-members'
211
* to defer some action, such as starting actors, until the cluster has reached
212
* a certain size.
213
*/
214
def registerOnMemberUp(callback: Runnable): Unit
215
216
/**
217
* The supplied thunk will be run, once, when current cluster member is Removed.
218
* If the cluster has already been shutdown the thunk will run on the caller thread immediately.
219
* Typically used together cluster.leave(cluster.selfAddress) and then system.terminate().
220
*/
221
def registerOnMemberRemoved[T](code: => T): Unit
222
223
/**
224
* Java API: The supplied callback will be run, once, when current cluster member is Removed.
225
* If the cluster has already been shutdown the callback will run immediately.
226
* Typically used together cluster.leave(cluster.selfAddress) and then system.terminate().
227
*/
228
def registerOnMemberRemoved(callback: Runnable): Unit
229
```
230
231
**Usage Examples:**
232
233
```scala
234
// Start application actors when cluster is ready
235
cluster.registerOnMemberUp(new Runnable {
236
def run(): Unit = {
237
println("Cluster member is UP - starting application actors")
238
system.actorOf(Props[MyApplicationActor], "app")
239
}
240
})
241
242
// Cleanup when leaving cluster
243
cluster.registerOnMemberRemoved(new Runnable {
244
def run(): Unit = {
245
println("Member removed from cluster - shutting down")
246
system.terminate()
247
}
248
})
249
250
// Scala-friendly syntax
251
cluster.registerOnMemberUp {
252
println("Member is UP!")
253
// Start application logic
254
}
255
```
256
257
### Utility Methods
258
259
Additional utility methods for cluster operations.
260
261
```scala { .api }
262
/**
263
* Generate the remote actor path by replacing the Address in the RootActor Path
264
* for the given ActorRef with the cluster's selfAddress, unless address' host is already defined
265
*/
266
def remotePathOf(actorRef: ActorRef): ActorPath
267
```
268
269
**Usage Example:**
270
271
```scala
272
val localActor = system.actorOf(Props[MyActor], "myActor")
273
val remotePath = cluster.remotePathOf(localActor)
274
// Use remotePath to reference this actor from other cluster nodes
275
```
276
277
## Types
278
279
```scala { .api }
280
// Data center type alias
281
type DataCenter = String
282
283
// Initial state subscription modes
284
sealed abstract class SubscriptionInitialStateMode
285
case object InitialStateAsSnapshot extends SubscriptionInitialStateMode
286
case object InitialStateAsEvents extends SubscriptionInitialStateMode
287
```