or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

analysis-framework.mdconfiguration.mdindex.mdprofiling.mdquery-services.mdremote-communication.mdsource-processing.mdstorage-layer.md

storage-layer.mddocs/

0

# Storage Layer

1

2

The SkyWalking storage layer provides pluggable storage abstractions that support multiple backend implementations including Elasticsearch, BanyanDB, MySQL, and other databases. It offers unified DAO interfaces, storage builders, and data models that enable seamless backend switching without application code changes.

3

4

## Core Storage Interfaces

5

6

### StorageDAO

7

8

Factory interface for creating storage-specific DAO implementations.

9

10

```java { .api }

11

public interface StorageDAO extends Service {

12

13

IMetricsDAO newMetricsDao(StorageBuilder storageBuilder);

14

15

IRecordDAO newRecordDao(StorageBuilder storageBuilder);

16

17

INoneStreamDAO newNoneStreamDao(StorageBuilder storageBuilder);

18

19

IManagementDAO newManagementDao(StorageBuilder storageBuilder);

20

}

21

```

22

23

### IMetricsDAO

24

25

Specialized DAO for metrics storage operations with caching and batch processing.

26

27

```java { .api }

28

public interface IMetricsDAO extends DAO {

29

30

/**

31

* Read data from the storage by given IDs.

32

* @param model target entity of this query.

33

* @param metrics metrics list.

34

* @return the data of all given IDs. Only include existing data. Don't require to keep the same order of ids list.

35

* @throws Exception when error occurs in data query.

36

*/

37

List<Metrics> multiGet(Model model, List<Metrics> metrics) throws Exception;

38

39

/**

40

* Transfer the given metrics to an executable insert statement.

41

* @return InsertRequest should follow the database client driver datatype, in order to make sure it could be

42

* executed ASAP.

43

*/

44

InsertRequest prepareBatchInsert(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;

45

46

/**

47

* Transfer the given metrics to an executable update statement.

48

* @return UpdateRequest should follow the database client driver datatype, in order to make sure it could be

49

* executed ASAP.

50

*/

51

UpdateRequest prepareBatchUpdate(Model model, Metrics metrics, SessionCacheCallback callback) throws IOException;

52

53

/**

54

* Calculate the expired status of the metric by given current timestamp, metric and TTL.

55

* @param model of the given cached value

56

* @param cachedValue is a metric instance

57

* @param currentTimeMillis current system time of OAP.

58

* @param ttl from core setting.

59

* @return true if the metric is expired.

60

*/

61

default boolean isExpiredCache(Model model, Metrics cachedValue, long currentTimeMillis, int ttl) {

62

final long metricTimestamp = TimeBucket.getTimestamp(

63

cachedValue.getTimeBucket(), model.getDownsampling());

64

// If the cached metric is older than the TTL indicated.

65

return currentTimeMillis - metricTimestamp > TimeUnit.DAYS.toMillis(ttl);

66

}

67

}

68

```

69

70

### IRecordDAO

71

72

DAO for record storage operations handling logs and events.

73

74

```java { .api }

75

public interface IRecordDAO extends DAO {

76

77

/**

78

* Prepares batch insert request for records

79

* @param model Storage model definition

80

* @param record Record data to insert

81

* @param callback Session cache callback

82

* @return Insert request for batch processing

83

* @throws IOException If preparation fails

84

*/

85

InsertRequest prepareBatchInsert(Model model, Record record,

86

SessionCacheCallback callback) throws IOException;

87

}

88

```

89

90

### INoneStreamDAO

91

92

DAO for non-stream data storage operations.

93

94

```java { .api }

95

public interface INoneStreamDAO extends DAO {

96

97

/**

98

* Prepares batch insert request for non-stream data

99

* @param model Storage model definition

100

* @param noneStream Non-stream data to insert

101

* @param callback Session cache callback

102

* @return Insert request for batch processing

103

* @throws IOException If preparation fails

104

*/

105

InsertRequest prepareBatchInsert(Model model, NoneStream noneStream,

106

SessionCacheCallback callback) throws IOException;

107

}

108

```

109

110

### IManagementDAO

111

112

DAO for management data storage operations.

113

114

