or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cluster-client.mdcluster-singleton.mddistributed-pubsub.mdindex.md

distributed-pubsub.mddocs/

0

# Distributed Publish-Subscribe

1

2

The Distributed Pub-Sub pattern enables location-transparent messaging between actors across cluster nodes. Actors can publish messages to topics and subscribe to topics without knowing the physical location of other actors. The system handles message routing, replication, and delivery automatically.

3

4

## Capabilities

5

6

### DistributedPubSub Extension

7

8

The main entry point for accessing the distributed pub-sub mediator.

9

10

```scala { .api }

11

/**

12

* Extension that starts a DistributedPubSubMediator actor

13

* with settings defined in config section akka.cluster.pub-sub

14

*/

15

object DistributedPubSub extends ExtensionId[DistributedPubSub] {

16

/**

17

* Get the DistributedPubSub extension instance

18

*/

19

def get(system: ActorSystem): DistributedPubSub

20

def get(system: ClassicActorSystemProvider): DistributedPubSub

21

}

22

23

class DistributedPubSub(system: ExtendedActorSystem) extends Extension {

24

/**

25

* The DistributedPubSubMediator actor reference

26

*/

27

def mediator: ActorRef

28

29

/**

30

* Returns true if this member is not tagged with the role configured for the mediator

31

*/

32

def isTerminated: Boolean

33

}

34

```

35

36

**Usage Example:**

37

38

```scala

39

import akka.cluster.pubsub.DistributedPubSub

40

import akka.cluster.pubsub.DistributedPubSubMediator._

41

42

implicit val system: ActorSystem = ActorSystem("cluster-system")

43

44

// Get the mediator

45

val mediator = DistributedPubSub(system).mediator

46

47

// Subscribe to a topic

48

mediator ! Subscribe("news", self)

49

50

// Publish to a topic

51

mediator ! Publish("news", "Breaking news: Akka cluster online!")

52

```

53

54

### DistributedPubSubMediator

55

56

The core actor that manages the distributed registry and handles message routing.

57

58

```scala { .api }

59

/**

60

* Actor that manages a registry of actor references and replicates

61

* entries to peer actors among all cluster nodes or nodes with specific role.

62

*

63

* Provides three message delivery modes:

64

* 1. Send - to one recipient with matching path

65

* 2. SendToAll - to all recipients with matching path

66

* 3. Publish - to all subscribers of a topic

67

*/

68

class DistributedPubSubMediator(settings: DistributedPubSubSettings) extends Actor

69

70

object DistributedPubSubMediator {

71

/**

72

* Scala API: Factory method for DistributedPubSubMediator Props

73

*/

74

def props(settings: DistributedPubSubSettings): Props

75

}

76

```

77

78

### Subscription Management

79

80

Messages for subscribing and unsubscribing from topics.

81

82

```scala { .api }

83

/**

84

* Subscribe to a named topic. Actors can be registered to the same topic

85

* name, and all will receive published messages.

86

*

87

* @param topic Topic name to subscribe to

88

* @param group Optional group name for message distribution control

89

* @param ref Actor reference to register as subscriber

90

*/

91

case class Subscribe(topic: String, group: Option[String], ref: ActorRef) {

92

/**

93

* Convenience constructor with group None

94

*/

95

def this(topic: String, ref: ActorRef) = this(topic, None, ref)

96

97

/**

98

* Java API: constructor with group String

99

*/

100

def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref)

101

}

102

103

object Subscribe {

104

def apply(topic: String, ref: ActorRef): Subscribe = new Subscribe(topic, ref)

105

}

106

107

/**

108

* Unsubscribe from a named topic

109

*

110

* @param topic Topic name to unsubscribe from

111

* @param group Optional group name that was used in Subscribe

112

* @param ref Actor reference to unregister

113

*/

114

case class Unsubscribe(topic: String, group: Option[String], ref: ActorRef) {

115

def this(topic: String, ref: ActorRef) = this(topic, None, ref)

116

def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref)

117

}

118

119

object Unsubscribe {

120

def apply(topic: String, ref: ActorRef): Unsubscribe = new Unsubscribe(topic, ref)

121

}

122

123

/**

124

* Acknowledgment of successful subscription

125

*/

126

case class SubscribeAck(subscribe: Subscribe) extends DeadLetterSuppression

127

128

/**

129

* Acknowledgment of successful unsubscription

130

*/

131

case class UnsubscribeAck(unsubscribe: Unsubscribe)

132

```

