CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-skywalking--server-core

Core analysis engine and storage abstractions for Apache SkyWalking observability platform

Pending
Overview
Eval results
Files

remote-communication.mddocs/

Remote Communication

The SkyWalking remote communication layer enables distributed processing across multiple OAP (Observability Analysis Platform) nodes through gRPC-based clustering. It provides load balancing, routing strategies, and serialization mechanisms for horizontal scaling and high availability deployments.

Remote Service Infrastructure

RemoteSenderService

gRPC client service for inter-node communication with configurable routing strategies.

public class RemoteSenderService implements Service {
    
    /**
     * Sends stream data to remote OAP node with routing strategy
     * @param nextWorkName Target worker name on remote node
     * @param streamData Stream data to transmit
     * @param selector Routing selector for node selection
     * @throws RemoteException If transmission fails
     */
    public void send(String nextWorkName, StreamData streamData, Selector selector) 
        throws RemoteException;
    
    /**
     * Sends data to specific remote address
     * @param remoteAddress Target remote address
     * @param nextWorkName Target worker name
     * @param streamData Stream data to transmit
     * @throws RemoteException If transmission fails
     */
    public void send(RemoteAddress remoteAddress, String nextWorkName, StreamData streamData) 
        throws RemoteException;
    
    /**
     * Sends data with timeout configuration
     * @param nextWorkName Target worker name
     * @param streamData Stream data to transmit  
     * @param selector Routing selector
     * @param timeoutSeconds Transmission timeout in seconds
     * @throws RemoteException If transmission fails or times out
     */
    public void sendWithTimeout(String nextWorkName, StreamData streamData, 
                              Selector selector, int timeoutSeconds) throws RemoteException;
    
    /**
     * Gets available remote addresses
     * @return List of configured remote addresses
     */
    public List<RemoteAddress> getRemoteAddresses();
    
    /**
     * Checks if remote service is available
     * @param remoteAddress Remote address to check
     * @return True if remote service is reachable
     */
    public boolean isAvailable(RemoteAddress remoteAddress);
}

RemoteServiceHandler

Handles incoming remote service requests and routes them to appropriate workers.

public class RemoteServiceHandler {
    
    /**
     * Handles incoming remote stream data
     * @param streamData Received stream data
     * @param nextWorkerName Target worker name for processing
     * @throws RemoteException If handling fails
     */
    public void handle(StreamData streamData, String nextWorkerName) throws RemoteException;
    
    /**
     * Registers worker for remote request handling
     * @param workerName Worker identifier
     * @param worker Worker instance to handle requests
     */
    public void registerWorker(String workerName, AbstractWorker<?> worker);
    
    /**
     * Unregisters worker from remote handling
     * @param workerName Worker identifier to remove
     */
    public void unregisterWorker(String workerName);
    
    /**
     * Gets registered worker by name
     * @param workerName Worker identifier
     * @return Worker instance or null if not found
     */
    public AbstractWorker<?> getWorker(String workerName);
}

Routing and Selection

Selector Interface

Base interface for routing strategy selection.

public interface Selector {
    
    /**
     * Selects remote address based on routing strategy
     * @param remoteAddresses Available remote addresses
     * @param data Data being routed (for hash-based selection)
     * @return Selected remote address
     */
    RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);
    
    /**
     * Gets selector type identifier
     * @return Selector type name
     */
    String getType();
}

HashCodeSelector

Routes data based on hash code for consistent routing.

public class HashCodeSelector implements Selector {
    
    /**
     * Selects remote address using data hash code modulo
     * @param remoteAddresses Available remote addresses
     * @param data Data to route (hash code used for selection)
     * @return Selected address based on hash distribution
     */
    @Override
    public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);
    
    /**
     * Gets hash code from stream data for routing
     * @param data Stream data
     * @return Hash code for routing decision
     */
    protected int getHashCode(StreamData data);
}

ForeverFirstSelector

Always selects the first available remote address.

public class ForeverFirstSelector implements Selector {
    
    /**
     * Always selects first address from list
     * @param remoteAddresses Available remote addresses
     * @param data Data being routed (ignored)
     * @return First remote address in list
     */
    @Override
    public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);
}

RollingSelector

Round-robin selection across available remote addresses.

public class RollingSelector implements Selector {
    
    private AtomicInteger index;
    
    /**
     * Selects remote address using round-robin strategy
     * @param remoteAddresses Available remote addresses
     * @param data Data being routed (ignored)
     * @return Next address in round-robin sequence
     */
    @Override
    public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);
    
    /**
     * Resets rolling counter
     */
    public void reset();
}

Remote Data Interfaces

Serializable Interface

