or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster-client.mdcluster-singleton.mddistributed-pubsub.mdindex.md

cluster-client.mddocs/

0

# Cluster Client (Deprecated)

1

2

**DEPRECATED since Akka 2.6.0** - Use Akka gRPC instead for external cluster communication.

3

4

The Cluster Client pattern provides a way for external systems (not part of the cluster) to communicate with actors within an Akka cluster. It acts as a gateway, handling connection management, service discovery, and message routing to cluster actors.

5

6

## Migration Notice

7

8

```scala

9

// ⚠️ DEPRECATED: ClusterClient usage

10

@deprecated("Use Akka gRPC instead", since = "2.6.0")

11

val client = system.actorOf(ClusterClient.props(settings))

12

13

// ✅ RECOMMENDED: Use Akka gRPC for external communication

14

// See: https://doc.akka.io/docs/akka/current/cluster-client.html#migration-to-akka-grpc

15

```

16

17

## Capabilities

18

19

### ClusterClient

20

21

The main client actor for external systems to communicate with cluster actors.

22

23

```scala { .api }

24

/**

25

* DEPRECATED: Use Akka gRPC instead

26

*

27

* Actor intended for use on external nodes not members of the cluster.

28

* Acts as gateway for sending messages to cluster actors via ClusterReceptionist.

29

*

30

* @param settings Configuration for connection and behavior

31

*/

32

@deprecated("Use Akka gRPC instead", since = "2.6.0")

33

final class ClusterClient(settings: ClusterClientSettings) extends Actor

34

35

@deprecated("Use Akka gRPC instead", since = "2.6.0")

36

object ClusterClient {

37

/**

38

* Scala API: Factory method for ClusterClient Props

39

*/

40

def props(settings: ClusterClientSettings): Props

41

}

42

```

43

44

### ClusterClient Message Types

45

46

Messages for different communication patterns with cluster actors.

47

48

```scala { .api }

49

/**

50

* Send message to one recipient with matching path in the cluster

51

*

52

* @param path Actor path string within the cluster

53

* @param msg Message to send

54

* @param localAffinity Prefer actors on same node as receptionist if available

55

*/

56

@deprecated("Use Akka gRPC instead", since = "2.6.0")

57

case class Send(path: String, msg: Any, localAffinity: Boolean) {

58

/**

59

* Convenience constructor with localAffinity false

60

*/

61

def this(path: String, msg: Any) = this(path, msg, localAffinity = false)

62

}

63

64

/**

65

* Send message to all recipients with matching path in the cluster

66

*

67

* @param path Actor path string within the cluster

68

* @param msg Message to send to all matching actors

69

*/

70

@deprecated("Use Akka gRPC instead", since = "2.6.0")

71

case class SendToAll(path: String, msg: Any)

72

73

/**

74

* Publish message to all subscribers of a topic in the cluster

75

*

76

* @param topic Topic name to publish to

77

* @param msg Message to publish

78

*/

79

@deprecated("Use Akka gRPC instead", since = "2.6.0")

80

case class Publish(topic: String, msg: Any)

81

```

82

83

### ClusterClientSettings

84

85

Configuration for cluster client behavior and connection management.

86

87

```scala { .api }

88

/**

89

* DEPRECATED: Configuration settings for ClusterClient

90

*

91

* @param initialContacts Actor paths of ClusterReceptionist actors on servers

92

* @param establishingGetContactsInterval Retry interval for establishing contact

93

* @param refreshContactsInterval How often to ask for new contact points

94

* @param heartbeatInterval How often to send heartbeat messages

95

* @param acceptableHeartbeatPause Acceptable heartbeat pause before failure detection

96

* @param bufferSize Number of messages to buffer when connection unavailable

97

* @param reconnectTimeout Timeout for connection re-establishment attempts

98

*/

99

@deprecated("Use Akka gRPC instead", since = "2.6.0")

100

final class ClusterClientSettings(

101

val initialContacts: Set[ActorPath],

102

val establishingGetContactsInterval: FiniteDuration,

103

val refreshContactsInterval: FiniteDuration,

104

val heartbeatInterval: FiniteDuration,

105

val acceptableHeartbeatPause: FiniteDuration,

106

val bufferSize: Int,

107

val reconnectTimeout: Option[FiniteDuration]

108

) extends NoSerializationVerificationNeeded

109

110

@deprecated("Use Akka gRPC instead", since = "2.6.0")

111

object ClusterClientSettings {

112

/**

113

* Create settings from default configuration akka.cluster.client

114

*/

115

def apply(system: ActorSystem): ClusterClientSettings

116

117

/**

118

* Create settings from configuration

119

*/

120

def apply(config: Config): ClusterClientSettings

121

122

/**

123

* Java API: Create settings from default configuration

124

*/

125

def create(system: ActorSystem): ClusterClientSettings

126

127

/**

128

* Java API: Create settings from configuration

129

*/

130

def create(config: Config): ClusterClientSettings

131

}

132

```

