0
# Cluster Management
1
2
The core cluster management API provides operations for joining, leaving, and controlling cluster membership. The `Cluster` extension serves as the main entry point for all cluster operations.
3
4
## Core API
5
6
### Cluster Extension
7
8
```scala { .api }
9
class Cluster(val system: ExtendedActorSystem) extends Extension {
10
def join(address: Address): Unit
11
def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit
12
def joinSeedNodes(seedNodes: java.util.List[Address]): Unit
13
def leave(address: Address): Unit
14
def down(address: Address): Unit
15
def prepareForFullClusterShutdown(): Unit
16
def state: CurrentClusterState
17
def selfMember: Member
18
def selfAddress: Address
19
def selfUniqueAddress: UniqueAddress
20
def selfDataCenter: DataCenter
21
def selfRoles: Set[String]
22
def getSelfRoles: java.util.Set[String]
23
def remotePathOf(actorRef: ActorRef): ActorPath
24
def isTerminated: Boolean
25
}
26
```
27
28
### Cluster Extension Factory
29
30
```scala { .api }
31
object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
32
def apply(system: ActorSystem): Cluster
33
def get(system: ActorSystem): Cluster
34
def get(system: ClassicActorSystemProvider): Cluster
35
}
36
```
37
38
## Joining a Cluster
39
40
### Single Node Join
41
42
Join a cluster by specifying a single node address. The node will attempt to contact the specified address to join the cluster.
43
44
```scala
45
val cluster = Cluster(system)
46
cluster.join(Address("akka", "ClusterSystem", "127.0.0.1", 2551))
47
```
48
49
### Seed Node Join
50
51
Join a cluster using multiple seed nodes for improved reliability. The cluster will try each seed node until it successfully joins.
52
53
```scala
54
val seedNodes = immutable.Seq(
55
Address("akka", "ClusterSystem", "127.0.0.1", 2551),
56
Address("akka", "ClusterSystem", "127.0.0.1", 2552),
57
Address("akka", "ClusterSystem", "127.0.0.1", 2553)
58
)
59
cluster.joinSeedNodes(seedNodes)
60
```
61
62
Java API:
63
```java
64
List<Address> seedNodes = Arrays.asList(
65
Address.create("akka", "ClusterSystem", "127.0.0.1", 2551),
66
Address.create("akka", "ClusterSystem", "127.0.0.1", 2552)
67
);
68
cluster.joinSeedNodes(seedNodes);
69
```
70
71
### Dynamic App Version
72
73
Set application version dynamically after system startup but before joining:
74
75
```scala
76
import scala.concurrent.Future
77
import akka.util.Version
78
79
val appVersionFuture: Future[Version] = loadVersionFromK8s()
80
cluster.setAppVersionLater(appVersionFuture)
81
cluster.joinSeedNodes(seedNodes) // Can be called immediately
82
```
83
84
Java API:
85
```java
86
CompletionStage<Version> appVersion = loadVersionFromK8s();
87
cluster.setAppVersionLater(appVersion);
88
cluster.joinSeedNodes(seedNodes);
89
```
90
91
## Leaving a Cluster
92
93
### Graceful Leave
94
95
Request graceful removal of a member from the cluster. The member transitions through Leaving → Exiting → Removed states.
96
97
```scala
98
// Leave self
99
cluster.leave(cluster.selfAddress)
100
101
// Leave another member (can be called from any cluster member)
102
cluster.leave(Address("akka", "ClusterSystem", "192.168.1.100", 2551))
103
```
104
105
### Forced Down
106
107
Mark a member as down when it's unreachable. This allows the leader to continue cluster operations.
108
109
```scala
110
// Mark unreachable member as down
111
cluster.down(unreachableMemberAddress)
112
```
113
114
### Coordinated Shutdown
115
116
Prepare all members for coordinated full cluster shutdown:
117
118
```scala
119
cluster.prepareForFullClusterShutdown()
120
```
121
122
## Cluster State Access
123
124
### Current State Snapshot
125
126
Get the current cluster state including all members, their statuses, and cluster leadership information:
127
128
```scala
129
val state: CurrentClusterState = cluster.state
130
println(s"Current leader: ${state.leader}")
131
println(s"Members: ${state.members.map(_.address).mkString(", ")}")
132
println(s"Unreachable: ${state.unreachable.map(_.address).mkString(", ")}")
133
```
134
135
### Self Member Information
136
137
Access information about the current node:
138
139
```scala
140
val self: Member = cluster.selfMember
141
val address: Address = cluster.selfAddress
142
val uniqueAddress: UniqueAddress = cluster.selfUniqueAddress
143
val dataCenter: String = cluster.selfDataCenter
144
val roles: Set[String] = cluster.selfRoles
145
146
println(s"Self: ${address}, Status: ${self.status}, Roles: ${roles.mkString(", ")}")
147
```
148
149
## Lifecycle Callbacks
150
151
### Member Up Callback
152
153
Execute code when the current member becomes Up:
154
155
```scala
156
cluster.registerOnMemberUp {
157
println("This node is now Up - starting application services")
158
// Start application-specific actors and services
159
}
160
```
161
162
Java API:
163
```java
164
cluster.registerOnMemberUp(() -> {
165
System.out.println("This node is now Up");
166
// Start application services
167
});
168
```
169
170
### Member Removed Callback
171
172
Execute code when the current member is removed from the cluster:
173
174
```scala
175
cluster.registerOnMemberRemoved {
176
println("This node has been removed from cluster - shutting down")
177
// Cleanup resources
178
}
179
```
180
181
Java API:
182
```java
183
cluster.registerOnMemberRemoved(() -> {
184
System.out.println("Node removed from cluster");
185
// Cleanup resources
186
});
187
```
188
189
## Utility Methods
190
191
### Remote Actor Path Generation
192
193
Generate remote actor paths for cluster communication:
194
195
```scala
196
val localActorRef: ActorRef = context.actorOf(Props[MyActor], "myactor")
197
val remotePath: ActorPath = cluster.remotePathOf(localActorRef)
198
// Result: akka://ClusterSystem@127.0.0.1:2551/user/myactor
199
```
200
201
### Cluster Status Check
202
203
Check if the cluster extension has been shut down:
204
205
```scala
206
if (!cluster.isTerminated) {
207
// Cluster is still active
208
cluster.join(seedNodeAddress)
209
}
210
```
211
212
## Error Handling
213
214
### Join Restrictions
215
216
- An actor system can only join a cluster once
217
- Additional join attempts are ignored
218
- System must be restarted to join a different cluster
219
- All cluster members must have the same ActorSystem name
220
221
### Common Issues
222
223
```scala
224
// Will throw ConfigurationException if actor provider is not 'cluster'
225
val cluster = Cluster(system) // Requires akka.actor.provider = cluster
226
227
// Address validation - will validate host characters
228
cluster.join(Address("akka", "MySystem", "invalid-host!", 2551)) // May throw
229
230
// Joining with local address gets converted automatically
231
val localAddress = Address("akka", system.name) // No host/port
232
cluster.join(localAddress) // Converts to selfAddress
233
```
234
235
## Configuration
236
237
Key configuration settings for cluster management:
238
239
```hocon
240
akka.cluster {
241
# Seed nodes for automatic cluster joining
242
seed-nodes = [
243
"akka://ClusterSystem@127.0.0.1:2551",
244
"akka://ClusterSystem@127.0.0.1:2552"
245
]
246
247
# Minimum number of members before leader actions
248
min-nr-of-members = 1
249
250
# Application version for compatibility checking
251
app-version = "1.0.0"
252
253
# Node roles for targeted operations
254
roles = ["backend", "compute"]
255
256
# Data center designation for multi-DC clusters
257
multi-data-center.self-data-center = "dc1"
258
}
259
```