Marker interface for remote-serializable data.

public interface Serializable {
    
    /**
     * Serializes object to byte array for remote transmission
     * @return Serialized byte array
     * @throws SerializationException If serialization fails
     */
    byte[] serialize() throws SerializationException;
    
    /**
     * Gets serialization version for compatibility
     * @return Serialization version number
     */
    int getSerializationVersion();
}

Deserializable Interface

Marker interface for remote-deserializable data.

public interface Deserializable {
    
    /**
     * Deserializes object from byte array
     * @param data Serialized byte array
     * @throws DeserializationException If deserialization fails
     */
    void deserialize(byte[] data) throws DeserializationException;
    
    /**
     * Gets deserialization version for compatibility
     * @return Deserialization version number
     */
    int getDeserializationVersion();
    
    /**
     * Checks if version is compatible for deserialization
     * @param version Incoming serialization version
     * @return True if version is compatible
     */
    boolean isCompatible(int version);
}

StreamData

Base class for data transmitted between remote nodes.

public abstract class StreamData implements Serializable, Deserializable {
    
    protected long timestamp;
    protected String remoteAddress;
    
    /**
     * Gets data timestamp
     * @return Timestamp in milliseconds
     */
    public long getTimestamp();
    
    /**
     * Sets data timestamp
     * @param timestamp Timestamp in milliseconds
     */
    public void setTimestamp(long timestamp);
    
    /**
     * Gets originating remote address
     * @return Remote address string
     */
    public String getRemoteAddress();
    
    /**
     * Sets originating remote address
     * @param remoteAddress Remote address string
     */
    public void setRemoteAddress(String remoteAddress);
    
    /**
     * Gets unique identifier for routing
     * @return String identifier for consistent routing
     */
    public abstract String id();
    
    /**
     * Gets stream data type identifier
     * @return Data type for worker routing
     */
    public abstract int remoteHashCode();
}

Remote Configuration

RemoteAddress

Represents remote OAP node address configuration.

public class RemoteAddress {
    
    private String host;
    private int port;
    private boolean selfAddress;
    
    /**
     * Creates remote address configuration
     * @param host Remote host address
     * @param port Remote port number
     */
    public RemoteAddress(String host, int port);
    
    /**
     * Creates remote address with self-identification
     * @param host Remote host address
     * @param port Remote port number
     * @param selfAddress True if this represents current node
     */
    public RemoteAddress(String host, int port, boolean selfAddress);
    
    /**
     * Gets host address
     * @return Host string
     */
    public String getHost();
    
    /**
     * Gets port number
     * @return Port number
     */
    public int getPort();
    
    /**
     * Checks if this is current node address
     * @return True if self address
     */
    public boolean isSelfAddress();
    
    /**
     * Gets full address string
     * @return "host:port" format
     */
    public String getAddress();
    
    @Override
    public boolean equals(Object obj);
    
    @Override
    public int hashCode();
    
    @Override
    public String toString();
}

RemoteConfiguration

Configuration for remote service connectivity.

public class RemoteConfiguration {
    
    private List<RemoteAddress> remoteAddresses;
    private int connectionTimeout;
    private int requestTimeout;
    private int maxRetries;
    private boolean enableHeartbeat;
    private int heartbeatInterval;
    
    /**
     * Gets configured remote addresses
     * @return List of remote addresses
     */
    public List<RemoteAddress> getRemoteAddresses();
    
    /**
     * Sets remote addresses
     * @param remoteAddresses List of remote addresses
     */
    public void setRemoteAddresses(List<RemoteAddress> remoteAddresses);
    
    /**
     * Gets connection timeout
     * @return Connection timeout in milliseconds
     */
    public int getConnectionTimeout();
    
    /**
     * Gets request timeout
     * @return Request timeout in milliseconds
     */
    public int getRequestTimeout();
    
    /**
     * Gets maximum retry attempts
     * @return Maximum retries
     */
    public int getMaxRetries();
    
    /**
     * Checks if heartbeat is enabled
     * @return True if heartbeat enabled
     */
    public boolean isEnableHeartbeat();
    
    /**
     * Gets heartbeat interval
     * @return Heartbeat interval in seconds
     */
    public int getHeartbeatInterval();
}

Remote Exceptions

RemoteException

Base exception for remote communication errors.

public class RemoteException extends Exception {
    
    /**
     * Creates remote exception with message
     * @param message Error message
     */
    public RemoteException(String message);
    
    /**
     * Creates remote exception with message and cause
     * @param message Error message
     * @param cause Underlying cause
     */
    public RemoteException(String message, Throwable cause);
}

SerializationException

Exception for data serialization errors.