133

134

**Settings Methods:**

135

136

```scala { .api }

137

// Configuration methods for ClusterClientSettings

138

def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings

139

def withInitialContacts(initialContacts: java.util.Set[ActorPath]): ClusterClientSettings // Java API

140

def withEstablishingGetContactsInterval(interval: FiniteDuration): ClusterClientSettings

141

def withRefreshContactsInterval(interval: FiniteDuration): ClusterClientSettings

142

def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration): ClusterClientSettings

143

def withBufferSize(bufferSize: Int): ClusterClientSettings

144

def withReconnectTimeout(reconnectTimeout: Option[FiniteDuration]): ClusterClientSettings

145

```

146

147

### Contact Point Management

148

149

Messages and events for managing cluster contact points.

150

151

```scala { .api }

152

/**

153

* Subscribe to contact point changes. Sender receives initial state

154

* and subsequent change events.

155

*/

156

case object SubscribeContactPoints extends SubscribeContactPoints {

157

/**

158

* Java API: get the singleton instance

159

*/

160

def getInstance = this

161

}

162

163

/**

164

* Explicitly unsubscribe from contact point change events

165

*/

166

case object UnsubscribeContactPoints extends UnsubscribeContactPoints {

167

/**

168

* Java API: get the singleton instance

169

*/

170

def getInstance = this

171

}

172

173

/**

174

* Get the contact points known to this client. Replies with ContactPoints.

175

*/

176

case object GetContactPoints extends GetContactPoints {

177

/**

178

* Java API: get the singleton instance

179

*/

180

def getInstance = this

181

}

182

183

/**

184

* Reply to GetContactPoints containing current known contact points

185

*

186

* @param contactPoints Set of currently known contact point paths

187

*/

188

case class ContactPoints(contactPoints: Set[ActorPath]) {

189

/**

190

* Java API: Get contact points as Java Set

191

*/

192

def getContactPoints: java.util.Set[ActorPath] = contactPoints.asJava

193

}

194

195

/**

196

* Event emitted when a new contact point is discovered

197

*/

198

case class ContactPointAdded(override val contactPoint: ActorPath) extends ContactPointChange

199

200

/**

201

* Event emitted when a contact point is removed

202

*/

203

case class ContactPointRemoved(override val contactPoint: ActorPath) extends ContactPointChange

204

205

/**

206

* Base trait for contact point change events

207

*/

208

sealed trait ContactPointChange {

209

val contactPoint: ActorPath

210

}

211

```

212

213

### ClusterClientReceptionist Extension

214

215

Extension for managing the server-side receptionist that handles client connections.

216

217

```scala { .api }

218

/**

219

* DEPRECATED: Extension that starts ClusterReceptionist and DistributedPubSubMediator

220

* with settings from akka.cluster.client.receptionist config section

221

*/

222

@deprecated("Use Akka gRPC instead", since = "2.6.0")

223

object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] {

224

def get(system: ActorSystem): ClusterClientReceptionist

225

def get(system: ClassicActorSystemProvider): ClusterClientReceptionist

226

}

227

228

@deprecated("Use Akka gRPC instead", since = "2.6.0")

229

final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension {

230

/**

231

* Register actor that should be reachable for clients.

232

* Clients can send messages using Send or SendToAll with actor's path.

233

*/

234

def registerService(actor: ActorRef): Unit

235

236

/**

237

* Unregister a previously registered service actor

238

*/

239

def unregisterService(actor: ActorRef): Unit

240

241

/**

242

* Register actor as subscriber to a named topic.

243

* Multiple actors can subscribe to same topic.

244

*/

245

def registerSubscriber(topic: String, actor: ActorRef): Unit

246

247

/**

248

* Unregister topic subscriber

249

*/

250

def unregisterSubscriber(topic: String, actor: ActorRef): Unit

251

252

/**

253

* Get the underlying ClusterReceptionist actor reference for events

254

*/

255

def underlying: ActorRef

256

257

/**

258

* Returns true if receptionist is terminated (wrong role, etc.)

259

*/

260

def isTerminated: Boolean

261

}

262

```

