CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-skywalking--server-core

Core analysis engine and storage abstractions for Apache SkyWalking observability platform

Pending
Overview
Eval results
Files

source-processing.mddocs/

Source Processing

The SkyWalking source processing system handles telemetry data ingestion, transformation, and routing. It provides the foundation for receiving various types of observability data including traces, metrics, logs, and infrastructure telemetry from different sources and protocols.

Core Source Interfaces

ISource

Base interface for all telemetry data sources in the SkyWalking system.

public interface ISource {
    
    /**
     * Gets the scope identifier for this source
     * @return Scope ID (service, instance, endpoint, etc.)
     */
    int scope();
    
    /**
     * Gets the time bucket for metrics aggregation
     * @return Time bucket value
     */
    long getTimeBucket();
    
    /**
     * Sets the time bucket for metrics aggregation
     * @param timeBucket Time bucket value
     */
    void setTimeBucket(long timeBucket);
    
    /**
     * Gets the entity identifier for this source
     * @return Entity ID (service ID, instance ID, endpoint ID, etc.)
     */
    String getEntityId();
    
    /**
     * Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(ISource)}
     */
    default void prepare() {
    }
}

SourceReceiver

Service interface for receiving and processing telemetry sources.

public interface SourceReceiver extends Service {
    
    /**
     * Receives and processes a telemetry source
     * @param source The source data to process
     * @throws IOException If source processing fails
     */
    void receive(ISource source) throws IOException;
    
    /**
     * Receives multiple sources in batch
     * @param sources List of sources to process
     * @throws IOException If batch processing fails
     */
    void receiveBatch(List<? extends ISource> sources) throws IOException;
}

SourceReceiverImpl

Default implementation of source receiver with dispatching logic.

public class SourceReceiverImpl implements SourceReceiver {
    
    private DispatcherManager dispatcherManager;
    
    @Override
    public void receive(ISource source) throws IOException;
    
    @Override
    public void receiveBatch(List<? extends ISource> sources) throws IOException;
    
    /**
     * Sets the dispatcher manager for routing sources
     * @param dispatcherManager Dispatcher manager instance
     */
    public void setDispatcherManager(DispatcherManager dispatcherManager);
    
    /**
     * Validates source before processing
     * @param source Source to validate
     * @return True if source is valid
     */
    protected boolean validateSource(ISource source);
    
    /**
     * Preprocesses source before dispatching
     * @param source Source to preprocess
     */
    protected void preprocessSource(ISource source);
}

Source Annotations

Source

Annotation to mark classes as telemetry sources for automatic discovery.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Source {
    
    /**
     * Source name for identification
     * @return Source name
     */
    String name() default "";
    
    /**
     * Source category (trace, metric, log, infrastructure)
     * @return Source category
     */
    String category() default "";
}

ScopeDeclaration

Annotation to declare the scope of source data.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ScopeDeclaration {
    
    /**
     * Scope identifier
     * @return Scope ID
     */
    int id();
    
    /**
     * Scope name
     * @return Scope name
     */
    String name() default "";
}

ScopeDefaultColumn

Annotation to define default columns for scope entities.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ScopeDefaultColumn {
    
    /**
     * Virtual column name
     * @return Column name
     */
    String virtualColumnName();
    
    /**
     * Field name in the source class
     * @return Field name
     */
    String fieldName();
    
    /**
     * Whether the column requires entity ID
     * @return True if entity ID required
     */
    boolean requireEntityId() default false;
}

Service Sources

Service

Source for service-level telemetry data and metadata.

@Source(name = "service", category = "service")
@ScopeDeclaration(id = DefaultScopeDefine.SERVICE, name = "Service")
public class Service extends ISource {
    
    @Getter @Setter
    private String name;
    
    @Getter @Setter
    private String shortName;
    
    @Getter @Setter
    private String group;
    
    @Getter @Setter
    private NodeType nodeType;
    