```java { .api }

115

public interface IManagementDAO extends DAO {

116

117

/**

118

* Prepares batch insert request for management data

119

* @param model Storage model definition

120

* @param management Management data to insert

121

* @param callback Session cache callback

122

* @return Insert request for batch processing

123

* @throws IOException If preparation fails

124

*/

125

InsertRequest prepareBatchInsert(Model model, Management management,

126

SessionCacheCallback callback) throws IOException;

127

}

128

```

129

130

## Storage Data Models

131

132

### StorageData

133

134

Base interface for all storage entities.

135

136

```java { .api }

137

public interface StorageData {

138

139

/**

140

* Standard time bucket column name

141

*/

142

String TIME_BUCKET = "time_bucket";

143

144

/**

145

* Gets unique storage identifier for this entity

146

* @return Storage identifier

147

*/

148

StorageID id();

149

}

150

```

151

152

### StorageID

153

154

Represents unique identifier in storage systems.

155

156

```java { .api }

157

public class StorageID {

158

159

private String id;

160

161

/**

162

* Creates storage ID from string value

163

* @param id String identifier

164

*/

165

public StorageID(String id);

166

167

/**

168

* Gets the string representation of this ID

169

* @return String ID

170

*/

171

public String build();

172

173

/**

174

* Gets raw ID value

175

* @return Raw ID string

176

*/

177

public String getId();

178

179

@Override

180

public boolean equals(Object obj);

181

182

@Override

183

public int hashCode();

184

185

@Override

186

public String toString();

187

}

188

```

189

190

### ComparableStorageData

191

192

Storage data with comparison capabilities for sorting.

193

194

```java { .api }

195

public interface ComparableStorageData extends StorageData, Comparable<ComparableStorageData> {

196

197

/**

198

* Compares this storage data with another for ordering

199

* @param o Other storage data to compare with

200

* @return Negative, zero, or positive integer for less than, equal, or greater than

201

*/

202

@Override

203

int compareTo(ComparableStorageData o);

204

}

205

```

206

207

## Storage Builders

208

209

### StorageBuilder Interface

210

211

Converts between entity objects and storage representations.

212

213

```java { .api }

214

public interface StorageBuilder<T extends StorageData> {

215

216

/**

217

* Converts storage data to entity object

218

* @param converter Conversion helper with storage data

219

* @return Entity object populated from storage

220

*/

221

T storage2Entity(Convert2Entity converter);

222

223

/**

224

* Converts entity object to storage data format

225

* @param storageData Entity object to convert

226

* @param converter Conversion helper for storage format

227

*/

228

void entity2Storage(T storageData, Convert2Storage converter);

229

}

230

```

231

232

### Convert2Entity

233

234

Helper interface for converting from storage to entity format.

235

236

```java { .api }

237

public interface Convert2Entity {

238

239

/**

240

* Gets string value from storage column

241

* @param columnName Column name

242

* @return String value or null

243

*/

244

String get(String columnName);

245

246

/**

247

* Gets integer value from storage column

248

* @param columnName Column name

249

* @return Integer value or null

250

*/

251

Integer getInt(String columnName);

252

253

/**

254

* Gets long value from storage column

255

* @param columnName Column name

256

* @return Long value or null

257

*/

258

Long getLong(String columnName);

259

260

/**

261

* Gets double value from storage column

262

* @param columnName Column name

263

* @return Double value or null

264

*/

265

Double getDouble(String columnName);

266

267

/**

268

* Gets byte array from storage column

269

* @param columnName Column name

270

* @return Byte array or null

271

*/

272

byte[] getBytes(String columnName);

273

}

274

```

275

276

### Convert2Storage

277

278

Helper interface for converting from entity to storage format.

279

280

```java { .api }

281

public interface Convert2Storage {

282

283

/**

284

* Sets storage column value

285

* @param columnName Column name

286

* @param value Value to store

287

*/

288

void accept(String columnName, Object value);

289

290

/**

291

* Sets storage column with specific data type

292

* @param columnName Column name

293

* @param value Value to store

294

* @param columnType Storage column type

295

*/

296

void accept(String columnName, Object value, Column.ValueDataType columnType);

297

}

298

```

299

300

### StorageBuilderFactory

301

302

Factory for creating storage builders.

303

304

