or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

analysis-framework.mdconfiguration.mdindex.mdprofiling.mdquery-services.mdremote-communication.mdsource-processing.mdstorage-layer.md

analysis-framework.mddocs/

0

# Analysis Framework

1

2

The SkyWalking analysis framework provides the core stream processing engine that transforms raw telemetry data into structured metrics and records. Built around the Observability Analysis Language (OAL), it offers high-performance real-time data processing with configurable downsampling and entity management.

3

4

## Stream Processing

5

6

### StreamProcessor Interface

7

8

The foundation of the streaming analysis engine.

9

10

```java { .api }

11

public interface StreamProcessor<STREAM> {

12

/**

13

* Processes incoming stream data

14

* @param stream The streaming data to be processed

15

*/

16

void in(STREAM stream);

17

}

18

```

19

20

### Stream Annotation

21

22

Marks classes for OAL stream processing analysis.

23

24

```java { .api }

25

@Target(ElementType.TYPE)

26

@Retention(RetentionPolicy.RUNTIME)

27

public @interface Stream {

28

/**

29

* @return name of this stream definition.

30

*/

31

String name();

32

33

/**

34

* @return scope id, see {@link ScopeDeclaration}

35

*/

36

int scopeId();

37

38

/**

39

* @return the converter type between entity and storage record persistence. The converter could be override by the

40

* storage implementation if necessary. Default, return {@link org.apache.skywalking.oap.server.core.storage.type.StorageBuilder}

41

* for general suitable.

42

*/

43

Class<? extends StorageBuilder> builder();

44

45

/**

46

* @return the stream processor type, see {@link MetricsStreamProcessor}, {@link RecordStreamProcessor}, {@link

47

* TopNStreamProcessor} and {@link NoneStreamProcessor} for more details.

48

*/

49

Class<? extends StreamProcessor> processor();

50

}

51

```

52

53

### StreamDefinition

54

55

Defines configurations for stream processing.

56

57

```java { .api }

58

public class StreamDefinition {

59

private String name;

60

private int scopeId;

61

private Class<? extends StreamBuilder> builderClass;

62

private Class<? extends StreamProcessor> processorClass;

63

64

/**

65

* Gets the stream name

66

* @return Stream name

67

*/

68

public String getName();

69

70

/**

71

* Gets the scope identifier

72

* @return Scope ID

73

*/

74

public int getScopeId();

75

76

/**

77

* Gets the builder class

78

* @return Builder class

79

*/

80

public Class<? extends StreamBuilder> getBuilderClass();

81

82

/**

83

* Gets the processor class

84

* @return Processor class

85

*/

86

public Class<? extends StreamProcessor> getProcessorClass();

87

}

88

```

89

90

## Source Dispatching

91

92

### SourceDispatcher Interface

93

94

Routes telemetry sources to appropriate stream processors.

95

96

```java { .api }

97

public interface SourceDispatcher<SOURCE> {

98

/**

99

* Dispatches source data to streaming process

100

* @param source The telemetry source to dispatch

101

*/

102

void dispatch(SOURCE source);

103

}

104

```

105

106

### DispatcherManager

107

108

Central manager for all source dispatchers.

109

110

```java { .api }

111

public class DispatcherManager implements DispatcherDetectorListener {

112

113

/**

114

* Routes source to appropriate dispatchers

115

* @param source The source to forward

116

* @throws IOException If routing fails

117

*/

118

public void forward(ISource source) throws IOException;

119

120

/**

121

* Scans for and registers dispatcher implementations

122

*/

123

public void scan() throws IOException, IllegalAccessException, InstantiationException;

124

125

/**

126

* Adds dispatcher if class is valid

127

* @param aClass Class to check and add as dispatcher

128

*/

129

public void addIfAsSourceDispatcher(Class<?> aClass);

130

}

131

```

132

133

## ID Management

134

135

### IDManager

136

137

Central service for encoding and decoding entity identifiers.

138

139

