DEPRECATED since Akka 2.6.0 - Use Akka gRPC instead for external cluster communication.
The Cluster Client pattern provides a way for external systems (not part of the cluster) to communicate with actors within an Akka cluster. It acts as a gateway, handling connection management, service discovery, and message routing to cluster actors.
// ⚠️ DEPRECATED: ClusterClient usage
@deprecated("Use Akka gRPC instead", since = "2.6.0")
val client = system.actorOf(ClusterClient.props(settings))
// ✅ RECOMMENDED: Use Akka gRPC for external communication
// See: https://doc.akka.io/docs/akka/current/cluster-client.html#migration-to-akka-grpcThe main client actor for external systems to communicate with cluster actors.
/**
* DEPRECATED: Use Akka gRPC instead
*
* Actor intended for use on external nodes not members of the cluster.
* Acts as gateway for sending messages to cluster actors via ClusterReceptionist.
*
* @param settings Configuration for connection and behavior
*/
@deprecated("Use Akka gRPC instead", since = "2.6.0")
final class ClusterClient(settings: ClusterClientSettings) extends Actor
@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ClusterClient {
/**
* Scala API: Factory method for ClusterClient Props
*/
def props(settings: ClusterClientSettings): Props
}Messages for different communication patterns with cluster actors.
/**
* Send message to one recipient with matching path in the cluster
*
* @param path Actor path string within the cluster
* @param msg Message to send
* @param localAffinity Prefer actors on same node as receptionist if available
*/
@deprecated("Use Akka gRPC instead", since = "2.6.0")
case class Send(path: String, msg: Any, localAffinity: Boolean) {
/**
* Convenience constructor with localAffinity false
*/
def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
}
/**
* Send message to all recipients with matching path in the cluster
*
* @param path Actor path string within the cluster
* @param msg Message to send to all matching actors
*/
@deprecated("Use Akka gRPC instead", since = "2.6.0")
case class SendToAll(path: String, msg: Any)
/**
* Publish message to all subscribers of a topic in the cluster
*
* @param topic Topic name to publish to
* @param msg Message to publish
*/
@deprecated("Use Akka gRPC instead", since = "2.6.0")
case class Publish(topic: String, msg: Any)Configuration for cluster client behavior and connection management.
/**
* DEPRECATED: Configuration settings for ClusterClient
*
* @param initialContacts Actor paths of ClusterReceptionist actors on servers
* @param establishingGetContactsInterval Retry interval for establishing contact
* @param refreshContactsInterval How often to ask for new contact points
* @param heartbeatInterval How often to send heartbeat messages
* @param acceptableHeartbeatPause Acceptable heartbeat pause before failure detection
* @param bufferSize Number of messages to buffer when connection unavailable
* @param reconnectTimeout Timeout for connection re-establishment attempts
*/
@deprecated("Use Akka gRPC instead", since = "2.6.0")
final class ClusterClientSettings(
val initialContacts: Set[ActorPath],
val establishingGetContactsInterval: FiniteDuration,
val refreshContactsInterval: FiniteDuration,
val heartbeatInterval: FiniteDuration,
val acceptableHeartbeatPause: FiniteDuration,
val bufferSize: Int,
val reconnectTimeout: Option[FiniteDuration]
) extends NoSerializationVerificationNeeded
@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ClusterClientSettings {
/**
* Create settings from default configuration akka.cluster.client
*/
def apply(system: ActorSystem): ClusterClientSettings
/**
* Create settings from configuration
*/
def apply(config: Config): ClusterClientSettings
/**
* Java API: Create settings from default configuration
*/
def create(system: ActorSystem): ClusterClientSettings
/**
* Java API: Create settings from configuration
*/
def create(config: Config): ClusterClientSettings
}Settings Methods:
// Configuration methods for ClusterClientSettings
def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings
def withInitialContacts(initialContacts: java.util.Set[ActorPath]): ClusterClientSettings // Java API
def withEstablishingGetContactsInterval(interval: FiniteDuration): ClusterClientSettings
def withRefreshContactsInterval(interval: FiniteDuration): ClusterClientSettings
def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration): ClusterClientSettings
def withBufferSize(bufferSize: Int): ClusterClientSettings
def withReconnectTimeout(reconnectTimeout: Option[FiniteDuration]): ClusterClientSettingsMessages and events for managing cluster contact points.
/**
* Subscribe to contact point changes. Sender receives initial state
* and subsequent change events.
*/
case object SubscribeContactPoints extends SubscribeContactPoints {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Explicitly unsubscribe from contact point change events
*/
case object UnsubscribeContactPoints extends UnsubscribeContactPoints {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Get the contact points known to this client. Replies with ContactPoints.
*/
case object GetContactPoints extends GetContactPoints {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Reply to GetContactPoints containing current known contact points
*
* @param contactPoints Set of currently known contact point paths
*/
case class ContactPoints(contactPoints: Set[ActorPath]) {
/**
* Java API: Get contact points as Java Set
*/
def getContactPoints: java.util.Set[ActorPath] = contactPoints.asJava
}
/**
* Event emitted when a new contact point is discovered
*/
case class ContactPointAdded(override val contactPoint: ActorPath) extends ContactPointChange
/**
* Event emitted when a contact point is removed
*/
case class ContactPointRemoved(override val contactPoint: ActorPath) extends ContactPointChange
/**
* Base trait for contact point change events
*/
sealed trait ContactPointChange {
val contactPoint: ActorPath
}Extension for managing the server-side receptionist that handles client connections.
/**
* DEPRECATED: Extension that starts ClusterReceptionist and DistributedPubSubMediator
* with settings from akka.cluster.client.receptionist config section
*/
@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] {
def get(system: ActorSystem): ClusterClientReceptionist
def get(system: ClassicActorSystemProvider): ClusterClientReceptionist
}
@deprecated("Use Akka gRPC instead", since = "2.6.0")
final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension {
/**
* Register actor that should be reachable for clients.
* Clients can send messages using Send or SendToAll with actor's path.
*/
def registerService(actor: ActorRef): Unit
/**
* Unregister a previously registered service actor
*/
def unregisterService(actor: ActorRef): Unit
/**
* Register actor as subscriber to a named topic.
* Multiple actors can subscribe to same topic.
*/
def registerSubscriber(topic: String, actor: ActorRef): Unit
/**
* Unregister topic subscriber
*/
def unregisterSubscriber(topic: String, actor: ActorRef): Unit
/**
* Get the underlying ClusterReceptionist actor reference for events
*/
def underlying: ActorRef
/**
* Returns true if receptionist is terminated (wrong role, etc.)
*/
def isTerminated: Boolean
}The server-side actor that handles client connections and message routing.
/**
* DEPRECATED: Server-side component that ClusterClient connects to.
* Forwards messages to DistributedPubSubMediator and handles client lifecycle.
*
* @param pubSubMediator Reference to DistributedPubSubMediator for message routing
* @param settings Configuration for receptionist behavior
*/
@deprecated("Use Akka gRPC instead", since = "2.6.0")
final class ClusterReceptionist(
pubSubMediator: ActorRef,
settings: ClusterReceptionistSettings
) extends Actor
@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ClusterReceptionist {
/**
* Scala API: Factory method for ClusterReceptionist Props
*/
def props(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings): Props
}Configuration for the server-side receptionist behavior.
/**
* DEPRECATED: Configuration settings for ClusterReceptionist
*
* @param role Start receptionist on members tagged with this role
* @param numberOfContacts Number of contact points to send to clients
* @param responseTunnelReceiveTimeout Timeout for response tunnel actors
*/
@deprecated("Use Akka gRPC instead", since = "2.6.0")
final class ClusterReceptionistSettings(
val role: Option[String],
val numberOfContacts: Int,
val responseTunnelReceiveTimeout: FiniteDuration
) extends NoSerializationVerificationNeeded
@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ClusterReceptionistSettings {
/**
* Create settings from default configuration akka.cluster.client.receptionist
*/
def apply(system: ActorSystem): ClusterReceptionistSettings
/**
* Create settings from configuration
*/
def apply(config: Config): ClusterReceptionistSettings
/**
* Java API factory methods
*/
def create(system: ActorSystem): ClusterReceptionistSettings
def create(config: Config): ClusterReceptionistSettings
}Settings Methods:
// Configuration methods for ClusterReceptionistSettings
def withRole(role: String): ClusterReceptionistSettings
def withRole(role: Option[String]): ClusterReceptionistSettings
def withNumberOfContacts(numberOfContacts: Int): ClusterReceptionistSettings
def withResponseTunnelReceiveTimeout(timeout: FiniteDuration): ClusterReceptionistSettings
def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration, failureDetectionInterval: FiniteDuration): ClusterReceptionistSettingsEvents for monitoring cluster client connections.
/**
* Base trait for cluster client interaction events
*/
sealed trait ClusterClientInteraction {
val clusterClient: ActorRef
}
/**
* Event emitted when a cluster client connects to receptionist
*/
case class ClusterClientUp(override val clusterClient: ActorRef) extends ClusterClientInteraction
/**
* Event emitted when cluster client becomes unreachable
*/
case class ClusterClientUnreachable(override val clusterClient: ActorRef) extends ClusterClientInteraction
/**
* Subscribe to cluster client interaction events
*/
case object SubscribeClusterClients extends SubscribeClusterClients {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Unsubscribe from cluster client interaction events
*/
case object UnsubscribeClusterClients extends UnsubscribeClusterClients {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Get currently connected cluster clients. Replies with ClusterClients.
*/
case object GetClusterClients extends GetClusterClients {
/**
* Java API: get the singleton instance
*/
def getInstance = this
}
/**
* Reply to GetClusterClients containing current client connections
*
* @param clusterClients Set of currently connected client actor references
*/
case class ClusterClients(clusterClients: Set[ActorRef]) {
/**
* Java API: Get cluster clients as Java Set
*/
def getClusterClients: java.util.Set[ActorRef] = clusterClients.asJava
}@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ExternalClientApp extends App {
implicit val system = ActorSystem("external-client")
val initialContacts = Set(
ActorPath.fromString("akka://cluster-system@127.0.0.1:2551/system/receptionist"),
ActorPath.fromString("akka://cluster-system@127.0.0.1:2552/system/receptionist")
)
val settings = ClusterClientSettings(system)
.withInitialContacts(initialContacts)
.withBufferSize(1000)
.withReconnectTimeout(Some(30.seconds))
val client = system.actorOf(ClusterClient.props(settings), "cluster-client")
// Send to specific actor
client ! ClusterClient.Send("/user/worker", ProcessJob("data"))
// Publish to topic
client ! ClusterClient.Publish("notifications", Alert("System maintenance scheduled"))
// Broadcast to all matching actors
client ! ClusterClient.SendToAll("/user/cache", ClearCache)
}@deprecated("Use Akka gRPC instead", since = "2.6.0")
class ClusterWorker extends Actor {
val receptionist = ClusterClientReceptionist(context.system)
override def preStart(): Unit = {
// Register this actor for client access
receptionist.registerService(self)
// Subscribe to notifications topic
receptionist.registerSubscriber("notifications", self)
}
def receive = {
case ProcessJob(data) =>
val result = processData(data)
sender() ! JobResult(result)
case Alert(message) =>
log.warning(s"Received alert: $message")
}
}// Required imports for cluster client functionality (deprecated)
import akka.actor.{Actor, ActorPath, ActorRef, ActorSystem, ExtendedActorSystem}
import akka.cluster.client.ClusterClientMessage
import com.typesafe.config.Config
import scala.concurrent.duration.FiniteDuration
/**
* Marker trait for cluster client messages with special serializer
*/
sealed trait ClusterClientMessage extends SerializableFor new projects, use Akka gRPC instead of ClusterClient:
// Instead of ClusterClient, use Akka gRPC service definitions
syntax = "proto3";
service WorkerService {
rpc ProcessJob(JobRequest) returns (JobResponse);
rpc GetStatus(StatusRequest) returns (StatusResponse);
}
// Generate Scala classes and implement gRPC services
class WorkerServiceImpl extends WorkerService {
override def processJob(request: JobRequest): Future[JobResponse] = {
// Process job and return response
Future.successful(JobResponse(result = "processed"))
}
}See the Akka gRPC documentation for migration guidance.