CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-skywalking--server-core

Core analysis engine and storage abstractions for Apache SkyWalking observability platform

Pending
Overview
Eval results
Files

storage-layer.mddocs/

Storage Layer

The SkyWalking storage layer provides pluggable storage abstractions that support multiple backend implementations including Elasticsearch, BanyanDB, MySQL, and other databases. It offers unified DAO interfaces, storage builders, and data models that enable seamless backend switching without application code changes.

Core Storage Interfaces

StorageDAO

Factory interface for creating storage-specific DAO implementations.

public interface StorageDAO extends Service {
    
    IMetricsDAO newMetricsDao(StorageBuilder storageBuilder);
    
    IRecordDAO newRecordDao(StorageBuilder storageBuilder);
    
    INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder);
    
    IManagementDAO newManagementDao(StorageBuilder storageBuilder);
}

IMetricsDAO

Specialized DAO for metrics storage operations with caching and batch processing.

public interface IMetricsDAO extends DAO {
    
    /**
     * Read data from the storage by given IDs.
     * @param model target entity of this query.
     * @param metrics metrics list.
     * @return the data of all given IDs. Only include existing data. Don't require to keep the same order of ids list.
     * @throws Exception when error occurs in data query.
     */
    List<Metrics> multiGet(Model model, List<Metrics> metrics) throws Exception;
    
    /**
     * Transfer the given metrics to an executable insert statement.
     * @return InsertRequest should follow the database client driver datatype, in order to make sure it could be
     * executed ASAP.
     */
    InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
    
    /**
     * Transfer the given metrics to an executable update statement.
     * @return UpdateRequest should follow the database client driver datatype, in order to make sure it could be
     * executed ASAP.
     */
    UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
    
    /**
     * Calculate the expired status of the metric by given current timestamp, metric and TTL.
     * @param model of the given cached value
     * @param cachedValue is a metric instance
     * @param currentTimeMillis current system time of OAP.
     * @param ttl from core setting.
     * @return true if the metric is expired.
     */
    default boolean isExpiredCache(Model model, Metrics cachedValue, long currentTimeMillis, int ttl) {
        final long metricTimestamp = TimeBucket.getTimestamp(
            cachedValue.getTimeBucket(), model.getDownsampling());
        // If the cached metric is older than the TTL indicated.
        return currentTimeMillis - metricTimestamp > TimeUnit.DAYS.toMillis(ttl);
    }
}

IRecordDAO

DAO for record storage operations handling logs and events.

public interface IRecordDAO extends DAO {
    
    /**
     * Prepares batch insert request for records
     * @param model Storage model definition
     * @param record Record data to insert
     * @param callback Session cache callback
     * @return Insert request for batch processing
     * @throws IOException If preparation fails
     */
    InsertRequest prepareBatchInsert(Model model, Record record, 
                                   SessionCacheCallback callback) throws IOException;
}

INoneStreamDAO

DAO for non-stream data storage operations.

public interface INoneStreamDAO extends DAO {
    
    /**
     * Prepares batch insert request for non-stream data
     * @param model Storage model definition
     * @param noneStream Non-stream data to insert
     * @param callback Session cache callback
     * @return Insert request for batch processing
     * @throws IOException If preparation fails
     */
    InsertRequest prepareBatchInsert(Model model, NoneStream noneStream, 
                                   SessionCacheCallback callback) throws IOException;
}

IManagementDAO

DAO for management data storage operations.

public interface IManagementDAO extends DAO {
    
    /**
     * Prepares batch insert request for management data
     * @param model Storage model definition
     * @param management Management data to insert
     * @param callback Session cache callback
     * @return Insert request for batch processing
     * @throws IOException If preparation fails
     */
    InsertRequest prepareBatchInsert(Model model, Management management, 
                                   SessionCacheCallback callback) throws IOException;
}

Storage Data Models

StorageData

Base interface for all storage entities.

public interface StorageData {
    
    /**
     * Standard time bucket column name
     */
    String TIME_BUCKET = "time_bucket";
    
    /**
     * Gets unique storage identifier for this entity
     * @return Storage identifier
     */
    StorageID id();
}

StorageID

Represents unique identifier in storage systems.

public class StorageID {
    
    private String id;
    
    /**
     * Creates storage ID from string value
     * @param id String identifier
     */
    public StorageID(String id);
    
    /**
     * Gets the string representation of this ID
     * @return String ID
     */
    public String build();
    
