CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-skywalking--server-core

Core analysis engine and storage abstractions for Apache SkyWalking observability platform

Pending
Overview
Eval results
Files

analysis-framework.mddocs/

Analysis Framework

The SkyWalking analysis framework provides the core stream processing engine that transforms raw telemetry data into structured metrics and records. Built around the Observability Analysis Language (OAL), it offers high-performance real-time data processing with configurable downsampling and entity management.

Stream Processing

StreamProcessor Interface

The foundation of the streaming analysis engine.

public interface StreamProcessor<STREAM> {
    /**
     * Processes incoming stream data
     * @param stream The streaming data to be processed
     */
    void in(STREAM stream);
}

Stream Annotation

Marks classes for OAL stream processing analysis.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Stream {
    /**
     * @return name of this stream definition.
     */
    String name();
    
    /**
     * @return scope id, see {@link ScopeDeclaration}
     */
    int scopeId();
    
    /**
     * @return the converter type between entity and storage record persistence. The converter could be override by the
     * storage implementation if necessary. Default, return {@link org.apache.skywalking.oap.server.core.storage.type.StorageBuilder}
     * for general suitable.
     */
    Class<? extends StorageBuilder> builder();
    
    /**
     * @return the stream processor type, see {@link MetricsStreamProcessor}, {@link RecordStreamProcessor},  {@link
     * TopNStreamProcessor} and {@link NoneStreamProcessor} for more details.
     */
    Class<? extends StreamProcessor> processor();
}

StreamDefinition

Defines configurations for stream processing.

public class StreamDefinition {
    private String name;
    private int scopeId;
    private Class<? extends StreamBuilder> builderClass;
    private Class<? extends StreamProcessor> processorClass;
    
    /**
     * Gets the stream name
     * @return Stream name
     */
    public String getName();
    
    /**
     * Gets the scope identifier
     * @return Scope ID
     */
    public int getScopeId();
    
    /**
     * Gets the builder class
     * @return Builder class
     */
    public Class<? extends StreamBuilder> getBuilderClass();
    
    /**
     * Gets the processor class  
     * @return Processor class
     */
    public Class<? extends StreamProcessor> getProcessorClass();
}

Source Dispatching

SourceDispatcher Interface

Routes telemetry sources to appropriate stream processors.

public interface SourceDispatcher<SOURCE> {
    /**
     * Dispatches source data to streaming process
     * @param source The telemetry source to dispatch
     */
    void dispatch(SOURCE source);
}

DispatcherManager

Central manager for all source dispatchers.

public class DispatcherManager implements DispatcherDetectorListener {
    
    /**
     * Routes source to appropriate dispatchers
     * @param source The source to forward
     * @throws IOException If routing fails
     */
    public void forward(ISource source) throws IOException;
    
    /**
     * Scans for and registers dispatcher implementations
     */
    public void scan() throws IOException, IllegalAccessException, InstantiationException;
    
    /**
     * Adds dispatcher if class is valid
     * @param aClass Class to check and add as dispatcher
     */
    public void addIfAsSourceDispatcher(Class<?> aClass);
}

ID Management

IDManager

Central service for encoding and decoding entity identifiers.

public class IDManager {
    
    /**
     * Service ID operations
     */
    public static class ServiceID {
        
        /**
         * Creates service ID from name and normalization flag
         * @param name Service name
         * @param isNormal Whether service name is normalized
         * @return Encoded service ID
         */
        public static String buildId(String name, boolean isNormal);
        
        /**
         * Parses service ID into components
         * @param id Service ID to parse
         * @return Service ID definition with components
         */
        public static ServiceIDDefinition analysisId(String id);
        
        /**
         * Creates relation ID between services
         * @param define Service relation definition
         * @return Encoded relation ID
         */
        public static String buildRelationId(ServiceRelationDefine define);
    }
    
    /**
     * Service instance ID operations
     */
    public static class ServiceInstanceID {
        
        /**
         * Creates instance ID from service and instance name
         * @param serviceId Parent service ID
         * @param instanceName Instance name
         * @return Encoded instance ID
         */
        public static String buildId(String serviceId, String instanceName);
        
        /**
         * Parses instance ID into components
         * @param id Instance ID to parse
         * @return Instance ID definition with components
         */
        public static InstanceIDDefinition analysisId(String id);
    }
    
    /**
     * Endpoint ID operations
     */
    public static class EndpointID {
        
        /**
         * Creates endpoint ID from service and endpoint name
         * @param serviceId Parent service ID
         * @param endpointName Endpoint name
         * @return Encoded endpoint ID
         */
        public static String buildId(String serviceId, String endpointName);
        