```java { .api }

140

public class IDManager {

141

142

/**

143

* Service ID operations

144

*/

145

public static class ServiceID {

146

147

/**

148

* Creates service ID from name and normalization flag

149

* @param name Service name

150

* @param isNormal Whether service name is normalized

151

* @return Encoded service ID

152

*/

153

public static String buildId(String name, boolean isNormal);

154

155

/**

156

* Parses service ID into components

157

* @param id Service ID to parse

158

* @return Service ID definition with components

159

*/

160

public static ServiceIDDefinition analysisId(String id);

161

162

/**

163

* Creates relation ID between services

164

* @param define Service relation definition

165

* @return Encoded relation ID

166

*/

167

public static String buildRelationId(ServiceRelationDefine define);

168

}

169

170

/**

171

* Service instance ID operations

172

*/

173

public static class ServiceInstanceID {

174

175

/**

176

* Creates instance ID from service and instance name

177

* @param serviceId Parent service ID

178

* @param instanceName Instance name

179

* @return Encoded instance ID

180

*/

181

public static String buildId(String serviceId, String instanceName);

182

183

/**

184

* Parses instance ID into components

185

* @param id Instance ID to parse

186

* @return Instance ID definition with components

187

*/

188

public static InstanceIDDefinition analysisId(String id);

189

}

190

191

/**

192

* Endpoint ID operations

193

*/

194

public static class EndpointID {

195

196

/**

197

* Creates endpoint ID from service and endpoint name

198

* @param serviceId Parent service ID

199

* @param endpointName Endpoint name

200

* @return Encoded endpoint ID

201

*/

202

public static String buildId(String serviceId, String endpointName);

203

204

/**

205

* Parses endpoint ID into components

206

* @param id Endpoint ID to parse

207

* @return Endpoint ID definition with components

208

*/

209

public static EndpointIDDefinition analysisId(String id);

210

}

211

212

/**

213

* Process ID operations

214

*/

215

public static class ProcessID {

216

217

/**

218

* Creates process ID from instance and process name

219

* @param instanceId Parent instance ID

220

* @param processName Process name

221

* @return Encoded process ID

222

*/

223

public static String buildId(String instanceId, String processName);

224

225

/**

226

* Parses process ID into components

227

* @param id Process ID to parse

228

* @return Process ID definition with components

229

*/

230

public static ProcessIDDefinition analysisId(String id);

231

}

232

233

/**

234

* Network address alias operations

235

*/

236

public static class NetworkAddressAliasDefine {

237

238

/**

239

* Creates network address ID

240

* @param networkAddress Network address

241

* @return Encoded network address ID

242

*/

243

public static String buildId(String networkAddress);

244

245

/**

246

* Parses network address ID

247

* @param id Network address ID to parse

248

* @return Network address definition

249

*/

250

public static NetworkAddressIDDefinition analysisId(String id);

251

}

252

}

253

```

254

255

## Time Management

256

257

### TimeBucket

258

259

Manages time bucket operations for metrics downsampling.

260

261

```java { .api }

262

public class TimeBucket {

263

264

/**

265

* Converts time bucket to timestamp

266

* @param timeBucket Time bucket value

267

* @param downSampling Downsampling level

268

* @return Timestamp in milliseconds

269

*/

270

public static long getTimestamp(long timeBucket, DownSampling downSampling);

271

272

/**

273

* Gets current minute precision time bucket

274

* @param timestamp Current timestamp

275

* @return Minute precision time bucket

276

*/

277

public static long getMinuteTimeBucket(long timestamp);

278

279

/**

280

* Gets current hour precision time bucket

281

* @param timestamp Current timestamp

282

* @return Hour precision time bucket

283

*/

284

public static long getHourTimeBucket(long timestamp);

285

286

/**

287

* Gets current day precision time bucket

288

* @param timestamp Current timestamp

289

* @return Day precision time bucket

290

*/

291

public static long getDayTimeBucket(long timestamp);

292

293

/**

294

* Checks if time bucket is minute precision

295

* @param timeBucket Time bucket to check

296

* @return True if minute precision

297

*/

298

public static boolean isMinuteBucket(long timeBucket);

299

300

/**

301

* Checks if time bucket is hour precision

302

* @param timeBucket Time bucket to check

303

* @return True if hour precision

304

*/

305

public static boolean isHourBucket(long timeBucket);

306

307

/**

308

* Checks if time bucket is day precision

309

* @param timeBucket Time bucket to check

310

* @return True if day precision

311

*/

312

public static boolean isDayBucket(long timeBucket);

313

}

314

```