    @Getter @Setter
    private List<String> layers;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
    
    /**
     * Checks if service is normal (not virtual)
     * @return True if normal service
     */
    public boolean isNormal();
    
    /**
     * Gets service layer information
     * @return Primary service layer
     */
    public String getLayer();
}

ServiceMeta

Source for service metadata and registration information.

@Source(name = "service_meta", category = "service")
public class ServiceMeta extends ISource {
    
    @Getter @Setter
    private String name;
    
    @Getter @Setter
    private NodeType nodeType;
    
    @Getter @Setter
    private List<String> layers;
    
    @Getter @Setter
    private JsonObject properties;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
    
    /**
     * Adds property to service metadata
     * @param key Property key
     * @param value Property value
     */
    public void addProperty(String key, String value);
    
    /**
     * Gets property from service metadata
     * @param key Property key
     * @return Property value or null
     */
    public String getProperty(String key);
}

ServiceRelation

Source for service relationship and dependency information.

@Source(name = "service_relation", category = "relation")
@ScopeDeclaration(id = DefaultScopeDefine.SERVICE_RELATION, name = "ServiceRelation")
public class ServiceRelation extends ISource {
    
    @Getter @Setter
    private String sourceServiceId;
    
    @Getter @Setter
    private String destServiceId;
    
    @Getter @Setter
    private String sourceServiceName;
    
    @Getter @Setter
    private String destServiceName;
    
    @Getter @Setter
    private DetectPoint detectPoint;
    
    @Getter @Setter
    private int componentId;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
    
    /**
     * Gets relation ID for entity identification
     * @return Service relation ID
     */
    public String getRelationId();
}

Instance Sources

ServiceInstance

Source for service instance telemetry data and metadata.

@Source(name = "service_instance", category = "service")
@ScopeDeclaration(id = DefaultScopeDefine.SERVICE_INSTANCE, name = "ServiceInstance")
public class ServiceInstance extends ISource {
    
    @Getter @Setter
    private String name;
    
    @Getter @Setter
    private String serviceId;
    
    @Getter @Setter
    private String serviceName;
    
    @Getter @Setter
    private JsonObject properties;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
    
    /**
     * Gets instance properties
     * @return Instance properties as JSON
     */
    public JsonObject getProperties();
    
    /**
     * Adds instance property
     * @param key Property key
     * @param value Property value
     */
    public void addProperty(String key, String value);
    
    /**
     * Gets specific instance property
     * @param key Property key
     * @return Property value or null
     */
    public String getProperty(String key);
}

ServiceInstanceRelation

Source for service instance relationship data.

@Source(name = "service_instance_relation", category = "relation")
@ScopeDeclaration(id = DefaultScopeDefine.SERVICE_INSTANCE_RELATION, name = "ServiceInstanceRelation")
public class ServiceInstanceRelation extends ISource {
    
    @Getter @Setter
    private String sourceServiceId;
    
    @Getter @Setter
    private String sourceServiceInstanceId;
    
    @Getter @Setter
    private String destServiceId;
    
    @Getter @Setter
    private String destServiceInstanceId;
    
    @Getter @Setter
    private String sourceServiceName;
    
    @Getter @Setter
    private String sourceServiceInstanceName;
    
    @Getter @Setter
    private String destServiceName;
    
    @Getter @Setter
    private String destServiceInstanceName;
    
    @Getter @Setter
    private DetectPoint detectPoint;
    
    @Getter @Setter
    private int componentId;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
}

Endpoint Sources

Endpoint

Source for endpoint telemetry data and performance metrics.

@Source(name = "endpoint", category = "endpoint")
@ScopeDeclaration(id = DefaultScopeDefine.ENDPOINT, name = "Endpoint")
public class Endpoint extends ISource {
    
    @Getter @Setter
    private String name;
    
    @Getter @Setter
    private String serviceId;
    
    @Getter @Setter
    private String serviceName;
    
