or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

cluster-client.mdcluster-singleton.mddistributed-pubsub.mdindex.md
tile.json

cluster-client.mddocs/

Cluster Client (Deprecated)

DEPRECATED since Akka 2.6.0 - Use Akka gRPC instead for external cluster communication.

The Cluster Client pattern provides a way for external systems (not part of the cluster) to communicate with actors within an Akka cluster. It acts as a gateway, handling connection management, service discovery, and message routing to cluster actors.

Migration Notice

// ⚠️ DEPRECATED: ClusterClient usage
@deprecated("Use Akka gRPC instead", since = "2.6.0")
val client = system.actorOf(ClusterClient.props(settings))

// ✅ RECOMMENDED: Use Akka gRPC for external communication
// See: https://doc.akka.io/docs/akka/current/cluster-client.html#migration-to-akka-grpc

Capabilities

ClusterClient

The main client actor for external systems to communicate with cluster actors.

/**
 * DEPRECATED: Use Akka gRPC instead
 * 
 * Actor intended for use on external nodes not members of the cluster.
 * Acts as gateway for sending messages to cluster actors via ClusterReceptionist.
 * 
 * @param settings Configuration for connection and behavior
 */
@deprecated("Use Akka gRPC instead", since = "2.6.0")
final class ClusterClient(settings: ClusterClientSettings) extends Actor

@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ClusterClient {
  /**
   * Scala API: Factory method for ClusterClient Props
   */
  def props(settings: ClusterClientSettings): Props
}

ClusterClient Message Types

Messages for different communication patterns with cluster actors.

/**
 * Send message to one recipient with matching path in the cluster
 * 
 * @param path Actor path string within the cluster
 * @param msg Message to send
 * @param localAffinity Prefer actors on same node as receptionist if available
 */
@deprecated("Use Akka gRPC instead", since = "2.6.0")
case class Send(path: String, msg: Any, localAffinity: Boolean) {
  /**
   * Convenience constructor with localAffinity false
   */
  def this(path: String, msg: Any) = this(path, msg, localAffinity = false)
}

/**
 * Send message to all recipients with matching path in the cluster
 * 
 * @param path Actor path string within the cluster  
 * @param msg Message to send to all matching actors
 */
@deprecated("Use Akka gRPC instead", since = "2.6.0")
case class SendToAll(path: String, msg: Any)

/**
 * Publish message to all subscribers of a topic in the cluster
 * 
 * @param topic Topic name to publish to
 * @param msg Message to publish
 */
@deprecated("Use Akka gRPC instead", since = "2.6.0")
case class Publish(topic: String, msg: Any)

ClusterClientSettings

Configuration for cluster client behavior and connection management.

/**
 * DEPRECATED: Configuration settings for ClusterClient
 * 
 * @param initialContacts Actor paths of ClusterReceptionist actors on servers
 * @param establishingGetContactsInterval Retry interval for establishing contact
 * @param refreshContactsInterval How often to ask for new contact points
 * @param heartbeatInterval How often to send heartbeat messages
 * @param acceptableHeartbeatPause Acceptable heartbeat pause before failure detection
 * @param bufferSize Number of messages to buffer when connection unavailable
 * @param reconnectTimeout Timeout for connection re-establishment attempts
 */
@deprecated("Use Akka gRPC instead", since = "2.6.0")
final class ClusterClientSettings(
  val initialContacts: Set[ActorPath],
  val establishingGetContactsInterval: FiniteDuration,
  val refreshContactsInterval: FiniteDuration,
  val heartbeatInterval: FiniteDuration,
  val acceptableHeartbeatPause: FiniteDuration,
  val bufferSize: Int,
  val reconnectTimeout: Option[FiniteDuration]
) extends NoSerializationVerificationNeeded

@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ClusterClientSettings {
  /**
   * Create settings from default configuration akka.cluster.client
   */
  def apply(system: ActorSystem): ClusterClientSettings
  
  /**
   * Create settings from configuration
   */
  def apply(config: Config): ClusterClientSettings
  
  /**
   * Java API: Create settings from default configuration
   */
  def create(system: ActorSystem): ClusterClientSettings
  
  /**
   * Java API: Create settings from configuration
   */
  def create(config: Config): ClusterClientSettings
}

Settings Methods:

// Configuration methods for ClusterClientSettings
def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings
def withInitialContacts(initialContacts: java.util.Set[ActorPath]): ClusterClientSettings // Java API
def withEstablishingGetContactsInterval(interval: FiniteDuration): ClusterClientSettings
def withRefreshContactsInterval(interval: FiniteDuration): ClusterClientSettings
def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration): ClusterClientSettings
def withBufferSize(bufferSize: Int): ClusterClientSettings
def withReconnectTimeout(reconnectTimeout: Option[FiniteDuration]): ClusterClientSettings