133

134

**Usage Example:**

135

136

```scala

137

// Subscribe with acknowledgment handling

138

class NewsSubscriber extends Actor {

139

val mediator = DistributedPubSub(context.system).mediator

140

141

override def preStart(): Unit = {

142

mediator ! Subscribe("news", self)

143

mediator ! Subscribe("weather", Some("local"), self) // with group

144

}

145

146

def receive = {

147

case SubscribeAck(Subscribe("news", _, _)) =>

148

log.info("Successfully subscribed to news")

149

case msg: String if sender() == mediator =>

150

log.info(s"Received news: $msg")

151

case "unsubscribe" =>

152

mediator ! Unsubscribe("news", self)

153

}

154

}

155

```

156

157

### Message Publishing

158

159

Messages for publishing content to topics and sending to registered actors.

160

161

```scala { .api }

162

/**

163

* Publish a message to all subscribers of a topic

164

*

165

* @param topic Topic name to publish to

166

* @param msg Message to publish to all subscribers

167

* @param sendOneMessageToEachGroup If true, send only one message per group

168

*/

169

case class Publish(topic: String, msg: Any, sendOneMessageToEachGroup: Boolean)

170

extends DistributedPubSubMessage with WrappedMessage {

171

172

/**

173

* Convenience constructor without group messaging

174

*/

175

def this(topic: String, msg: Any) = this(topic, msg, sendOneMessageToEachGroup = false)

176

177

override def message: Any = msg

178

}

179

180

object Publish {

181

def apply(topic: String, msg: Any): Publish = new Publish(topic, msg)

182

}

183

184

/**

185

* Send message to one recipient with matching path

186

*

187

* @param path Actor path string to send to

188

* @param msg Message to send

189

* @param localAffinity Prefer local actors if available

190

*/

191

case class Send(path: String, msg: Any, localAffinity: Boolean)

192

extends DistributedPubSubMessage with WrappedMessage {

193

194

/**

195

* Convenience constructor with localAffinity false

196

*/

197

def this(path: String, msg: Any) = this(path, msg, localAffinity = false)

198

199

override def message: Any = msg

200

}

201

202

/**

203

* Send message to all recipients with matching path

204

*

205

* @param path Actor path string to send to

206

* @param msg Message to send to all matching actors

207

* @param allButSelf Exclude the sending node from recipients

208

*/

209

case class SendToAll(path: String, msg: Any, allButSelf: Boolean = false)

210

extends DistributedPubSubMessage with WrappedMessage {

211

212

def this(path: String, msg: Any) = this(path, msg, allButSelf = false)

213

214

override def message: Any = msg

215

}

216

```

217

218

**Usage Example:**

219

220

```scala

221

// Publishing messages

222

class NewsPublisher extends Actor {

223

val mediator = DistributedPubSub(context.system).mediator

224

225

def receive = {

226

case "breaking-news" =>

227

mediator ! Publish("news", "BREAKING: Major event occurred!")

228

229

case "weather-update" =>

230

// Send one message to each group (load balancing)

231

mediator ! Publish("weather", WeatherUpdate("sunny", 25), sendOneMessageToEachGroup = true)

232

233

case SendToWorker(task) =>

234

// Send to one worker (load balancing)

235

mediator ! Send("/user/worker", task, localAffinity = true)

236

237

case BroadcastToWorkers(announcement) =>

238

// Send to all workers

239

mediator ! SendToAll("/user/worker", announcement)

240

}

241

}

242

```

243

244

### Actor Registration

245

246

Messages for registering and unregistering actors for Send/SendToAll operations.

247

248

```scala { .api }

249

/**

250

* Register an actor for Send and SendToAll messages.

251

* The actor will be reachable by its path string.

252

*/

253

case class Put(ref: ActorRef)

254

255

/**

256

* Remove a registered actor by its path string

257

*

258

* @param path The path string of the actor to remove

259

*/

260

case class Remove(path: String)

261

```