    @Getter @Setter
    private String serviceInstanceId;
    
    @Getter @Setter
    private String serviceInstanceName;
    
    @Getter @Setter
    private int latency;
    
    @Getter @Setter
    private boolean status;
    
    @Getter @Setter
    private int responseCode;
    
    @Getter @Setter
    private RequestType type;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
    
    /**
     * Checks if endpoint call was successful
     * @return True if successful (status == true)
     */
    public boolean isSuccess();
    
    /**
     * Gets endpoint latency in milliseconds
     * @return Latency value
     */
    public int getLatency();
}

EndpointMeta

Source for endpoint metadata and registration information.

@Source(name = "endpoint_meta", category = "endpoint")
public class EndpointMeta extends ISource {
    
    @Getter @Setter
    private String endpoint;
    
    @Getter @Setter
    private String serviceId;
    
    @Getter @Setter
    private String serviceName;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
}

EndpointRelation

Source for endpoint relationship and dependency information.

@Source(name = "endpoint_relation", category = "relation")
@ScopeDeclaration(id = DefaultScopeDefine.ENDPOINT_RELATION, name = "EndpointRelation")
public class EndpointRelation extends ISource {
    
    @Getter @Setter
    private String endpoint;
    
    @Getter @Setter
    private String childEndpoint;
    
    @Getter @Setter
    private int rpcLatency;
    
    @Getter @Setter
    private boolean status;
    
    @Getter @Setter
    private int responseCode;
    
    @Getter @Setter
    private RequestType type;
    
    @Getter @Setter
    private DetectPoint detectPoint;
    
    @Getter @Setter
    private int componentId;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
}

Specialized Sources

DatabaseAccess

Source for database operation telemetry data.

@Source(name = "database_access", category = "database")
@ScopeDeclaration(id = DefaultScopeDefine.DATABASE_ACCESS, name = "DatabaseAccess")
public class DatabaseAccess extends ISource {
    
    @Getter @Setter
    private String databaseTypeId;
    
    @Getter @Setter
    private String name;
    
    @Getter @Setter
    private int latency;
    
    @Getter @Setter
    private boolean status;
    
    @Getter @Setter
    private String sqlStatement;
    
    @Getter @Setter
    private String operation;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
    
    /**
     * Gets database type identifier
     * @return Database type ID
     */
    public String getDatabaseTypeId();
    
    /**
     * Gets SQL statement (may be truncated)
     * @return SQL statement
     */
    public String getSqlStatement();
}

CacheAccess

Source for cache operation telemetry data.

@Source(name = "cache_access", category = "cache")
public class CacheAccess extends ISource {
    
    @Getter @Setter
    private String name;
    
    @Getter @Setter
    private int latency;
    
    @Getter @Setter
    private boolean status;
    
    @Getter @Setter
    private String operation;
    
    @Getter @Setter
    private String key;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
    
    /**
     * Gets cache operation type
     * @return Operation (GET, SET, DELETE, etc.)
     */
    public String getOperation();
}

MQAccess

Source for message queue operation telemetry data.

@Source(name = "mq_access", category = "mq")
public class MQAccess extends ISource {
    
    @Getter @Setter
    private String broker;
    
    @Getter @Setter
    private String topic;
    
    @Getter @Setter
    private int latency;
    
    @Getter @Setter
    private boolean status;
    
    @Getter @Setter
    private String operation;
    
    @Getter @Setter
    private TransmissionLatency transmissionLatency;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
    
    /**
     * Gets message broker information
     * @return Broker identifier
     */
    public String getBroker();
    
    /**
     * Gets message topic
     * @return Topic name
     */
    public String getTopic();
}

Log

Source for log data and events.

@Source(name = "log", category = "log")
public class Log extends ISource {
    
    @Getter @Setter
    private String serviceId;
    
    @Getter @Setter
    private String serviceInstanceId;
    
