Core analysis engine and storage abstractions for Apache SkyWalking observability platform
—
The SkyWalking analysis framework provides the core stream processing engine that transforms raw telemetry data into structured metrics and records. Built around the Observability Analysis Language (OAL), it offers high-performance real-time data processing with configurable downsampling and entity management.
The foundation of the streaming analysis engine.
public interface StreamProcessor<STREAM> {
/**
* Processes incoming stream data
* @param stream The streaming data to be processed
*/
void in(STREAM stream);
}Marks classes for OAL stream processing analysis.
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Stream {
/**
* @return name of this stream definition.
*/
String name();
/**
* @return scope id, see {@link ScopeDeclaration}
*/
int scopeId();
/**
* @return the converter type between entity and storage record persistence. The converter could be override by the
* storage implementation if necessary. Default, return {@link org.apache.skywalking.oap.server.core.storage.type.StorageBuilder}
* for general suitable.
*/
Class<? extends StorageBuilder> builder();
/**
* @return the stream processor type, see {@link MetricsStreamProcessor}, {@link RecordStreamProcessor}, {@link
* TopNStreamProcessor} and {@link NoneStreamProcessor} for more details.
*/
Class<? extends StreamProcessor> processor();
}Defines configurations for stream processing.
public class StreamDefinition {
private String name;
private int scopeId;
private Class<? extends StreamBuilder> builderClass;
private Class<? extends StreamProcessor> processorClass;
/**
* Gets the stream name
* @return Stream name
*/
public String getName();
/**
* Gets the scope identifier
* @return Scope ID
*/
public int getScopeId();
/**
* Gets the builder class
* @return Builder class
*/
public Class<? extends StreamBuilder> getBuilderClass();
/**
* Gets the processor class
* @return Processor class
*/
public Class<? extends StreamProcessor> getProcessorClass();
}Routes telemetry sources to appropriate stream processors.
public interface SourceDispatcher<SOURCE> {
/**
* Dispatches source data to streaming process
* @param source The telemetry source to dispatch
*/
void dispatch(SOURCE source);
}Central manager for all source dispatchers.
public class DispatcherManager implements DispatcherDetectorListener {
/**
* Routes source to appropriate dispatchers
* @param source The source to forward
* @throws IOException If routing fails
*/
public void forward(ISource source) throws IOException;
/**
* Scans for and registers dispatcher implementations
*/
public void scan() throws IOException, IllegalAccessException, InstantiationException;
/**
* Adds dispatcher if class is valid
* @param aClass Class to check and add as dispatcher
*/
public void addIfAsSourceDispatcher(Class<?> aClass);
}Central service for encoding and decoding entity identifiers.
public class IDManager {
/**
* Service ID operations
*/
public static class ServiceID {
/**
* Creates service ID from name and normalization flag
* @param name Service name
* @param isNormal Whether service name is normalized
* @return Encoded service ID
*/
public static String buildId(String name, boolean isNormal);
/**
* Parses service ID into components
* @param id Service ID to parse
* @return Service ID definition with components
*/
public static ServiceIDDefinition analysisId(String id);
/**
* Creates relation ID between services
* @param define Service relation definition
* @return Encoded relation ID
*/
public static String buildRelationId(ServiceRelationDefine define);
}
/**
* Service instance ID operations
*/
public static class ServiceInstanceID {
/**
* Creates instance ID from service and instance name
* @param serviceId Parent service ID
* @param instanceName Instance name
* @return Encoded instance ID
*/
public static String buildId(String serviceId, String instanceName);
/**
* Parses instance ID into components
* @param id Instance ID to parse
* @return Instance ID definition with components
*/
public static InstanceIDDefinition analysisId(String id);
}
/**
* Endpoint ID operations
*/
public static class EndpointID {
/**
* Creates endpoint ID from service and endpoint name
* @param serviceId Parent service ID
* @param endpointName Endpoint name
* @return Encoded endpoint ID
*/
public static String buildId(String serviceId, String endpointName);
/**
* Parses endpoint ID into components
* @param id Endpoint ID to parse
* @return Endpoint ID definition with components
*/
public static EndpointIDDefinition analysisId(String id);
}
/**
* Process ID operations
*/
public static class ProcessID {
/**
* Creates process ID from instance and process name
* @param instanceId Parent instance ID
* @param processName Process name
* @return Encoded process ID
*/
public static String buildId(String instanceId, String processName);
/**
* Parses process ID into components
* @param id Process ID to parse
* @return Process ID definition with components
*/
public static ProcessIDDefinition analysisId(String id);
}
/**
* Network address alias operations
*/
public static class NetworkAddressAliasDefine {
/**
* Creates network address ID
* @param networkAddress Network address
* @return Encoded network address ID
*/
public static String buildId(String networkAddress);
/**
* Parses network address ID
* @param id Network address ID to parse
* @return Network address definition
*/
public static NetworkAddressIDDefinition analysisId(String id);
}
}Manages time bucket operations for metrics downsampling.
public class TimeBucket {
/**
* Converts time bucket to timestamp
* @param timeBucket Time bucket value
* @param downSampling Downsampling level
* @return Timestamp in milliseconds
*/
public static long getTimestamp(long timeBucket, DownSampling downSampling);
/**
* Gets current minute precision time bucket
* @param timestamp Current timestamp
* @return Minute precision time bucket
*/
public static long getMinuteTimeBucket(long timestamp);
/**
* Gets current hour precision time bucket
* @param timestamp Current timestamp
* @return Hour precision time bucket
*/
public static long getHourTimeBucket(long timestamp);
/**
* Gets current day precision time bucket
* @param timestamp Current timestamp
* @return Day precision time bucket
*/
public static long getDayTimeBucket(long timestamp);
/**
* Checks if time bucket is minute precision
* @param timeBucket Time bucket to check
* @return True if minute precision
*/
public static boolean isMinuteBucket(long timeBucket);
/**
* Checks if time bucket is hour precision
* @param timeBucket Time bucket to check
* @return True if hour precision
*/
public static boolean isHourBucket(long timeBucket);
/**
* Checks if time bucket is day precision
* @param timeBucket Time bucket to check
* @return True if day precision
*/
public static boolean isDayBucket(long timeBucket);
}Defines time precision levels for metrics aggregation.
public enum DownSampling {
/**
* Minute precision (1-minute buckets)
*/
Minute(Calendar.MINUTE, "Minute", 1),
/**
* Hour precision (1-hour buckets)
*/
Hour(Calendar.HOUR_OF_DAY, "Hour", 24),
/**
* Day precision (1-day buckets)
*/
Day(Calendar.DAY_OF_MONTH, "Day", 30);
private final int calendarUnit;
private final String name;
private final int size;
/**
* Gets the calendar unit for this downsampling level
* @return Calendar unit constant
*/
public int getCalendarUnit();
/**
* Gets the name of this downsampling level
* @return Downsampling name
*/
public String getName();
/**
* Gets the typical size/count for this level
* @return Size value
*/
public int getSize();
}Processes metrics data streams with aggregation and downsampling.
public class MetricsStreamProcessor extends StreamProcessor<Metrics> {
/**
* Processes incoming metrics stream
* @param metrics Metrics data to process
*/
@Override
public void in(Metrics metrics);
/**
* Creates processor instance for metrics stream
* @param moduleDefineHolder Module services holder
* @param definition Stream definition
* @param metricsClass Metrics class type
* @return Configured processor
*/
public static MetricsStreamProcessor create(ModuleDefineHolder moduleDefineHolder,
StreamDefinition definition,
Class<? extends Metrics> metricsClass);
}Processes record data streams for log and event storage.
public class RecordStreamProcessor extends StreamProcessor<Record> {
/**
* Processes incoming record stream
* @param record Record data to process
*/
@Override
public void in(Record record);
/**
* Creates processor instance for record stream
* @param moduleDefineHolder Module services holder
* @param definition Stream definition
* @param recordClass Record class type
* @return Configured processor
*/
public static RecordStreamProcessor create(ModuleDefineHolder moduleDefineHolder,
StreamDefinition definition,
Class<? extends Record> recordClass);
}Processes management data streams for metadata and configuration.
public class ManagementStreamProcessor extends StreamProcessor<Management> {
/**
* Processes incoming management stream
* @param management Management data to process
*/
@Override
public void in(Management management);
}Processes TopN aggregation streams for ranking and sorting.
public class TopNStreamProcessor extends StreamProcessor<TopN> {
/**
* Processes incoming TopN stream
* @param topN TopN data to process
*/
@Override
public void in(TopN topN);
}@Component
public class CustomTelemetryDispatcher implements SourceDispatcher<CustomTelemetrySource> {
@Override
public void dispatch(CustomTelemetrySource source) {
// Prepare source data
source.prepare();
// Set entity ID using IDManager
String serviceId = IDManager.ServiceID.buildId(
source.getServiceName(),
true // normalized
);
source.setEntityId(serviceId);
// Set time bucket for aggregation
long timeBucket = TimeBucket.getMinuteTimeBucket(
source.getTimestamp()
);
source.setTimeBucket(timeBucket);
// Forward to appropriate stream processor
if (source.isMetric()) {
metricsProcessor.in(source.toMetrics());
} else if (source.isRecord()) {
recordProcessor.in(source.toRecord());
}
}
}@Stream(
name = "custom_service_metrics",
scopeId = DefaultScopeDefine.SERVICE,
builder = CustomServiceMetrics.Builder.class,
processor = MetricsStreamProcessor.class
)
public class CustomServiceMetrics extends Metrics {
@Getter @Setter
private String serviceName;
@Getter @Setter
private long requestCount;
@Getter @Setter
private long responseTime;
@Override
public boolean combine(Metrics metrics) {
CustomServiceMetrics other = (CustomServiceMetrics) metrics;
this.requestCount += other.getRequestCount();
this.responseTime += other.getResponseTime();
return true;
}
@Override
public void calculate() {
// Calculate average response time or other derived metrics
if (requestCount > 0) {
// Perform calculations
}
}
@Override
public Metrics toHour() {
CustomServiceMetrics hourMetrics = new CustomServiceMetrics();
hourMetrics.copyFrom(this);
return hourMetrics;
}
@Override
public Metrics toDay() {
CustomServiceMetrics dayMetrics = new CustomServiceMetrics();
dayMetrics.copyFrom(this);
return dayMetrics;
}
public static class Builder implements StorageBuilder<CustomServiceMetrics> {
@Override
public CustomServiceMetrics storage2Entity(Convert2Entity converter) {
// Build entity from storage data
CustomServiceMetrics metrics = new CustomServiceMetrics();
// Set fields from converter
return metrics;
}
@Override
public void entity2Storage(CustomServiceMetrics storageData,
Convert2Storage converter) {
// Convert entity to storage format
converter.accept("service_name", storageData.getServiceName());
converter.accept("request_count", storageData.getRequestCount());
converter.accept("response_time", storageData.getResponseTime());
}
}
}/**
* Service ID definition with parsed components
*/
public class ServiceIDDefinition {
private String name;
private boolean isNormal;
public String getName();
public boolean isNormal();
}
/**
* Instance ID definition with parsed components
*/
public class InstanceIDDefinition {
private String serviceId;
private String instanceName;
public String getServiceId();
public String getInstanceName();
}
/**
* Endpoint ID definition with parsed components
*/
public class EndpointIDDefinition {
private String serviceId;
private String endpointName;
public String getServiceId();
public String getEndpointName();
}
/**
* Service relation definition
*/
public class ServiceRelationDefine {
private String sourceServiceId;
private String destServiceId;
private DetectPoint detectPoint;
public String getSourceServiceId();
public String getDestServiceId();
public DetectPoint getDetectPoint();
}
/**
* Detection point for service relations
*/
public enum DetectPoint {
CLIENT, SERVER, PROXY
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-skywalking--server-core