        /**
         * Parses endpoint ID into components
         * @param id Endpoint ID to parse
         * @return Endpoint ID definition with components
         */
        public static EndpointIDDefinition analysisId(String id);
    }
    
    /**
     * Process ID operations
     */
    public static class ProcessID {
        
        /**
         * Creates process ID from instance and process name
         * @param instanceId Parent instance ID
         * @param processName Process name
         * @return Encoded process ID
         */
        public static String buildId(String instanceId, String processName);
        
        /**
         * Parses process ID into components
         * @param id Process ID to parse
         * @return Process ID definition with components
         */
        public static ProcessIDDefinition analysisId(String id);
    }
    
    /**
     * Network address alias operations
     */
    public static class NetworkAddressAliasDefine {
        
        /**
         * Creates network address ID
         * @param networkAddress Network address
         * @return Encoded network address ID
         */
        public static String buildId(String networkAddress);
        
        /**
         * Parses network address ID
         * @param id Network address ID to parse
         * @return Network address definition
         */
        public static NetworkAddressIDDefinition analysisId(String id);
    }
}

Time Management

TimeBucket

Manages time bucket operations for metrics downsampling.

public class TimeBucket {
    
    /**
     * Converts time bucket to timestamp
     * @param timeBucket Time bucket value
     * @param downSampling Downsampling level
     * @return Timestamp in milliseconds
     */
    public static long getTimestamp(long timeBucket, DownSampling downSampling);
    
    /**
     * Gets current minute precision time bucket
     * @param timestamp Current timestamp
     * @return Minute precision time bucket
     */
    public static long getMinuteTimeBucket(long timestamp);
    
    /**
     * Gets current hour precision time bucket
     * @param timestamp Current timestamp
     * @return Hour precision time bucket
     */
    public static long getHourTimeBucket(long timestamp);
    
    /**
     * Gets current day precision time bucket
     * @param timestamp Current timestamp
     * @return Day precision time bucket
     */
    public static long getDayTimeBucket(long timestamp);
    
    /**
     * Checks if time bucket is minute precision
     * @param timeBucket Time bucket to check
     * @return True if minute precision
     */
    public static boolean isMinuteBucket(long timeBucket);
    
    /**
     * Checks if time bucket is hour precision
     * @param timeBucket Time bucket to check
     * @return True if hour precision
     */
    public static boolean isHourBucket(long timeBucket);
    
    /**
     * Checks if time bucket is day precision
     * @param timeBucket Time bucket to check
     * @return True if day precision
     */
    public static boolean isDayBucket(long timeBucket);
}

DownSampling

Defines time precision levels for metrics aggregation.

public enum DownSampling {
    /**
     * Minute precision (1-minute buckets)
     */
    Minute(Calendar.MINUTE, "Minute", 1),
    
    /**
     * Hour precision (1-hour buckets)
     */
    Hour(Calendar.HOUR_OF_DAY, "Hour", 24),
    
    /**
     * Day precision (1-day buckets)
     */
    Day(Calendar.DAY_OF_MONTH, "Day", 30);
    
    private final int calendarUnit;
    private final String name;
    private final int size;
    
    /**
     * Gets the calendar unit for this downsampling level
     * @return Calendar unit constant
     */
    public int getCalendarUnit();
    
    /**
     * Gets the name of this downsampling level
     * @return Downsampling name
     */
    public String getName();
    
    /**
     * Gets the typical size/count for this level
     * @return Size value
     */
    public int getSize();
}

Stream Processing Workers

MetricsStreamProcessor

Processes metrics data streams with aggregation and downsampling.

public class MetricsStreamProcessor extends StreamProcessor<Metrics> {
    
    /**
     * Processes incoming metrics stream
     * @param metrics Metrics data to process
     */
    @Override
    public void in(Metrics metrics);
    
    /**
     * Creates processor instance for metrics stream
     * @param moduleDefineHolder Module services holder
     * @param definition Stream definition
     * @param metricsClass Metrics class type
     * @return Configured processor
     */
    public static MetricsStreamProcessor create(ModuleDefineHolder moduleDefineHolder,
                                              StreamDefinition definition,
                                              Class<? extends Metrics> metricsClass);
}

RecordStreamProcessor

Processes record data streams for log and event storage.

public class RecordStreamProcessor extends StreamProcessor<Record> {
    
    /**
     * Processes incoming record stream
     * @param record Record data to process
     */
    @Override
    public void in(Record record);
    
    /**
     * Creates processor instance for record stream
     * @param moduleDefineHolder Module services holder
     * @param definition Stream definition
     * @param recordClass Record class type
     * @return Configured processor
     */
    public static RecordStreamProcessor create(ModuleDefineHolder moduleDefineHolder,
                                             StreamDefinition definition,
                                             Class<? extends Record> recordClass);
}

