Core analysis engine and storage abstractions for Apache SkyWalking observability platform
npx @tessl/cli install tessl/maven-org-apache-skywalking--server-core@10.1.0Apache SkyWalking server-core provides the foundational analysis engine, storage abstractions, and processing framework for the SkyWalking observability platform. It enables distributed tracing, metrics collection, and observability data analysis for microservices and cloud-native architectures.
pom.xml<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>10.1.0</version>
</dependency>// Core constants and utilities
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.RunningMode;
import org.apache.skywalking.oap.server.core.UnexpectedException;
// Analysis framework
import org.apache.skywalking.oap.server.core.analysis.Stream;
import org.apache.skywalking.oap.server.core.analysis.StreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.SourceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
// Storage abstractions
import org.apache.skywalking.oap.server.core.storage.StorageDAO;
import org.apache.skywalking.oap.server.core.storage.IMetricsDAO;
import org.apache.skywalking.oap.server.core.storage.StorageData;
// Query services
import org.apache.skywalking.oap.server.core.query.MetricsQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
// Source processing
import org.apache.skywalking.oap.server.core.source.ISource;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
// Metrics and records
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.analysis.record.Record;// Example: Basic source processing
public class CustomSourceProcessor implements SourceDispatcher<CustomSource> {
@Override
public void dispatch(CustomSource source) {
// Process incoming telemetry source
source.prepare();
// Generate entity ID using IDManager
String serviceId = IDManager.ServiceID.buildId(source.getServiceName(), true);
source.setEntityId(serviceId);
// Set time bucket for metrics aggregation
long timeBucket = TimeBucket.getMinuteTimeBucket(System.currentTimeMillis());
source.setTimeBucket(timeBucket);
// Forward to streaming process
StreamProcessor.getInstance().in(source);
}
}
// Example: Custom metrics implementation
@Stream(name = "custom_metrics", scopeId = 1, // DefaultScopeDefine.SERVICE = 1
builder = CustomMetrics.Builder.class, processor = MetricsStreamProcessor.class)
public class CustomMetrics extends Metrics {
@Getter @Setter private long value;
@Override
public boolean combine(Metrics metrics) {
CustomMetrics custom = (CustomMetrics) metrics;
this.value += custom.getValue();
return true;
}
@Override
public void calculate() {
// Perform final calculations
}
@Override
public Metrics toHour() {
CustomMetrics hourMetrics = new CustomMetrics();
hourMetrics.copyFrom(this);
return hourMetrics;
}
@Override
public Metrics toDay() {
CustomMetrics dayMetrics = new CustomMetrics();
dayMetrics.copyFrom(this);
return dayMetrics;
}
}SkyWalking server-core is built around several key architectural components:
The OAL (Observability Analysis Language) processing engine that transforms raw telemetry data into structured metrics and records.
High-performance streaming architecture for real-time data processing with configurable downsampling (minute, hour, day precision).
Pluggable storage system supporting multiple backends (Elasticsearch, BanyanDB, MySQL, etc.) through unified DAO interfaces.
Comprehensive query services providing APIs for metrics, traces, topology, logs, and metadata retrieval.
gRPC-based clustering and distributed processing capabilities for horizontal scaling.
Stream processing engine for telemetry data analysis with OAL language support.
// Core stream processing
public interface StreamProcessor<STREAM> {
void in(STREAM stream);
}
// Source dispatching
public interface SourceDispatcher<SOURCE> {
void dispatch(SOURCE source);
}
// ID management for entities
public class IDManager {
public static class ServiceID {
public static String buildId(String name, boolean isNormal);
public static ServiceIDDefinition analysisId(String id);
public static String buildRelationId(ServiceRelationDefine define);
public static ServiceRelationDefine analysisRelationId(String entityId);
}
public static class ServiceInstanceID {
public static String buildId(String serviceId, String instanceName);
public static InstanceIDDefinition analysisId(String id);
public static String buildRelationId(ServiceInstanceRelationDefine define);
public static ServiceInstanceRelationDefine analysisRelationId(String entityId);
}
public static class EndpointID {
public static String buildId(String serviceId, String endpointName);
public static EndpointIDDefinition analysisId(String id);
public static String buildRelationId(EndpointRelationDefine define);
public static EndpointRelationDefine analysisRelationId(String entityId);
}
public static class ProcessID {
public static String buildId(String serviceInstanceId, String processName);
public static String buildRelationId(ProcessRelationDefine define);
public static ProcessRelationDefine analysisRelationId(String entityId);
}
}Pluggable storage abstractions supporting multiple backend implementations.
// Storage DAO factory
public interface StorageDAO extends Service {
IMetricsDAO newMetricsDao(StorageBuilder storageBuilder);
IRecordDAO newRecordDao(StorageBuilder storageBuilder);
INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder);
}
// Metrics storage operations
public interface IMetricsDAO extends DAO {
List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException;
InsertRequest prepareBatchInsert(Model model, Metrics metrics,
SessionCacheCallback callback) throws IOException;
}
// Storage entity interface
public interface StorageData {
StorageID id();
String TIME_BUCKET = "time_bucket";
}Comprehensive query APIs for metrics, traces, topology and metadata.
// Metrics querying
public class MetricsQueryService implements Service {
public NullableValue readMetricsValue(MetricsCondition condition, Duration duration)
throws IOException;
public MetricsValues readMetricsValues(MetricsCondition condition, Duration duration)
throws IOException;
public List<MetricsValues> readLabeledMetricsValues(MetricsCondition condition,
List<KeyValue> labels, Duration duration) throws IOException;
}
// Trace querying
public class TraceQueryService implements Service {
// Query trace data and segments
}
// Metadata querying
public class MetadataQueryService implements Service {
// Query services, instances, endpoints
}gRPC-based inter-node communication for distributed processing.
// Remote data transmission
public class RemoteSenderService implements Service {
public void send(String nextWorkName, StreamData streamData, Selector selector);
}
// Serialization interfaces
public interface Serializable {
// Marker for remote-serializable data
}
public interface Deserializable {
// Marker for remote-deserializable data
}Configuration services and component library management.
// Configuration management
public class ConfigService implements Service {
// General configuration management
}
// Component library catalog
public interface IComponentLibraryCatalogService extends Service {
// Component library catalog management
}
// Naming control
public class NamingControl {
// Entity naming rules and normalization
}Trace profiling and performance analysis capabilities.
// Profile task querying
public class ProfileTaskQueryService implements Service {
// Query profiling task information
}
// Profile task management
public class ProfileTaskMutationService implements Service {
// Create and manage profiling tasks
}
// Profile records
public class ProfileTaskRecord extends Record {
// Storage record for profiling tasks
}Telemetry source handling and processing pipeline.
// Base source interface
public interface ISource {
int scope();
long getTimeBucket();
void setTimeBucket(long timeBucket);
String getEntityId();
void prepare();
}
// Source receiver
public interface SourceReceiver extends Service {
// Receives and processes telemetry sources
}
// Source types
public class Service extends ISource { }
public class ServiceInstance extends ISource { }
public class Endpoint extends ISource { }
public class DatabaseAccess extends ISource { }// Time bucket management
public class TimeBucket {
public static long getRecordTimeBucket(long time);
public static long getMinuteTimeBucket(long time);
public static long getTimestamp(long timeBucket);
public static long getTimestamp(long timeBucket, DownSampling downsampling);
public static long getTimeBucket(long timestamp, DownSampling downsampling);
public static boolean isSecondBucket(long timeBucket);
public static boolean isMinuteBucket(long timeBucket);
public static boolean isHourBucket(long timeBucket);
public static boolean isDayBucket(long timeBucket);
}
// Downsampling precision levels
public enum DownSampling {
None(0, ""),
Second(1, "second"),
Minute(2, "minute"),
Hour(3, "hour"),
Day(4, "day");
public int getValue();
public String getName();
}
// Storage identifier
public class StorageID {
// Unique identifier in storage
}
// Stream definition
public class StreamDefinition {
// Defines stream processing configurations
}
// Base metrics class
public abstract class Metrics extends StreamData implements StorageData {
protected long timeBucket;
protected long lastUpdateTimestamp;
public abstract boolean combine(Metrics metrics);
public abstract void calculate();
public abstract Metrics toHour();
public abstract Metrics toDay();
}
// Base record class
public abstract class Record implements StorageData {
protected long timeBucket;
}
// Default scope definitions for stream annotations
public class DefaultScopeDefine {
public static final int SERVICE = 1;
public static final int SERVICE_INSTANCE = 2;
public static final int ENDPOINT = 3;
public static final int SERVICE_RELATION = 4;
public static final int SERVICE_INSTANCE_RELATION = 5;
public static final int ENDPOINT_RELATION = 6;
public static final int DATABASE_ACCESS = 7;
public static final int ALL = 99;
}