    @Getter @Setter
    private String endpointId;
    
    @Getter @Setter
    private String traceId;
    
    @Getter @Setter
    private String traceSegmentId;
    
    @Getter @Setter
    private int spanId;
    
    @Getter @Setter
    private ContentType contentType;
    
    @Getter @Setter
    private String content;
    
    @Getter @Setter
    private List<String> tags;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
    
    /**
     * Gets log content
     * @return Log content string
     */
    public String getContent();
    
    /**
     * Gets associated trace ID
     * @return Trace ID or null if not associated
     */
    public String getTraceId();
    
    /**
     * Adds tag to log entry
     * @param tag Tag to add
     */
    public void addTag(String tag);
}

Segment

Source for trace segment data.

@Source(name = "segment", category = "trace")
public class Segment extends ISource {
    
    @Getter @Setter
    private String traceId;
    
    @Getter @Setter
    private String segmentId;
    
    @Getter @Setter
    private String serviceId;
    
    @Getter @Setter
    private String serviceInstanceId;
    
    @Getter @Setter
    private String endpointName;
    
    @Getter @Setter
    private int latency;
    
    @Getter @Setter
    private boolean isError;
    
    @Getter @Setter
    private List<String> tags;
    
    @Getter @Setter
    private byte[] dataBinary;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
    
    /**
     * Gets trace segment identifier
     * @return Segment ID
     */
    public String getSegmentId();
    
    /**
     * Checks if segment contains errors
     * @return True if error occurred
     */
    public boolean isError();
}

Infrastructure Sources

K8SMetrics

Source for Kubernetes metrics and telemetry.

@Source(name = "k8s_metrics", category = "infrastructure")
public class K8SMetrics extends ISource {
    
    @Getter @Setter
    private String cluster;
    
    @Getter @Setter
    private String namespace;
    
    @Getter @Setter
    private String workload;
    
    @Getter @Setter
    private String pod;
    
    @Getter @Setter
    private String container;
    
    @Getter @Setter
    private long cpuUsage;
    
    @Getter @Setter
    private long memoryUsage;
    
    @Getter @Setter
    private Map<String, String> labels;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
    
    /**
     * Gets Kubernetes cluster name
     * @return Cluster name
     */
    public String getCluster();
    
    /**
     * Gets Kubernetes namespace
     * @return Namespace name
     */
    public String getNamespace();
}

EnvoyInstanceMetric

Source for Envoy proxy metrics and telemetry.

@Source(name = "envoy_instance_metric", category = "infrastructure")
public class EnvoyInstanceMetric extends ISource {
    
    @Getter @Setter
    private String serviceId;
    
    @Getter @Setter
    private String serviceInstanceId;
    
    @Getter @Setter
    private long totalRequestsCount;
    
    @Getter @Setter
    private long totalConnectionsCount;
    
    @Getter @Setter
    private long activeConnectionsCount;
    
    @Getter @Setter
    private double cpuUsage;
    
    @Getter @Setter
    private double heapMemoryUsed;
    
    @Override
    public int scope();
    
    @Override
    public String getEntityId();
    
    @Override
    public void prepare();
}

Source Enums and Constants

NodeType

Enumeration for service node types.

public enum NodeType {
    Normal(0), Browser(1), Unknown(2);
    
    private int value;
    
    NodeType(int value) {
        this.value = value;
    }
    
    public int value() {
        return value;
    }
    
    public static NodeType valueOf(int value) {
        for (NodeType nodeType : NodeType.values()) {
            if (nodeType.value == value) {
                return nodeType;
            }
        }
        throw new IllegalArgumentException("Unknown NodeType value: " + value);
    }
}

DetectPoint

Enumeration for detection points in service relationships.

public enum DetectPoint {
    CLIENT(0), SERVER(1), PROXY(2);
    
    private int value;
    
    DetectPoint(int value) {
        this.value = value;
    }
    
    public int value() {
        return value;
    }
    