    /**
     * Gets raw ID value
     * @return Raw ID string
     */
    public String getId();
    
    @Override
    public boolean equals(Object obj);
    
    @Override
    public int hashCode();
    
    @Override
    public String toString();
}

ComparableStorageData

Storage data with comparison capabilities for sorting.

public interface ComparableStorageData extends StorageData, Comparable<ComparableStorageData> {
    
    /**
     * Compares this storage data with another for ordering
     * @param o Other storage data to compare with
     * @return Negative, zero, or positive integer for less than, equal, or greater than
     */
    @Override
    int compareTo(ComparableStorageData o);
}

Storage Builders

StorageBuilder Interface

Converts between entity objects and storage representations.

public interface StorageBuilder<T extends StorageData> {
    
    /**
     * Converts storage data to entity object
     * @param converter Conversion helper with storage data
     * @return Entity object populated from storage
     */
    T storage2Entity(Convert2Entity converter);
    
    /**
     * Converts entity object to storage data format
     * @param storageData Entity object to convert
     * @param converter Conversion helper for storage format
     */
    void entity2Storage(T storageData, Convert2Storage converter);
}

Convert2Entity

Helper interface for converting from storage to entity format.

public interface Convert2Entity {
    
    /**
     * Gets string value from storage column
     * @param columnName Column name
     * @return String value or null
     */
    String get(String columnName);
    
    /**
     * Gets integer value from storage column
     * @param columnName Column name
     * @return Integer value or null
     */
    Integer getInt(String columnName);
    
    /**
     * Gets long value from storage column
     * @param columnName Column name
     * @return Long value or null
     */
    Long getLong(String columnName);
    
    /**
     * Gets double value from storage column
     * @param columnName Column name
     * @return Double value or null
     */
    Double getDouble(String columnName);
    
    /**
     * Gets byte array from storage column
     * @param columnName Column name
     * @return Byte array or null
     */
    byte[] getBytes(String columnName);
}

Convert2Storage

Helper interface for converting from entity to storage format.

public interface Convert2Storage {
    
    /**
     * Sets storage column value
     * @param columnName Column name
     * @param value Value to store
     */
    void accept(String columnName, Object value);
    
    /**
     * Sets storage column with specific data type
     * @param columnName Column name
     * @param value Value to store  
     * @param columnType Storage column type
     */
    void accept(String columnName, Object value, Column.ValueDataType columnType);
}

StorageBuilderFactory

Factory for creating storage builders.

public class StorageBuilderFactory {
    
    /**
     * Creates storage builder for specified entity class
     * @param clazz Entity class
     * @return Storage builder instance
     * @throws StorageException If builder creation fails
     */
    public static <T extends StorageData> StorageBuilder<T> getStorageBuilder(Class<T> clazz) 
        throws StorageException;
}

Storage Annotations

Column Annotation

Marks fields as storage columns with metadata.

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Column {
    
    /**
     * Column name in storage (defaults to field name)
     */
    String name() default "";
    
    /**
     * Data type for storage
     */
    ValueDataType dataType() default ValueDataType.VARCHAR;
    
    /**
     * Maximum length for string columns
     */
    int length() default 200;
    
    /**
     * Whether column stores JSON data
     */
    boolean storageOnly() default false;
    
    /**
     * Column index configuration
     */
    boolean indexOnly() default false;
    
    /**
     * Column data type enumeration
     */
    enum ValueDataType {
        VARCHAR, TEXT, INT, BIGINT, DOUBLE, SAMPLED_RECORD
    }
}

BanyanDB Annotation

BanyanDB-specific storage configurations.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface BanyanDB {
    
    /**
     * Time-to-live configuration
     */
    TTL ttl() default @TTL();
    
    /**
     * Sharding configuration
     */
    Sharding sharding() default @Sharding();
    
    /**
     * Time-to-live settings
     */
    @interface TTL {
        String value() default "";
        String unit() default "DAY";
    }
    
    /**
     * Sharding settings
     */
    @interface Sharding {
        String[] shardingKeys() default {};
    }
}

Batch Processing

InsertRequest

Request for batch insert operations.

public class InsertRequest {
    
    private String index;
    private String type;
    private String id;
    private Map<String, Object> source;
    
    /**
     * Creates insert request
     * @param index Storage index/table name
     * @param type Storage type
     * @param id Document/record ID
     * @param source Data to insert
     */
    public InsertRequest(String index, String type, String id, Map<String, Object> source);
    
