0
# Analysis Framework
1
2
The SkyWalking analysis framework provides the core stream processing engine that transforms raw telemetry data into structured metrics and records. Built around the Observability Analysis Language (OAL), it offers high-performance real-time data processing with configurable downsampling and entity management.
3
4
## Stream Processing
5
6
### StreamProcessor Interface
7
8
The foundation of the streaming analysis engine.
9
10
```java { .api }
11
public interface StreamProcessor<STREAM> {
12
/**
13
* Processes incoming stream data
14
* @param stream The streaming data to be processed
15
*/
16
void in(STREAM stream);
17
}
18
```
19
20
### Stream Annotation
21
22
Marks classes for OAL stream processing analysis.
23
24
```java { .api }
25
@Target(ElementType.TYPE)
26
@Retention(RetentionPolicy.RUNTIME)
27
public @interface Stream {
28
/**
29
* @return name of this stream definition.
30
*/
31
String name();
32
33
/**
34
* @return scope id, see {@link ScopeDeclaration}
35
*/
36
int scopeId();
37
38
/**
39
* @return the converter type between entity and storage record persistence. The converter could be override by the
40
* storage implementation if necessary. Default, return {@link org.apache.skywalking.oap.server.core.storage.type.StorageBuilder}
41
* for general suitable.
42
*/
43
Class<? extends StorageBuilder> builder();
44
45
/**
46
* @return the stream processor type, see {@link MetricsStreamProcessor}, {@link RecordStreamProcessor}, {@link
47
* TopNStreamProcessor} and {@link NoneStreamProcessor} for more details.
48
*/
49
Class<? extends StreamProcessor> processor();
50
}
51
```
52
53
### StreamDefinition
54
55
Defines configurations for stream processing.
56
57
```java { .api }
58
public class StreamDefinition {
59
private String name;
60
private int scopeId;
61
private Class<? extends StreamBuilder> builderClass;
62
private Class<? extends StreamProcessor> processorClass;
63
64
/**
65
* Gets the stream name
66
* @return Stream name
67
*/
68
public String getName();
69
70
/**
71
* Gets the scope identifier
72
* @return Scope ID
73
*/
74
public int getScopeId();
75
76
/**
77
* Gets the builder class
78
* @return Builder class
79
*/
80
public Class<? extends StreamBuilder> getBuilderClass();
81
82
/**
83
* Gets the processor class
84
* @return Processor class
85
*/
86
public Class<? extends StreamProcessor> getProcessorClass();
87
}
88
```
89
90
## Source Dispatching
91
92
### SourceDispatcher Interface
93
94
Routes telemetry sources to appropriate stream processors.
95
96
```java { .api }
97
public interface SourceDispatcher<SOURCE> {
98
/**
99
* Dispatches source data to streaming process
100
* @param source The telemetry source to dispatch
101
*/
102
void dispatch(SOURCE source);
103
}
104
```
105
106
### DispatcherManager
107
108
Central manager for all source dispatchers.
109
110
```java { .api }
111
public class DispatcherManager implements DispatcherDetectorListener {
112
113
/**
114
* Routes source to appropriate dispatchers
115
* @param source The source to forward
116
* @throws IOException If routing fails
117
*/
118
public void forward(ISource source) throws IOException;
119
120
/**
121
* Scans for and registers dispatcher implementations
122
*/
123
public void scan() throws IOException, IllegalAccessException, InstantiationException;
124
125
/**
126
* Adds dispatcher if class is valid
127
* @param aClass Class to check and add as dispatcher
128
*/
129
public void addIfAsSourceDispatcher(Class<?> aClass);
130
}
131
```
132
133
## ID Management
134
135
### IDManager
136
137
Central service for encoding and decoding entity identifiers.
138
139
```java { .api }
140
public class IDManager {
141
142
/**
143
* Service ID operations
144
*/
145
public static class ServiceID {
146
147
/**
148
* Creates service ID from name and normalization flag
149
* @param name Service name
150
* @param isNormal Whether service name is normalized
151
* @return Encoded service ID
152
*/
153
public static String buildId(String name, boolean isNormal);
154
155
/**
156
* Parses service ID into components
157
* @param id Service ID to parse
158
* @return Service ID definition with components
159
*/
160
public static ServiceIDDefinition analysisId(String id);
161
162
/**
163
* Creates relation ID between services
164
* @param define Service relation definition
165
* @return Encoded relation ID
166
*/
167
public static String buildRelationId(ServiceRelationDefine define);
168
}
169
170
/**
171
* Service instance ID operations
172
*/
173
public static class ServiceInstanceID {
174
175
/**
176
* Creates instance ID from service and instance name
177
* @param serviceId Parent service ID
178
* @param instanceName Instance name
179
* @return Encoded instance ID
180
*/
181
public static String buildId(String serviceId, String instanceName);
182
183
/**
184
* Parses instance ID into components
185
* @param id Instance ID to parse
186
* @return Instance ID definition with components
187
*/
188
public static InstanceIDDefinition analysisId(String id);
189
}
190
191
/**
192
* Endpoint ID operations
193
*/
194
public static class EndpointID {
195
196
/**
197
* Creates endpoint ID from service and endpoint name
198
* @param serviceId Parent service ID
199
* @param endpointName Endpoint name
200
* @return Encoded endpoint ID
201
*/
202
public static String buildId(String serviceId, String endpointName);
203
204
/**
205
* Parses endpoint ID into components
206
* @param id Endpoint ID to parse
207
* @return Endpoint ID definition with components
208
*/
209
public static EndpointIDDefinition analysisId(String id);
210
}
211
212
/**
213
* Process ID operations
214
*/
215
public static class ProcessID {
216
217
/**
218
* Creates process ID from instance and process name
219
* @param instanceId Parent instance ID
220
* @param processName Process name
221
* @return Encoded process ID
222
*/
223
public static String buildId(String instanceId, String processName);
224
225
/**
226
* Parses process ID into components
227
* @param id Process ID to parse
228
* @return Process ID definition with components
229
*/
230
public static ProcessIDDefinition analysisId(String id);
231
}
232
233
/**
234
* Network address alias operations
235
*/
236
public static class NetworkAddressAliasDefine {
237
238
/**
239
* Creates network address ID
240
* @param networkAddress Network address
241
* @return Encoded network address ID
242
*/
243
public static String buildId(String networkAddress);
244
245
/**
246
* Parses network address ID
247
* @param id Network address ID to parse
248
* @return Network address definition
249
*/
250
public static NetworkAddressIDDefinition analysisId(String id);
251
}
252
}
253
```
254
255
## Time Management
256
257
### TimeBucket
258
259
Manages time bucket operations for metrics downsampling.
260
261
```java { .api }
262
public class TimeBucket {
263
264
/**
265
* Converts time bucket to timestamp
266
* @param timeBucket Time bucket value
267
* @param downSampling Downsampling level
268
* @return Timestamp in milliseconds
269
*/
270
public static long getTimestamp(long timeBucket, DownSampling downSampling);
271
272
/**
273
* Gets current minute precision time bucket
274
* @param timestamp Current timestamp
275
* @return Minute precision time bucket
276
*/
277
public static long getMinuteTimeBucket(long timestamp);
278
279
/**
280
* Gets current hour precision time bucket
281
* @param timestamp Current timestamp
282
* @return Hour precision time bucket
283
*/
284
public static long getHourTimeBucket(long timestamp);
285
286
/**
287
* Gets current day precision time bucket
288
* @param timestamp Current timestamp
289
* @return Day precision time bucket
290
*/
291
public static long getDayTimeBucket(long timestamp);
292
293
/**
294
* Checks if time bucket is minute precision
295
* @param timeBucket Time bucket to check
296
* @return True if minute precision
297
*/
298
public static boolean isMinuteBucket(long timeBucket);
299
300
/**
301
* Checks if time bucket is hour precision
302
* @param timeBucket Time bucket to check
303
* @return True if hour precision
304
*/
305
public static boolean isHourBucket(long timeBucket);
306
307
/**
308
* Checks if time bucket is day precision
309
* @param timeBucket Time bucket to check
310
* @return True if day precision
311
*/
312
public static boolean isDayBucket(long timeBucket);
313
}
314
```
315
316
### DownSampling
317
318
Defines time precision levels for metrics aggregation.
319
320
```java { .api }
321
public enum DownSampling {
322
/**
323
* Minute precision (1-minute buckets)
324
*/
325
Minute(Calendar.MINUTE, "Minute", 1),
326
327
/**
328
* Hour precision (1-hour buckets)
329
*/
330
Hour(Calendar.HOUR_OF_DAY, "Hour", 24),
331
332
/**
333
* Day precision (1-day buckets)
334
*/
335
Day(Calendar.DAY_OF_MONTH, "Day", 30);
336
337
private final int calendarUnit;
338
private final String name;
339
private final int size;
340
341
/**
342
* Gets the calendar unit for this downsampling level
343
* @return Calendar unit constant
344
*/
345
public int getCalendarUnit();
346
347
/**
348
* Gets the name of this downsampling level
349
* @return Downsampling name
350
*/
351
public String getName();
352
353
/**
354
* Gets the typical size/count for this level
355
* @return Size value
356
*/
357
public int getSize();
358
}
359
```
360
361
## Stream Processing Workers
362
363
### MetricsStreamProcessor
364
365
Processes metrics data streams with aggregation and downsampling.
366
367
```java { .api }
368
public class MetricsStreamProcessor extends StreamProcessor<Metrics> {
369
370
/**
371
* Processes incoming metrics stream
372
* @param metrics Metrics data to process
373
*/
374
@Override
375
public void in(Metrics metrics);
376
377
/**
378
* Creates processor instance for metrics stream
379
* @param moduleDefineHolder Module services holder
380
* @param definition Stream definition
381
* @param metricsClass Metrics class type
382
* @return Configured processor
383
*/
384
public static MetricsStreamProcessor create(ModuleDefineHolder moduleDefineHolder,
385
StreamDefinition definition,
386
Class<? extends Metrics> metricsClass);
387
}
388
```
389
390
### RecordStreamProcessor
391
392
Processes record data streams for log and event storage.
393
394
```java { .api }
395
public class RecordStreamProcessor extends StreamProcessor<Record> {
396
397
/**
398
* Processes incoming record stream
399
* @param record Record data to process
400
*/
401
@Override
402
public void in(Record record);
403
404
/**
405
* Creates processor instance for record stream
406
* @param moduleDefineHolder Module services holder
407
* @param definition Stream definition
408
* @param recordClass Record class type
409
* @return Configured processor
410
*/
411
public static RecordStreamProcessor create(ModuleDefineHolder moduleDefineHolder,
412
StreamDefinition definition,
413
Class<? extends Record> recordClass);
414
}
415
```
416
417
### ManagementStreamProcessor
418
419
Processes management data streams for metadata and configuration.
420
421
```java { .api }
422
public class ManagementStreamProcessor extends StreamProcessor<Management> {
423
424
/**
425
* Processes incoming management stream
426
* @param management Management data to process
427
*/
428
@Override
429
public void in(Management management);
430
}
431
```
432
433
### TopNStreamProcessor
434
435
Processes TopN aggregation streams for ranking and sorting.
436
437
```java { .api }
438
public class TopNStreamProcessor extends StreamProcessor<TopN> {
439
440
/**
441
* Processes incoming TopN stream
442
* @param topN TopN data to process
443
*/
444
@Override
445
public void in(TopN topN);
446
}
447
```
448
449
## Usage Examples
450
451
### Implementing Custom Source Dispatcher
452
453
```java
454
@Component
455
public class CustomTelemetryDispatcher implements SourceDispatcher<CustomTelemetrySource> {
456
457
@Override
458
public void dispatch(CustomTelemetrySource source) {
459
// Prepare source data
460
source.prepare();
461
462
// Set entity ID using IDManager
463
String serviceId = IDManager.ServiceID.buildId(
464
source.getServiceName(),
465
true // normalized
466
);
467
source.setEntityId(serviceId);
468
469
// Set time bucket for aggregation
470
long timeBucket = TimeBucket.getMinuteTimeBucket(
471
source.getTimestamp()
472
);
473
source.setTimeBucket(timeBucket);
474
475
// Forward to appropriate stream processor
476
if (source.isMetric()) {
477
metricsProcessor.in(source.toMetrics());
478
} else if (source.isRecord()) {
479
recordProcessor.in(source.toRecord());
480
}
481
}
482
}
483
```
484
485
### Creating Custom Stream Definition
486
487
```java
488
@Stream(
489
name = "custom_service_metrics",
490
scopeId = DefaultScopeDefine.SERVICE,
491
builder = CustomServiceMetrics.Builder.class,
492
processor = MetricsStreamProcessor.class
493
)
494
public class CustomServiceMetrics extends Metrics {
495
496
@Getter @Setter
497
private String serviceName;
498
499
@Getter @Setter
500
private long requestCount;
501
502
@Getter @Setter
503
private long responseTime;
504
505
@Override
506
public boolean combine(Metrics metrics) {
507
CustomServiceMetrics other = (CustomServiceMetrics) metrics;
508
this.requestCount += other.getRequestCount();
509
this.responseTime += other.getResponseTime();
510
return true;
511
}
512
513
@Override
514
public void calculate() {
515
// Calculate average response time or other derived metrics
516
if (requestCount > 0) {
517
// Perform calculations
518
}
519
}
520
521
@Override
522
public Metrics toHour() {
523
CustomServiceMetrics hourMetrics = new CustomServiceMetrics();
524
hourMetrics.copyFrom(this);
525
return hourMetrics;
526
}
527
528
@Override
529
public Metrics toDay() {
530
CustomServiceMetrics dayMetrics = new CustomServiceMetrics();
531
dayMetrics.copyFrom(this);
532
return dayMetrics;
533
}
534
535
public static class Builder implements StorageBuilder<CustomServiceMetrics> {
536
@Override
537
public CustomServiceMetrics storage2Entity(Convert2Entity converter) {
538
// Build entity from storage data
539
CustomServiceMetrics metrics = new CustomServiceMetrics();
540
// Set fields from converter
541
return metrics;
542
}
543
544
@Override
545
public void entity2Storage(CustomServiceMetrics storageData,
546
Convert2Storage converter) {
547
// Convert entity to storage format
548
converter.accept("service_name", storageData.getServiceName());
549
converter.accept("request_count", storageData.getRequestCount());
550
converter.accept("response_time", storageData.getResponseTime());
551
}
552
}
553
}
554
```
555
556
## Core Types
557
558
```java { .api }
559
/**
560
* Service ID definition with parsed components
561
*/
562
public class ServiceIDDefinition {
563
private String name;
564
private boolean isNormal;
565
566
public String getName();
567
public boolean isNormal();
568
}
569
570
/**
571
* Instance ID definition with parsed components
572
*/
573
public class InstanceIDDefinition {
574
private String serviceId;
575
private String instanceName;
576
577
public String getServiceId();
578
public String getInstanceName();
579
}
580
581
/**
582
* Endpoint ID definition with parsed components
583
*/
584
public class EndpointIDDefinition {
585
private String serviceId;
586
private String endpointName;
587
588
public String getServiceId();
589
public String getEndpointName();
590
}
591
592
/**
593
* Service relation definition
594
*/
595
public class ServiceRelationDefine {
596
private String sourceServiceId;
597
private String destServiceId;
598
private DetectPoint detectPoint;
599
600
public String getSourceServiceId();
601
public String getDestServiceId();
602
public DetectPoint getDetectPoint();
603
}
604
605
/**
606
* Detection point for service relations
607
*/
608
public enum DetectPoint {
609
CLIENT, SERVER, PROXY
610
}
611
```