public class SerializationException extends RemoteException {
    
    /**
     * Creates serialization exception
     * @param message Error message
     */
    public SerializationException(String message);
    
    /**
     * Creates serialization exception with cause
     * @param message Error message
     * @param cause Underlying cause
     */
    public SerializationException(String message, Throwable cause);
}

DeserializationException

Exception for data deserialization errors.

public class DeserializationException extends RemoteException {
    
    /**
     * Creates deserialization exception
     * @param message Error message
     */
    public DeserializationException(String message);
    
    /**
     * Creates deserialization exception with cause
     * @param message Error message
     * @param cause Underlying cause
     */
    public DeserializationException(String message, Throwable cause);
}

gRPC Integration

RemoteServiceGrpc

gRPC service definitions for remote communication.

public class RemoteServiceGrpc {
    
    /**
     * gRPC stub for remote service calls
     */
    public static class RemoteServiceStub {
        
        /**
         * Sends stream data to remote node
         * @param request Stream data request
         * @param responseObserver Response observer for async handling
         */
        public void call(StreamDataRequest request, 
                        StreamObserver<StreamDataResponse> responseObserver);
    }
    
    /**
     * gRPC blocking stub for synchronous calls
     */
    public static class RemoteServiceBlockingStub {
        
        /**
         * Sends stream data synchronously
         * @param request Stream data request
         * @return Stream data response
         */
        public StreamDataResponse call(StreamDataRequest request);
    }
    
    /**
     * Service implementation base class
     */
    public static abstract class RemoteServiceImplBase implements BindableService {
        
        /**
         * Handles incoming remote calls
         * @param request Stream data request
         * @param responseObserver Response observer
         */
        public abstract void call(StreamDataRequest request,
                                StreamObserver<StreamDataResponse> responseObserver);
    }
}

Usage Examples

Setting up Remote Communication

// Configure remote addresses
List<RemoteAddress> remoteAddresses = Arrays.asList(
    new RemoteAddress("oap-node-1", 11800),
    new RemoteAddress("oap-node-2", 11800),
    new RemoteAddress("oap-node-3", 11800)
);

RemoteConfiguration config = new RemoteConfiguration();
config.setRemoteAddresses(remoteAddresses);
config.setConnectionTimeout(5000);
config.setRequestTimeout(10000);
config.setMaxRetries(3);
config.setEnableHeartbeat(true);
config.setHeartbeatInterval(30);

// Initialize remote sender service
RemoteSenderService remoteSender = new RemoteSenderService();
remoteSender.initialize(config);

Sending Data with Different Routing Strategies

// Hash-based routing for consistent distribution
Selector hashSelector = new HashCodeSelector();
remoteSender.send("MetricsAggregateWorker", metricsData, hashSelector);

// Round-robin routing for load balancing  
Selector rollingSelector = new RollingSelector();
remoteSender.send("RecordPersistentWorker", recordData, rollingSelector);

// Always send to first available node
Selector firstSelector = new ForeverFirstSelector();
remoteSender.send("ManagementWorker", managementData, firstSelector);

// Send to specific remote address
RemoteAddress specificNode = new RemoteAddress("oap-primary", 11800);
remoteSender.send(specificNode, "PriorityWorker", criticalData);

Implementing Custom Stream Data

public class CustomTelemetryData extends StreamData {
    
    private String serviceName;
    private String operationName; 
    private long duration;
    private Map<String, String> tags;
    
    @Override
    public String id() {
        // Create unique ID for routing consistency
        return serviceName + ":" + operationName;
    }
    
    @Override
    public int remoteHashCode() {
        // Hash code for routing decisions
        return Objects.hash(serviceName, operationName);
    }
    
    @Override
    public byte[] serialize() throws SerializationException {
        try {
            // Serialize to protobuf or other format
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            DataOutputStream dos = new DataOutputStream(baos);
            
            dos.writeUTF(serviceName);
            dos.writeUTF(operationName);
            dos.writeLong(duration);
            dos.writeInt(tags.size());
            
            for (Map.Entry<String, String> entry : tags.entrySet()) {
                dos.writeUTF(entry.getKey());
                dos.writeUTF(entry.getValue());
            }
            
            return baos.toByteArray();
        } catch (IOException e) {
            throw new SerializationException("Failed to serialize custom telemetry data", e);
        }
    }
    
