Core analysis engine and storage abstractions for Apache SkyWalking observability platform
—
The SkyWalking remote communication layer enables distributed processing across multiple OAP (Observability Analysis Platform) nodes through gRPC-based clustering. It provides load balancing, routing strategies, and serialization mechanisms for horizontal scaling and high availability deployments.
gRPC client service for inter-node communication with configurable routing strategies.
public class RemoteSenderService implements Service {
/**
* Sends stream data to remote OAP node with routing strategy
* @param nextWorkName Target worker name on remote node
* @param streamData Stream data to transmit
* @param selector Routing selector for node selection
* @throws RemoteException If transmission fails
*/
public void send(String nextWorkName, StreamData streamData, Selector selector)
throws RemoteException;
/**
* Sends data to specific remote address
* @param remoteAddress Target remote address
* @param nextWorkName Target worker name
* @param streamData Stream data to transmit
* @throws RemoteException If transmission fails
*/
public void send(RemoteAddress remoteAddress, String nextWorkName, StreamData streamData)
throws RemoteException;
/**
* Sends data with timeout configuration
* @param nextWorkName Target worker name
* @param streamData Stream data to transmit
* @param selector Routing selector
* @param timeoutSeconds Transmission timeout in seconds
* @throws RemoteException If transmission fails or times out
*/
public void sendWithTimeout(String nextWorkName, StreamData streamData,
Selector selector, int timeoutSeconds) throws RemoteException;
/**
* Gets available remote addresses
* @return List of configured remote addresses
*/
public List<RemoteAddress> getRemoteAddresses();
/**
* Checks if remote service is available
* @param remoteAddress Remote address to check
* @return True if remote service is reachable
*/
public boolean isAvailable(RemoteAddress remoteAddress);
}Handles incoming remote service requests and routes them to appropriate workers.
public class RemoteServiceHandler {
/**
* Handles incoming remote stream data
* @param streamData Received stream data
* @param nextWorkerName Target worker name for processing
* @throws RemoteException If handling fails
*/
public void handle(StreamData streamData, String nextWorkerName) throws RemoteException;
/**
* Registers worker for remote request handling
* @param workerName Worker identifier
* @param worker Worker instance to handle requests
*/
public void registerWorker(String workerName, AbstractWorker<?> worker);
/**
* Unregisters worker from remote handling
* @param workerName Worker identifier to remove
*/
public void unregisterWorker(String workerName);
/**
* Gets registered worker by name
* @param workerName Worker identifier
* @return Worker instance or null if not found
*/
public AbstractWorker<?> getWorker(String workerName);
}Base interface for routing strategy selection.
public interface Selector {
/**
* Selects remote address based on routing strategy
* @param remoteAddresses Available remote addresses
* @param data Data being routed (for hash-based selection)
* @return Selected remote address
*/
RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);
/**
* Gets selector type identifier
* @return Selector type name
*/
String getType();
}Routes data based on hash code for consistent routing.
public class HashCodeSelector implements Selector {
/**
* Selects remote address using data hash code modulo
* @param remoteAddresses Available remote addresses
* @param data Data to route (hash code used for selection)
* @return Selected address based on hash distribution
*/
@Override
public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);
/**
* Gets hash code from stream data for routing
* @param data Stream data
* @return Hash code for routing decision
*/
protected int getHashCode(StreamData data);
}Always selects the first available remote address.
public class ForeverFirstSelector implements Selector {
/**
* Always selects first address from list
* @param remoteAddresses Available remote addresses
* @param data Data being routed (ignored)
* @return First remote address in list
*/
@Override
public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);
}Round-robin selection across available remote addresses.
public class RollingSelector implements Selector {
private AtomicInteger index;
/**
* Selects remote address using round-robin strategy
* @param remoteAddresses Available remote addresses
* @param data Data being routed (ignored)
* @return Next address in round-robin sequence
*/
@Override
public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);
/**
* Resets rolling counter
*/
public void reset();
}Marker interface for remote-serializable data.
public interface Serializable {
/**
* Serializes object to byte array for remote transmission
* @return Serialized byte array
* @throws SerializationException If serialization fails
*/
byte[] serialize() throws SerializationException;
/**
* Gets serialization version for compatibility
* @return Serialization version number
*/
int getSerializationVersion();
}Marker interface for remote-deserializable data.
public interface Deserializable {
/**
* Deserializes object from byte array
* @param data Serialized byte array
* @throws DeserializationException If deserialization fails
*/
void deserialize(byte[] data) throws DeserializationException;
/**
* Gets deserialization version for compatibility
* @return Deserialization version number
*/
int getDeserializationVersion();
/**
* Checks if version is compatible for deserialization
* @param version Incoming serialization version
* @return True if version is compatible
*/
boolean isCompatible(int version);
}Base class for data transmitted between remote nodes.
public abstract class StreamData implements Serializable, Deserializable {
protected long timestamp;
protected String remoteAddress;
/**
* Gets data timestamp
* @return Timestamp in milliseconds
*/
public long getTimestamp();
/**
* Sets data timestamp
* @param timestamp Timestamp in milliseconds
*/
public void setTimestamp(long timestamp);
/**
* Gets originating remote address
* @return Remote address string
*/
public String getRemoteAddress();
/**
* Sets originating remote address
* @param remoteAddress Remote address string
*/
public void setRemoteAddress(String remoteAddress);
/**
* Gets unique identifier for routing
* @return String identifier for consistent routing
*/
public abstract String id();
/**
* Gets stream data type identifier
* @return Data type for worker routing
*/
public abstract int remoteHashCode();
}Represents remote OAP node address configuration.
public class RemoteAddress {
private String host;
private int port;
private boolean selfAddress;
/**
* Creates remote address configuration
* @param host Remote host address
* @param port Remote port number
*/
public RemoteAddress(String host, int port);
/**
* Creates remote address with self-identification
* @param host Remote host address
* @param port Remote port number
* @param selfAddress True if this represents current node
*/
public RemoteAddress(String host, int port, boolean selfAddress);
/**
* Gets host address
* @return Host string
*/
public String getHost();
/**
* Gets port number
* @return Port number
*/
public int getPort();
/**
* Checks if this is current node address
* @return True if self address
*/
public boolean isSelfAddress();
/**
* Gets full address string
* @return "host:port" format
*/
public String getAddress();
@Override
public boolean equals(Object obj);
@Override
public int hashCode();
@Override
public String toString();
}Configuration for remote service connectivity.
public class RemoteConfiguration {
private List<RemoteAddress> remoteAddresses;
private int connectionTimeout;
private int requestTimeout;
private int maxRetries;
private boolean enableHeartbeat;
private int heartbeatInterval;
/**
* Gets configured remote addresses
* @return List of remote addresses
*/
public List<RemoteAddress> getRemoteAddresses();
/**
* Sets remote addresses
* @param remoteAddresses List of remote addresses
*/
public void setRemoteAddresses(List<RemoteAddress> remoteAddresses);
/**
* Gets connection timeout
* @return Connection timeout in milliseconds
*/
public int getConnectionTimeout();
/**
* Gets request timeout
* @return Request timeout in milliseconds
*/
public int getRequestTimeout();
/**
* Gets maximum retry attempts
* @return Maximum retries
*/
public int getMaxRetries();
/**
* Checks if heartbeat is enabled
* @return True if heartbeat enabled
*/
public boolean isEnableHeartbeat();
/**
* Gets heartbeat interval
* @return Heartbeat interval in seconds
*/
public int getHeartbeatInterval();
}Base exception for remote communication errors.
public class RemoteException extends Exception {
/**
* Creates remote exception with message
* @param message Error message
*/
public RemoteException(String message);
/**
* Creates remote exception with message and cause
* @param message Error message
* @param cause Underlying cause
*/
public RemoteException(String message, Throwable cause);
}Exception for data serialization errors.
public class SerializationException extends RemoteException {
/**
* Creates serialization exception
* @param message Error message
*/
public SerializationException(String message);
/**
* Creates serialization exception with cause
* @param message Error message
* @param cause Underlying cause
*/
public SerializationException(String message, Throwable cause);
}Exception for data deserialization errors.
public class DeserializationException extends RemoteException {
/**
* Creates deserialization exception
* @param message Error message
*/
public DeserializationException(String message);
/**
* Creates deserialization exception with cause
* @param message Error message
* @param cause Underlying cause
*/
public DeserializationException(String message, Throwable cause);
}gRPC service definitions for remote communication.
public class RemoteServiceGrpc {
/**
* gRPC stub for remote service calls
*/
public static class RemoteServiceStub {
/**
* Sends stream data to remote node
* @param request Stream data request
* @param responseObserver Response observer for async handling
*/
public void call(StreamDataRequest request,
StreamObserver<StreamDataResponse> responseObserver);
}
/**
* gRPC blocking stub for synchronous calls
*/
public static class RemoteServiceBlockingStub {
/**
* Sends stream data synchronously
* @param request Stream data request
* @return Stream data response
*/
public StreamDataResponse call(StreamDataRequest request);
}
/**
* Service implementation base class
*/
public static abstract class RemoteServiceImplBase implements BindableService {
/**
* Handles incoming remote calls
* @param request Stream data request
* @param responseObserver Response observer
*/
public abstract void call(StreamDataRequest request,
StreamObserver<StreamDataResponse> responseObserver);
}
}// Configure remote addresses
List<RemoteAddress> remoteAddresses = Arrays.asList(
new RemoteAddress("oap-node-1", 11800),
new RemoteAddress("oap-node-2", 11800),
new RemoteAddress("oap-node-3", 11800)
);
RemoteConfiguration config = new RemoteConfiguration();
config.setRemoteAddresses(remoteAddresses);
config.setConnectionTimeout(5000);
config.setRequestTimeout(10000);
config.setMaxRetries(3);
config.setEnableHeartbeat(true);
config.setHeartbeatInterval(30);
// Initialize remote sender service
RemoteSenderService remoteSender = new RemoteSenderService();
remoteSender.initialize(config);// Hash-based routing for consistent distribution
Selector hashSelector = new HashCodeSelector();
remoteSender.send("MetricsAggregateWorker", metricsData, hashSelector);
// Round-robin routing for load balancing
Selector rollingSelector = new RollingSelector();
remoteSender.send("RecordPersistentWorker", recordData, rollingSelector);
// Always send to first available node
Selector firstSelector = new ForeverFirstSelector();
remoteSender.send("ManagementWorker", managementData, firstSelector);
// Send to specific remote address
RemoteAddress specificNode = new RemoteAddress("oap-primary", 11800);
remoteSender.send(specificNode, "PriorityWorker", criticalData);public class CustomTelemetryData extends StreamData {
private String serviceName;
private String operationName;
private long duration;
private Map<String, String> tags;
@Override
public String id() {
// Create unique ID for routing consistency
return serviceName + ":" + operationName;
}
@Override
public int remoteHashCode() {
// Hash code for routing decisions
return Objects.hash(serviceName, operationName);
}
@Override
public byte[] serialize() throws SerializationException {
try {
// Serialize to protobuf or other format
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
dos.writeUTF(serviceName);
dos.writeUTF(operationName);
dos.writeLong(duration);
dos.writeInt(tags.size());
for (Map.Entry<String, String> entry : tags.entrySet()) {
dos.writeUTF(entry.getKey());
dos.writeUTF(entry.getValue());
}
return baos.toByteArray();
} catch (IOException e) {
throw new SerializationException("Failed to serialize custom telemetry data", e);
}
}
@Override
public void deserialize(byte[] data) throws DeserializationException {
try {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
DataInputStream dis = new DataInputStream(bais);
this.serviceName = dis.readUTF();
this.operationName = dis.readUTF();
this.duration = dis.readLong();
int tagCount = dis.readInt();
this.tags = new HashMap<>();
for (int i = 0; i < tagCount; i++) {
String key = dis.readUTF();
String value = dis.readUTF();
tags.put(key, value);
}
} catch (IOException e) {
throw new DeserializationException("Failed to deserialize custom telemetry data", e);
}
}
@Override
public int getSerializationVersion() {
return 1;
}
@Override
public int getDeserializationVersion() {
return 1;
}
@Override
public boolean isCompatible(int version) {
return version <= getDeserializationVersion();
}
// Getters and setters
public String getServiceName() { return serviceName; }
public void setServiceName(String serviceName) { this.serviceName = serviceName; }
public String getOperationName() { return operationName; }
public void setOperationName(String operationName) { this.operationName = operationName; }
public long getDuration() { return duration; }
public void setDuration(long duration) { this.duration = duration; }
public Map<String, String> getTags() { return tags; }
public void setTags(Map<String, String> tags) { this.tags = tags; }
}public class GeographicSelector implements Selector {
private final String preferredRegion;
private final Map<RemoteAddress, String> addressRegions;
public GeographicSelector(String preferredRegion,
Map<RemoteAddress, String> addressRegions) {
this.preferredRegion = preferredRegion;
this.addressRegions = addressRegions;
}
@Override
public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data) {
// First, try to find addresses in preferred region
List<RemoteAddress> preferredAddresses = remoteAddresses.stream()
.filter(addr -> preferredRegion.equals(addressRegions.get(addr)))
.collect(Collectors.toList());
if (!preferredAddresses.isEmpty()) {
// Use hash-based selection within preferred region
int hash = Math.abs(data.remoteHashCode());
int index = hash % preferredAddresses.size();
return preferredAddresses.get(index);
}
// Fall back to any available address
if (!remoteAddresses.isEmpty()) {
int hash = Math.abs(data.remoteHashCode());
int index = hash % remoteAddresses.size();
return remoteAddresses.get(index);
}
return null;
}
@Override
public String getType() {
return "geographic";
}
}@Component
public class CustomRemoteServiceHandler extends RemoteServiceHandler {
@Override
public void handle(StreamData streamData, String nextWorkerName) throws RemoteException {
try {
// Validate stream data
if (streamData == null) {
throw new RemoteException("Received null stream data");
}
// Check if worker exists
AbstractWorker<?> worker = getWorker(nextWorkerName);
if (worker == null) {
throw new RemoteException("Unknown worker: " + nextWorkerName);
}
// Log incoming request
logger.info("Handling remote request for worker: {} from address: {}",
nextWorkerName, streamData.getRemoteAddress());
// Route to appropriate worker
super.handle(streamData, nextWorkerName);
} catch (Exception e) {
logger.error("Failed to handle remote request", e);
throw new RemoteException("Request handling failed", e);
}
}
public void registerCustomWorkers() {
// Register custom workers for remote handling
registerWorker("CustomMetricsWorker", new CustomMetricsWorker());
registerWorker("CustomRecordWorker", new CustomRecordWorker());
registerWorker("CustomAnalysisWorker", new CustomAnalysisWorker());
}
}/**
* gRPC request for stream data transmission
*/
public class StreamDataRequest {
private String workerName;
private byte[] streamData;
private String dataType;
public String getWorkerName();
public byte[] getStreamData();
public String getDataType();
}
/**
* gRPC response for stream data transmission
*/
public class StreamDataResponse {
private boolean success;
private String errorMessage;
public boolean isSuccess();
public String getErrorMessage();
}
/**
* Remote module definition
*/
public class RemoteModule extends ModuleDefine {
public static final String NAME = "remote";
@Override
public String name();
@Override
public Class[] services();
}
/**
* Load balancing strategy enumeration
*/
public enum LoadBalanceStrategy {
HASH_CODE, ROLLING, FIRST_AVAILABLE, CUSTOM
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-skywalking--server-core