    public static DetectPoint valueOf(int value) {
        for (DetectPoint detectPoint : DetectPoint.values()) {
            if (detectPoint.value == value) {
                return detectPoint;
            }
        }
        throw new IllegalArgumentException("Unknown DetectPoint value: " + value);
    }
}

RequestType

Enumeration for request types.

public enum RequestType {
    RPC(0), DATABASE(1), HTTP(2), CACHE(3), MQ(4);
    
    private int value;
    
    RequestType(int value) {
        this.value = value;
    }
    
    public int value() {
        return value;
    }
    
    public static RequestType valueOf(int value) {
        for (RequestType requestType : RequestType.values()) {
            if (requestType.value == value) {
                return requestType;
            }
        }
        throw new IllegalArgumentException("Unknown RequestType value: " + value);
    }
}

ContentType

Enumeration for log content types.

public enum ContentType {
    NONE(0), TEXT(1), JSON(2), YAML(3);
    
    private int value;
    
    ContentType(int value) {
        this.value = value;
    }
    
    public int value() {
        return value;
    }
    
    public static ContentType valueOf(int value) {
        for (ContentType contentType : ContentType.values()) {
            if (contentType.value == value) {
                return contentType;
            }
        }
        throw new IllegalArgumentException("Unknown ContentType value: " + value);
    }
}

Usage Examples

Implementing Custom Source

@Source(name = "custom_business_metric", category = "business")
@ScopeDeclaration(id = 1000, name = "BusinessMetric") // Custom scope ID
public class CustomBusinessMetric implements ISource {
    
    @Getter @Setter
    private String businessUnit;
    
    @Getter @Setter
    private String operation;
    
    @Getter @Setter
    private double revenue;
    
    @Getter @Setter
    private int transactionCount;
    
    @Getter @Setter
    private String currency;
    
    private long timeBucket;
    private String entityId;
    
    @Override
    public int scope() {
        return 1000; // Custom scope for business metrics
    }
    
    @Override
    public long getTimeBucket() {
        return timeBucket;
    }
    
    @Override
    public void setTimeBucket(long timeBucket) {
        this.timeBucket = timeBucket;
    }
    
    @Override
    public String getEntityId() {
        return entityId;
    }
    
    @Override
    public void prepare() {
        // Generate entity ID from business unit and operation
        this.entityId = businessUnit + ":" + operation;
        
        // Set time bucket if not already set
        if (timeBucket == 0) {
            timeBucket = TimeBucket.getMinuteTimeBucket(System.currentTimeMillis());
        }
    }
    
    public double getRevenuePerTransaction() {
        return transactionCount > 0 ? revenue / transactionCount : 0.0;
    }
}

Creating Custom Source Dispatcher

@Component
public class CustomBusinessMetricDispatcher implements SourceDispatcher<CustomBusinessMetric> {
    
    @Override
    public void dispatch(CustomBusinessMetric source) {
        // Validate source data
        if (source.getBusinessUnit() == null || source.getOperation() == null) {
            throw new IllegalArgumentException("Business unit and operation are required");
        }
        
        // Prepare source
        source.prepare();
        
        // Apply business rules
        applyBusinessRules(source);
        
        // Route to metrics processor
        MetricsStreamProcessor.getInstance().in(source);
    }
    
    private void applyBusinessRules(CustomBusinessMetric source) {
        // Example: Convert currency to USD if needed
        if (!"USD".equals(source.getCurrency())) {
            double exchangeRate = getExchangeRate(source.getCurrency(), "USD");
            source.setRevenue(source.getRevenue() * exchangeRate);
            source.setCurrency("USD");
        }
        
        // Example: Filter out test data
        if (source.getBusinessUnit().startsWith("TEST_")) {
            throw new IllegalArgumentException("Test data not allowed in production");
        }
    }
    