Contact Point Management

Messages and events for managing cluster contact points.

/**
 * Subscribe to contact point changes. Sender receives initial state
 * and subsequent change events.
 */
case object SubscribeContactPoints extends SubscribeContactPoints {
  /**
   * Java API: get the singleton instance
   */
  def getInstance = this
}

/**
 * Explicitly unsubscribe from contact point change events
 */
case object UnsubscribeContactPoints extends UnsubscribeContactPoints {
  /**
   * Java API: get the singleton instance  
   */
  def getInstance = this
}

/**
 * Get the contact points known to this client. Replies with ContactPoints.
 */
case object GetContactPoints extends GetContactPoints {
  /**
   * Java API: get the singleton instance
   */
  def getInstance = this
}

/**
 * Reply to GetContactPoints containing current known contact points
 * 
 * @param contactPoints Set of currently known contact point paths
 */
case class ContactPoints(contactPoints: Set[ActorPath]) {
  /**
   * Java API: Get contact points as Java Set
   */
  def getContactPoints: java.util.Set[ActorPath] = contactPoints.asJava
}

/**
 * Event emitted when a new contact point is discovered
 */
case class ContactPointAdded(override val contactPoint: ActorPath) extends ContactPointChange

/**
 * Event emitted when a contact point is removed
 */
case class ContactPointRemoved(override val contactPoint: ActorPath) extends ContactPointChange

/**
 * Base trait for contact point change events
 */
sealed trait ContactPointChange {
  val contactPoint: ActorPath
}

ClusterClientReceptionist Extension

Extension for managing the server-side receptionist that handles client connections.

/**
 * DEPRECATED: Extension that starts ClusterReceptionist and DistributedPubSubMediator
 * with settings from akka.cluster.client.receptionist config section
 */
@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ClusterClientReceptionist extends ExtensionId[ClusterClientReceptionist] {
  def get(system: ActorSystem): ClusterClientReceptionist
  def get(system: ClassicActorSystemProvider): ClusterClientReceptionist
}

@deprecated("Use Akka gRPC instead", since = "2.6.0")
final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Extension {
  /**
   * Register actor that should be reachable for clients.
   * Clients can send messages using Send or SendToAll with actor's path.
   */
  def registerService(actor: ActorRef): Unit
  
  /**
   * Unregister a previously registered service actor
   */
  def unregisterService(actor: ActorRef): Unit
  
  /**
   * Register actor as subscriber to a named topic.
   * Multiple actors can subscribe to same topic.
   */
  def registerSubscriber(topic: String, actor: ActorRef): Unit
  
  /**
   * Unregister topic subscriber
   */
  def unregisterSubscriber(topic: String, actor: ActorRef): Unit
  
  /**
   * Get the underlying ClusterReceptionist actor reference for events
   */
  def underlying: ActorRef
  
  /**
   * Returns true if receptionist is terminated (wrong role, etc.)
   */
  def isTerminated: Boolean
}

ClusterReceptionist

The server-side actor that handles client connections and message routing.

/**
 * DEPRECATED: Server-side component that ClusterClient connects to.
 * Forwards messages to DistributedPubSubMediator and handles client lifecycle.
 * 
 * @param pubSubMediator Reference to DistributedPubSubMediator for message routing
 * @param settings Configuration for receptionist behavior
 */
@deprecated("Use Akka gRPC instead", since = "2.6.0")
final class ClusterReceptionist(
  pubSubMediator: ActorRef,
  settings: ClusterReceptionistSettings
) extends Actor

@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ClusterReceptionist {
  /**
   * Scala API: Factory method for ClusterReceptionist Props
   */
  def props(pubSubMediator: ActorRef, settings: ClusterReceptionistSettings): Props
}

ClusterReceptionistSettings

Configuration for the server-side receptionist behavior.

/**
 * DEPRECATED: Configuration settings for ClusterReceptionist
 * 
 * @param role Start receptionist on members tagged with this role
 * @param numberOfContacts Number of contact points to send to clients
 * @param responseTunnelReceiveTimeout Timeout for response tunnel actors
 */
@deprecated("Use Akka gRPC instead", since = "2.6.0")
final class ClusterReceptionistSettings(
  val role: Option[String],
  val numberOfContacts: Int,
  val responseTunnelReceiveTimeout: FiniteDuration
) extends NoSerializationVerificationNeeded

@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ClusterReceptionistSettings {
  /**
   * Create settings from default configuration akka.cluster.client.receptionist
   */
  def apply(system: ActorSystem): ClusterReceptionistSettings
  
  /**
   * Create settings from configuration
   */
  def apply(config: Config): ClusterReceptionistSettings
  
  /**
   * Java API factory methods
   */
  def create(system: ActorSystem): ClusterReceptionistSettings
  def create(config: Config): ClusterReceptionistSettings
}