    public String getIndex();
    public String getType();
    public String getId();
    public Map<String, Object> getSource();
}

UpdateRequest

Request for batch update operations.

public class UpdateRequest {
    
    private String index;
    private String type;
    private String id;
    private Map<String, Object> doc;
    
    /**
     * Creates update request
     * @param index Storage index/table name
     * @param type Storage type
     * @param id Document/record ID to update
     * @param doc Updated data
     */
    public UpdateRequest(String index, String type, String id, Map<String, Object> doc);
    
    public String getIndex();
    public String getType();
    public String getId();
    public Map<String, Object> getDoc();
}

SessionCacheCallback

Callback for session cache operations during batch processing.

public interface SessionCacheCallback {
    
    /**
     * Callback method invoked during cache operations
     * @param data Cache-related data
     */
    void callback(Object data);
}

Storage Utilities

StorageException

Exception for storage layer operations.

public class StorageException extends Exception {
    
    /**
     * Creates storage exception with message
     * @param message Error message
     */
    public StorageException(String message);
    
    /**
     * Creates storage exception with message and cause
     * @param message Error message
     * @param cause Underlying cause
     */
    public StorageException(String message, Throwable cause);
}

StorageModule

Storage module definition and configuration.

public class StorageModule extends ModuleDefine {
    
    public static final String NAME = "storage";
    
    /**
     * Gets module name
     * @return Module name
     */
    @Override
    public String name();
    
    /**
     * Gets module services
     * @return Array of service classes
     */
    @Override
    public Class[] services();
}

Model

Storage model definition with metadata.

public class Model {
    
    private String name;
    private List<ModelColumn> columns;
    private boolean record;
    private boolean superDataset;
    
    /**
     * Gets model name
     * @return Model name
     */
    public String getName();
    
    /**
     * Gets model columns
     * @return List of columns
     */
    public List<ModelColumn> getColumns();
    
    /**
     * Checks if model represents record data
     * @return True if record model
     */
    public boolean isRecord();
    
    /**
     * Checks if model is super dataset
     * @return True if super dataset
     */
    public boolean isSuperDataset();
}

Usage Examples

Implementing Custom Storage DAO

@Component
public class CustomElasticsearchStorageDAO implements StorageDAO {
    
    private ElasticsearchClient client;
    
    @Override
    public IMetricsDAO newMetricsDao(StorageBuilder<? extends Metrics> storageBuilder) 
            throws IOException {
        return new ElasticsearchMetricsDAO(client, storageBuilder);
    }
    
    @Override
    public IRecordDAO newRecordDao(StorageBuilder<? extends Record> storageBuilder) 
            throws IOException {
        return new ElasticsearchRecordDAO(client, storageBuilder);
    }
    
    @Override
    public INoneStreamDAO newNoneStreamDao(StorageBuilder<? extends NoneStream> storageBuilder) 
            throws IOException {
        return new ElasticsearchNoneStreamDAO(client, storageBuilder);
    }
    
    @Override
    public IManagementDAO newManagementDao(StorageBuilder<? extends Management> storageBuilder) 
            throws IOException {
        return new ElasticsearchManagementDAO(client, storageBuilder);
    }
}

Creating Custom Storage Entity

@BanyanDB(
    ttl = @BanyanDB.TTL(value = "7", unit = "DAY"),
    sharding = @BanyanDB.Sharding(shardingKeys = {"service_id"})
)
public class CustomMetrics extends Metrics {
    
    @Column(name = "service_id", dataType = Column.ValueDataType.VARCHAR, length = 512)
    @Getter @Setter
    private String serviceId;
    
    @Column(name = "request_count", dataType = Column.ValueDataType.BIGINT)
    @Getter @Setter
    private long requestCount;
    
    @Column(name = "response_time", dataType = Column.ValueDataType.BIGINT)
    @Getter @Setter
    private long responseTime;
    
    @Column(name = "error_rate", dataType = Column.ValueDataType.DOUBLE)
    @Getter @Setter
    private double errorRate;
    
    // Implement required Metrics methods
    @Override
    public boolean combine(Metrics metrics) {
        CustomMetrics other = (CustomMetrics) metrics;
        this.requestCount += other.getRequestCount();
        this.responseTime += other.getResponseTime();
        // Recalculate error rate
        return true;
    }
    
    @Override
    public void calculate() {
        if (requestCount > 0) {
            // Perform calculations
        }
    }
    
