The Distributed Pub-Sub pattern enables location-transparent messaging between actors across cluster nodes. Actors can publish messages to topics and subscribe to topics without knowing the physical location of other actors. The system handles message routing, replication, and delivery automatically.
The main entry point for accessing the distributed pub-sub mediator.
/**
* Extension that starts a DistributedPubSubMediator actor
* with settings defined in config section akka.cluster.pub-sub
*/
object DistributedPubSub extends ExtensionId[DistributedPubSub] {
/**
* Get the DistributedPubSub extension instance
*/
def get(system: ActorSystem): DistributedPubSub
def get(system: ClassicActorSystemProvider): DistributedPubSub
}
class DistributedPubSub(system: ExtendedActorSystem) extends Extension {
/**
* The DistributedPubSubMediator actor reference
*/
def mediator: ActorRef
/**
* Returns true if this member is not tagged with the role configured for the mediator
*/
def isTerminated: Boolean
}Usage Example:
import akka.cluster.pubsub.DistributedPubSub
import akka.cluster.pubsub.DistributedPubSubMediator._
implicit val system: ActorSystem = ActorSystem("cluster-system")
// Get the mediator
val mediator = DistributedPubSub(system).mediator
// Subscribe to a topic
mediator ! Subscribe("news", self)
// Publish to a topic
mediator ! Publish("news", "Breaking news: Akka cluster online!")The core actor that manages the distributed registry and handles message routing.
/**
* Actor that manages a registry of actor references and replicates
* entries to peer actors among all cluster nodes or nodes with specific role.
*
* Provides three message delivery modes:
* 1. Send - to one recipient with matching path
* 2. SendToAll - to all recipients with matching path
* 3. Publish - to all subscribers of a topic
*/
class DistributedPubSubMediator(settings: DistributedPubSubSettings) extends Actor
object DistributedPubSubMediator {
/**
* Scala API: Factory method for DistributedPubSubMediator Props
*/
def props(settings: DistributedPubSubSettings): Props
}Messages for subscribing and unsubscribing from topics.
/**
* Subscribe to a named topic. Actors can be registered to the same topic
* name, and all will receive published messages.
*
* @param topic Topic name to subscribe to
* @param group Optional group name for message distribution control
* @param ref Actor reference to register as subscriber
*/
case class Subscribe(topic: String, group: Option[String], ref: ActorRef) {
/**
* Convenience constructor with group None
*/
def this(topic: String, ref: ActorRef) = this(topic, None, ref)
/**
* Java API: constructor with group String
*/
def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref)
}
object Subscribe {
def apply(topic: String, ref: ActorRef): Subscribe = new Subscribe(topic, ref)
}
/**
* Unsubscribe from a named topic
*
* @param topic Topic name to unsubscribe from
* @param group Optional group name that was used in Subscribe
* @param ref Actor reference to unregister
*/
case class Unsubscribe(topic: String, group: Option[String], ref: ActorRef) {
def this(topic: String, ref: ActorRef) = this(topic, None, ref)
def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref)
}
object Unsubscribe {
def apply(topic: String, ref: ActorRef): Unsubscribe = new Unsubscribe(topic, ref)
}
/**
* Acknowledgment of successful subscription
*/
case class SubscribeAck(subscribe: Subscribe) extends DeadLetterSuppression
/**
* Acknowledgment of successful unsubscription
*/
case class UnsubscribeAck(unsubscribe: Unsubscribe)Usage Example:
// Subscribe with acknowledgment handling
class NewsSubscriber extends Actor {
val mediator = DistributedPubSub(context.system).mediator
override def preStart(): Unit = {
mediator ! Subscribe("news", self)
mediator ! Subscribe("weather", Some("local"), self) // with group
}
def receive = {
case SubscribeAck(Subscribe("news", _, _)) =>
log.info("Successfully subscribed to news")
case msg: String if sender() == mediator =>
log.info(s"Received news: $msg")
case "unsubscribe" =>
mediator ! Unsubscribe("news", self)
}
}Messages for publishing content to topics and sending to registered actors.
/**
* Publish a message to all subscribers of a topic
*
* @param topic Topic name to publish to
* @param msg Message to publish to all subscribers
* @param sendOneMessageToEachGroup If true, send only one message per group
*/
case class Publish(topic: String, msg: Any, sendOneMessageToEachGroup: Boolean)
extends DistributedPubSubMessage with WrappedMessage {
/**
* Convenience constructor without group messaging
*/
def this(topic: String, msg: Any) = this(topic, msg, sendOneMessageToEachGroup = false)
override def message: Any = msg
}
object Publish {
def apply(topic: String, msg: Any): Publish = new Publish(topic, msg)
}
/**
* Send message to one recipient with matching path
*
* @param path Actor path string to send to
* @param msg Message to send
* @param localAffinity Prefer local actors if available
*/
case class Send(path: String, msg: Any, localAffinity: Boolean)
extends DistributedPubSubMessage with WrappedMessage {
/**
* Convenience constructor with localAffinity false
*/
def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
override def message: Any = msg
}
/**
* Send message to all recipients with matching path
*
* @param path Actor path string to send to
* @param msg Message to send to all matching actors
* @param allButSelf Exclude the sending node from recipients
*/
case class SendToAll(path: String, msg: Any, allButSelf: Boolean = false)
extends DistributedPubSubMessage with WrappedMessage {
def this(path: String, msg: Any) = this(path, msg, allButSelf = false)
override def message: Any = msg
}Usage Example:
// Publishing messages
class NewsPublisher extends Actor {
val mediator = DistributedPubSub(context.system).mediator
def receive = {
case "breaking-news" =>
mediator ! Publish("news", "BREAKING: Major event occurred!")
case "weather-update" =>
// Send one message to each group (load balancing)
mediator ! Publish("weather", WeatherUpdate("sunny", 25), sendOneMessageToEachGroup = true)
case SendToWorker(task) =>
// Send to one worker (load balancing)
mediator ! Send("/user/worker", task, localAffinity = true)
case BroadcastToWorkers(announcement) =>
// Send to all workers
mediator ! SendToAll("/user/worker", announcement)
}
}Messages for registering and unregistering actors for Send/SendToAll operations.
/**
* Register an actor for Send and SendToAll messages.
* The actor will be reachable by its path string.
*/
case class Put(ref: ActorRef)
/**
* Remove a registered actor by its path string
*
* @param path The path string of the actor to remove
*/
case class Remove(path: String)Usage Example:
// Register worker for Send/SendToAll
class WorkerManager extends Actor {
val mediator = DistributedPubSub(context.system).mediator
override def preStart(): Unit = {
val worker = context.actorOf(Props[Worker](), "worker")
mediator ! Put(worker) // Register for Send/SendToAll
}
def receive = {
case "shutdown-worker" =>
mediator ! Remove("/user/worker-manager/worker")
}
}Messages for inspecting current topics and subscriber counts.
/**
* Send this message to the mediator to get current topics.
* Replies with CurrentTopics containing topic names.
*/
case object GetTopics extends GetTopics
/**
* Java API: Get singleton instance for GetTopics
*/
def getTopicsInstance: GetTopics = GetTopics
/**
* Reply to GetTopics request
*
* @param topics Set of currently known topic names
*/
case class CurrentTopics(topics: Set[String]) {
/**
* Java API: Get topics as Java Set
*/
def getTopics(): java.util.Set[String] = topics.asJava
}
/**
* Send this message to get count of subscribers (testing only)
*/
case object Count extends Count
/**
* Java API: Get singleton instance for Count
*/
def getCountInstance: Count = Count
/**
* Count subscribers for a specific topic
*
* @param topic Topic name to count subscribers for
*/
case class CountSubscribers(topic: String)Configuration settings for the mediator behavior.
/**
* Configuration settings for DistributedPubSubMediator
*
* @param role Start mediator on members tagged with this role. All members if undefined
* @param routingLogic The routing logic to use for Send messages
* @param gossipInterval How often the mediator sends out gossip information
* @param removedTimeToLive Removed entries are pruned after this duration
* @param maxDeltaElements Maximum elements to transfer in one gossip message
* @param sendToDeadLettersWhenNoSubscribers Send to dead letters when no subscribers
*/
final class DistributedPubSubSettings(
val role: Option[String],
val routingLogic: RoutingLogic,
val gossipInterval: FiniteDuration,
val removedTimeToLive: FiniteDuration,
val maxDeltaElements: Int,
val sendToDeadLettersWhenNoSubscribers: Boolean
) extends NoSerializationVerificationNeeded
object DistributedPubSubSettings {
/**
* Create settings from the default configuration akka.cluster.pub-sub
*/
def apply(system: ActorSystem): DistributedPubSubSettings
/**
* Create settings from configuration with same layout as default
*/
def apply(config: Config): DistributedPubSubSettings
/**
* Java API: Create settings from default configuration
*/
def create(system: ActorSystem): DistributedPubSubSettings
/**
* Java API: Create settings from configuration
*/
def create(config: Config): DistributedPubSubSettings
}Settings Methods:
// Configuration methods for DistributedPubSubSettings
def withRole(role: String): DistributedPubSubSettings
def withRole(role: Option[String]): DistributedPubSubSettings
def withRoutingLogic(routingLogic: RoutingLogic): DistributedPubSubSettings
def withGossipInterval(gossipInterval: FiniteDuration): DistributedPubSubSettings
def withRemovedTimeToLive(removedTimeToLive: FiniteDuration): DistributedPubSubSettings
def withMaxDeltaElements(maxDeltaElements: Int): DistributedPubSubSettings
def withSendToDeadLettersWhenNoSubscribers(sendToDeadLetters: Boolean): DistributedPubSubSettings// Publisher service
class EventPublisher extends Actor {
val mediator = DistributedPubSub(context.system).mediator
def receive = {
case UserRegistered(userId, email) =>
mediator ! Publish("user-events", UserRegisteredEvent(userId, email))
case OrderPlaced(orderId, userId, amount) =>
mediator ! Publish("order-events", OrderPlacedEvent(orderId, userId, amount))
}
}
// Subscriber services
class EmailService extends Actor {
val mediator = DistributedPubSub(context.system).mediator
override def preStart(): Unit = {
mediator ! Subscribe("user-events", self)
}
def receive = {
case SubscribeAK(Subscribe("user-events", _, _)) =>
log.info("Email service subscribed to user events")
case UserRegisteredEvent(userId, email) =>
sendWelcomeEmail(email)
}
}
class AnalyticsService extends Actor {
val mediator = DistributedPubSub(context.system).mediator
override def preStart(): Unit = {
mediator ! Subscribe("user-events", self)
mediator ! Subscribe("order-events", self)
}
def receive = {
case UserRegisteredEvent(userId, _) =>
recordUserMetric(userId)
case OrderPlacedEvent(orderId, userId, amount) =>
recordRevenueMetric(amount)
}
}// Worker pool with load balancing
class WorkerPool extends Actor {
val mediator = DistributedPubSub(context.system).mediator
override def preStart(): Unit = {
// Register multiple workers
(1 to 5).foreach { i =>
val worker = context.actorOf(Props[DataProcessor](), s"worker-$i")
mediator ! Put(worker)
}
}
def receive = {
case task: ProcessingTask =>
// Send to one available worker (load balanced)
mediator ! Send("/user/worker-pool/worker", task, localAffinity = true)
}
}
class TaskDispatcher extends Actor {
val mediator = DistributedPubSub(context.system).mediator
def receive = {
case batch: TaskBatch =>
batch.tasks.foreach { task =>
mediator ! Send("/user/worker-pool/worker", task)
}
}
}// Regional message distribution
class RegionalNewsService extends Actor {
val mediator = DistributedPubSub(context.system).mediator
override def preStart(): Unit = {
val region = context.system.settings.config.getString("app.region")
mediator ! Subscribe("regional-news", Some(region), self)
}
def receive = {
case RegionalNewsUpdate(region, news) =>
// Only one service per region gets this message
processRegionalNews(region, news)
}
}
class NewsDistributor extends Actor {
val mediator = DistributedPubSub(context.system).mediator
def receive = {
case RegionalUpdate(news) =>
// Send one message to each regional group
mediator ! Publish("regional-news", RegionalNewsUpdate("all", news), sendOneMessageToEachGroup = true)
}
}// Required imports for pub-sub functionality
import akka.actor.{Actor, ActorRef, ActorSystem, ExtendedActorSystem, ExtensionId}
import akka.cluster.pubsub.DistributedPubSubMessage
import akka.routing.RoutingLogic
import com.typesafe.config.Config
import scala.concurrent.duration.FiniteDuration
// Marker traits
trait DistributedPubSubMessage extends Serializable
trait WrappedMessage {
def message: Any
}
trait DeadLetterSuppression