or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster-management.mdcluster-routing.mdconfiguration-and-management.mdevents-and-state.mdextensibility.mdindex.mdmembers-and-status.md

cluster-management.mddocs/

0

# Cluster Management

1

2

Core cluster operations for joining, leaving, and managing cluster membership. This provides the fundamental functionality for setting up and managing distributed actor systems using Akka Cluster.

3

4

## Capabilities

5

6

### Cluster Extension Access

7

8

Access the cluster extension instance from an ActorSystem.

9

10

```scala { .api }

11

/**

12

* Cluster Extension Id and factory for creating Cluster extension

13

*/

14

object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {

15

/** Get cluster extension instance for given actor system */

16

def get(system: ActorSystem): Cluster

17

18

/** Get cluster extension instance for typed actor system */

19

def get(system: ClassicActorSystemProvider): Cluster

20

}

21

```

22

23

**Usage Example:**

24

25

```scala

26

import akka.cluster.Cluster

27

28

implicit val system = ActorSystem("ClusterSystem")

29

val cluster = Cluster(system)

30

// or

31

val cluster = Cluster.get(system)

32

```

33

34

### Main Cluster Extension

35

36

The primary interface for cluster membership management and operations.

37

38

```scala { .api }

39

/**

40

* Main cluster extension responsible for cluster membership information.

41

* Changes to cluster information are retrieved through subscribe().

42

* Commands to operate the cluster are available through methods like join(), down(), and leave().

43

*/

44

class Cluster(val system: ExtendedActorSystem) extends Extension {

45

/** The address including a uid of this cluster member */

46

val selfUniqueAddress: UniqueAddress

47

48

/** The address of this cluster member */

49

def selfAddress: Address

50

51

/** Data center to which this node belongs to */

52

def selfDataCenter: DataCenter

53

54

/** Roles that this member has */

55

def selfRoles: Set[String]

56

57

/** Current snapshot state of the cluster */

58

def state: CurrentClusterState

59

60

/** Current snapshot of the member itself */

61

def selfMember: Member

62

63

/** Returns true if this cluster instance has been shutdown */

64

def isTerminated: Boolean

65

66

/** Java API: roles that this member has */

67

def getSelfRoles: java.util.Set[String]

68

}

69

```

70

71

### Cluster Join Operations

72

73

Join the cluster by connecting to existing members or seed nodes.

74

75

```scala { .api }

76

/**

77

* Try to join this cluster node with the node specified by 'address'.

78

* An actor system can only join a cluster once. Additional attempts will be ignored.

79

* When it has successfully joined it must be restarted to be able to join another

80

* cluster or to join the same cluster again.

81

*/

82

def join(address: Address): Unit

83

84

/**

85

* Join the specified seed nodes without defining them in config.

86

* Especially useful from tests when Addresses are unknown before startup time.

87

*/

88

def joinSeedNodes(seedNodes: immutable.Seq[Address]): Unit

89

90

/** Java API for joining seed nodes */

91

def joinSeedNodes(seedNodes: java.util.List[Address]): Unit

92

```

93

94

**Usage Examples:**

95

96

```scala

97

import akka.actor.Address

98

99

// Join a specific node

100

val seedAddress = Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2551)

101

cluster.join(seedAddress)

102

103

// Join using multiple seed nodes

104

val seedNodes = List(

105

Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2551),

106

Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2552)

107

)

108

cluster.joinSeedNodes(seedNodes)

109

110

// Self-join (form single-node cluster)

111

cluster.join(cluster.selfAddress)

112

```

113

114

### Cluster Leave Operations

115

116

Gracefully leave the cluster or forcefully remove nodes.

117

118

```scala { .api }

119

/**

120

* Send command to issue state transition to LEAVING for the node specified by 'address'.

121

* The member will go through the status changes MemberStatus Leaving (not published to

122

* subscribers) followed by MemberStatus Exiting and finally MemberStatus Removed.

123

*/

124

def leave(address: Address): Unit

125

126

/**

127

* Send command to DOWN the node specified by 'address'.

128

* When a member is considered by the failure detector to be unreachable the leader is not

129

* allowed to perform its duties. The status of the unreachable member must be changed to 'Down'.

130

*/

131

def down(address: Address): Unit

132

```

133

134

**Usage Examples:**

135

136

```scala

137

// Graceful leave

138

cluster.leave(cluster.selfAddress)

139

140

// Force down an unreachable node

141

val unreachableAddress = Address("akka.tcp", "ClusterSystem", "127.0.0.1", 2553)

142

cluster.down(unreachableAddress)

143

```

144

145

### Event Subscription Management

146

147

Subscribe to cluster events to react to membership changes.

148

149

