0
# Source Processing
1
2
The SkyWalking source processing system handles telemetry data ingestion, transformation, and routing. It provides the foundation for receiving various types of observability data including traces, metrics, logs, and infrastructure telemetry from different sources and protocols.
3
4
## Core Source Interfaces
5
6
### ISource
7
8
Base interface for all telemetry data sources in the SkyWalking system.
9
10
```java { .api }
11
public interface ISource {
12
13
/**
14
* Gets the scope identifier for this source
15
* @return Scope ID (service, instance, endpoint, etc.)
16
*/
17
int scope();
18
19
/**
20
* Gets the time bucket for metrics aggregation
21
* @return Time bucket value
22
*/
23
long getTimeBucket();
24
25
/**
26
* Sets the time bucket for metrics aggregation
27
* @param timeBucket Time bucket value
28
*/
29
void setTimeBucket(long timeBucket);
30
31
/**
32
* Gets the entity identifier for this source
33
* @return Entity ID (service ID, instance ID, endpoint ID, etc.)
34
*/
35
String getEntityId();
36
37
/**
38
* Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(ISource)}
39
*/
40
default void prepare() {
41
}
42
}
43
```
44
45
### SourceReceiver
46
47
Service interface for receiving and processing telemetry sources.
48
49
```java { .api }
50
public interface SourceReceiver extends Service {
51
52
/**
53
* Receives and processes a telemetry source
54
* @param source The source data to process
55
* @throws IOException If source processing fails
56
*/
57
void receive(ISource source) throws IOException;
58
59
/**
60
* Receives multiple sources in batch
61
* @param sources List of sources to process
62
* @throws IOException If batch processing fails
63
*/
64
void receiveBatch(List<? extends ISource> sources) throws IOException;
65
}
66
```
67
68
### SourceReceiverImpl
69
70
Default implementation of source receiver with dispatching logic.
71
72
```java { .api }
73
public class SourceReceiverImpl implements SourceReceiver {
74
75
private DispatcherManager dispatcherManager;
76
77
@Override
78
public void receive(ISource source) throws IOException;
79
80
@Override
81
public void receiveBatch(List<? extends ISource> sources) throws IOException;
82
83
/**
84
* Sets the dispatcher manager for routing sources
85
* @param dispatcherManager Dispatcher manager instance
86
*/
87
public void setDispatcherManager(DispatcherManager dispatcherManager);
88
89
/**
90
* Validates source before processing
91
* @param source Source to validate
92
* @return True if source is valid
93
*/
94
protected boolean validateSource(ISource source);
95
96
/**
97
* Preprocesses source before dispatching
98
* @param source Source to preprocess
99
*/
100
protected void preprocessSource(ISource source);
101
}
102
```
103
104
## Source Annotations
105
106
### Source
107
108
Annotation to mark classes as telemetry sources for automatic discovery.
109
110
```java { .api }
111
@Target(ElementType.TYPE)
112
@Retention(RetentionPolicy.RUNTIME)
113
public @interface Source {
114
115
/**
116
* Source name for identification
117
* @return Source name
118
*/
119
String name() default "";
120
121
/**
122
* Source category (trace, metric, log, infrastructure)
123
* @return Source category
124
*/
125
String category() default "";
126
}
127
```
128
129
### ScopeDeclaration
130
131
Annotation to declare the scope of source data.
132
133
```java { .api }
134
@Target(ElementType.TYPE)
135
@Retention(RetentionPolicy.RUNTIME)
136
public @interface ScopeDeclaration {
137
138
/**
139
* Scope identifier
140
* @return Scope ID
141
*/
142
int id();
143
144
/**
145
* Scope name
146
* @return Scope name
147
*/
148
String name() default "";
149
}
150
```
151
152
### ScopeDefaultColumn
153
154
Annotation to define default columns for scope entities.
155
156
```java { .api }
157
@Target(ElementType.TYPE)
158
@Retention(RetentionPolicy.RUNTIME)
159
public @interface ScopeDefaultColumn {
160
161
/**
162
* Virtual column name
163
* @return Column name
164
*/
165
String virtualColumnName();
166
167
/**
168
* Field name in the source class
169
* @return Field name
170
*/
171
String fieldName();
172
173
/**
174
* Whether the column requires entity ID
175
* @return True if entity ID required
176
*/
177
boolean requireEntityId() default false;
178
}
179
```
180
181
## Service Sources
182
183
### Service
184
185
Source for service-level telemetry data and metadata.
186
187
```java { .api }
188
@Source(name = "service", category = "service")
189
@ScopeDeclaration(id = DefaultScopeDefine.SERVICE, name = "Service")
190
public class Service extends ISource {
191
192
@Getter @Setter
193
private String name;
194
195
@Getter @Setter
196
private String shortName;
197
198
@Getter @Setter
199
private String group;
200
201
@Getter @Setter
202
private NodeType nodeType;
203
204
@Getter @Setter
205
private List<String> layers;
206
207
@Override
208
public int scope();
209
210
@Override
211
public String getEntityId();
212
213
@Override
214
public void prepare();
215
216
/**
217
* Checks if service is normal (not virtual)
218
* @return True if normal service
219
*/
220
public boolean isNormal();
221
222
/**
223
* Gets service layer information
224
* @return Primary service layer
225
*/
226
public String getLayer();
227
}
228
```
229
230
### ServiceMeta
231
232
Source for service metadata and registration information.
233
234
```java { .api }
235
@Source(name = "service_meta", category = "service")
236
public class ServiceMeta extends ISource {
237
238
@Getter @Setter
239
private String name;
240
241
@Getter @Setter
242
private NodeType nodeType;
243
244
@Getter @Setter
245
private List<String> layers;
246
247
@Getter @Setter
248
private JsonObject properties;
249
250
@Override
251
public int scope();
252
253
@Override
254
public String getEntityId();
255
256
@Override
257
public void prepare();
258
259
/**
260
* Adds property to service metadata
261
* @param key Property key
262
* @param value Property value
263
*/
264
public void addProperty(String key, String value);
265
266
/**
267
* Gets property from service metadata
268
* @param key Property key
269
* @return Property value or null
270
*/
271
public String getProperty(String key);
272
}
273
```
274
275
### ServiceRelation
276
277
Source for service relationship and dependency information.
278
279
```java { .api }
280
@Source(name = "service_relation", category = "relation")
281
@ScopeDeclaration(id = DefaultScopeDefine.SERVICE_RELATION, name = "ServiceRelation")
282
public class ServiceRelation extends ISource {
283
284
@Getter @Setter
285
private String sourceServiceId;
286
287
@Getter @Setter
288
private String destServiceId;
289
290
@Getter @Setter
291
private String sourceServiceName;
292
293
@Getter @Setter
294
private String destServiceName;
295
296
@Getter @Setter
297
private DetectPoint detectPoint;
298
299
@Getter @Setter
300
private int componentId;
301
302
@Override
303
public int scope();
304
305
@Override
306
public String getEntityId();
307
308
@Override
309
public void prepare();
310
311
/**
312
* Gets relation ID for entity identification
313
* @return Service relation ID
314
*/
315
public String getRelationId();
316
}
317
```
318
319
## Instance Sources
320
321
### ServiceInstance
322
323
Source for service instance telemetry data and metadata.
324
325
```java { .api }
326
@Source(name = "service_instance", category = "service")
327
@ScopeDeclaration(id = DefaultScopeDefine.SERVICE_INSTANCE, name = "ServiceInstance")
328
public class ServiceInstance extends ISource {
329
330
@Getter @Setter
331
private String name;
332
333
@Getter @Setter
334
private String serviceId;
335
336
@Getter @Setter
337
private String serviceName;
338
339
@Getter @Setter
340
private JsonObject properties;
341
342
@Override
343
public int scope();
344
345
@Override
346
public String getEntityId();
347
348
@Override
349
public void prepare();
350
351
/**
352
* Gets instance properties
353
* @return Instance properties as JSON
354
*/
355
public JsonObject getProperties();
356
357
/**
358
* Adds instance property
359
* @param key Property key
360
* @param value Property value
361
*/
362
public void addProperty(String key, String value);
363
364
/**
365
* Gets specific instance property
366
* @param key Property key
367
* @return Property value or null
368
*/
369
public String getProperty(String key);
370
}
371
```
372
373
### ServiceInstanceRelation
374
375
Source for service instance relationship data.
376
377
```java { .api }
378
@Source(name = "service_instance_relation", category = "relation")
379
@ScopeDeclaration(id = DefaultScopeDefine.SERVICE_INSTANCE_RELATION, name = "ServiceInstanceRelation")
380
public class ServiceInstanceRelation extends ISource {
381
382
@Getter @Setter
383
private String sourceServiceId;
384
385
@Getter @Setter
386
private String sourceServiceInstanceId;
387
388
@Getter @Setter
389
private String destServiceId;
390
391
@Getter @Setter
392
private String destServiceInstanceId;
393
394
@Getter @Setter
395
private String sourceServiceName;
396
397
@Getter @Setter
398
private String sourceServiceInstanceName;
399
400
@Getter @Setter
401
private String destServiceName;
402
403
@Getter @Setter
404
private String destServiceInstanceName;
405
406
@Getter @Setter
407
private DetectPoint detectPoint;
408
409
@Getter @Setter
410
private int componentId;
411
412
@Override
413
public int scope();
414
415
@Override
416
public String getEntityId();
417
418
@Override
419
public void prepare();
420
}
421
```
422
423
## Endpoint Sources
424
425
### Endpoint
426
427
Source for endpoint telemetry data and performance metrics.
428
429
```java { .api }
430
@Source(name = "endpoint", category = "endpoint")
431
@ScopeDeclaration(id = DefaultScopeDefine.ENDPOINT, name = "Endpoint")
432
public class Endpoint extends ISource {
433
434
@Getter @Setter
435
private String name;
436
437
@Getter @Setter
438
private String serviceId;
439
440
@Getter @Setter
441
private String serviceName;
442
443
@Getter @Setter
444
private String serviceInstanceId;
445
446
@Getter @Setter
447
private String serviceInstanceName;
448
449
@Getter @Setter
450
private int latency;
451
452
@Getter @Setter
453
private boolean status;
454
455
@Getter @Setter
456
private int responseCode;
457
458
@Getter @Setter
459
private RequestType type;
460
461
@Override
462
public int scope();
463
464
@Override
465
public String getEntityId();
466
467
@Override
468
public void prepare();
469
470
/**
471
* Checks if endpoint call was successful
472
* @return True if successful (status == true)
473
*/
474
public boolean isSuccess();
475
476
/**
477
* Gets endpoint latency in milliseconds
478
* @return Latency value
479
*/
480
public int getLatency();
481
}
482
```
483
484
### EndpointMeta
485
486
Source for endpoint metadata and registration information.
487
488
```java { .api }
489
@Source(name = "endpoint_meta", category = "endpoint")
490
public class EndpointMeta extends ISource {
491
492
@Getter @Setter
493
private String endpoint;
494
495
@Getter @Setter
496
private String serviceId;
497
498
@Getter @Setter
499
private String serviceName;
500
501
@Override
502
public int scope();
503
504
@Override
505
public String getEntityId();
506
507
@Override
508
public void prepare();
509
}
510
```
511
512
### EndpointRelation
513
514
Source for endpoint relationship and dependency information.
515
516
```java { .api }
517
@Source(name = "endpoint_relation", category = "relation")
518
@ScopeDeclaration(id = DefaultScopeDefine.ENDPOINT_RELATION, name = "EndpointRelation")
519
public class EndpointRelation extends ISource {
520
521
@Getter @Setter
522
private String endpoint;
523
524
@Getter @Setter
525
private String childEndpoint;
526
527
@Getter @Setter
528
private int rpcLatency;
529
530
@Getter @Setter
531
private boolean status;
532
533
@Getter @Setter
534
private int responseCode;
535
536
@Getter @Setter
537
private RequestType type;
538
539
@Getter @Setter
540
private DetectPoint detectPoint;
541
542
@Getter @Setter
543
private int componentId;
544
545
@Override
546
public int scope();
547
548
@Override
549
public String getEntityId();
550
551
@Override
552
public void prepare();
553
}
554
```
555
556
## Specialized Sources
557
558
### DatabaseAccess
559
560
Source for database operation telemetry data.
561
562
```java { .api }
563
@Source(name = "database_access", category = "database")
564
@ScopeDeclaration(id = DefaultScopeDefine.DATABASE_ACCESS, name = "DatabaseAccess")
565
public class DatabaseAccess extends ISource {
566
567
@Getter @Setter
568
private String databaseTypeId;
569
570
@Getter @Setter
571
private String name;
572
573
@Getter @Setter
574
private int latency;
575
576
@Getter @Setter
577
private boolean status;
578
579
@Getter @Setter
580
private String sqlStatement;
581
582
@Getter @Setter
583
private String operation;
584
585
@Override
586
public int scope();
587
588
@Override
589
public String getEntityId();
590
591
@Override
592
public void prepare();
593
594
/**
595
* Gets database type identifier
596
* @return Database type ID
597
*/
598
public String getDatabaseTypeId();
599
600
/**
601
* Gets SQL statement (may be truncated)
602
* @return SQL statement
603
*/
604
public String getSqlStatement();
605
}
606
```
607
608
### CacheAccess
609
610
Source for cache operation telemetry data.
611
612
```java { .api }
613
@Source(name = "cache_access", category = "cache")
614
public class CacheAccess extends ISource {
615
616
@Getter @Setter
617
private String name;
618
619
@Getter @Setter
620
private int latency;
621
622
@Getter @Setter
623
private boolean status;
624
625
@Getter @Setter
626
private String operation;
627
628
@Getter @Setter
629
private String key;
630
631
@Override
632
public int scope();
633
634
@Override
635
public String getEntityId();
636
637
@Override
638
public void prepare();
639
640
/**
641
* Gets cache operation type
642
* @return Operation (GET, SET, DELETE, etc.)
643
*/
644
public String getOperation();
645
}
646
```
647
648
### MQAccess
649
650
Source for message queue operation telemetry data.
651
652
```java { .api }
653
@Source(name = "mq_access", category = "mq")
654
public class MQAccess extends ISource {
655
656
@Getter @Setter
657
private String broker;
658
659
@Getter @Setter
660
private String topic;
661
662
@Getter @Setter
663
private int latency;
664
665
@Getter @Setter
666
private boolean status;
667
668
@Getter @Setter
669
private String operation;
670
671
@Getter @Setter
672
private TransmissionLatency transmissionLatency;
673
674
@Override
675
public int scope();
676
677
@Override
678
public String getEntityId();
679
680
@Override
681
public void prepare();
682
683
/**
684
* Gets message broker information
685
* @return Broker identifier
686
*/
687
public String getBroker();
688
689
/**
690
* Gets message topic
691
* @return Topic name
692
*/
693
public String getTopic();
694
}
695
```
696
697
### Log
698
699
Source for log data and events.
700
701
```java { .api }
702
@Source(name = "log", category = "log")
703
public class Log extends ISource {
704
705
@Getter @Setter
706
private String serviceId;
707
708
@Getter @Setter
709
private String serviceInstanceId;
710
711
@Getter @Setter
712
private String endpointId;
713
714
@Getter @Setter
715
private String traceId;
716
717
@Getter @Setter
718
private String traceSegmentId;
719
720
@Getter @Setter
721
private int spanId;
722
723
@Getter @Setter
724
private ContentType contentType;
725
726
@Getter @Setter
727
private String content;
728
729
@Getter @Setter
730
private List<String> tags;
731
732
@Override
733
public int scope();
734
735
@Override
736
public String getEntityId();
737
738
@Override
739
public void prepare();
740
741
/**
742
* Gets log content
743
* @return Log content string
744
*/
745
public String getContent();
746
747
/**
748
* Gets associated trace ID
749
* @return Trace ID or null if not associated
750
*/
751
public String getTraceId();
752
753
/**
754
* Adds tag to log entry
755
* @param tag Tag to add
756
*/
757
public void addTag(String tag);
758
}
759
```
760
761
### Segment
762
763
Source for trace segment data.
764
765
```java { .api }
766
@Source(name = "segment", category = "trace")
767
public class Segment extends ISource {
768
769
@Getter @Setter
770
private String traceId;
771
772
@Getter @Setter
773
private String segmentId;
774
775
@Getter @Setter
776
private String serviceId;
777
778
@Getter @Setter
779
private String serviceInstanceId;
780
781
@Getter @Setter
782
private String endpointName;
783
784
@Getter @Setter
785
private int latency;
786
787
@Getter @Setter
788
private boolean isError;
789
790
@Getter @Setter
791
private List<String> tags;
792
793
@Getter @Setter
794
private byte[] dataBinary;
795
796
@Override
797
public int scope();
798
799
@Override
800
public String getEntityId();
801
802
@Override
803
public void prepare();
804
805
/**
806
* Gets trace segment identifier
807
* @return Segment ID
808
*/
809
public String getSegmentId();
810
811
/**
812
* Checks if segment contains errors
813
* @return True if error occurred
814
*/
815
public boolean isError();
816
}
817
```
818
819
## Infrastructure Sources
820
821
### K8SMetrics
822
823
Source for Kubernetes metrics and telemetry.
824
825
```java { .api }
826
@Source(name = "k8s_metrics", category = "infrastructure")
827
public class K8SMetrics extends ISource {
828
829
@Getter @Setter
830
private String cluster;
831
832
@Getter @Setter
833
private String namespace;
834
835
@Getter @Setter
836
private String workload;
837
838
@Getter @Setter
839
private String pod;
840
841
@Getter @Setter
842
private String container;
843
844
@Getter @Setter
845
private long cpuUsage;
846
847
@Getter @Setter
848
private long memoryUsage;
849
850
@Getter @Setter
851
private Map<String, String> labels;
852
853
@Override
854
public int scope();
855
856
@Override
857
public String getEntityId();
858
859
@Override
860
public void prepare();
861
862
/**
863
* Gets Kubernetes cluster name
864
* @return Cluster name
865
*/
866
public String getCluster();
867
868
/**
869
* Gets Kubernetes namespace
870
* @return Namespace name
871
*/
872
public String getNamespace();
873
}
874
```
875
876
### EnvoyInstanceMetric
877
878
Source for Envoy proxy metrics and telemetry.
879
880
```java { .api }
881
@Source(name = "envoy_instance_metric", category = "infrastructure")
882
public class EnvoyInstanceMetric extends ISource {
883
884
@Getter @Setter
885
private String serviceId;
886
887
@Getter @Setter
888
private String serviceInstanceId;
889
890
@Getter @Setter
891
private long totalRequestsCount;
892
893
@Getter @Setter
894
private long totalConnectionsCount;
895
896
@Getter @Setter
897
private long activeConnectionsCount;
898
899
@Getter @Setter
900
private double cpuUsage;
901
902
@Getter @Setter
903
private double heapMemoryUsed;
904
905
@Override
906
public int scope();
907
908
@Override
909
public String getEntityId();
910
911
@Override
912
public void prepare();
913
}
914
```
915
916
## Source Enums and Constants
917
918
### NodeType
919
920
Enumeration for service node types.
921
922
```java { .api }
923
public enum NodeType {
924
Normal(0), Browser(1), Unknown(2);
925
926
private int value;
927
928
NodeType(int value) {
929
this.value = value;
930
}
931
932
public int value() {
933
return value;
934
}
935
936
public static NodeType valueOf(int value) {
937
for (NodeType nodeType : NodeType.values()) {
938
if (nodeType.value == value) {
939
return nodeType;
940
}
941
}
942
throw new IllegalArgumentException("Unknown NodeType value: " + value);
943
}
944
}
945
```
946
947
### DetectPoint
948
949
Enumeration for detection points in service relationships.
950
951
```java { .api }
952
public enum DetectPoint {
953
CLIENT(0), SERVER(1), PROXY(2);
954
955
private int value;
956
957
DetectPoint(int value) {
958
this.value = value;
959
}
960
961
public int value() {
962
return value;
963
}
964
965
public static DetectPoint valueOf(int value) {
966
for (DetectPoint detectPoint : DetectPoint.values()) {
967
if (detectPoint.value == value) {
968
return detectPoint;
969
}
970
}
971
throw new IllegalArgumentException("Unknown DetectPoint value: " + value);
972
}
973
}
974
```
975
976
### RequestType
977
978
Enumeration for request types.
979
980
```java { .api }
981
public enum RequestType {
982
RPC(0), DATABASE(1), HTTP(2), CACHE(3), MQ(4);
983
984
private int value;
985
986
RequestType(int value) {
987
this.value = value;
988
}
989
990
public int value() {
991
return value;
992
}
993
994
public static RequestType valueOf(int value) {
995
for (RequestType requestType : RequestType.values()) {
996
if (requestType.value == value) {
997
return requestType;
998
}
999
}
1000
throw new IllegalArgumentException("Unknown RequestType value: " + value);
1001
}
1002
}
1003
```
1004
1005
### ContentType
1006
1007
Enumeration for log content types.
1008
1009
```java { .api }
1010
public enum ContentType {
1011
NONE(0), TEXT(1), JSON(2), YAML(3);
1012
1013
private int value;
1014
1015
ContentType(int value) {
1016
this.value = value;
1017
}
1018
1019
public int value() {
1020
return value;
1021
}
1022
1023
public static ContentType valueOf(int value) {
1024
for (ContentType contentType : ContentType.values()) {
1025
if (contentType.value == value) {
1026
return contentType;
1027
}
1028
}
1029
throw new IllegalArgumentException("Unknown ContentType value: " + value);
1030
}
1031
}
1032
```
1033
1034
## Usage Examples
1035
1036
### Implementing Custom Source
1037
1038
```java
1039
@Source(name = "custom_business_metric", category = "business")
1040
@ScopeDeclaration(id = 1000, name = "BusinessMetric") // Custom scope ID
1041
public class CustomBusinessMetric implements ISource {
1042
1043
@Getter @Setter
1044
private String businessUnit;
1045
1046
@Getter @Setter
1047
private String operation;
1048
1049
@Getter @Setter
1050
private double revenue;
1051
1052
@Getter @Setter
1053
private int transactionCount;
1054
1055
@Getter @Setter
1056
private String currency;
1057
1058
private long timeBucket;
1059
private String entityId;
1060
1061
@Override
1062
public int scope() {
1063
return 1000; // Custom scope for business metrics
1064
}
1065
1066
@Override
1067
public long getTimeBucket() {
1068
return timeBucket;
1069
}
1070
1071
@Override
1072
public void setTimeBucket(long timeBucket) {
1073
this.timeBucket = timeBucket;
1074
}
1075
1076
@Override
1077
public String getEntityId() {
1078
return entityId;
1079
}
1080
1081
@Override
1082
public void prepare() {
1083
// Generate entity ID from business unit and operation
1084
this.entityId = businessUnit + ":" + operation;
1085
1086
// Set time bucket if not already set
1087
if (timeBucket == 0) {
1088
timeBucket = TimeBucket.getMinuteTimeBucket(System.currentTimeMillis());
1089
}
1090
}
1091
1092
public double getRevenuePerTransaction() {
1093
return transactionCount > 0 ? revenue / transactionCount : 0.0;
1094
}
1095
}
1096
```
1097
1098
### Creating Custom Source Dispatcher
1099
1100
```java
1101
@Component
1102
public class CustomBusinessMetricDispatcher implements SourceDispatcher<CustomBusinessMetric> {
1103
1104
@Override
1105
public void dispatch(CustomBusinessMetric source) {
1106
// Validate source data
1107
if (source.getBusinessUnit() == null || source.getOperation() == null) {
1108
throw new IllegalArgumentException("Business unit and operation are required");
1109
}
1110
1111
// Prepare source
1112
source.prepare();
1113
1114
// Apply business rules
1115
applyBusinessRules(source);
1116
1117
// Route to metrics processor
1118
MetricsStreamProcessor.getInstance().in(source);
1119
}
1120
1121
private void applyBusinessRules(CustomBusinessMetric source) {
1122
// Example: Convert currency to USD if needed
1123
if (!"USD".equals(source.getCurrency())) {
1124
double exchangeRate = getExchangeRate(source.getCurrency(), "USD");
1125
source.setRevenue(source.getRevenue() * exchangeRate);
1126
source.setCurrency("USD");
1127
}
1128
1129
// Example: Filter out test data
1130
if (source.getBusinessUnit().startsWith("TEST_")) {
1131
throw new IllegalArgumentException("Test data not allowed in production");
1132
}
1133
}
1134
1135
private double getExchangeRate(String fromCurrency, String toCurrency) {
1136
// Implement currency conversion logic
1137
return 1.0; // Placeholder
1138
}
1139
}
1140
```
1141
1142
### Processing Service Sources
1143
1144
```java
1145
public class ServiceSourceProcessor {
1146
1147
private SourceReceiver sourceReceiver;
1148
1149
public void processServiceRegistration(String serviceName, NodeType nodeType,
1150
List<String> layers, Map<String, String> properties) {
1151
1152
// Create service source
1153
Service serviceSource = new Service();
1154
serviceSource.setName(serviceName);
1155
serviceSource.setNodeType(nodeType);
1156
serviceSource.setLayers(layers);
1157
1158
try {
1159
sourceReceiver.receive(serviceSource);
1160
System.out.println("Service registered: " + serviceName);
1161
} catch (IOException e) {
1162
System.err.println("Failed to register service: " + e.getMessage());
1163
}
1164
1165
// Create service metadata source
1166
ServiceMeta metaSource = new ServiceMeta();
1167
metaSource.setName(serviceName);
1168
metaSource.setNodeType(nodeType);
1169
metaSource.setLayers(layers);
1170
1171
JsonObject props = new JsonObject();
1172
for (Map.Entry<String, String> entry : properties.entrySet()) {
1173
props.addProperty(entry.getKey(), entry.getValue());
1174
}
1175
metaSource.setProperties(props);
1176
1177
try {
1178
sourceReceiver.receive(metaSource);
1179
System.out.println("Service metadata registered: " + serviceName);
1180
} catch (IOException e) {
1181
System.err.println("Failed to register service metadata: " + e.getMessage());
1182
}
1183
}
1184
1185
public void processServiceRelation(String sourceService, String destService,
1186
DetectPoint detectPoint, int componentId) {
1187
1188
ServiceRelation relationSource = new ServiceRelation();
1189
relationSource.setSourceServiceName(sourceService);
1190
relationSource.setDestServiceName(destService);
1191
relationSource.setDetectPoint(detectPoint);
1192
relationSource.setComponentId(componentId);
1193
1194
try {
1195
sourceReceiver.receive(relationSource);
1196
System.out.println("Service relation recorded: " + sourceService + " -> " + destService);
1197
} catch (IOException e) {
1198
System.err.println("Failed to record service relation: " + e.getMessage());
1199
}
1200
}
1201
}
1202
```
1203
1204
### Processing Endpoint Sources
1205
1206
```java
1207
public class EndpointSourceProcessor {
1208
1209
private SourceReceiver sourceReceiver;
1210
1211
public void processEndpointCall(String serviceName, String instanceName, String endpointName,
1212
int latency, boolean success, int responseCode, RequestType type) {
1213
1214
// Create endpoint source
1215
Endpoint endpointSource = new Endpoint();
1216
endpointSource.setServiceName(serviceName);
1217
endpointSource.setServiceInstanceName(instanceName);
1218
endpointSource.setName(endpointName);
1219
endpointSource.setLatency(latency);
1220
endpointSource.setStatus(success);
1221
endpointSource.setResponseCode(responseCode);
1222
endpointSource.setType(type);
1223
1224
try {
1225
sourceReceiver.receive(endpointSource);
1226
1227
if (!success) {
1228
System.out.println("Error endpoint call recorded: " + endpointName +
1229
" (" + responseCode + ")");
1230
}
1231
} catch (IOException e) {
1232
System.err.println("Failed to process endpoint call: " + e.getMessage());
1233
}
1234
}
1235
1236
public void processEndpointRelation(String sourceEndpoint, String destEndpoint,
1237
int latency, boolean success, DetectPoint detectPoint) {
1238
1239
EndpointRelation relationSource = new EndpointRelation();
1240
relationSource.setEndpoint(sourceEndpoint);
1241
relationSource.setChildEndpoint(destEndpoint);
1242
relationSource.setRpcLatency(latency);
1243
relationSource.setStatus(success);
1244
relationSource.setDetectPoint(detectPoint);
1245
1246
try {
1247
sourceReceiver.receive(relationSource);
1248
} catch (IOException e) {
1249
System.err.println("Failed to process endpoint relation: " + e.getMessage());
1250
}
1251
}
1252
}
1253
```
1254
1255
### Processing Database and Cache Sources
1256
1257
```java
1258
public class DatabaseCacheSourceProcessor {
1259
1260
private SourceReceiver sourceReceiver;
1261
1262
public void processDatabaseAccess(String databaseType, String operation, String sqlStatement,
1263
int latency, boolean success) {
1264
1265
DatabaseAccess dbSource = new DatabaseAccess();
1266
dbSource.setDatabaseTypeId(databaseType);
1267
dbSource.setOperation(operation);
1268
dbSource.setSqlStatement(sqlStatement);
1269
dbSource.setLatency(latency);
1270
dbSource.setStatus(success);
1271
1272
try {
1273
sourceReceiver.receive(dbSource);
1274
1275
if (latency > 1000) { // Log slow queries
1276
System.out.println("Slow database query detected: " + latency + "ms - " +
1277
sqlStatement.substring(0, Math.min(100, sqlStatement.length())));
1278
}
1279
} catch (IOException e) {
1280
System.err.println("Failed to process database access: " + e.getMessage());
1281
}
1282
}
1283
1284
public void processCacheAccess(String cacheName, String operation, String key,
1285
int latency, boolean success) {
1286
1287
CacheAccess cacheSource = new CacheAccess();
1288
cacheSource.setName(cacheName);
1289
cacheSource.setOperation(operation);
1290
cacheSource.setKey(key);
1291
cacheSource.setLatency(latency);
1292
cacheSource.setStatus(success);
1293
1294
try {
1295
sourceReceiver.receive(cacheSource);
1296
1297
if (!success) {
1298
System.out.println("Cache operation failed: " + operation + " on " + key);
1299
}
1300
} catch (IOException e) {
1301
System.err.println("Failed to process cache access: " + e.getMessage());
1302
}
1303
}
1304
}
1305
```
1306
1307
### Batch Source Processing
1308
1309
```java
1310
public class BatchSourceProcessor {
1311
1312
private SourceReceiver sourceReceiver;
1313
private List<ISource> sourceBatch = new ArrayList<>();
1314
private final int BATCH_SIZE = 100;
1315
1316
public void addSource(ISource source) {
1317
sourceBatch.add(source);
1318
1319
if (sourceBatch.size() >= BATCH_SIZE) {
1320
flushBatch();
1321
}
1322
}
1323
1324
public void flushBatch() {
1325
if (sourceBatch.isEmpty()) {
1326
return;
1327
}
1328
1329
try {
1330
sourceReceiver.receiveBatch(new ArrayList<>(sourceBatch));
1331
System.out.println("Processed batch of " + sourceBatch.size() + " sources");
1332
sourceBatch.clear();
1333
} catch (IOException e) {
1334
System.err.println("Failed to process source batch: " + e.getMessage());
1335
// Optionally retry individual sources
1336
retryIndividualSources();
1337
}
1338
}
1339
1340
private void retryIndividualSources() {
1341
for (ISource source : sourceBatch) {
1342
try {
1343
sourceReceiver.receive(source);
1344
} catch (IOException e) {
1345
System.err.println("Failed to process individual source: " +
1346
source.getClass().getSimpleName() + " - " + e.getMessage());
1347
}
1348
}
1349
sourceBatch.clear();
1350
}
1351
1352
// Call this method periodically to ensure pending sources are processed
1353
public void periodicFlush() {
1354
if (!sourceBatch.isEmpty()) {
1355
flushBatch();
1356
}
1357
}
1358
}
1359
```
1360
1361
## Core Source Types
1362
1363
```java { .api }
1364
/**
1365
* Transmission latency for message queue operations
1366
*/
1367
public class TransmissionLatency {
1368
private long producerTime;
1369
private long consumerTime;
1370
1371
public long getProducerTime();
1372
public void setProducerTime(long producerTime);
1373
public long getConsumerTime();
1374
public void setConsumerTime(long consumerTime);
1375
public long getLatency();
1376
}
1377
1378
/**
1379
* Default scope definitions for built-in source types
1380
*/
1381
public class DefaultScopeDefine {
1382
public static final int SERVICE = 1;
1383
public static final int SERVICE_INSTANCE = 2;
1384
public static final int ENDPOINT = 3;
1385
public static final int SERVICE_RELATION = 4;
1386
public static final int SERVICE_INSTANCE_RELATION = 5;
1387
public static final int ENDPOINT_RELATION = 6;
1388
public static final int DATABASE_ACCESS = 7;
1389
public static final int ALL = 99;
1390
}
1391
1392
/**
1393
* Source processing exception
1394
*/
1395
public class SourceProcessingException extends RuntimeException {
1396
public SourceProcessingException(String message);
1397
public SourceProcessingException(String message, Throwable cause);
1398
}
1399
```