```java { .api }

305

public class StorageBuilderFactory {

306

307

/**

308

* Creates storage builder for specified entity class

309

* @param clazz Entity class

310

* @return Storage builder instance

311

* @throws StorageException If builder creation fails

312

*/

313

public static <T extends StorageData> StorageBuilder<T> getStorageBuilder(Class<T> clazz)

314

throws StorageException;

315

}

316

```

317

318

## Storage Annotations

319

320

### Column Annotation

321

322

Marks fields as storage columns with metadata.

323

324

```java { .api }

325

@Target(ElementType.FIELD)

326

@Retention(RetentionPolicy.RUNTIME)

327

public @interface Column {

328

329

/**

330

* Column name in storage (defaults to field name)

331

*/

332

String name() default "";

333

334

/**

335

* Data type for storage

336

*/

337

ValueDataType dataType() default ValueDataType.VARCHAR;

338

339

/**

340

* Maximum length for string columns

341

*/

342

int length() default 200;

343

344

/**

345

* Whether column stores JSON data

346

*/

347

boolean storageOnly() default false;

348

349

/**

350

* Column index configuration

351

*/

352

boolean indexOnly() default false;

353

354

/**

355

* Column data type enumeration

356

*/

357

enum ValueDataType {

358

VARCHAR, TEXT, INT, BIGINT, DOUBLE, SAMPLED_RECORD

359

}

360

}

361

```

362

363

### BanyanDB Annotation

364

365

BanyanDB-specific storage configurations.

366

367

```java { .api }

368

@Target(ElementType.TYPE)

369

@Retention(RetentionPolicy.RUNTIME)

370

public @interface BanyanDB {

371

372

/**

373

* Time-to-live configuration

374

*/

375

TTL ttl() default @TTL();

376

377

/**

378

* Sharding configuration

379

*/

380

Sharding sharding() default @Sharding();

381

382

/**

383

* Time-to-live settings

384

*/

385

@interface TTL {

386

String value() default "";

387

String unit() default "DAY";

388

}

389

390

/**

391

* Sharding settings

392

*/

393

@interface Sharding {

394

String[] shardingKeys() default {};

395

}

396

}

397

```

398

399

## Batch Processing

400

401

### InsertRequest

402

403

Request for batch insert operations.

404

405

```java { .api }

406

public class InsertRequest {

407

408

private String index;

409

private String type;

410

private String id;

411

private Map<String, Object> source;

412

413

/**

414

* Creates insert request

415

* @param index Storage index/table name

416

* @param type Storage type

417

* @param id Document/record ID

418

* @param source Data to insert

419

*/

420

public InsertRequest(String index, String type, String id, Map<String, Object> source);

421

422

public String getIndex();

423

public String getType();

424

public String getId();

425

public Map<String, Object> getSource();

426

}

427

```

428

429

### UpdateRequest

430

431

Request for batch update operations.

432

433

```java { .api }

434

public class UpdateRequest {

435

436

private String index;

437

private String type;

438

private String id;

439

private Map<String, Object> doc;

440

441

/**

442

* Creates update request

443

* @param index Storage index/table name

444

* @param type Storage type

445

* @param id Document/record ID to update

446

* @param doc Updated data

447

*/

448

public UpdateRequest(String index, String type, String id, Map<String, Object> doc);

449

450

public String getIndex();

451

public String getType();

452

public String getId();

453

public Map<String, Object> getDoc();

454

}

455

```

456

457

### SessionCacheCallback

458

459

Callback for session cache operations during batch processing.

460

461

```java { .api }

462

public interface SessionCacheCallback {

463

464

/**

465

* Callback method invoked during cache operations

466

* @param data Cache-related data

467

*/

468

void callback(Object data);

469

}

470

```

471

472

## Storage Utilities

473

474

### StorageException

475

476

Exception for storage layer operations.

477

478

```java { .api }

479

public class StorageException extends Exception {

480

481

/**

482

* Creates storage exception with message

483

* @param message Error message

484

*/

485

public StorageException(String message);

486

487

/**

488

* Creates storage exception with message and cause

489

* @param message Error message

490

* @param cause Underlying cause

491

*/

492

public StorageException(String message, Throwable cause);

493

}

494

```

495

496

### StorageModule

497