```scala { .api }

150

/**

151

* Subscribe to one or more cluster domain events.

152

* A snapshot of CurrentClusterState will be sent to the subscriber as the first message.

153

*/

154

def subscribe(subscriber: ActorRef, to: Class[_]*): Unit

155

156

/**

157

* Subscribe with specific initial state mode.

158

* If initialStateMode is InitialStateAsEvents the events corresponding

159

* to the current state will be sent to mimic past events.

160

* If InitialStateAsSnapshot a snapshot of CurrentClusterState will be sent.

161

*/

162

def subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Class[_]*): Unit

163

164

/** Unsubscribe from all cluster domain events */

165

def unsubscribe(subscriber: ActorRef): Unit

166

167

/** Unsubscribe from specific type of cluster domain events */

168

def unsubscribe(subscriber: ActorRef, to: Class[_]): Unit

169

170

/** Send current cluster state to the specified receiver */

171

def sendCurrentClusterState(receiver: ActorRef): Unit

172

```

173

174

**Usage Examples:**

175

176

```scala

177

import akka.cluster.ClusterEvent._

178

179

// Subscribe to all member events

180

cluster.subscribe(listener, classOf[MemberEvent])

181

182

// Subscribe to specific events

183

cluster.subscribe(listener,

184

classOf[MemberUp],

185

classOf[MemberLeft],

186

classOf[UnreachableMember])

187

188

// Subscribe with event replay

189

cluster.subscribe(listener, InitialStateAsEvents, classOf[MemberEvent])

190

191

// Unsubscribe

192

cluster.unsubscribe(listener)

193

```

194

195

### Lifecycle Callbacks

196

197

Register callbacks to execute when cluster member reaches specific states.

198

199

```scala { .api }

200

/**

201

* The supplied thunk will be run, once, when current cluster member is Up.

202

* Typically used together with configuration option 'akka.cluster.min-nr-of-members'

203

* to defer some action, such as starting actors, until the cluster has reached

204

* a certain size.

205

*/

206

def registerOnMemberUp[T](code: => T): Unit

207

208

/**

209

* Java API: The supplied callback will be run, once, when current cluster member is Up.

210

* Typically used together with configuration option 'akka.cluster.min-nr-of-members'

211

* to defer some action, such as starting actors, until the cluster has reached

212

* a certain size.

213

*/

214

def registerOnMemberUp(callback: Runnable): Unit

215

216

/**

217

* The supplied thunk will be run, once, when current cluster member is Removed.

218

* If the cluster has already been shutdown the thunk will run on the caller thread immediately.

219

* Typically used together cluster.leave(cluster.selfAddress) and then system.terminate().

220

*/

221

def registerOnMemberRemoved[T](code: => T): Unit

222

223

/**

224

* Java API: The supplied callback will be run, once, when current cluster member is Removed.

225

* If the cluster has already been shutdown the callback will run immediately.

226

* Typically used together cluster.leave(cluster.selfAddress) and then system.terminate().

227

*/

228

def registerOnMemberRemoved(callback: Runnable): Unit

229

```

230

231

**Usage Examples:**

232

233

```scala

234

// Start application actors when cluster is ready

235

cluster.registerOnMemberUp(new Runnable {

236

def run(): Unit = {

237

println("Cluster member is UP - starting application actors")

238

system.actorOf(Props[MyApplicationActor], "app")

239

}

240

})

241

242

// Cleanup when leaving cluster

243

cluster.registerOnMemberRemoved(new Runnable {

244

def run(): Unit = {

245

println("Member removed from cluster - shutting down")

246

system.terminate()

247

}

248

})

249

250

// Scala-friendly syntax

251

cluster.registerOnMemberUp {

252

println("Member is UP!")

253

// Start application logic

254

}

255

```

256

257

### Utility Methods

258

259

Additional utility methods for cluster operations.

260

261

```scala { .api }

262

/**

263

* Generate the remote actor path by replacing the Address in the RootActor Path

264

* for the given ActorRef with the cluster's selfAddress, unless address' host is already defined

265

*/

266

def remotePathOf(actorRef: ActorRef): ActorPath

267

```

268

269

**Usage Example:**

270

271

```scala

272

val localActor = system.actorOf(Props[MyActor], "myActor")

273

val remotePath = cluster.remotePathOf(localActor)

274

// Use remotePath to reference this actor from other cluster nodes

275

```

276

277

## Types

278

279

```scala { .api }

280

// Data center type alias

281

type DataCenter = String

282

283

// Initial state subscription modes

284

sealed abstract class SubscriptionInitialStateMode

285

case object InitialStateAsSnapshot extends SubscriptionInitialStateMode

286

case object InitialStateAsEvents extends SubscriptionInitialStateMode

287

```