315

316

### DownSampling

317

318

Defines time precision levels for metrics aggregation.

319

320

```java { .api }

321

public enum DownSampling {

322

/**

323

* Minute precision (1-minute buckets)

324

*/

325

Minute(Calendar.MINUTE, "Minute", 1),

326

327

/**

328

* Hour precision (1-hour buckets)

329

*/

330

Hour(Calendar.HOUR_OF_DAY, "Hour", 24),

331

332

/**

333

* Day precision (1-day buckets)

334

*/

335

Day(Calendar.DAY_OF_MONTH, "Day", 30);

336

337

private final int calendarUnit;

338

private final String name;

339

private final int size;

340

341

/**

342

* Gets the calendar unit for this downsampling level

343

* @return Calendar unit constant

344

*/

345

public int getCalendarUnit();

346

347

/**

348

* Gets the name of this downsampling level

349

* @return Downsampling name

350

*/

351

public String getName();

352

353

/**

354

* Gets the typical size/count for this level

355

* @return Size value

356

*/

357

public int getSize();

358

}

359

```

360

361

## Stream Processing Workers

362

363

### MetricsStreamProcessor

364

365

Processes metrics data streams with aggregation and downsampling.

366

367

```java { .api }

368

public class MetricsStreamProcessor extends StreamProcessor<Metrics> {

369

370

/**

371

* Processes incoming metrics stream

372

* @param metrics Metrics data to process

373

*/

374

@Override

375

public void in(Metrics metrics);

376

377

/**

378

* Creates processor instance for metrics stream

379

* @param moduleDefineHolder Module services holder

380

* @param definition Stream definition

381

* @param metricsClass Metrics class type

382

* @return Configured processor

383

*/

384

public static MetricsStreamProcessor create(ModuleDefineHolder moduleDefineHolder,

385

StreamDefinition definition,

386

Class<? extends Metrics> metricsClass);

387

}

388

```

389

390

### RecordStreamProcessor

391

392

Processes record data streams for log and event storage.

393

394

```java { .api }

395

public class RecordStreamProcessor extends StreamProcessor<Record> {

396

397

/**

398

* Processes incoming record stream

399

* @param record Record data to process

400

*/

401

@Override

402

public void in(Record record);

403

404

/**

405

* Creates processor instance for record stream

406

* @param moduleDefineHolder Module services holder

407

* @param definition Stream definition

408

* @param recordClass Record class type

409

* @return Configured processor

410

*/

411

public static RecordStreamProcessor create(ModuleDefineHolder moduleDefineHolder,

412

StreamDefinition definition,

413

Class<? extends Record> recordClass);

414

}

415

```

416

417

### ManagementStreamProcessor

418

419

Processes management data streams for metadata and configuration.

420

421

```java { .api }

422

public class ManagementStreamProcessor extends StreamProcessor<Management> {

423

424

/**

425

* Processes incoming management stream

426

* @param management Management data to process

427

*/

428

@Override

429

public void in(Management management);

430

}

431

```

432

433

### TopNStreamProcessor

434

435

Processes TopN aggregation streams for ranking and sorting.

436

437

```java { .api }

438

public class TopNStreamProcessor extends StreamProcessor<TopN> {

439

440

/**

441

* Processes incoming TopN stream

442

* @param topN TopN data to process

443

*/

444

@Override

445

public void in(TopN topN);

446

}

447

```

448

449

## Usage Examples

450

451

### Implementing Custom Source Dispatcher

452

453