498

Storage module definition and configuration.

499

500

```java { .api }

501

public class StorageModule extends ModuleDefine {

502

503

public static final String NAME = "storage";

504

505

/**

506

* Gets module name

507

* @return Module name

508

*/

509

@Override

510

public String name();

511

512

/**

513

* Gets module services

514

* @return Array of service classes

515

*/

516

@Override

517

public Class[] services();

518

}

519

```

520

521

### Model

522

523

Storage model definition with metadata.

524

525

```java { .api }

526

public class Model {

527

528

private String name;

529

private List<ModelColumn> columns;

530

private boolean record;

531

private boolean superDataset;

532

533

/**

534

* Gets model name

535

* @return Model name

536

*/

537

public String getName();

538

539

/**

540

* Gets model columns

541

* @return List of columns

542

*/

543

public List<ModelColumn> getColumns();

544

545

/**

546

* Checks if model represents record data

547

* @return True if record model

548

*/

549

public boolean isRecord();

550

551

/**

552

* Checks if model is super dataset

553

* @return True if super dataset

554

*/

555

public boolean isSuperDataset();

556

}

557

```

558

559

## Usage Examples

560

561

### Implementing Custom Storage DAO

562

563

```java

564

@Component

565

public class CustomElasticsearchStorageDAO implements StorageDAO {

566

567

private ElasticsearchClient client;

568

569

@Override

570

public IMetricsDAO newMetricsDao(StorageBuilder<? extends Metrics> storageBuilder)

571

throws IOException {

572

return new ElasticsearchMetricsDAO(client, storageBuilder);

573

}

574

575

@Override

576

public IRecordDAO newRecordDao(StorageBuilder<? extends Record> storageBuilder)

577

throws IOException {

578

return new ElasticsearchRecordDAO(client, storageBuilder);

579

}

580

581

@Override

582

public INoneStreamDAO newNoneStreamDao(StorageBuilder<? extends NoneStream> storageBuilder)

583

throws IOException {

584

return new ElasticsearchNoneStreamDAO(client, storageBuilder);

585

}

586

587

@Override

588

public IManagementDAO newManagementDao(StorageBuilder<? extends Management> storageBuilder)

589

throws IOException {

590

return new ElasticsearchManagementDAO(client, storageBuilder);

591

}

592

}

593

```

594

595

### Creating Custom Storage Entity

596

597

```java

598

@BanyanDB(

599

ttl = @BanyanDB.TTL(value = "7", unit = "DAY"),

600

sharding = @BanyanDB.Sharding(shardingKeys = {"service_id"})

601

)

602

public class CustomMetrics extends Metrics {

603

604

@Column(name = "service_id", dataType = Column.ValueDataType.VARCHAR, length = 512)

605

@Getter @Setter

606

private String serviceId;

607

608

@Column(name = "request_count", dataType = Column.ValueDataType.BIGINT)

609

@Getter @Setter

610

private long requestCount;

611

612

@Column(name = "response_time", dataType = Column.ValueDataType.BIGINT)

613

@Getter @Setter

614

private long responseTime;

615

616

@Column(name = "error_rate", dataType = Column.ValueDataType.DOUBLE)

617

@Getter @Setter

618

private double errorRate;

619

620

// Implement required Metrics methods

621

@Override

622

public boolean combine(Metrics metrics) {

623

CustomMetrics other = (CustomMetrics) metrics;

624

this.requestCount += other.getRequestCount();

625

this.responseTime += other.getResponseTime();

626

// Recalculate error rate

627

return true;

628

}

629

630

@Override

631

public void calculate() {

632

if (requestCount > 0) {

633

// Perform calculations

634

}

635

}

636

637

@Override

638

public Metrics toHour() {

639

CustomMetrics hourMetrics = new CustomMetrics();

640

// Copy and transform data for hour aggregation

641

return hourMetrics;

642

}

643

644

@Override

645

public Metrics toDay() {

646

CustomMetrics dayMetrics = new CustomMetrics();

647

// Copy and transform data for day aggregation

648

return dayMetrics;

649

}

650

651

public static class Builder implements StorageBuilder<CustomMetrics> {

652

653

@Override

654

public CustomMetrics storage2Entity(Convert2Entity converter) {

655

CustomMetrics metrics = new CustomMetrics();

656

metrics.setServiceId(converter.get("service_id"));

657

metrics.setRequestCount(converter.getLong("request_count"));

658

metrics.setResponseTime(converter.getLong("response_time"));

659

metrics.setErrorRate(converter.getDouble("error_rate"));

660

metrics.setTimeBucket(converter.getLong("time_bucket"));

661

return metrics;

662

}

663

664

@Override

665

public void entity2Storage(CustomMetrics storageData, Convert2Storage converter) {

666

converter.accept("service_id", storageData.getServiceId());

667

converter.accept("request_count", storageData.getRequestCount());

668

converter.accept("response_time", storageData.getResponseTime());

669

converter.accept("error_rate", storageData.getErrorRate());

670

converter.accept("time_bucket", storageData.getTimeBucket());

671

}

672

}

673

}

674

```