263

264

### ClusterReceptionist

265

266

The server-side actor that handles client connections and message routing.

267

268

```scala { .api }

269

/**

270

* DEPRECATED: Server-side component that ClusterClient connects to.

271

* Forwards messages to DistributedPubSubMediator and handles client lifecycle.

272

*

273

* @param pubSubMediator Reference to DistributedPubSubMediator for message routing

274

* @param settings Configuration for receptionist behavior

275

*/

276

@deprecated("Use Akka gRPC instead", since = "2.6.0")

277

final class ClusterReceptionist(

278

pubSubMediator: ActorRef,

279

settings: ClusterReceptionistSettings

280

) extends Actor

281

282

@deprecated("Use Akka gRPC instead", since = "2.6.0")

283

object ClusterReceptionist {

284

/**

285

* Scala API: Factory method for ClusterReceptionist Props

286

*/

287

def props(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings): Props

288

}

289

```

290

291

### ClusterReceptionistSettings

292

293

Configuration for the server-side receptionist behavior.

294

295

```scala { .api }

296

/**

297

* DEPRECATED: Configuration settings for ClusterReceptionist

298

*

299

* @param role Start receptionist on members tagged with this role

300

* @param numberOfContacts Number of contact points to send to clients

301

* @param responseTunnelReceiveTimeout Timeout for response tunnel actors

302

*/

303

@deprecated("Use Akka gRPC instead", since = "2.6.0")

304

final class ClusterReceptionistSettings(

305

val role: Option[String],

306

val numberOfContacts: Int,

307

val responseTunnelReceiveTimeout: FiniteDuration

308

) extends NoSerializationVerificationNeeded

309

310

@deprecated("Use Akka gRPC instead", since = "2.6.0")

311

object ClusterReceptionistSettings {

312

/**

313

* Create settings from default configuration akka.cluster.client.receptionist

314

*/

315

def apply(system: ActorSystem): ClusterReceptionistSettings

316

317

/**

318

* Create settings from configuration

319

*/

320

def apply(config: Config): ClusterReceptionistSettings

321

322

/**

323

* Java API factory methods

324

*/

325

def create(system: ActorSystem): ClusterReceptionistSettings

326

def create(config: Config): ClusterReceptionistSettings

327

}

328

```

329

330

**Settings Methods:**

331

332

```scala { .api }

333

// Configuration methods for ClusterReceptionistSettings

334

def withRole(role: String): ClusterReceptionistSettings

335

def withRole(role: Option[String]): ClusterReceptionistSettings

336

def withNumberOfContacts(numberOfContacts: Int): ClusterReceptionistSettings

337

def withResponseTunnelReceiveTimeout(timeout: FiniteDuration): ClusterReceptionistSettings

338

def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration, failureDetectionInterval: FiniteDuration): ClusterReceptionistSettings

339

```

340

341

### Client Interaction Events

342

343

Events for monitoring cluster client connections.

344

345

