0
# Distributed Publish-Subscribe
1
2
The Distributed Pub-Sub pattern enables location-transparent messaging between actors across cluster nodes. Actors can publish messages to topics and subscribe to topics without knowing the physical location of other actors. The system handles message routing, replication, and delivery automatically.
3
4
## Capabilities
5
6
### DistributedPubSub Extension
7
8
The main entry point for accessing the distributed pub-sub mediator.
9
10
```scala { .api }
11
/**
12
* Extension that starts a DistributedPubSubMediator actor
13
* with settings defined in config section akka.cluster.pub-sub
14
*/
15
object DistributedPubSub extends ExtensionId[DistributedPubSub] {
16
/**
17
* Get the DistributedPubSub extension instance
18
*/
19
def get(system: ActorSystem): DistributedPubSub
20
def get(system: ClassicActorSystemProvider): DistributedPubSub
21
}
22
23
class DistributedPubSub(system: ExtendedActorSystem) extends Extension {
24
/**
25
* The DistributedPubSubMediator actor reference
26
*/
27
def mediator: ActorRef
28
29
/**
30
* Returns true if this member is not tagged with the role configured for the mediator
31
*/
32
def isTerminated: Boolean
33
}
34
```
35
36
**Usage Example:**
37
38
```scala
39
import akka.cluster.pubsub.DistributedPubSub
40
import akka.cluster.pubsub.DistributedPubSubMediator._
41
42
implicit val system: ActorSystem = ActorSystem("cluster-system")
43
44
// Get the mediator
45
val mediator = DistributedPubSub(system).mediator
46
47
// Subscribe to a topic
48
mediator ! Subscribe("news", self)
49
50
// Publish to a topic
51
mediator ! Publish("news", "Breaking news: Akka cluster online!")
52
```
53
54
### DistributedPubSubMediator
55
56
The core actor that manages the distributed registry and handles message routing.
57
58
```scala { .api }
59
/**
60
* Actor that manages a registry of actor references and replicates
61
* entries to peer actors among all cluster nodes or nodes with specific role.
62
*
63
* Provides three message delivery modes:
64
* 1. Send - to one recipient with matching path
65
* 2. SendToAll - to all recipients with matching path
66
* 3. Publish - to all subscribers of a topic
67
*/
68
class DistributedPubSubMediator(settings: DistributedPubSubSettings) extends Actor
69
70
object DistributedPubSubMediator {
71
/**
72
* Scala API: Factory method for DistributedPubSubMediator Props
73
*/
74
def props(settings: DistributedPubSubSettings): Props
75
}
76
```
77
78
### Subscription Management
79
80
Messages for subscribing and unsubscribing from topics.
81
82
```scala { .api }
83
/**
84
* Subscribe to a named topic. Actors can be registered to the same topic
85
* name, and all will receive published messages.
86
*
87
* @param topic Topic name to subscribe to
88
* @param group Optional group name for message distribution control
89
* @param ref Actor reference to register as subscriber
90
*/
91
case class Subscribe(topic: String, group: Option[String], ref: ActorRef) {
92
/**
93
* Convenience constructor with group None
94
*/
95
def this(topic: String, ref: ActorRef) = this(topic, None, ref)
96
97
/**
98
* Java API: constructor with group String
99
*/
100
def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref)
101
}
102
103
object Subscribe {
104
def apply(topic: String, ref: ActorRef): Subscribe = new Subscribe(topic, ref)
105
}
106
107
/**
108
* Unsubscribe from a named topic
109
*
110
* @param topic Topic name to unsubscribe from
111
* @param group Optional group name that was used in Subscribe
112
* @param ref Actor reference to unregister
113
*/
114
case class Unsubscribe(topic: String, group: Option[String], ref: ActorRef) {
115
def this(topic: String, ref: ActorRef) = this(topic, None, ref)
116
def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref)
117
}
118
119
object Unsubscribe {
120
def apply(topic: String, ref: ActorRef): Unsubscribe = new Unsubscribe(topic, ref)
121
}
122
123
/**
124
* Acknowledgment of successful subscription
125
*/
126
case class SubscribeAck(subscribe: Subscribe) extends DeadLetterSuppression
127
128
/**
129
* Acknowledgment of successful unsubscription
130
*/
131
case class UnsubscribeAck(unsubscribe: Unsubscribe)
132
```
133
134
**Usage Example:**
135
136
```scala
137
// Subscribe with acknowledgment handling
138
class NewsSubscriber extends Actor {
139
val mediator = DistributedPubSub(context.system).mediator
140
141
override def preStart(): Unit = {
142
mediator ! Subscribe("news", self)
143
mediator ! Subscribe("weather", Some("local"), self) // with group
144
}
145
146
def receive = {
147
case SubscribeAck(Subscribe("news", _, _)) =>
148
log.info("Successfully subscribed to news")
149
case msg: String if sender() == mediator =>
150
log.info(s"Received news: $msg")
151
case "unsubscribe" =>
152
mediator ! Unsubscribe("news", self)
153
}
154
}
155
```
156
157
### Message Publishing
158
159
Messages for publishing content to topics and sending to registered actors.
160
161
```scala { .api }
162
/**
163
* Publish a message to all subscribers of a topic
164
*
165
* @param topic Topic name to publish to
166
* @param msg Message to publish to all subscribers
167
* @param sendOneMessageToEachGroup If true, send only one message per group
168
*/
169
case class Publish(topic: String, msg: Any, sendOneMessageToEachGroup: Boolean)
170
extends DistributedPubSubMessage with WrappedMessage {
171
172
/**
173
* Convenience constructor without group messaging
174
*/
175
def this(topic: String, msg: Any) = this(topic, msg, sendOneMessageToEachGroup = false)
176
177
override def message: Any = msg
178
}
179
180
object Publish {
181
def apply(topic: String, msg: Any): Publish = new Publish(topic, msg)
182
}
183
184
/**
185
* Send message to one recipient with matching path
186
*
187
* @param path Actor path string to send to
188
* @param msg Message to send
189
* @param localAffinity Prefer local actors if available
190
*/
191
case class Send(path: String, msg: Any, localAffinity: Boolean)
192
extends DistributedPubSubMessage with WrappedMessage {
193
194
/**
195
* Convenience constructor with localAffinity false
196
*/
197
def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
198
199
override def message: Any = msg
200
}
201
202
/**
203
* Send message to all recipients with matching path
204
*
205
* @param path Actor path string to send to
206
* @param msg Message to send to all matching actors
207
* @param allButSelf Exclude the sending node from recipients
208
*/
209
case class SendToAll(path: String, msg: Any, allButSelf: Boolean = false)
210
extends DistributedPubSubMessage with WrappedMessage {
211
212
def this(path: String, msg: Any) = this(path, msg, allButSelf = false)
213
214
override def message: Any = msg
215
}
216
```
217
218
**Usage Example:**
219
220
```scala
221
// Publishing messages
222
class NewsPublisher extends Actor {
223
val mediator = DistributedPubSub(context.system).mediator
224
225
def receive = {
226
case "breaking-news" =>
227
mediator ! Publish("news", "BREAKING: Major event occurred!")
228
229
case "weather-update" =>
230
// Send one message to each group (load balancing)
231
mediator ! Publish("weather", WeatherUpdate("sunny", 25), sendOneMessageToEachGroup = true)
232
233
case SendToWorker(task) =>
234
// Send to one worker (load balancing)
235
mediator ! Send("/user/worker", task, localAffinity = true)
236
237
case BroadcastToWorkers(announcement) =>
238
// Send to all workers
239
mediator ! SendToAll("/user/worker", announcement)
240
}
241
}
242
```
243
244
### Actor Registration
245
246
Messages for registering and unregistering actors for Send/SendToAll operations.
247
248
```scala { .api }
249
/**
250
* Register an actor for Send and SendToAll messages.
251
* The actor will be reachable by its path string.
252
*/
253
case class Put(ref: ActorRef)
254
255
/**
256
* Remove a registered actor by its path string
257
*
258
* @param path The path string of the actor to remove
259
*/
260
case class Remove(path: String)
261
```
262
263
**Usage Example:**
264
265
```scala
266
// Register worker for Send/SendToAll
267
class WorkerManager extends Actor {
268
val mediator = DistributedPubSub(context.system).mediator
269
270
override def preStart(): Unit = {
271
val worker = context.actorOf(Props[Worker](), "worker")
272
mediator ! Put(worker) // Register for Send/SendToAll
273
}
274
275
def receive = {
276
case "shutdown-worker" =>
277
mediator ! Remove("/user/worker-manager/worker")
278
}
279
}
280
```
281
282
### Topic Introspection
283
284
Messages for inspecting current topics and subscriber counts.
285
286
```scala { .api }
287
/**
288
* Send this message to the mediator to get current topics.
289
* Replies with CurrentTopics containing topic names.
290
*/
291
case object GetTopics extends GetTopics
292
293
/**
294
* Java API: Get singleton instance for GetTopics
295
*/
296
def getTopicsInstance: GetTopics = GetTopics
297
298
/**
299
* Reply to GetTopics request
300
*
301
* @param topics Set of currently known topic names
302
*/
303
case class CurrentTopics(topics: Set[String]) {
304
/**
305
* Java API: Get topics as Java Set
306
*/
307
def getTopics(): java.util.Set[String] = topics.asJava
308
}
309
310
/**
311
* Send this message to get count of subscribers (testing only)
312
*/
313
case object Count extends Count
314
315
/**
316
* Java API: Get singleton instance for Count
317
*/
318
def getCountInstance: Count = Count
319
320
/**
321
* Count subscribers for a specific topic
322
*
323
* @param topic Topic name to count subscribers for
324
*/
325
case class CountSubscribers(topic: String)
326
```
327
328
### DistributedPubSubSettings
329
330
Configuration settings for the mediator behavior.
331
332
```scala { .api }
333
/**
334
* Configuration settings for DistributedPubSubMediator
335
*
336
* @param role Start mediator on members tagged with this role. All members if undefined
337
* @param routingLogic The routing logic to use for Send messages
338
* @param gossipInterval How often the mediator sends out gossip information
339
* @param removedTimeToLive Removed entries are pruned after this duration
340
* @param maxDeltaElements Maximum elements to transfer in one gossip message
341
* @param sendToDeadLettersWhenNoSubscribers Send to dead letters when no subscribers
342
*/
343
final class DistributedPubSubSettings(
344
val role: Option[String],
345
val routingLogic: RoutingLogic,
346
val gossipInterval: FiniteDuration,
347
val removedTimeToLive: FiniteDuration,
348
val maxDeltaElements: Int,
349
val sendToDeadLettersWhenNoSubscribers: Boolean
350
) extends NoSerializationVerificationNeeded
351
352
object DistributedPubSubSettings {
353
/**
354
* Create settings from the default configuration akka.cluster.pub-sub
355
*/
356
def apply(system: ActorSystem): DistributedPubSubSettings
357
358
/**
359
* Create settings from configuration with same layout as default
360
*/
361
def apply(config: Config): DistributedPubSubSettings
362
363
/**
364
* Java API: Create settings from default configuration
365
*/
366
def create(system: ActorSystem): DistributedPubSubSettings
367
368
/**
369
* Java API: Create settings from configuration
370
*/
371
def create(config: Config): DistributedPubSubSettings
372
}
373
```
374
375
**Settings Methods:**
376
377
```scala { .api }
378
// Configuration methods for DistributedPubSubSettings
379
def withRole(role: String): DistributedPubSubSettings
380
def withRole(role: Option[String]): DistributedPubSubSettings
381
def withRoutingLogic(routingLogic: RoutingLogic): DistributedPubSubSettings
382
def withGossipInterval(gossipInterval: FiniteDuration): DistributedPubSubSettings
383
def withRemovedTimeToLive(removedTimeToLive: FiniteDuration): DistributedPubSubSettings
384
def withMaxDeltaElements(maxDeltaElements: Int): DistributedPubSubSettings
385
def withSendToDeadLettersWhenNoSubscribers(sendToDeadLetters: Boolean): DistributedPubSubSettings
386
```
387
388
## Common Usage Patterns
389
390
### Event-Driven Architecture
391
392
```scala
393
// Publisher service
394
class EventPublisher extends Actor {
395
val mediator = DistributedPubSub(context.system).mediator
396
397
def receive = {
398
case UserRegistered(userId, email) =>
399
mediator ! Publish("user-events", UserRegisteredEvent(userId, email))
400
case OrderPlaced(orderId, userId, amount) =>
401
mediator ! Publish("order-events", OrderPlacedEvent(orderId, userId, amount))
402
}
403
}
404
405
// Subscriber services
406
class EmailService extends Actor {
407
val mediator = DistributedPubSub(context.system).mediator
408
409
override def preStart(): Unit = {
410
mediator ! Subscribe("user-events", self)
411
}
412
413
def receive = {
414
case SubscribeAK(Subscribe("user-events", _, _)) =>
415
log.info("Email service subscribed to user events")
416
case UserRegisteredEvent(userId, email) =>
417
sendWelcomeEmail(email)
418
}
419
}
420
421
class AnalyticsService extends Actor {
422
val mediator = DistributedPubSub(context.system).mediator
423
424
override def preStart(): Unit = {
425
mediator ! Subscribe("user-events", self)
426
mediator ! Subscribe("order-events", self)
427
}
428
429
def receive = {
430
case UserRegisteredEvent(userId, _) =>
431
recordUserMetric(userId)
432
case OrderPlacedEvent(orderId, userId, amount) =>
433
recordRevenueMetric(amount)
434
}
435
}
436
```
437
438
### Load-Balanced Workers
439
440
```scala
441
// Worker pool with load balancing
442
class WorkerPool extends Actor {
443
val mediator = DistributedPubSub(context.system).mediator
444
445
override def preStart(): Unit = {
446
// Register multiple workers
447
(1 to 5).foreach { i =>
448
val worker = context.actorOf(Props[DataProcessor](), s"worker-$i")
449
mediator ! Put(worker)
450
}
451
}
452
453
def receive = {
454
case task: ProcessingTask =>
455
// Send to one available worker (load balanced)
456
mediator ! Send("/user/worker-pool/worker", task, localAffinity = true)
457
}
458
}
459
460
class TaskDispatcher extends Actor {
461
val mediator = DistributedPubSub(context.system).mediator
462
463
def receive = {
464
case batch: TaskBatch =>
465
batch.tasks.foreach { task =>
466
mediator ! Send("/user/worker-pool/worker", task)
467
}
468
}
469
}
470
```
471
472
### Group-Based Messaging
473
474
```scala
475
// Regional message distribution
476
class RegionalNewsService extends Actor {
477
val mediator = DistributedPubSub(context.system).mediator
478
479
override def preStart(): Unit = {
480
val region = context.system.settings.config.getString("app.region")
481
mediator ! Subscribe("regional-news", Some(region), self)
482
}
483
484
def receive = {
485
case RegionalNewsUpdate(region, news) =>
486
// Only one service per region gets this message
487
processRegionalNews(region, news)
488
}
489
}
490
491
class NewsDistributor extends Actor {
492
val mediator = DistributedPubSub(context.system).mediator
493
494
def receive = {
495
case RegionalUpdate(news) =>
496
// Send one message to each regional group
497
mediator ! Publish("regional-news", RegionalNewsUpdate("all", news), sendOneMessageToEachGroup = true)
498
}
499
}
500
```
501
502
## Types
503
504
```scala { .api }
505
// Required imports for pub-sub functionality
506
import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem, ExtensionId}
507
import akka.cluster.pubsub.DistributedPubSubMessage
508
import akka.routing.RoutingLogic
509
import com.typesafe.config.Config
510
import scala.concurrent.duration.FiniteDuration
511
512
// Marker traits
513
trait DistributedPubSubMessage extends Serializable
514
trait WrappedMessage {
515
def message: Any
516
}
517
trait DeadLetterSuppression
518
```