    private double getExchangeRate(String fromCurrency, String toCurrency) {
        // Implement currency conversion logic
        return 1.0; // Placeholder
    }
}

Processing Service Sources

public class ServiceSourceProcessor {
    
    private SourceReceiver sourceReceiver;
    
    public void processServiceRegistration(String serviceName, NodeType nodeType, 
                                         List<String> layers, Map<String, String> properties) {
        
        // Create service source
        Service serviceSource = new Service();
        serviceSource.setName(serviceName);
        serviceSource.setNodeType(nodeType);
        serviceSource.setLayers(layers);
        
        try {
            sourceReceiver.receive(serviceSource);
            System.out.println("Service registered: " + serviceName);
        } catch (IOException e) {
            System.err.println("Failed to register service: " + e.getMessage());
        }
        
        // Create service metadata source
        ServiceMeta metaSource = new ServiceMeta();
        metaSource.setName(serviceName);
        metaSource.setNodeType(nodeType);
        metaSource.setLayers(layers);
        
        JsonObject props = new JsonObject();
        for (Map.Entry<String, String> entry : properties.entrySet()) {
            props.addProperty(entry.getKey(), entry.getValue());
        }
        metaSource.setProperties(props);
        
        try {
            sourceReceiver.receive(metaSource);
            System.out.println("Service metadata registered: " + serviceName);
        } catch (IOException e) {
            System.err.println("Failed to register service metadata: " + e.getMessage());
        }
    }
    
    public void processServiceRelation(String sourceService, String destService, 
                                     DetectPoint detectPoint, int componentId) {
        
        ServiceRelation relationSource = new ServiceRelation();
        relationSource.setSourceServiceName(sourceService);
        relationSource.setDestServiceName(destService);
        relationSource.setDetectPoint(detectPoint);
        relationSource.setComponentId(componentId);
        
        try {
            sourceReceiver.receive(relationSource);
            System.out.println("Service relation recorded: " + sourceService + " -> " + destService);
        } catch (IOException e) {
            System.err.println("Failed to record service relation: " + e.getMessage());
        }
    }
}

Processing Endpoint Sources

public class EndpointSourceProcessor {
    
    private SourceReceiver sourceReceiver;
    
    public void processEndpointCall(String serviceName, String instanceName, String endpointName,
                                   int latency, boolean success, int responseCode, RequestType type) {
        
        // Create endpoint source
        Endpoint endpointSource = new Endpoint();
        endpointSource.setServiceName(serviceName);
        endpointSource.setServiceInstanceName(instanceName);
        endpointSource.setName(endpointName);
        endpointSource.setLatency(latency);
        endpointSource.setStatus(success);
        endpointSource.setResponseCode(responseCode);
        endpointSource.setType(type);
        
        try {
            sourceReceiver.receive(endpointSource);
            
            if (!success) {
                System.out.println("Error endpoint call recorded: " + endpointName + 
                                 " (" + responseCode + ")");
            }
        } catch (IOException e) {
            System.err.println("Failed to process endpoint call: " + e.getMessage());
        }
    }
    
    public void processEndpointRelation(String sourceEndpoint, String destEndpoint,
                                       int latency, boolean success, DetectPoint detectPoint) {
        
        EndpointRelation relationSource = new EndpointRelation();
        relationSource.setEndpoint(sourceEndpoint);
        relationSource.setChildEndpoint(destEndpoint);
        relationSource.setRpcLatency(latency);
        relationSource.setStatus(success);
        relationSource.setDetectPoint(detectPoint);
        
        try {
            sourceReceiver.receive(relationSource);
        } catch (IOException e) {
            System.err.println("Failed to process endpoint relation: " + e.getMessage());
        }
    }
}

Processing Database and Cache Sources

public class DatabaseCacheSourceProcessor {
    
    private SourceReceiver sourceReceiver;
    