```scala { .api }

346

/**

347

* Base trait for cluster client interaction events

348

*/

349

sealed trait ClusterClientInteraction {

350

val clusterClient: ActorRef

351

}

352

353

/**

354

* Event emitted when a cluster client connects to receptionist

355

*/

356

case class ClusterClientUp(override val clusterClient: ActorRef) extends ClusterClientInteraction

357

358

/**

359

* Event emitted when cluster client becomes unreachable

360

*/

361

case class ClusterClientUnreachable(override val clusterClient: ActorRef) extends ClusterClientInteraction

362

363

/**

364

* Subscribe to cluster client interaction events

365

*/

366

case object SubscribeClusterClients extends SubscribeClusterClients {

367

/**

368

* Java API: get the singleton instance

369

*/

370

def getInstance = this

371

}

372

373

/**

374

* Unsubscribe from cluster client interaction events

375

*/

376

case object UnsubscribeClusterClients extends UnsubscribeClusterClients {

377

/**

378

* Java API: get the singleton instance

379

*/

380

def getInstance = this

381

}

382

383

/**

384

* Get currently connected cluster clients. Replies with ClusterClients.

385

*/

386

case object GetClusterClients extends GetClusterClients {

387

/**

388

* Java API: get the singleton instance

389

*/

390

def getInstance = this

391

}

392

393

/**

394

* Reply to GetClusterClients containing current client connections

395

*

396

* @param clusterClients Set of currently connected client actor references

397

*/

398

case class ClusterClients(clusterClients: Set[ActorRef]) {

399

/**

400

* Java API: Get cluster clients as Java Set

401

*/

402

def getClusterClients: java.util.Set[ActorRef] = clusterClients.asJava

403

}

404

```

405

406

## Usage Examples (Deprecated)

407

408

### Basic Client Setup

409

410

```scala

411

@deprecated("Use Akka gRPC instead", since = "2.6.0")

412

object ExternalClientApp extends App {

413

implicit val system = ActorSystem("external-client")

414

415

val initialContacts = Set(

416

ActorPath.fromString("akka://cluster-system@127.0.0.1:2551/system/receptionist"),

417

ActorPath.fromString("akka://cluster-system@127.0.0.1:2552/system/receptionist")

418

)

419

420

val settings = ClusterClientSettings(system)

421

.withInitialContacts(initialContacts)

422

.withBufferSize(1000)

423

.withReconnectTimeout(Some(30.seconds))

424

425

val client = system.actorOf(ClusterClient.props(settings), "cluster-client")

426

427

// Send to specific actor

428

client ! ClusterClient.Send("/user/worker", ProcessJob("data"))

429

430

// Publish to topic

431

client ! ClusterClient.Publish("notifications", Alert("System maintenance scheduled"))

432

433

// Broadcast to all matching actors

434

client ! ClusterClient.SendToAll("/user/cache", ClearCache)

435

}

436

```

437

438

### Server-Side Service Registration

439

440

```scala

441

@deprecated("Use Akka gRPC instead", since = "2.6.0")

442

class ClusterWorker extends Actor {

443

val receptionist = ClusterClientReceptionist(context.system)

444

445

override def preStart(): Unit = {

446

// Register this actor for client access

447

receptionist.registerService(self)

448

449

// Subscribe to notifications topic

450

receptionist.registerSubscriber("notifications", self)

451

}

452

453

def receive = {

454

case ProcessJob(data) =>

455

val result = processData(data)

456

sender() ! JobResult(result)

457

458

case Alert(message) =>

459

log.warning(s"Received alert: $message")

460

}

461

}

462

```

463

464

## Types

465

466

```scala { .api }

467

// Required imports for cluster client functionality (deprecated)

468

import akka.actor.{Actor, ActorPath, ActorRef, ActorSystem, ExtendedActorSystem}

469

import akka.cluster.client.ClusterClientMessage

470

import com.typesafe.config.Config

471

import scala.concurrent.duration.FiniteDuration

472

473

/**

474

* Marker trait for cluster client messages with special serializer

475

*/

476

sealed trait ClusterClientMessage extends Serializable

477

```

478

479

## Migration to Akka gRPC

480

481

For new projects, use Akka gRPC instead of ClusterClient:

482

483

```scala

484

// Instead of ClusterClient, use Akka gRPC service definitions

485

syntax = "proto3";

486

487

service WorkerService {

488

rpc ProcessJob(JobRequest) returns (JobResponse);

489

rpc GetStatus(StatusRequest) returns (StatusResponse);

490

}

491

492

// Generate Scala classes and implement gRPC services

493

class WorkerServiceImpl extends WorkerService {

494

override def processJob(request: JobRequest): Future[JobResponse] = {

495

// Process job and return response

496

Future.successful(JobResponse(result = "processed"))

497

}

498

}

499

```

500

501

See the [Akka gRPC documentation](https://doc.akka.io/docs/akka-grpc/current/) for migration guidance.