262

263

**Usage Example:**

264

265

```scala

266

// Register worker for Send/SendToAll

267

class WorkerManager extends Actor {

268

val mediator = DistributedPubSub(context.system).mediator

269

270

override def preStart(): Unit = {

271

val worker = context.actorOf(Props[Worker](), "worker")

272

mediator ! Put(worker) // Register for Send/SendToAll

273

}

274

275

def receive = {

276

case "shutdown-worker" =>

277

mediator ! Remove("/user/worker-manager/worker")

278

}

279

}

280

```

281

282

### Topic Introspection

283

284

Messages for inspecting current topics and subscriber counts.

285

286

```scala { .api }

287

/**

288

* Send this message to the mediator to get current topics.

289

* Replies with CurrentTopics containing topic names.

290

*/

291

case object GetTopics extends GetTopics

292

293

/**

294

* Java API: Get singleton instance for GetTopics

295

*/

296

def getTopicsInstance: GetTopics = GetTopics

297

298

/**

299

* Reply to GetTopics request

300

*

301

* @param topics Set of currently known topic names

302

*/

303

case class CurrentTopics(topics: Set[String]) {

304

/**

305

* Java API: Get topics as Java Set

306

*/

307

def getTopics(): java.util.Set[String] = topics.asJava

308

}

309

310

/**

311

* Send this message to get count of subscribers (testing only)

312

*/

313

case object Count extends Count

314

315

/**

316

* Java API: Get singleton instance for Count

317

*/

318

def getCountInstance: Count = Count

319

320

/**

321

* Count subscribers for a specific topic

322

*

323

* @param topic Topic name to count subscribers for

324

*/

325

case class CountSubscribers(topic: String)

326

```

327

328

### DistributedPubSubSettings

329

330

Configuration settings for the mediator behavior.

331

332

```scala { .api }

333

/**

334

* Configuration settings for DistributedPubSubMediator

335

*

336

* @param role Start mediator on members tagged with this role. All members if undefined

337

* @param routingLogic The routing logic to use for Send messages

338

* @param gossipInterval How often the mediator sends out gossip information

339

* @param removedTimeToLive Removed entries are pruned after this duration

340

* @param maxDeltaElements Maximum elements to transfer in one gossip message

341

* @param sendToDeadLettersWhenNoSubscribers Send to dead letters when no subscribers

342

*/

343

final class DistributedPubSubSettings(

344

val role: Option[String],

345

val routingLogic: RoutingLogic,

346

val gossipInterval: FiniteDuration,

347

val removedTimeToLive: FiniteDuration,

348

val maxDeltaElements: Int,

349

val sendToDeadLettersWhenNoSubscribers: Boolean

350

) extends NoSerializationVerificationNeeded

351

352

object DistributedPubSubSettings {

353

/**

354

* Create settings from the default configuration akka.cluster.pub-sub

355

*/

356

def apply(system: ActorSystem): DistributedPubSubSettings

357

358

/**

359

* Create settings from configuration with same layout as default

360

*/

361

def apply(config: Config): DistributedPubSubSettings

362

363

/**

364

* Java API: Create settings from default configuration

365

*/

366

def create(system: ActorSystem): DistributedPubSubSettings

367

368

/**

369

* Java API: Create settings from configuration

370

*/

371

def create(config: Config): DistributedPubSubSettings

372

}

373

```

374

375

**Settings Methods:**

376

377

```scala { .api }

378

// Configuration methods for DistributedPubSubSettings

379

def withRole(role: String): DistributedPubSubSettings

380

def withRole(role: Option[String]): DistributedPubSubSettings

381

def withRoutingLogic(routingLogic: RoutingLogic): DistributedPubSubSettings

382

def withGossipInterval(gossipInterval: FiniteDuration): DistributedPubSubSettings

383

def withRemovedTimeToLive(removedTimeToLive: FiniteDuration): DistributedPubSubSettings

384

def withMaxDeltaElements(maxDeltaElements: Int): DistributedPubSubSettings

385

def withSendToDeadLettersWhenNoSubscribers(sendToDeadLetters: Boolean): DistributedPubSubSettings

386

```