675

676

### Implementing Metrics DAO

677

678

```java

679

public class CustomMetricsDAO implements IMetricsDAO {

680

681

private final DatabaseClient client;

682

private final StorageBuilder<? extends Metrics> storageBuilder;

683

684

public CustomMetricsDAO(DatabaseClient client,

685

StorageBuilder<? extends Metrics> storageBuilder) {

686

this.client = client;

687

this.storageBuilder = storageBuilder;

688

}

689

690

@Override

691

public List<Metrics> multiGet(Model model, List<Metrics> metrics) throws IOException {

692

List<String> ids = metrics.stream()

693

.map(m -> m.id().build())

694

.collect(Collectors.toList());

695

696

// Batch query from storage

697

List<Map<String, Object>> results = client.multiGet(model.getName(), ids);

698

699

return results.stream()

700

.map(this::convertToMetrics)

701

.collect(Collectors.toList());

702

}

703

704

@Override

705

public InsertRequest prepareBatchInsert(Model model, Metrics metrics,

706

SessionCacheCallback callback) throws IOException {

707

708

// Convert entity to storage format

709

Convert2Storage converter = new MapConvert2Storage();

710

storageBuilder.entity2Storage(metrics, converter);

711

712

return new InsertRequest(

713

model.getName(), // index/table

714

"metrics", // type

715

metrics.id().build(), // document ID

716

converter.getData() // source data

717

);

718

}

719

720

@Override

721

public UpdateRequest prepareBatchUpdate(Model model, Metrics metrics,

722

SessionCacheCallback callback) throws IOException {

723

724

// Convert entity to storage format for update

725

Convert2Storage converter = new MapConvert2Storage();

726

storageBuilder.entity2Storage(metrics, converter);

727

728

return new UpdateRequest(

729

model.getName(), // index/table

730

"metrics", // type

731

metrics.id().build(), // document ID

732

converter.getData() // update data

733

);

734

}

735

736

@Override

737

public boolean isExpiredCache(Model model, Metrics cachedValue,

738

long currentTimeMillis, int ttl) throws IOException {

739

long cacheTime = cachedValue.getLastUpdateTimestamp();

740

long expireTime = cacheTime + (ttl * 1000L);

741

return currentTimeMillis > expireTime;

742

}

743

744

private Metrics convertToMetrics(Map<String, Object> data) {

745

Convert2Entity converter = new MapConvert2Entity(data);

746

return storageBuilder.storage2Entity(converter);

747

}

748

}

749

```

750

751

## Core Storage Types

752

753

```java { .api }

754

/**

755

* Model column definition

756

*/

757

public class ModelColumn {

758

private String columnName;

759

private Class<?> type;

760

private boolean indexOnly;

761

private boolean storageOnly;

762

private Column.ValueDataType dataType;

763

764

public String getColumnName();

765

public Class<?> getType();

766

public boolean isIndexOnly();

767

public boolean isStorageOnly();

768

public Column.ValueDataType getDataType();

769

}

770

771

/**

772

* Base DAO interface

773

*/

774

public interface DAO {

775

// Marker interface for DAO implementations

776

}

777

778

/**

779

* Storage request base class

780

*/

781

public abstract class StorageRequest {

782

protected String index;

783

protected String type;

784

protected String id;

785

786

public String getIndex();

787

public String getType();

788

public String getId();

789

}

790

```