ManagementStreamProcessor

Processes management data streams for metadata and configuration.

public class ManagementStreamProcessor extends StreamProcessor<Management> {
    
    /**
     * Processes incoming management stream
     * @param management Management data to process
     */
    @Override
    public void in(Management management);
}

TopNStreamProcessor

Processes TopN aggregation streams for ranking and sorting.

public class TopNStreamProcessor extends StreamProcessor<TopN> {
    
    /**
     * Processes incoming TopN stream
     * @param topN TopN data to process
     */
    @Override
    public void in(TopN topN);
}

Usage Examples

Implementing Custom Source Dispatcher

@Component
public class CustomTelemetryDispatcher implements SourceDispatcher<CustomTelemetrySource> {
    
    @Override
    public void dispatch(CustomTelemetrySource source) {
        // Prepare source data
        source.prepare();
        
        // Set entity ID using IDManager
        String serviceId = IDManager.ServiceID.buildId(
            source.getServiceName(), 
            true // normalized
        );
        source.setEntityId(serviceId);
        
        // Set time bucket for aggregation
        long timeBucket = TimeBucket.getMinuteTimeBucket(
            source.getTimestamp()
        );
        source.setTimeBucket(timeBucket);
        
        // Forward to appropriate stream processor
        if (source.isMetric()) {
            metricsProcessor.in(source.toMetrics());
        } else if (source.isRecord()) {
            recordProcessor.in(source.toRecord());
        }
    }
}

Creating Custom Stream Definition

@Stream(
    name = "custom_service_metrics", 
    scopeId = DefaultScopeDefine.SERVICE,
    builder = CustomServiceMetrics.Builder.class, 
    processor = MetricsStreamProcessor.class
)
public class CustomServiceMetrics extends Metrics {
    
    @Getter @Setter 
    private String serviceName;
    
    @Getter @Setter 
    private long requestCount;
    
    @Getter @Setter 
    private long responseTime;
    
    @Override
    public boolean combine(Metrics metrics) {
        CustomServiceMetrics other = (CustomServiceMetrics) metrics;
        this.requestCount += other.getRequestCount();
        this.responseTime += other.getResponseTime();
        return true;
    }
    
    @Override
    public void calculate() {
        // Calculate average response time or other derived metrics
        if (requestCount > 0) {
            // Perform calculations
        }
    }
    
    @Override
    public Metrics toHour() {
        CustomServiceMetrics hourMetrics = new CustomServiceMetrics();
        hourMetrics.copyFrom(this);
        return hourMetrics;
    }
    
    @Override  
    public Metrics toDay() {
        CustomServiceMetrics dayMetrics = new CustomServiceMetrics();
        dayMetrics.copyFrom(this);
        return dayMetrics;
    }
    
    public static class Builder implements StorageBuilder<CustomServiceMetrics> {
        @Override
        public CustomServiceMetrics storage2Entity(Convert2Entity converter) {
            // Build entity from storage data
            CustomServiceMetrics metrics = new CustomServiceMetrics();
            // Set fields from converter
            return metrics;
        }
        
        @Override
        public void entity2Storage(CustomServiceMetrics storageData, 
                                 Convert2Storage converter) {
            // Convert entity to storage format
            converter.accept("service_name", storageData.getServiceName());
            converter.accept("request_count", storageData.getRequestCount());
            converter.accept("response_time", storageData.getResponseTime());
        }
    }
}

Core Types

/**
 * Service ID definition with parsed components
 */
public class ServiceIDDefinition {
    private String name;
    private boolean isNormal;
    
    public String getName();
    public boolean isNormal();
}

/**
 * Instance ID definition with parsed components
 */
public class InstanceIDDefinition {
    private String serviceId;
    private String instanceName;
    
    public String getServiceId();
    public String getInstanceName();
}

/**
 * Endpoint ID definition with parsed components
 */
public class EndpointIDDefinition {
    private String serviceId;
    private String endpointName;
    
    public String getServiceId();
    public String getEndpointName();
}

/**
 * Service relation definition
 */
public class ServiceRelationDefine {
    private String sourceServiceId;
    private String destServiceId;
    private DetectPoint detectPoint;
    
    public String getSourceServiceId();
    public String getDestServiceId();
    public DetectPoint getDetectPoint();
}

/**
 * Detection point for service relations
 */
public enum DetectPoint {
    CLIENT, SERVER, PROXY
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-skywalking--server-core

docs

analysis-framework.md

configuration.md

index.md

profiling.md

query-services.md

remote-communication.md

source-processing.md

storage-layer.md

tile.json