Core analysis engine and storage abstractions for Apache SkyWalking observability platform
—
The SkyWalking storage layer provides pluggable storage abstractions that support multiple backend implementations including Elasticsearch, BanyanDB, MySQL, and other databases. It offers unified DAO interfaces, storage builders, and data models that enable seamless backend switching without application code changes.
Factory interface for creating storage-specific DAO implementations.
public interface StorageDAO extends Service {
IMetricsDAO newMetricsDao(StorageBuilder storageBuilder);
IRecordDAO newRecordDao(StorageBuilder storageBuilder);
INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder);
IManagementDAO newManagementDao(StorageBuilder storageBuilder);
}Specialized DAO for metrics storage operations with caching and batch processing.
public interface IMetricsDAO extends DAO {
/**
* Read data from the storage by given IDs.
* @param model target entity of this query.
* @param metrics metrics list.
* @return the data of all given IDs. Only include existing data. Don't require to keep the same order of ids list.
* @throws Exception when error occurs in data query.
*/
List<Metrics> multiGet(Model model, List<Metrics> metrics) throws Exception;
/**
* Transfer the given metrics to an executable insert statement.
* @return InsertRequest should follow the database client driver datatype, in order to make sure it could be
* executed ASAP.
*/
InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
/**
* Transfer the given metrics to an executable update statement.
* @return UpdateRequest should follow the database client driver datatype, in order to make sure it could be
* executed ASAP.
*/
UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
/**
* Calculate the expired status of the metric by given current timestamp, metric and TTL.
* @param model of the given cached value
* @param cachedValue is a metric instance
* @param currentTimeMillis current system time of OAP.
* @param ttl from core setting.
* @return true if the metric is expired.
*/
default boolean isExpiredCache(Model model, Metrics cachedValue, long currentTimeMillis, int ttl) {
final long metricTimestamp = TimeBucket.getTimestamp(
cachedValue.getTimeBucket(), model.getDownsampling());
// If the cached metric is older than the TTL indicated.
return currentTimeMillis - metricTimestamp > TimeUnit.DAYS.toMillis(ttl);
}
}DAO for record storage operations handling logs and events.
public interface IRecordDAO extends DAO {
/**
* Prepares batch insert request for records
* @param model Storage model definition
* @param record Record data to insert
* @param callback Session cache callback
* @return Insert request for batch processing
* @throws IOException If preparation fails
*/
InsertRequest prepareBatchInsert(Model model, Record record,
SessionCacheCallback callback) throws IOException;
}DAO for non-stream data storage operations.
public interface INoneStreamDAO extends DAO {
/**
* Prepares batch insert request for non-stream data
* @param model Storage model definition
* @param noneStream Non-stream data to insert
* @param callback Session cache callback
* @return Insert request for batch processing
* @throws IOException If preparation fails
*/
InsertRequest prepareBatchInsert(Model model, NoneStream noneStream,
SessionCacheCallback callback) throws IOException;
}DAO for management data storage operations.
public interface IManagementDAO extends DAO {
/**
* Prepares batch insert request for management data
* @param model Storage model definition
* @param management Management data to insert
* @param callback Session cache callback
* @return Insert request for batch processing
* @throws IOException If preparation fails
*/
InsertRequest prepareBatchInsert(Model model, Management management,
SessionCacheCallback callback) throws IOException;
}Base interface for all storage entities.
public interface StorageData {
/**
* Standard time bucket column name
*/
String TIME_BUCKET = "time_bucket";
/**
* Gets unique storage identifier for this entity
* @return Storage identifier
*/
StorageID id();
}Represents unique identifier in storage systems.
public class StorageID {
private String id;
/**
* Creates storage ID from string value
* @param id String identifier
*/
public StorageID(String id);
/**
* Gets the string representation of this ID
* @return String ID
*/
public String build();
/**
* Gets raw ID value
* @return Raw ID string
*/
public String getId();
@Override
public boolean equals(Object obj);
@Override
public int hashCode();
@Override
public String toString();
}Storage data with comparison capabilities for sorting.
public interface ComparableStorageData extends StorageData, Comparable<ComparableStorageData> {
/**
* Compares this storage data with another for ordering
* @param o Other storage data to compare with
* @return Negative, zero, or positive integer for less than, equal, or greater than
*/
@Override
int compareTo(ComparableStorageData o);
}Converts between entity objects and storage representations.
public interface StorageBuilder<T extends StorageData> {
/**
* Converts storage data to entity object
* @param converter Conversion helper with storage data
* @return Entity object populated from storage
*/
T storage2Entity(Convert2Entity converter);
/**
* Converts entity object to storage data format
* @param storageData Entity object to convert
* @param converter Conversion helper for storage format
*/
void entity2Storage(T storageData, Convert2Storage converter);
}Helper interface for converting from storage to entity format.
public interface Convert2Entity {
/**
* Gets string value from storage column
* @param columnName Column name
* @return String value or null
*/
String get(String columnName);
/**
* Gets integer value from storage column
* @param columnName Column name
* @return Integer value or null
*/
Integer getInt(String columnName);
/**
* Gets long value from storage column
* @param columnName Column name
* @return Long value or null
*/
Long getLong(String columnName);
/**
* Gets double value from storage column
* @param columnName Column name
* @return Double value or null
*/
Double getDouble(String columnName);
/**
* Gets byte array from storage column
* @param columnName Column name
* @return Byte array or null
*/
byte[] getBytes(String columnName);
}Helper interface for converting from entity to storage format.
public interface Convert2Storage {
/**
* Sets storage column value
* @param columnName Column name
* @param value Value to store
*/
void accept(String columnName, Object value);
/**
* Sets storage column with specific data type
* @param columnName Column name
* @param value Value to store
* @param columnType Storage column type
*/
void accept(String columnName, Object value, Column.ValueDataType columnType);
}Factory for creating storage builders.
public class StorageBuilderFactory {
/**
* Creates storage builder for specified entity class
* @param clazz Entity class
* @return Storage builder instance
* @throws StorageException If builder creation fails
*/
public static <T extends StorageData> StorageBuilder<T> getStorageBuilder(Class<T> clazz)
throws StorageException;
}Marks fields as storage columns with metadata.
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Column {
/**
* Column name in storage (defaults to field name)
*/
String name() default "";
/**
* Data type for storage
*/
ValueDataType dataType() default ValueDataType.VARCHAR;
/**
* Maximum length for string columns
*/
int length() default 200;
/**
* Whether column stores JSON data
*/
boolean storageOnly() default false;
/**
* Column index configuration
*/
boolean indexOnly() default false;
/**
* Column data type enumeration
*/
enum ValueDataType {
VARCHAR, TEXT, INT, BIGINT, DOUBLE, SAMPLED_RECORD
}
}BanyanDB-specific storage configurations.
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface BanyanDB {
/**
* Time-to-live configuration
*/
TTL ttl() default @TTL();
/**
* Sharding configuration
*/
Sharding sharding() default @Sharding();
/**
* Time-to-live settings
*/
@interface TTL {
String value() default "";
String unit() default "DAY";
}
/**
* Sharding settings
*/
@interface Sharding {
String[] shardingKeys() default {};
}
}Request for batch insert operations.
public class InsertRequest {
private String index;
private String type;
private String id;
private Map<String, Object> source;
/**
* Creates insert request
* @param index Storage index/table name
* @param type Storage type
* @param id Document/record ID
* @param source Data to insert
*/
public InsertRequest(String index, String type, String id, Map<String, Object> source);
public String getIndex();
public String getType();
public String getId();
public Map<String, Object> getSource();
}Request for batch update operations.
public class UpdateRequest {
private String index;
private String type;
private String id;
private Map<String, Object> doc;
/**
* Creates update request
* @param index Storage index/table name
* @param type Storage type
* @param id Document/record ID to update
* @param doc Updated data
*/
public UpdateRequest(String index, String type, String id, Map<String, Object> doc);
public String getIndex();
public String getType();
public String getId();
public Map<String, Object> getDoc();
}Callback for session cache operations during batch processing.
public interface SessionCacheCallback {
/**
* Callback method invoked during cache operations
* @param data Cache-related data
*/
void callback(Object data);
}Exception for storage layer operations.
public class StorageException extends Exception {
/**
* Creates storage exception with message
* @param message Error message
*/
public StorageException(String message);
/**
* Creates storage exception with message and cause
* @param message Error message
* @param cause Underlying cause
*/
public StorageException(String message, Throwable cause);
}Storage module definition and configuration.
public class StorageModule extends ModuleDefine {
public static final String NAME = "storage";
/**
* Gets module name
* @return Module name
*/
@Override
public String name();
/**
* Gets module services
* @return Array of service classes
*/
@Override
public Class[] services();
}Storage model definition with metadata.
public class Model {
private String name;
private List<ModelColumn> columns;
private boolean record;
private boolean superDataset;
/**
* Gets model name
* @return Model name
*/
public String getName();
/**
* Gets model columns
* @return List of columns
*/
public List<ModelColumn> getColumns();
/**
* Checks if model represents record data
* @return True if record model
*/
public boolean isRecord();
/**
* Checks if model is super dataset
* @return True if super dataset
*/
public boolean isSuperDataset();
}@Component
public class CustomElasticsearchStorageDAO implements StorageDAO {
private ElasticsearchClient client;
@Override
public IMetricsDAO newMetricsDao(StorageBuilder<? extends Metrics> storageBuilder)
throws IOException {
return new ElasticsearchMetricsDAO(client, storageBuilder);
}
@Override
public IRecordDAO newRecordDao(StorageBuilder<? extends Record> storageBuilder)
throws IOException {
return new ElasticsearchRecordDAO(client, storageBuilder);
}
@Override
public INoneStreamDAO newNoneStreamDao(StorageBuilder<? extends NoneStream> storageBuilder)
throws IOException {
return new ElasticsearchNoneStreamDAO(client, storageBuilder);
}
@Override
public IManagementDAO newManagementDao(StorageBuilder<? extends Management> storageBuilder)
throws IOException {
return new ElasticsearchManagementDAO(client, storageBuilder);
}
}@BanyanDB(
ttl = @BanyanDB.TTL(value = "7", unit = "DAY"),
sharding = @BanyanDB.Sharding(shardingKeys = {"service_id"})
)
public class CustomMetrics extends Metrics {
@Column(name = "service_id", dataType = Column.ValueDataType.VARCHAR, length = 512)
@Getter @Setter
private String serviceId;
@Column(name = "request_count", dataType = Column.ValueDataType.BIGINT)
@Getter @Setter
private long requestCount;
@Column(name = "response_time", dataType = Column.ValueDataType.BIGINT)
@Getter @Setter
private long responseTime;
@Column(name = "error_rate", dataType = Column.ValueDataType.DOUBLE)
@Getter @Setter
private double errorRate;
// Implement required Metrics methods
@Override
public boolean combine(Metrics metrics) {
CustomMetrics other = (CustomMetrics) metrics;
this.requestCount += other.getRequestCount();
this.responseTime += other.getResponseTime();
// Recalculate error rate
return true;
}
@Override
public void calculate() {
if (requestCount > 0) {
// Perform calculations
}
}
@Override
public Metrics toHour() {
CustomMetrics hourMetrics = new CustomMetrics();
// Copy and transform data for hour aggregation
return hourMetrics;
}
@Override
public Metrics toDay() {
CustomMetrics dayMetrics = new CustomMetrics();
// Copy and transform data for day aggregation
return dayMetrics;
}
public static class Builder implements StorageBuilder<CustomMetrics> {
@Override
public CustomMetrics storage2Entity(Convert2Entity converter) {
CustomMetrics metrics = new CustomMetrics();
metrics.setServiceId(converter.get("service_id"));
metrics.setRequestCount(converter.getLong("request_count"));
metrics.setResponseTime(converter.getLong("response_time"));
metrics.setErrorRate(converter.getDouble("error_rate"));
metrics.setTimeBucket(converter.getLong("time_bucket"));
return metrics;
}
@Override
public void entity2Storage(CustomMetrics storageData, Convert2Storage converter) {
converter.accept("service_id", storageData.getServiceId());
converter.accept("request_count", storageData.getRequestCount());
converter.accept("response_time", storageData.getResponseTime());
converter.accept("error_rate", storageData.getErrorRate());
converter.accept("time_bucket", storageData.getTimeBucket());
}
}
}public class CustomMetricsDAO implements IMetricsDAO {
private final DatabaseClient client;
private final StorageBuilder<? extends Metrics> storageBuilder;
public CustomMetricsDAO(DatabaseClient client,
StorageBuilder<? extends Metrics> storageBuilder) {
this.client = client;
this.storageBuilder = storageBuilder;
}
@Override
public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
List<String> ids = metrics.stream()
.map(m -> m.id().build())
.collect(Collectors.toList());
// Batch query from storage
List<Map<String, Object>> results = client.multiGet(model.getName(), ids);
return results.stream()
.map(this::convertToMetrics)
.collect(Collectors.toList());
}
@Override
public InsertRequest prepareBatchInsert(Model model, Metrics metrics,
SessionCacheCallback callback) throws IOException {
// Convert entity to storage format
Convert2Storage converter = new MapConvert2Storage();
storageBuilder.entity2Storage(metrics, converter);
return new InsertRequest(
model.getName(), // index/table
"metrics", // type
metrics.id().build(), // document ID
converter.getData() // source data
);
}
@Override
public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics,
SessionCacheCallback callback) throws IOException {
// Convert entity to storage format for update
Convert2Storage converter = new MapConvert2Storage();
storageBuilder.entity2Storage(metrics, converter);
return new UpdateRequest(
model.getName(), // index/table
"metrics", // type
metrics.id().build(), // document ID
converter.getData() // update data
);
}
@Override
public boolean isExpiredCache(Model model, Metrics cachedValue,
long currentTimeMillis, int ttl) throws IOException {
long cacheTime = cachedValue.getLastUpdateTimestamp();
long expireTime = cacheTime + (ttl * 1000L);
return currentTimeMillis > expireTime;
}
private Metrics convertToMetrics(Map<String, Object> data) {
Convert2Entity converter = new MapConvert2Entity(data);
return storageBuilder.storage2Entity(converter);
}
}/**
* Model column definition
*/
public class ModelColumn {
private String columnName;
private Class<?> type;
private boolean indexOnly;
private boolean storageOnly;
private Column.ValueDataType dataType;
public String getColumnName();
public Class<?> getType();
public boolean isIndexOnly();
public boolean isStorageOnly();
public Column.ValueDataType getDataType();
}
/**
* Base DAO interface
*/
public interface DAO {
// Marker interface for DAO implementations
}
/**
* Storage request base class
*/
public abstract class StorageRequest {
protected String index;
protected String type;
protected String id;
public String getIndex();
public String getType();
public String getId();
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-skywalking--server-core