Settings Methods:

// Configuration methods for ClusterReceptionistSettings
def withRole(role: String): ClusterReceptionistSettings
def withRole(role: Option[String]): ClusterReceptionistSettings  
def withNumberOfContacts(numberOfContacts: Int): ClusterReceptionistSettings
def withResponseTunnelReceiveTimeout(timeout: FiniteDuration): ClusterReceptionistSettings
def withHeartbeat(heartbeatInterval: FiniteDuration, acceptableHeartbeatPause: FiniteDuration, failureDetectionInterval: FiniteDuration): ClusterReceptionistSettings

Client Interaction Events

Events for monitoring cluster client connections.

/**
 * Base trait for cluster client interaction events  
 */
sealed trait ClusterClientInteraction {
  val clusterClient: ActorRef
}

/**
 * Event emitted when a cluster client connects to receptionist
 */
case class ClusterClientUp(override val clusterClient: ActorRef) extends ClusterClientInteraction

/**
 * Event emitted when cluster client becomes unreachable
 */
case class ClusterClientUnreachable(override val clusterClient: ActorRef) extends ClusterClientInteraction

/**
 * Subscribe to cluster client interaction events
 */
case object SubscribeClusterClients extends SubscribeClusterClients {
  /**
   * Java API: get the singleton instance
   */
  def getInstance = this
}

/**
 * Unsubscribe from cluster client interaction events
 */
case object UnsubscribeClusterClients extends UnsubscribeClusterClients {
  /**
   * Java API: get the singleton instance
   */
  def getInstance = this
}

/**
 * Get currently connected cluster clients. Replies with ClusterClients.
 */
case object GetClusterClients extends GetClusterClients {
  /**
   * Java API: get the singleton instance
   */
  def getInstance = this
}

/**
 * Reply to GetClusterClients containing current client connections
 * 
 * @param clusterClients Set of currently connected client actor references
 */
case class ClusterClients(clusterClients: Set[ActorRef]) {
  /**
   * Java API: Get cluster clients as Java Set
   */
  def getClusterClients: java.util.Set[ActorRef] = clusterClients.asJava
}

Usage Examples (Deprecated)

Basic Client Setup

@deprecated("Use Akka gRPC instead", since = "2.6.0")
object ExternalClientApp extends App {
  implicit val system = ActorSystem("external-client")
  
  val initialContacts = Set(
    ActorPath.fromString("akka://cluster-system@127.0.0.1:2551/system/receptionist"),
    ActorPath.fromString("akka://cluster-system@127.0.0.1:2552/system/receptionist")
  )
  
  val settings = ClusterClientSettings(system)
    .withInitialContacts(initialContacts)
    .withBufferSize(1000)
    .withReconnectTimeout(Some(30.seconds))
  
  val client = system.actorOf(ClusterClient.props(settings), "cluster-client")
  
  // Send to specific actor
  client ! ClusterClient.Send("/user/worker", ProcessJob("data"))
  
  // Publish to topic
  client ! ClusterClient.Publish("notifications", Alert("System maintenance scheduled"))
  
  // Broadcast to all matching actors
  client ! ClusterClient.SendToAll("/user/cache", ClearCache)
}

Server-Side Service Registration

@deprecated("Use Akka gRPC instead", since = "2.6.0")
class ClusterWorker extends Actor {
  val receptionist = ClusterClientReceptionist(context.system)
  
  override def preStart(): Unit = {
    // Register this actor for client access
    receptionist.registerService(self)
    
    // Subscribe to notifications topic
    receptionist.registerSubscriber("notifications", self)
  }
  
  def receive = {
    case ProcessJob(data) =>
      val result = processData(data)
      sender() ! JobResult(result)
    
    case Alert(message) =>
      log.warning(s"Received alert: $message")
  }
}

Types

// Required imports for cluster client functionality (deprecated)
import akka.actor.{Actor, ActorPath, ActorRef, ActorSystem, ExtendedActorSystem}
import akka.cluster.client.ClusterClientMessage
import com.typesafe.config.Config
import scala.concurrent.duration.FiniteDuration

/**
 * Marker trait for cluster client messages with special serializer
 */
sealed trait ClusterClientMessage extends Serializable

Migration to Akka gRPC

For new projects, use Akka gRPC instead of ClusterClient:

// Instead of ClusterClient, use Akka gRPC service definitions
syntax = "proto3";

service WorkerService {
  rpc ProcessJob(JobRequest) returns (JobResponse);
  rpc GetStatus(StatusRequest) returns (StatusResponse);
}

// Generate Scala classes and implement gRPC services
class WorkerServiceImpl extends WorkerService {
  override def processJob(request: JobRequest): Future[JobResponse] = {
    // Process job and return response
    Future.successful(JobResponse(result = "processed"))
  }
}

See the Akka gRPC documentation for migration guidance.