    @Override
    public Metrics toHour() {
        CustomMetrics hourMetrics = new CustomMetrics();
        // Copy and transform data for hour aggregation
        return hourMetrics;
    }
    
    @Override
    public Metrics toDay() {
        CustomMetrics dayMetrics = new CustomMetrics();
        // Copy and transform data for day aggregation
        return dayMetrics;
    }
    
    public static class Builder implements StorageBuilder<CustomMetrics> {
        
        @Override
        public CustomMetrics storage2Entity(Convert2Entity converter) {
            CustomMetrics metrics = new CustomMetrics();
            metrics.setServiceId(converter.get("service_id"));
            metrics.setRequestCount(converter.getLong("request_count"));
            metrics.setResponseTime(converter.getLong("response_time"));
            metrics.setErrorRate(converter.getDouble("error_rate"));
            metrics.setTimeBucket(converter.getLong("time_bucket"));
            return metrics;
        }
        
        @Override
        public void entity2Storage(CustomMetrics storageData, Convert2Storage converter) {
            converter.accept("service_id", storageData.getServiceId());
            converter.accept("request_count", storageData.getRequestCount());
            converter.accept("response_time", storageData.getResponseTime());
            converter.accept("error_rate", storageData.getErrorRate());
            converter.accept("time_bucket", storageData.getTimeBucket());
        }
    }
}

Implementing Metrics DAO

public class CustomMetricsDAO implements IMetricsDAO {
    
    private final DatabaseClient client;
    private final StorageBuilder<? extends Metrics> storageBuilder;
    
    public CustomMetricsDAO(DatabaseClient client, 
                          StorageBuilder<? extends Metrics> storageBuilder) {
        this.client = client;
        this.storageBuilder = storageBuilder;
    }
    
    @Override
    public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
        List<String> ids = metrics.stream()
            .map(m -> m.id().build())
            .collect(Collectors.toList());
            
        // Batch query from storage
        List<Map<String, Object>> results = client.multiGet(model.getName(), ids);
        
        return results.stream()
            .map(this::convertToMetrics)
            .collect(Collectors.toList());
    }
    
    @Override
    public InsertRequest prepareBatchInsert(Model model, Metrics metrics, 
                                          SessionCacheCallback callback) throws IOException {
        
        // Convert entity to storage format
        Convert2Storage converter = new MapConvert2Storage();
        storageBuilder.entity2Storage(metrics, converter);
        
        return new InsertRequest(
            model.getName(),           // index/table
            "metrics",                 // type
            metrics.id().build(),      // document ID
            converter.getData()        // source data
        );
    }
    
    @Override
    public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, 
                                          SessionCacheCallback callback) throws IOException {
        
        // Convert entity to storage format for update
        Convert2Storage converter = new MapConvert2Storage();
        storageBuilder.entity2Storage(metrics, converter);
        
        return new UpdateRequest(
            model.getName(),           // index/table
            "metrics",                 // type
            metrics.id().build(),      // document ID
            converter.getData()        // update data
        );
    }
    
    @Override
    public boolean isExpiredCache(Model model, Metrics cachedValue, 
                                long currentTimeMillis, int ttl) throws IOException {
        long cacheTime = cachedValue.getLastUpdateTimestamp();
        long expireTime = cacheTime + (ttl * 1000L);
        return currentTimeMillis > expireTime;
    }
    
    private Metrics convertToMetrics(Map<String, Object> data) {
        Convert2Entity converter = new MapConvert2Entity(data);
        return storageBuilder.storage2Entity(converter);
    }
}

Core Storage Types

/**
 * Model column definition
 */
public class ModelColumn {
    private String columnName;
    private Class<?> type;
    private boolean indexOnly;
    private boolean storageOnly;
    private Column.ValueDataType dataType;
    
    public String getColumnName();
    public Class<?> getType();
    public boolean isIndexOnly();
    public boolean isStorageOnly();
    public Column.ValueDataType getDataType();
}

/**
 * Base DAO interface
 */
public interface DAO {
    // Marker interface for DAO implementations
}

/**
 * Storage request base class
 */
public abstract class StorageRequest {
    protected String index;
    protected String type;
    protected String id;
    
    public String getIndex();
    public String getType();
    public String getId();
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-skywalking--server-core

docs

analysis-framework.md

configuration.md

index.md

profiling.md

query-services.md

remote-communication.md

source-processing.md

storage-layer.md

tile.json