    @Override
    public void deserialize(byte[] data) throws DeserializationException {
        try {
            ByteArrayInputStream bais = new ByteArrayInputStream(data);
            DataInputStream dis = new DataInputStream(bais);
            
            this.serviceName = dis.readUTF();
            this.operationName = dis.readUTF();
            this.duration = dis.readLong();
            
            int tagCount = dis.readInt();
            this.tags = new HashMap<>();
            
            for (int i = 0; i < tagCount; i++) {
                String key = dis.readUTF();
                String value = dis.readUTF();
                tags.put(key, value);
            }
        } catch (IOException e) {
            throw new DeserializationException("Failed to deserialize custom telemetry data", e);
        }
    }
    
    @Override
    public int getSerializationVersion() {
        return 1;
    }
    
    @Override
    public int getDeserializationVersion() {
        return 1;
    }
    
    @Override
    public boolean isCompatible(int version) {
        return version <= getDeserializationVersion();
    }
    
    // Getters and setters
    public String getServiceName() { return serviceName; }
    public void setServiceName(String serviceName) { this.serviceName = serviceName; }
    
    public String getOperationName() { return operationName; }
    public void setOperationName(String operationName) { this.operationName = operationName; }
    
    public long getDuration() { return duration; }
    public void setDuration(long duration) { this.duration = duration; }
    
    public Map<String, String> getTags() { return tags; }
    public void setTags(Map<String, String> tags) { this.tags = tags; }
}

Implementing Custom Selector

public class GeographicSelector implements Selector {
    
    private final String preferredRegion;
    private final Map<RemoteAddress, String> addressRegions;
    
    public GeographicSelector(String preferredRegion, 
                            Map<RemoteAddress, String> addressRegions) {
        this.preferredRegion = preferredRegion;
        this.addressRegions = addressRegions;
    }
    
    @Override
    public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data) {
        // First, try to find addresses in preferred region
        List<RemoteAddress> preferredAddresses = remoteAddresses.stream()
            .filter(addr -> preferredRegion.equals(addressRegions.get(addr)))
            .collect(Collectors.toList());
        
        if (!preferredAddresses.isEmpty()) {
            // Use hash-based selection within preferred region
            int hash = Math.abs(data.remoteHashCode());
            int index = hash % preferredAddresses.size();
            return preferredAddresses.get(index);
        }
        
        // Fall back to any available address
        if (!remoteAddresses.isEmpty()) {
            int hash = Math.abs(data.remoteHashCode());
            int index = hash % remoteAddresses.size();
            return remoteAddresses.get(index);
        }
        
        return null;
    }
    
    @Override
    public String getType() {
        return "geographic";
    }
}

Handling Remote Service Requests

@Component
public class CustomRemoteServiceHandler extends RemoteServiceHandler {
    
    @Override
    public void handle(StreamData streamData, String nextWorkerName) throws RemoteException {
        try {
            // Validate stream data
            if (streamData == null) {
                throw new RemoteException("Received null stream data");
            }
            
            // Check if worker exists
            AbstractWorker<?> worker = getWorker(nextWorkerName);
            if (worker == null) {
                throw new RemoteException("Unknown worker: " + nextWorkerName);
            }
            
            // Log incoming request
            logger.info("Handling remote request for worker: {} from address: {}", 
                       nextWorkerName, streamData.getRemoteAddress());
            
            // Route to appropriate worker
            super.handle(streamData, nextWorkerName);
            
        } catch (Exception e) {
            logger.error("Failed to handle remote request", e);
            throw new RemoteException("Request handling failed", e);
        }
    }
    
    public void registerCustomWorkers() {
        // Register custom workers for remote handling
        registerWorker("CustomMetricsWorker", new CustomMetricsWorker());
        registerWorker("CustomRecordWorker", new CustomRecordWorker());
        registerWorker("CustomAnalysisWorker", new CustomAnalysisWorker());
    }
}

Core Remote Types

/**
 * gRPC request for stream data transmission
 */
public class StreamDataRequest {
    private String workerName;
    private byte[] streamData;
    private String dataType;
    
    public String getWorkerName();
    public byte[] getStreamData();
    public String getDataType();
}

/**
 * gRPC response for stream data transmission
 */
public class StreamDataResponse {
    private boolean success;
    private String errorMessage;
    
    public boolean isSuccess();
    public String getErrorMessage();
}

/**
 * Remote module definition
 */
public class RemoteModule extends ModuleDefine {
    public static final String NAME = "remote";
    
    @Override
    public String name();
    
    @Override
    public Class[] services();
}

/**
 * Load balancing strategy enumeration
 */
public enum LoadBalanceStrategy {
    HASH_CODE, ROLLING, FIRST_AVAILABLE, CUSTOM
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-skywalking--server-core

docs

analysis-framework.md

configuration.md

index.md

profiling.md

query-services.md

remote-communication.md

source-processing.md

storage-layer.md

tile.json