Core analysis engine and storage abstractions for Apache SkyWalking observability platform
—
The SkyWalking source processing system handles telemetry data ingestion, transformation, and routing. It provides the foundation for receiving various types of observability data including traces, metrics, logs, and infrastructure telemetry from different sources and protocols.
Base interface for all telemetry data sources in the SkyWalking system.
public interface ISource {
/**
* Gets the scope identifier for this source
* @return Scope ID (service, instance, endpoint, etc.)
*/
int scope();
/**
* Gets the time bucket for metrics aggregation
* @return Time bucket value
*/
long getTimeBucket();
/**
* Sets the time bucket for metrics aggregation
* @param timeBucket Time bucket value
*/
void setTimeBucket(long timeBucket);
/**
* Gets the entity identifier for this source
* @return Entity ID (service ID, instance ID, endpoint ID, etc.)
*/
String getEntityId();
/**
* Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(ISource)}
*/
default void prepare() {
}
}Service interface for receiving and processing telemetry sources.
public interface SourceReceiver extends Service {
/**
* Receives and processes a telemetry source
* @param source The source data to process
* @throws IOException If source processing fails
*/
void receive(ISource source) throws IOException;
/**
* Receives multiple sources in batch
* @param sources List of sources to process
* @throws IOException If batch processing fails
*/
void receiveBatch(List<? extends ISource> sources) throws IOException;
}Default implementation of source receiver with dispatching logic.
public class SourceReceiverImpl implements SourceReceiver {
private DispatcherManager dispatcherManager;
@Override
public void receive(ISource source) throws IOException;
@Override
public void receiveBatch(List<? extends ISource> sources) throws IOException;
/**
* Sets the dispatcher manager for routing sources
* @param dispatcherManager Dispatcher manager instance
*/
public void setDispatcherManager(DispatcherManager dispatcherManager);
/**
* Validates source before processing
* @param source Source to validate
* @return True if source is valid
*/
protected boolean validateSource(ISource source);
/**
* Preprocesses source before dispatching
* @param source Source to preprocess
*/
protected void preprocessSource(ISource source);
}Annotation to mark classes as telemetry sources for automatic discovery.
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Source {
/**
* Source name for identification
* @return Source name
*/
String name() default "";
/**
* Source category (trace, metric, log, infrastructure)
* @return Source category
*/
String category() default "";
}Annotation to declare the scope of source data.
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ScopeDeclaration {
/**
* Scope identifier
* @return Scope ID
*/
int id();
/**
* Scope name
* @return Scope name
*/
String name() default "";
}Annotation to define default columns for scope entities.
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ScopeDefaultColumn {
/**
* Virtual column name
* @return Column name
*/
String virtualColumnName();
/**
* Field name in the source class
* @return Field name
*/
String fieldName();
/**
* Whether the column requires entity ID
* @return True if entity ID required
*/
boolean requireEntityId() default false;
}Source for service-level telemetry data and metadata.
@Source(name = "service", category = "service")
@ScopeDeclaration(id = DefaultScopeDefine.SERVICE, name = "Service")
public class Service extends ISource {
@Getter @Setter
private String name;
@Getter @Setter
private String shortName;
@Getter @Setter
private String group;
@Getter @Setter
private NodeType nodeType;
@Getter @Setter
private List<String> layers;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
/**
* Checks if service is normal (not virtual)
* @return True if normal service
*/
public boolean isNormal();
/**
* Gets service layer information
* @return Primary service layer
*/
public String getLayer();
}Source for service metadata and registration information.
@Source(name = "service_meta", category = "service")
public class ServiceMeta extends ISource {
@Getter @Setter
private String name;
@Getter @Setter
private NodeType nodeType;
@Getter @Setter
private List<String> layers;
@Getter @Setter
private JsonObject properties;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
/**
* Adds property to service metadata
* @param key Property key
* @param value Property value
*/
public void addProperty(String key, String value);
/**
* Gets property from service metadata
* @param key Property key
* @return Property value or null
*/
public String getProperty(String key);
}Source for service relationship and dependency information.
@Source(name = "service_relation", category = "relation")
@ScopeDeclaration(id = DefaultScopeDefine.SERVICE_RELATION, name = "ServiceRelation")
public class ServiceRelation extends ISource {
@Getter @Setter
private String sourceServiceId;
@Getter @Setter
private String destServiceId;
@Getter @Setter
private String sourceServiceName;
@Getter @Setter
private String destServiceName;
@Getter @Setter
private DetectPoint detectPoint;
@Getter @Setter
private int componentId;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
/**
* Gets relation ID for entity identification
* @return Service relation ID
*/
public String getRelationId();
}Source for service instance telemetry data and metadata.
@Source(name = "service_instance", category = "service")
@ScopeDeclaration(id = DefaultScopeDefine.SERVICE_INSTANCE, name = "ServiceInstance")
public class ServiceInstance extends ISource {
@Getter @Setter
private String name;
@Getter @Setter
private String serviceId;
@Getter @Setter
private String serviceName;
@Getter @Setter
private JsonObject properties;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
/**
* Gets instance properties
* @return Instance properties as JSON
*/
public JsonObject getProperties();
/**
* Adds instance property
* @param key Property key
* @param value Property value
*/
public void addProperty(String key, String value);
/**
* Gets specific instance property
* @param key Property key
* @return Property value or null
*/
public String getProperty(String key);
}Source for service instance relationship data.
@Source(name = "service_instance_relation", category = "relation")
@ScopeDeclaration(id = DefaultScopeDefine.SERVICE_INSTANCE_RELATION, name = "ServiceInstanceRelation")
public class ServiceInstanceRelation extends ISource {
@Getter @Setter
private String sourceServiceId;
@Getter @Setter
private String sourceServiceInstanceId;
@Getter @Setter
private String destServiceId;
@Getter @Setter
private String destServiceInstanceId;
@Getter @Setter
private String sourceServiceName;
@Getter @Setter
private String sourceServiceInstanceName;
@Getter @Setter
private String destServiceName;
@Getter @Setter
private String destServiceInstanceName;
@Getter @Setter
private DetectPoint detectPoint;
@Getter @Setter
private int componentId;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
}Source for endpoint telemetry data and performance metrics.
@Source(name = "endpoint", category = "endpoint")
@ScopeDeclaration(id = DefaultScopeDefine.ENDPOINT, name = "Endpoint")
public class Endpoint extends ISource {
@Getter @Setter
private String name;
@Getter @Setter
private String serviceId;
@Getter @Setter
private String serviceName;
@Getter @Setter
private String serviceInstanceId;
@Getter @Setter
private String serviceInstanceName;
@Getter @Setter
private int latency;
@Getter @Setter
private boolean status;
@Getter @Setter
private int responseCode;
@Getter @Setter
private RequestType type;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
/**
* Checks if endpoint call was successful
* @return True if successful (status == true)
*/
public boolean isSuccess();
/**
* Gets endpoint latency in milliseconds
* @return Latency value
*/
public int getLatency();
}Source for endpoint metadata and registration information.
@Source(name = "endpoint_meta", category = "endpoint")
public class EndpointMeta extends ISource {
@Getter @Setter
private String endpoint;
@Getter @Setter
private String serviceId;
@Getter @Setter
private String serviceName;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
}Source for endpoint relationship and dependency information.
@Source(name = "endpoint_relation", category = "relation")
@ScopeDeclaration(id = DefaultScopeDefine.ENDPOINT_RELATION, name = "EndpointRelation")
public class EndpointRelation extends ISource {
@Getter @Setter
private String endpoint;
@Getter @Setter
private String childEndpoint;
@Getter @Setter
private int rpcLatency;
@Getter @Setter
private boolean status;
@Getter @Setter
private int responseCode;
@Getter @Setter
private RequestType type;
@Getter @Setter
private DetectPoint detectPoint;
@Getter @Setter
private int componentId;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
}Source for database operation telemetry data.
@Source(name = "database_access", category = "database")
@ScopeDeclaration(id = DefaultScopeDefine.DATABASE_ACCESS, name = "DatabaseAccess")
public class DatabaseAccess extends ISource {
@Getter @Setter
private String databaseTypeId;
@Getter @Setter
private String name;
@Getter @Setter
private int latency;
@Getter @Setter
private boolean status;
@Getter @Setter
private String sqlStatement;
@Getter @Setter
private String operation;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
/**
* Gets database type identifier
* @return Database type ID
*/
public String getDatabaseTypeId();
/**
* Gets SQL statement (may be truncated)
* @return SQL statement
*/
public String getSqlStatement();
}Source for cache operation telemetry data.
@Source(name = "cache_access", category = "cache")
public class CacheAccess extends ISource {
@Getter @Setter
private String name;
@Getter @Setter
private int latency;
@Getter @Setter
private boolean status;
@Getter @Setter
private String operation;
@Getter @Setter
private String key;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
/**
* Gets cache operation type
* @return Operation (GET, SET, DELETE, etc.)
*/
public String getOperation();
}Source for message queue operation telemetry data.
@Source(name = "mq_access", category = "mq")
public class MQAccess extends ISource {
@Getter @Setter
private String broker;
@Getter @Setter
private String topic;
@Getter @Setter
private int latency;
@Getter @Setter
private boolean status;
@Getter @Setter
private String operation;
@Getter @Setter
private TransmissionLatency transmissionLatency;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
/**
* Gets message broker information
* @return Broker identifier
*/
public String getBroker();
/**
* Gets message topic
* @return Topic name
*/
public String getTopic();
}Source for log data and events.
@Source(name = "log", category = "log")
public class Log extends ISource {
@Getter @Setter
private String serviceId;
@Getter @Setter
private String serviceInstanceId;
@Getter @Setter
private String endpointId;
@Getter @Setter
private String traceId;
@Getter @Setter
private String traceSegmentId;
@Getter @Setter
private int spanId;
@Getter @Setter
private ContentType contentType;
@Getter @Setter
private String content;
@Getter @Setter
private List<String> tags;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
/**
* Gets log content
* @return Log content string
*/
public String getContent();
/**
* Gets associated trace ID
* @return Trace ID or null if not associated
*/
public String getTraceId();
/**
* Adds tag to log entry
* @param tag Tag to add
*/
public void addTag(String tag);
}Source for trace segment data.
@Source(name = "segment", category = "trace")
public class Segment extends ISource {
@Getter @Setter
private String traceId;
@Getter @Setter
private String segmentId;
@Getter @Setter
private String serviceId;
@Getter @Setter
private String serviceInstanceId;
@Getter @Setter
private String endpointName;
@Getter @Setter
private int latency;
@Getter @Setter
private boolean isError;
@Getter @Setter
private List<String> tags;
@Getter @Setter
private byte[] dataBinary;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
/**
* Gets trace segment identifier
* @return Segment ID
*/
public String getSegmentId();
/**
* Checks if segment contains errors
* @return True if error occurred
*/
public boolean isError();
}Source for Kubernetes metrics and telemetry.
@Source(name = "k8s_metrics", category = "infrastructure")
public class K8SMetrics extends ISource {
@Getter @Setter
private String cluster;
@Getter @Setter
private String namespace;
@Getter @Setter
private String workload;
@Getter @Setter
private String pod;
@Getter @Setter
private String container;
@Getter @Setter
private long cpuUsage;
@Getter @Setter
private long memoryUsage;
@Getter @Setter
private Map<String, String> labels;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
/**
* Gets Kubernetes cluster name
* @return Cluster name
*/
public String getCluster();
/**
* Gets Kubernetes namespace
* @return Namespace name
*/
public String getNamespace();
}Source for Envoy proxy metrics and telemetry.
@Source(name = "envoy_instance_metric", category = "infrastructure")
public class EnvoyInstanceMetric extends ISource {
@Getter @Setter
private String serviceId;
@Getter @Setter
private String serviceInstanceId;
@Getter @Setter
private long totalRequestsCount;
@Getter @Setter
private long totalConnectionsCount;
@Getter @Setter
private long activeConnectionsCount;
@Getter @Setter
private double cpuUsage;
@Getter @Setter
private double heapMemoryUsed;
@Override
public int scope();
@Override
public String getEntityId();
@Override
public void prepare();
}Enumeration for service node types.
public enum NodeType {
Normal(0), Browser(1), Unknown(2);
private int value;
NodeType(int value) {
this.value = value;
}
public int value() {
return value;
}
public static NodeType valueOf(int value) {
for (NodeType nodeType : NodeType.values()) {
if (nodeType.value == value) {
return nodeType;
}
}
throw new IllegalArgumentException("Unknown NodeType value: " + value);
}
}Enumeration for detection points in service relationships.
public enum DetectPoint {
CLIENT(0), SERVER(1), PROXY(2);
private int value;
DetectPoint(int value) {
this.value = value;
}
public int value() {
return value;
}
public static DetectPoint valueOf(int value) {
for (DetectPoint detectPoint : DetectPoint.values()) {
if (detectPoint.value == value) {
return detectPoint;
}
}
throw new IllegalArgumentException("Unknown DetectPoint value: " + value);
}
}Enumeration for request types.
public enum RequestType {
RPC(0), DATABASE(1), HTTP(2), CACHE(3), MQ(4);
private int value;
RequestType(int value) {
this.value = value;
}
public int value() {
return value;
}
public static RequestType valueOf(int value) {
for (RequestType requestType : RequestType.values()) {
if (requestType.value == value) {
return requestType;
}
}
throw new IllegalArgumentException("Unknown RequestType value: " + value);
}
}Enumeration for log content types.
public enum ContentType {
NONE(0), TEXT(1), JSON(2), YAML(3);
private int value;
ContentType(int value) {
this.value = value;
}
public int value() {
return value;
}
public static ContentType valueOf(int value) {
for (ContentType contentType : ContentType.values()) {
if (contentType.value == value) {
return contentType;
}
}
throw new IllegalArgumentException("Unknown ContentType value: " + value);
}
}@Source(name = "custom_business_metric", category = "business")
@ScopeDeclaration(id = 1000, name = "BusinessMetric") // Custom scope ID
public class CustomBusinessMetric implements ISource {
@Getter @Setter
private String businessUnit;
@Getter @Setter
private String operation;
@Getter @Setter
private double revenue;
@Getter @Setter
private int transactionCount;
@Getter @Setter
private String currency;
private long timeBucket;
private String entityId;
@Override
public int scope() {
return 1000; // Custom scope for business metrics
}
@Override
public long getTimeBucket() {
return timeBucket;
}
@Override
public void setTimeBucket(long timeBucket) {
this.timeBucket = timeBucket;
}
@Override
public String getEntityId() {
return entityId;
}
@Override
public void prepare() {
// Generate entity ID from business unit and operation
this.entityId = businessUnit + ":" + operation;
// Set time bucket if not already set
if (timeBucket == 0) {
timeBucket = TimeBucket.getMinuteTimeBucket(System.currentTimeMillis());
}
}
public double getRevenuePerTransaction() {
return transactionCount > 0 ? revenue / transactionCount : 0.0;
}
}@Component
public class CustomBusinessMetricDispatcher implements SourceDispatcher<CustomBusinessMetric> {
@Override
public void dispatch(CustomBusinessMetric source) {
// Validate source data
if (source.getBusinessUnit() == null || source.getOperation() == null) {
throw new IllegalArgumentException("Business unit and operation are required");
}
// Prepare source
source.prepare();
// Apply business rules
applyBusinessRules(source);
// Route to metrics processor
MetricsStreamProcessor.getInstance().in(source);
}
private void applyBusinessRules(CustomBusinessMetric source) {
// Example: Convert currency to USD if needed
if (!"USD".equals(source.getCurrency())) {
double exchangeRate = getExchangeRate(source.getCurrency(), "USD");
source.setRevenue(source.getRevenue() * exchangeRate);
source.setCurrency("USD");
}
// Example: Filter out test data
if (source.getBusinessUnit().startsWith("TEST_")) {
throw new IllegalArgumentException("Test data not allowed in production");
}
}
private double getExchangeRate(String fromCurrency, String toCurrency) {
// Implement currency conversion logic
return 1.0; // Placeholder
}
}public class ServiceSourceProcessor {
private SourceReceiver sourceReceiver;
public void processServiceRegistration(String serviceName, NodeType nodeType,
List<String> layers, Map<String, String> properties) {
// Create service source
Service serviceSource = new Service();
serviceSource.setName(serviceName);
serviceSource.setNodeType(nodeType);
serviceSource.setLayers(layers);
try {
sourceReceiver.receive(serviceSource);
System.out.println("Service registered: " + serviceName);
} catch (IOException e) {
System.err.println("Failed to register service: " + e.getMessage());
}
// Create service metadata source
ServiceMeta metaSource = new ServiceMeta();
metaSource.setName(serviceName);
metaSource.setNodeType(nodeType);
metaSource.setLayers(layers);
JsonObject props = new JsonObject();
for (Map.Entry<String, String> entry : properties.entrySet()) {
props.addProperty(entry.getKey(), entry.getValue());
}
metaSource.setProperties(props);
try {
sourceReceiver.receive(metaSource);
System.out.println("Service metadata registered: " + serviceName);
} catch (IOException e) {
System.err.println("Failed to register service metadata: " + e.getMessage());
}
}
public void processServiceRelation(String sourceService, String destService,
DetectPoint detectPoint, int componentId) {
ServiceRelation relationSource = new ServiceRelation();
relationSource.setSourceServiceName(sourceService);
relationSource.setDestServiceName(destService);
relationSource.setDetectPoint(detectPoint);
relationSource.setComponentId(componentId);
try {
sourceReceiver.receive(relationSource);
System.out.println("Service relation recorded: " + sourceService + " -> " + destService);
} catch (IOException e) {
System.err.println("Failed to record service relation: " + e.getMessage());
}
}
}public class EndpointSourceProcessor {
private SourceReceiver sourceReceiver;
public void processEndpointCall(String serviceName, String instanceName, String endpointName,
int latency, boolean success, int responseCode, RequestType type) {
// Create endpoint source
Endpoint endpointSource = new Endpoint();
endpointSource.setServiceName(serviceName);
endpointSource.setServiceInstanceName(instanceName);
endpointSource.setName(endpointName);
endpointSource.setLatency(latency);
endpointSource.setStatus(success);
endpointSource.setResponseCode(responseCode);
endpointSource.setType(type);
try {
sourceReceiver.receive(endpointSource);
if (!success) {
System.out.println("Error endpoint call recorded: " + endpointName +
" (" + responseCode + ")");
}
} catch (IOException e) {
System.err.println("Failed to process endpoint call: " + e.getMessage());
}
}
public void processEndpointRelation(String sourceEndpoint, String destEndpoint,
int latency, boolean success, DetectPoint detectPoint) {
EndpointRelation relationSource = new EndpointRelation();
relationSource.setEndpoint(sourceEndpoint);
relationSource.setChildEndpoint(destEndpoint);
relationSource.setRpcLatency(latency);
relationSource.setStatus(success);
relationSource.setDetectPoint(detectPoint);
try {
sourceReceiver.receive(relationSource);
} catch (IOException e) {
System.err.println("Failed to process endpoint relation: " + e.getMessage());
}
}
}public class DatabaseCacheSourceProcessor {
private SourceReceiver sourceReceiver;
public void processDatabaseAccess(String databaseType, String operation, String sqlStatement,
int latency, boolean success) {
DatabaseAccess dbSource = new DatabaseAccess();
dbSource.setDatabaseTypeId(databaseType);
dbSource.setOperation(operation);
dbSource.setSqlStatement(sqlStatement);
dbSource.setLatency(latency);
dbSource.setStatus(success);
try {
sourceReceiver.receive(dbSource);
if (latency > 1000) { // Log slow queries
System.out.println("Slow database query detected: " + latency + "ms - " +
sqlStatement.substring(0, Math.min(100, sqlStatement.length())));
}
} catch (IOException e) {
System.err.println("Failed to process database access: " + e.getMessage());
}
}
public void processCacheAccess(String cacheName, String operation, String key,
int latency, boolean success) {
CacheAccess cacheSource = new CacheAccess();
cacheSource.setName(cacheName);
cacheSource.setOperation(operation);
cacheSource.setKey(key);
cacheSource.setLatency(latency);
cacheSource.setStatus(success);
try {
sourceReceiver.receive(cacheSource);
if (!success) {
System.out.println("Cache operation failed: " + operation + " on " + key);
}
} catch (IOException e) {
System.err.println("Failed to process cache access: " + e.getMessage());
}
}
}public class BatchSourceProcessor {
private SourceReceiver sourceReceiver;
private List<ISource> sourceBatch = new ArrayList<>();
private final int BATCH_SIZE = 100;
public void addSource(ISource source) {
sourceBatch.add(source);
if (sourceBatch.size() >= BATCH_SIZE) {
flushBatch();
}
}
public void flushBatch() {
if (sourceBatch.isEmpty()) {
return;
}
try {
sourceReceiver.receiveBatch(new ArrayList<>(sourceBatch));
System.out.println("Processed batch of " + sourceBatch.size() + " sources");
sourceBatch.clear();
} catch (IOException e) {
System.err.println("Failed to process source batch: " + e.getMessage());
// Optionally retry individual sources
retryIndividualSources();
}
}
private void retryIndividualSources() {
for (ISource source : sourceBatch) {
try {
sourceReceiver.receive(source);
} catch (IOException e) {
System.err.println("Failed to process individual source: " +
source.getClass().getSimpleName() + " - " + e.getMessage());
}
}
sourceBatch.clear();
}
// Call this method periodically to ensure pending sources are processed
public void periodicFlush() {
if (!sourceBatch.isEmpty()) {
flushBatch();
}
}
}/**
* Transmission latency for message queue operations
*/
public class TransmissionLatency {
private long producerTime;
private long consumerTime;
public long getProducerTime();
public void setProducerTime(long producerTime);
public long getConsumerTime();
public void setConsumerTime(long consumerTime);
public long getLatency();
}
/**
* Default scope definitions for built-in source types
*/
public class DefaultScopeDefine {
public static final int SERVICE = 1;
public static final int SERVICE_INSTANCE = 2;
public static final int ENDPOINT = 3;
public static final int SERVICE_RELATION = 4;
public static final int SERVICE_INSTANCE_RELATION = 5;
public static final int ENDPOINT_RELATION = 6;
public static final int DATABASE_ACCESS = 7;
public static final int ALL = 99;
}
/**
* Source processing exception
*/
public class SourceProcessingException extends RuntimeException {
public SourceProcessingException(String message);
public SourceProcessingException(String message, Throwable cause);
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-skywalking--server-core