    public void processDatabaseAccess(String databaseType, String operation, String sqlStatement,
                                    int latency, boolean success) {
        
        DatabaseAccess dbSource = new DatabaseAccess();
        dbSource.setDatabaseTypeId(databaseType);
        dbSource.setOperation(operation);
        dbSource.setSqlStatement(sqlStatement);
        dbSource.setLatency(latency);
        dbSource.setStatus(success);
        
        try {
            sourceReceiver.receive(dbSource);
            
            if (latency > 1000) { // Log slow queries
                System.out.println("Slow database query detected: " + latency + "ms - " + 
                                 sqlStatement.substring(0, Math.min(100, sqlStatement.length())));
            }
        } catch (IOException e) {
            System.err.println("Failed to process database access: " + e.getMessage());
        }
    }
    
    public void processCacheAccess(String cacheName, String operation, String key,
                                 int latency, boolean success) {
        
        CacheAccess cacheSource = new CacheAccess();
        cacheSource.setName(cacheName);
        cacheSource.setOperation(operation);
        cacheSource.setKey(key);
        cacheSource.setLatency(latency);
        cacheSource.setStatus(success);
        
        try {
            sourceReceiver.receive(cacheSource);
            
            if (!success) {
                System.out.println("Cache operation failed: " + operation + " on " + key);
            }
        } catch (IOException e) {
            System.err.println("Failed to process cache access: " + e.getMessage());
        }
    }
}

Batch Source Processing

public class BatchSourceProcessor {
    
    private SourceReceiver sourceReceiver;
    private List<ISource> sourceBatch = new ArrayList<>();
    private final int BATCH_SIZE = 100;
    
    public void addSource(ISource source) {
        sourceBatch.add(source);
        
        if (sourceBatch.size() >= BATCH_SIZE) {
            flushBatch();
        }
    }
    
    public void flushBatch() {
        if (sourceBatch.isEmpty()) {
            return;
        }
        
        try {
            sourceReceiver.receiveBatch(new ArrayList<>(sourceBatch));
            System.out.println("Processed batch of " + sourceBatch.size() + " sources");
            sourceBatch.clear();
        } catch (IOException e) {
            System.err.println("Failed to process source batch: " + e.getMessage());
            // Optionally retry individual sources
            retryIndividualSources();
        }
    }
    
    private void retryIndividualSources() {
        for (ISource source : sourceBatch) {
            try {
                sourceReceiver.receive(source);
            } catch (IOException e) {
                System.err.println("Failed to process individual source: " + 
                                 source.getClass().getSimpleName() + " - " + e.getMessage());
            }
        }
        sourceBatch.clear();
    }
    
    // Call this method periodically to ensure pending sources are processed
    public void periodicFlush() {
        if (!sourceBatch.isEmpty()) {
            flushBatch();
        }
    }
}

Core Source Types

/**
 * Transmission latency for message queue operations
 */
public class TransmissionLatency {
    private long producerTime;
    private long consumerTime;
    
    public long getProducerTime();
    public void setProducerTime(long producerTime);
    public long getConsumerTime();  
    public void setConsumerTime(long consumerTime);
    public long getLatency();
}

/**
 * Default scope definitions for built-in source types
 */
public class DefaultScopeDefine {
    public static final int SERVICE = 1;
    public static final int SERVICE_INSTANCE = 2;
    public static final int ENDPOINT = 3;
    public static final int SERVICE_RELATION = 4;
    public static final int SERVICE_INSTANCE_RELATION = 5;
    public static final int ENDPOINT_RELATION = 6;
    public static final int DATABASE_ACCESS = 7;
    public static final int ALL = 99;
}

/**
 * Source processing exception
 */
public class SourceProcessingException extends RuntimeException {
    public SourceProcessingException(String message);
    public SourceProcessingException(String message, Throwable cause);
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-skywalking--server-core

docs

analysis-framework.md

configuration.md

index.md

profiling.md

query-services.md

remote-communication.md

source-processing.md

storage-layer.md

tile.json