0
# Storage Layer
1
2
The SkyWalking storage layer provides pluggable storage abstractions that support multiple backend implementations including Elasticsearch, BanyanDB, MySQL, and other databases. It offers unified DAO interfaces, storage builders, and data models that enable seamless backend switching without application code changes.
3
4
## Core Storage Interfaces
5
6
### StorageDAO
7
8
Factory interface for creating storage-specific DAO implementations.
9
10
```java { .api }
11
public interface StorageDAO extends Service {
12
13
IMetricsDAO newMetricsDao(StorageBuilder storageBuilder);
14
15
IRecordDAO newRecordDao(StorageBuilder storageBuilder);
16
17
INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder);
18
19
IManagementDAO newManagementDao(StorageBuilder storageBuilder);
20
}
21
```
22
23
### IMetricsDAO
24
25
Specialized DAO for metrics storage operations with caching and batch processing.
26
27
```java { .api }
28
public interface IMetricsDAO extends DAO {
29
30
/**
31
* Read data from the storage by given IDs.
32
* @param model target entity of this query.
33
* @param metrics metrics list.
34
* @return the data of all given IDs. Only include existing data. Don't require to keep the same order of ids list.
35
* @throws Exception when error occurs in data query.
36
*/
37
List<Metrics> multiGet(Model model, List<Metrics> metrics) throws Exception;
38
39
/**
40
* Transfer the given metrics to an executable insert statement.
41
* @return InsertRequest should follow the database client driver datatype, in order to make sure it could be
42
* executed ASAP.
43
*/
44
InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
45
46
/**
47
* Transfer the given metrics to an executable update statement.
48
* @return UpdateRequest should follow the database client driver datatype, in order to make sure it could be
49
* executed ASAP.
50
*/
51
UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;
52
53
/**
54
* Calculate the expired status of the metric by given current timestamp, metric and TTL.
55
* @param model of the given cached value
56
* @param cachedValue is a metric instance
57
* @param currentTimeMillis current system time of OAP.
58
* @param ttl from core setting.
59
* @return true if the metric is expired.
60
*/
61
default boolean isExpiredCache(Model model, Metrics cachedValue, long currentTimeMillis, int ttl) {
62
final long metricTimestamp = TimeBucket.getTimestamp(
63
cachedValue.getTimeBucket(), model.getDownsampling());
64
// If the cached metric is older than the TTL indicated.
65
return currentTimeMillis - metricTimestamp > TimeUnit.DAYS.toMillis(ttl);
66
}
67
}
68
```
69
70
### IRecordDAO
71
72
DAO for record storage operations handling logs and events.
73
74
```java { .api }
75
public interface IRecordDAO extends DAO {
76
77
/**
78
* Prepares batch insert request for records
79
* @param model Storage model definition
80
* @param record Record data to insert
81
* @param callback Session cache callback
82
* @return Insert request for batch processing
83
* @throws IOException If preparation fails
84
*/
85
InsertRequest prepareBatchInsert(Model model, Record record,
86
SessionCacheCallback callback) throws IOException;
87
}
88
```
89
90
### INoneStreamDAO
91
92
DAO for non-stream data storage operations.
93
94
```java { .api }
95
public interface INoneStreamDAO extends DAO {
96
97
/**
98
* Prepares batch insert request for non-stream data
99
* @param model Storage model definition
100
* @param noneStream Non-stream data to insert
101
* @param callback Session cache callback
102
* @return Insert request for batch processing
103
* @throws IOException If preparation fails
104
*/
105
InsertRequest prepareBatchInsert(Model model, NoneStream noneStream,
106
SessionCacheCallback callback) throws IOException;
107
}
108
```
109
110
### IManagementDAO
111
112
DAO for management data storage operations.
113
114
```java { .api }
115
public interface IManagementDAO extends DAO {
116
117
/**
118
* Prepares batch insert request for management data
119
* @param model Storage model definition
120
* @param management Management data to insert
121
* @param callback Session cache callback
122
* @return Insert request for batch processing
123
* @throws IOException If preparation fails
124
*/
125
InsertRequest prepareBatchInsert(Model model, Management management,
126
SessionCacheCallback callback) throws IOException;
127
}
128
```
129
130
## Storage Data Models
131
132
### StorageData
133
134
Base interface for all storage entities.
135
136
```java { .api }
137
public interface StorageData {
138
139
/**
140
* Standard time bucket column name
141
*/
142
String TIME_BUCKET = "time_bucket";
143
144
/**
145
* Gets unique storage identifier for this entity
146
* @return Storage identifier
147
*/
148
StorageID id();
149
}
150
```
151
152
### StorageID
153
154
Represents unique identifier in storage systems.
155
156
```java { .api }
157
public class StorageID {
158
159
private String id;
160
161
/**
162
* Creates storage ID from string value
163
* @param id String identifier
164
*/
165
public StorageID(String id);
166
167
/**
168
* Gets the string representation of this ID
169
* @return String ID
170
*/
171
public String build();
172
173
/**
174
* Gets raw ID value
175
* @return Raw ID string
176
*/
177
public String getId();
178
179
@Override
180
public boolean equals(Object obj);
181
182
@Override
183
public int hashCode();
184
185
@Override
186
public String toString();
187
}
188
```
189
190
### ComparableStorageData
191
192
Storage data with comparison capabilities for sorting.
193
194
```java { .api }
195
public interface ComparableStorageData extends StorageData, Comparable<ComparableStorageData> {
196
197
/**
198
* Compares this storage data with another for ordering
199
* @param o Other storage data to compare with
200
* @return Negative, zero, or positive integer for less than, equal, or greater than
201
*/
202
@Override
203
int compareTo(ComparableStorageData o);
204
}
205
```
206
207
## Storage Builders
208
209
### StorageBuilder Interface
210
211
Converts between entity objects and storage representations.
212
213
```java { .api }
214
public interface StorageBuilder<T extends StorageData> {
215
216
/**
217
* Converts storage data to entity object
218
* @param converter Conversion helper with storage data
219
* @return Entity object populated from storage
220
*/
221
T storage2Entity(Convert2Entity converter);
222
223
/**
224
* Converts entity object to storage data format
225
* @param storageData Entity object to convert
226
* @param converter Conversion helper for storage format
227
*/
228
void entity2Storage(T storageData, Convert2Storage converter);
229
}
230
```
231
232
### Convert2Entity
233
234
Helper interface for converting from storage to entity format.
235
236
```java { .api }
237
public interface Convert2Entity {
238
239
/**
240
* Gets string value from storage column
241
* @param columnName Column name
242
* @return String value or null
243
*/
244
String get(String columnName);
245
246
/**
247
* Gets integer value from storage column
248
* @param columnName Column name
249
* @return Integer value or null
250
*/
251
Integer getInt(String columnName);
252
253
/**
254
* Gets long value from storage column
255
* @param columnName Column name
256
* @return Long value or null
257
*/
258
Long getLong(String columnName);
259
260
/**
261
* Gets double value from storage column
262
* @param columnName Column name
263
* @return Double value or null
264
*/
265
Double getDouble(String columnName);
266
267
/**
268
* Gets byte array from storage column
269
* @param columnName Column name
270
* @return Byte array or null
271
*/
272
byte[] getBytes(String columnName);
273
}
274
```
275
276
### Convert2Storage
277
278
Helper interface for converting from entity to storage format.
279
280
```java { .api }
281
public interface Convert2Storage {
282
283
/**
284
* Sets storage column value
285
* @param columnName Column name
286
* @param value Value to store
287
*/
288
void accept(String columnName, Object value);
289
290
/**
291
* Sets storage column with specific data type
292
* @param columnName Column name
293
* @param value Value to store
294
* @param columnType Storage column type
295
*/
296
void accept(String columnName, Object value, Column.ValueDataType columnType);
297
}
298
```
299
300
### StorageBuilderFactory
301
302
Factory for creating storage builders.
303
304
```java { .api }
305
public class StorageBuilderFactory {
306
307
/**
308
* Creates storage builder for specified entity class
309
* @param clazz Entity class
310
* @return Storage builder instance
311
* @throws StorageException If builder creation fails
312
*/
313
public static <T extends StorageData> StorageBuilder<T> getStorageBuilder(Class<T> clazz)
314
throws StorageException;
315
}
316
```
317
318
## Storage Annotations
319
320
### Column Annotation
321
322
Marks fields as storage columns with metadata.
323
324
```java { .api }
325
@Target(ElementType.FIELD)
326
@Retention(RetentionPolicy.RUNTIME)
327
public @interface Column {
328
329
/**
330
* Column name in storage (defaults to field name)
331
*/
332
String name() default "";
333
334
/**
335
* Data type for storage
336
*/
337
ValueDataType dataType() default ValueDataType.VARCHAR;
338
339
/**
340
* Maximum length for string columns
341
*/
342
int length() default 200;
343
344
/**
345
* Whether column stores JSON data
346
*/
347
boolean storageOnly() default false;
348
349
/**
350
* Column index configuration
351
*/
352
boolean indexOnly() default false;
353
354
/**
355
* Column data type enumeration
356
*/
357
enum ValueDataType {
358
VARCHAR, TEXT, INT, BIGINT, DOUBLE, SAMPLED_RECORD
359
}
360
}
361
```
362
363
### BanyanDB Annotation
364
365
BanyanDB-specific storage configurations.
366
367
```java { .api }
368
@Target(ElementType.TYPE)
369
@Retention(RetentionPolicy.RUNTIME)
370
public @interface BanyanDB {
371
372
/**
373
* Time-to-live configuration
374
*/
375
TTL ttl() default @TTL();
376
377
/**
378
* Sharding configuration
379
*/
380
Sharding sharding() default @Sharding();
381
382
/**
383
* Time-to-live settings
384
*/
385
@interface TTL {
386
String value() default "";
387
String unit() default "DAY";
388
}
389
390
/**
391
* Sharding settings
392
*/
393
@interface Sharding {
394
String[] shardingKeys() default {};
395
}
396
}
397
```
398
399
## Batch Processing
400
401
### InsertRequest
402
403
Request for batch insert operations.
404
405
```java { .api }
406
public class InsertRequest {
407
408
private String index;
409
private String type;
410
private String id;
411
private Map<String, Object> source;
412
413
/**
414
* Creates insert request
415
* @param index Storage index/table name
416
* @param type Storage type
417
* @param id Document/record ID
418
* @param source Data to insert
419
*/
420
public InsertRequest(String index, String type, String id, Map<String, Object> source);
421
422
public String getIndex();
423
public String getType();
424
public String getId();
425
public Map<String, Object> getSource();
426
}
427
```
428
429
### UpdateRequest
430
431
Request for batch update operations.
432
433
```java { .api }
434
public class UpdateRequest {
435
436
private String index;
437
private String type;
438
private String id;
439
private Map<String, Object> doc;
440
441
/**
442
* Creates update request
443
* @param index Storage index/table name
444
* @param type Storage type
445
* @param id Document/record ID to update
446
* @param doc Updated data
447
*/
448
public UpdateRequest(String index, String type, String id, Map<String, Object> doc);
449
450
public String getIndex();
451
public String getType();
452
public String getId();
453
public Map<String, Object> getDoc();
454
}
455
```
456
457
### SessionCacheCallback
458
459
Callback for session cache operations during batch processing.
460
461
```java { .api }
462
public interface SessionCacheCallback {
463
464
/**
465
* Callback method invoked during cache operations
466
* @param data Cache-related data
467
*/
468
void callback(Object data);
469
}
470
```
471
472
## Storage Utilities
473
474
### StorageException
475
476
Exception for storage layer operations.
477
478
```java { .api }
479
public class StorageException extends Exception {
480
481
/**
482
* Creates storage exception with message
483
* @param message Error message
484
*/
485
public StorageException(String message);
486
487
/**
488
* Creates storage exception with message and cause
489
* @param message Error message
490
* @param cause Underlying cause
491
*/
492
public StorageException(String message, Throwable cause);
493
}
494
```
495
496
### StorageModule
497
498
Storage module definition and configuration.
499
500
```java { .api }
501
public class StorageModule extends ModuleDefine {
502
503
public static final String NAME = "storage";
504
505
/**
506
* Gets module name
507
* @return Module name
508
*/
509
@Override
510
public String name();
511
512
/**
513
* Gets module services
514
* @return Array of service classes
515
*/
516
@Override
517
public Class[] services();
518
}
519
```
520
521
### Model
522
523
Storage model definition with metadata.
524
525
```java { .api }
526
public class Model {
527
528
private String name;
529
private List<ModelColumn> columns;
530
private boolean record;
531
private boolean superDataset;
532
533
/**
534
* Gets model name
535
* @return Model name
536
*/
537
public String getName();
538
539
/**
540
* Gets model columns
541
* @return List of columns
542
*/
543
public List<ModelColumn> getColumns();
544
545
/**
546
* Checks if model represents record data
547
* @return True if record model
548
*/
549
public boolean isRecord();
550
551
/**
552
* Checks if model is super dataset
553
* @return True if super dataset
554
*/
555
public boolean isSuperDataset();
556
}
557
```
558
559
## Usage Examples
560
561
### Implementing Custom Storage DAO
562
563
```java
564
@Component
565
public class CustomElasticsearchStorageDAO implements StorageDAO {
566
567
private ElasticsearchClient client;
568
569
@Override
570
public IMetricsDAO newMetricsDao(StorageBuilder<? extends Metrics> storageBuilder)
571
throws IOException {
572
return new ElasticsearchMetricsDAO(client, storageBuilder);
573
}
574
575
@Override
576
public IRecordDAO newRecordDao(StorageBuilder<? extends Record> storageBuilder)
577
throws IOException {
578
return new ElasticsearchRecordDAO(client, storageBuilder);
579
}
580
581
@Override
582
public INoneStreamDAO newNoneStreamDao(StorageBuilder<? extends NoneStream> storageBuilder)
583
throws IOException {
584
return new ElasticsearchNoneStreamDAO(client, storageBuilder);
585
}
586
587
@Override
588
public IManagementDAO newManagementDao(StorageBuilder<? extends Management> storageBuilder)
589
throws IOException {
590
return new ElasticsearchManagementDAO(client, storageBuilder);
591
}
592
}
593
```
594
595
### Creating Custom Storage Entity
596
597
```java
598
@BanyanDB(
599
ttl = @BanyanDB.TTL(value = "7", unit = "DAY"),
600
sharding = @BanyanDB.Sharding(shardingKeys = {"service_id"})
601
)
602
public class CustomMetrics extends Metrics {
603
604
@Column(name = "service_id", dataType = Column.ValueDataType.VARCHAR, length = 512)
605
@Getter @Setter
606
private String serviceId;
607
608
@Column(name = "request_count", dataType = Column.ValueDataType.BIGINT)
609
@Getter @Setter
610
private long requestCount;
611
612
@Column(name = "response_time", dataType = Column.ValueDataType.BIGINT)
613
@Getter @Setter
614
private long responseTime;
615
616
@Column(name = "error_rate", dataType = Column.ValueDataType.DOUBLE)
617
@Getter @Setter
618
private double errorRate;
619
620
// Implement required Metrics methods
621
@Override
622
public boolean combine(Metrics metrics) {
623
CustomMetrics other = (CustomMetrics) metrics;
624
this.requestCount += other.getRequestCount();
625
this.responseTime += other.getResponseTime();
626
// Recalculate error rate
627
return true;
628
}
629
630
@Override
631
public void calculate() {
632
if (requestCount > 0) {
633
// Perform calculations
634
}
635
}
636
637
@Override
638
public Metrics toHour() {
639
CustomMetrics hourMetrics = new CustomMetrics();
640
// Copy and transform data for hour aggregation
641
return hourMetrics;
642
}
643
644
@Override
645
public Metrics toDay() {
646
CustomMetrics dayMetrics = new CustomMetrics();
647
// Copy and transform data for day aggregation
648
return dayMetrics;
649
}
650
651
public static class Builder implements StorageBuilder<CustomMetrics> {
652
653
@Override
654
public CustomMetrics storage2Entity(Convert2Entity converter) {
655
CustomMetrics metrics = new CustomMetrics();
656
metrics.setServiceId(converter.get("service_id"));
657
metrics.setRequestCount(converter.getLong("request_count"));
658
metrics.setResponseTime(converter.getLong("response_time"));
659
metrics.setErrorRate(converter.getDouble("error_rate"));
660
metrics.setTimeBucket(converter.getLong("time_bucket"));
661
return metrics;
662
}
663
664
@Override
665
public void entity2Storage(CustomMetrics storageData, Convert2Storage converter) {
666
converter.accept("service_id", storageData.getServiceId());
667
converter.accept("request_count", storageData.getRequestCount());
668
converter.accept("response_time", storageData.getResponseTime());
669
converter.accept("error_rate", storageData.getErrorRate());
670
converter.accept("time_bucket", storageData.getTimeBucket());
671
}
672
}
673
}
674
```
675
676
### Implementing Metrics DAO
677
678
```java
679
public class CustomMetricsDAO implements IMetricsDAO {
680
681
private final DatabaseClient client;
682
private final StorageBuilder<? extends Metrics> storageBuilder;
683
684
public CustomMetricsDAO(DatabaseClient client,
685
StorageBuilder<? extends Metrics> storageBuilder) {
686
this.client = client;
687
this.storageBuilder = storageBuilder;
688
}
689
690
@Override
691
public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {
692
List<String> ids = metrics.stream()
693
.map(m -> m.id().build())
694
.collect(Collectors.toList());
695
696
// Batch query from storage
697
List<Map<String, Object>> results = client.multiGet(model.getName(), ids);
698
699
return results.stream()
700
.map(this::convertToMetrics)
701
.collect(Collectors.toList());
702
}
703
704
@Override
705
public InsertRequest prepareBatchInsert(Model model, Metrics metrics,
706
SessionCacheCallback callback) throws IOException {
707
708
// Convert entity to storage format
709
Convert2Storage converter = new MapConvert2Storage();
710
storageBuilder.entity2Storage(metrics, converter);
711
712
return new InsertRequest(
713
model.getName(), // index/table
714
"metrics", // type
715
metrics.id().build(), // document ID
716
converter.getData() // source data
717
);
718
}
719
720
@Override
721
public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics,
722
SessionCacheCallback callback) throws IOException {
723
724
// Convert entity to storage format for update
725
Convert2Storage converter = new MapConvert2Storage();
726
storageBuilder.entity2Storage(metrics, converter);
727
728
return new UpdateRequest(
729
model.getName(), // index/table
730
"metrics", // type
731
metrics.id().build(), // document ID
732
converter.getData() // update data
733
);
734
}
735
736
@Override
737
public boolean isExpiredCache(Model model, Metrics cachedValue,
738
long currentTimeMillis, int ttl) throws IOException {
739
long cacheTime = cachedValue.getLastUpdateTimestamp();
740
long expireTime = cacheTime + (ttl * 1000L);
741
return currentTimeMillis > expireTime;
742
}
743
744
private Metrics convertToMetrics(Map<String, Object> data) {
745
Convert2Entity converter = new MapConvert2Entity(data);
746
return storageBuilder.storage2Entity(converter);
747
}
748
}
749
```
750
751
## Core Storage Types
752
753
```java { .api }
754
/**
755
* Model column definition
756
*/
757
public class ModelColumn {
758
private String columnName;
759
private Class<?> type;
760
private boolean indexOnly;
761
private boolean storageOnly;
762
private Column.ValueDataType dataType;
763
764
public String getColumnName();
765
public Class<?> getType();
766
public boolean isIndexOnly();
767
public boolean isStorageOnly();
768
public Column.ValueDataType getDataType();
769
}
770
771
/**
772
* Base DAO interface
773
*/
774
public interface DAO {
775
// Marker interface for DAO implementations
776
}
777
778
/**
779
* Storage request base class
780
*/
781
public abstract class StorageRequest {
782
protected String index;
783
protected String type;
784
protected String id;
785
786
public String getIndex();
787
public String getType();
788
public String getId();
789
}
790
```