387

388

## Common Usage Patterns

389

390

### Event-Driven Architecture

391

392

```scala

393

// Publisher service

394

class EventPublisher extends Actor {

395

val mediator = DistributedPubSub(context.system).mediator

396

397

def receive = {

398

case UserRegistered(userId, email) =>

399

mediator ! Publish("user-events", UserRegisteredEvent(userId, email))

400

case OrderPlaced(orderId, userId, amount) =>

401

mediator ! Publish("order-events", OrderPlacedEvent(orderId, userId, amount))

402

}

403

}

404

405

// Subscriber services

406

class EmailService extends Actor {

407

val mediator = DistributedPubSub(context.system).mediator

408

409

override def preStart(): Unit = {

410

mediator ! Subscribe("user-events", self)

411

}

412

413

def receive = {

414

case SubscribeAK(Subscribe("user-events", _, _)) =>

415

log.info("Email service subscribed to user events")

416

case UserRegisteredEvent(userId, email) =>

417

sendWelcomeEmail(email)

418

}

419

}

420

421

class AnalyticsService extends Actor {

422

val mediator = DistributedPubSub(context.system).mediator

423

424

override def preStart(): Unit = {

425

mediator ! Subscribe("user-events", self)

426

mediator ! Subscribe("order-events", self)

427

}

428

429

def receive = {

430

case UserRegisteredEvent(userId, _) =>

431

recordUserMetric(userId)

432

case OrderPlacedEvent(orderId, userId, amount) =>

433

recordRevenueMetric(amount)

434

}

435

}

436

```

437

438

### Load-Balanced Workers

439

440

```scala

441

// Worker pool with load balancing

442

class WorkerPool extends Actor {

443

val mediator = DistributedPubSub(context.system).mediator

444

445

override def preStart(): Unit = {

446

// Register multiple workers

447

(1 to 5).foreach { i =>

448

val worker = context.actorOf(Props[DataProcessor](), s"worker-$i")

449

mediator ! Put(worker)

450

}

451

}

452

453

def receive = {

454

case task: ProcessingTask =>

455

// Send to one available worker (load balanced)

456

mediator ! Send("/user/worker-pool/worker", task, localAffinity = true)

457

}

458

}

459

460

class TaskDispatcher extends Actor {

461

val mediator = DistributedPubSub(context.system).mediator

462

463

def receive = {

464

case batch: TaskBatch =>

465

batch.tasks.foreach { task =>

466

mediator ! Send("/user/worker-pool/worker", task)

467

}

468

}

469

}

470

```

471

472

### Group-Based Messaging

473

474

```scala

475

// Regional message distribution

476

class RegionalNewsService extends Actor {

477

val mediator = DistributedPubSub(context.system).mediator

478

479

override def preStart(): Unit = {

480

val region = context.system.settings.config.getString("app.region")

481

mediator ! Subscribe("regional-news", Some(region), self)

482

}

483

484

def receive = {

485

case RegionalNewsUpdate(region, news) =>

486

// Only one service per region gets this message

487

processRegionalNews(region, news)

488

}

489

}

490

491

class NewsDistributor extends Actor {

492

val mediator = DistributedPubSub(context.system).mediator

493

494

def receive = {

495

case RegionalUpdate(news) =>

496

// Send one message to each regional group

497

mediator ! Publish("regional-news", RegionalNewsUpdate("all", news), sendOneMessageToEachGroup = true)

498

}

499

}

500

```

501

502

## Types

503

504

```scala { .api }

505

// Required imports for pub-sub functionality

506

import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem, ExtensionId}

507

import akka.cluster.pubsub.DistributedPubSubMessage

508

import akka.routing.RoutingLogic

509

import com.typesafe.config.Config

510

import scala.concurrent.duration.FiniteDuration

511

512

// Marker traits

513

trait DistributedPubSubMessage extends Serializable

514

trait WrappedMessage {

515

def message: Any

516

}

517

trait DeadLetterSuppression

518

```