```java

454

@Component

455

public class CustomTelemetryDispatcher implements SourceDispatcher<CustomTelemetrySource> {

456

457

@Override

458

public void dispatch(CustomTelemetrySource source) {

459

// Prepare source data

460

source.prepare();

461

462

// Set entity ID using IDManager

463

String serviceId = IDManager.ServiceID.buildId(

464

source.getServiceName(),

465

true // normalized

466

);

467

source.setEntityId(serviceId);

468

469

// Set time bucket for aggregation

470

long timeBucket = TimeBucket.getMinuteTimeBucket(

471

source.getTimestamp()

472

);

473

source.setTimeBucket(timeBucket);

474

475

// Forward to appropriate stream processor

476

if (source.isMetric()) {

477

metricsProcessor.in(source.toMetrics());

478

} else if (source.isRecord()) {

479

recordProcessor.in(source.toRecord());

480

}

481

}

482

}

483

```

484

485

### Creating Custom Stream Definition

486

487

```java

488

@Stream(

489

name = "custom_service_metrics",

490

scopeId = DefaultScopeDefine.SERVICE,

491

builder = CustomServiceMetrics.Builder.class,

492

processor = MetricsStreamProcessor.class

493

)

494

public class CustomServiceMetrics extends Metrics {

495

496

@Getter @Setter

497

private String serviceName;

498

499

@Getter @Setter

500

private long requestCount;

501

502

@Getter @Setter

503

private long responseTime;

504

505

@Override

506

public boolean combine(Metrics metrics) {

507

CustomServiceMetrics other = (CustomServiceMetrics) metrics;

508

this.requestCount += other.getRequestCount();

509

this.responseTime += other.getResponseTime();

510

return true;

511

}

512

513

@Override

514

public void calculate() {

515

// Calculate average response time or other derived metrics

516

if (requestCount > 0) {

517

// Perform calculations

518

}

519

}

520

521

@Override

522

public Metrics toHour() {

523

CustomServiceMetrics hourMetrics = new CustomServiceMetrics();

524

hourMetrics.copyFrom(this);

525

return hourMetrics;

526

}

527

528

@Override

529

public Metrics toDay() {

530

CustomServiceMetrics dayMetrics = new CustomServiceMetrics();

531

dayMetrics.copyFrom(this);

532

return dayMetrics;

533

}

534

535

public static class Builder implements StorageBuilder<CustomServiceMetrics> {

536

@Override

537

public CustomServiceMetrics storage2Entity(Convert2Entity converter) {

538

// Build entity from storage data

539

CustomServiceMetrics metrics = new CustomServiceMetrics();

540

// Set fields from converter

541

return metrics;

542

}

543

544

@Override

545

public void entity2Storage(CustomServiceMetrics storageData,

546

Convert2Storage converter) {

547

// Convert entity to storage format

548

converter.accept("service_name", storageData.getServiceName());

549

converter.accept("request_count", storageData.getRequestCount());

550

converter.accept("response_time", storageData.getResponseTime());

551

}

552

}

553

}

554

```

555

556

## Core Types

557

558

```java { .api }

559

/**

560

* Service ID definition with parsed components

561

*/

562

public class ServiceIDDefinition {

563

private String name;

564

private boolean isNormal;

565

566

public String getName();

567

public boolean isNormal();

568

}

569

570

/**

571

* Instance ID definition with parsed components

572

*/

573

public class InstanceIDDefinition {

574

private String serviceId;

575

private String instanceName;

576

577

public String getServiceId();

578

public String getInstanceName();

579

}

580

581

/**

582

* Endpoint ID definition with parsed components

583

*/

584

public class EndpointIDDefinition {

585

private String serviceId;

586

private String endpointName;

587

588

public String getServiceId();

589

public String getEndpointName();

590

}

591

592

/**

593

* Service relation definition

594

*/

595

public class ServiceRelationDefine {

596

private String sourceServiceId;

597

private String destServiceId;

598

private DetectPoint detectPoint;

599

600

public String getSourceServiceId();

601

public String getDestServiceId();

602

public DetectPoint getDetectPoint();

603

}

604

605

/**

606

* Detection point for service relations

607

*/

608

public enum DetectPoint {

609

CLIENT, SERVER, PROXY

610

}

611

```