or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

analysis-framework.mdconfiguration.mdindex.mdprofiling.mdquery-services.mdremote-communication.mdsource-processing.mdstorage-layer.md

source-processing.mddocs/

0

# Source Processing

1

2

The SkyWalking source processing system handles telemetry data ingestion, transformation, and routing. It provides the foundation for receiving various types of observability data including traces, metrics, logs, and infrastructure telemetry from different sources and protocols.

3

4

## Core Source Interfaces

5

6

### ISource

7

8

Base interface for all telemetry data sources in the SkyWalking system.

9

10

```java { .api }

11

public interface ISource {

12

13

/**

14

* Gets the scope identifier for this source

15

* @return Scope ID (service, instance, endpoint, etc.)

16

*/

17

int scope();

18

19

/**

20

* Gets the time bucket for metrics aggregation

21

* @return Time bucket value

22

*/

23

long getTimeBucket();

24

25

/**

26

* Sets the time bucket for metrics aggregation

27

* @param timeBucket Time bucket value

28

*/

29

void setTimeBucket(long timeBucket);

30

31

/**

32

* Gets the entity identifier for this source

33

* @return Entity ID (service ID, instance ID, endpoint ID, etc.)

34

*/

35

String getEntityId();

36

37

/**

38

* Internal data field preparation before {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher#dispatch(ISource)}

39

*/

40

default void prepare() {

41

}

42

}

43

```

44

45

### SourceReceiver

46

47

Service interface for receiving and processing telemetry sources.

48

49

```java { .api }

50

public interface SourceReceiver extends Service {

51

52

/**

53

* Receives and processes a telemetry source

54

* @param source The source data to process

55

* @throws IOException If source processing fails

56

*/

57

void receive(ISource source) throws IOException;

58

59

/**

60

* Receives multiple sources in batch

61

* @param sources List of sources to process

62

* @throws IOException If batch processing fails

63

*/

64

void receiveBatch(List<? extends ISource> sources) throws IOException;

65

}

66

```

67

68

### SourceReceiverImpl

69

70

Default implementation of source receiver with dispatching logic.

71

72

```java { .api }

73

public class SourceReceiverImpl implements SourceReceiver {

74

75

private DispatcherManager dispatcherManager;

76

77

@Override

78

public void receive(ISource source) throws IOException;

79

80

@Override

81

public void receiveBatch(List<? extends ISource> sources) throws IOException;

82

83

/**

84

* Sets the dispatcher manager for routing sources

85

* @param dispatcherManager Dispatcher manager instance

86

*/

87

public void setDispatcherManager(DispatcherManager dispatcherManager);

88

89

/**

90

* Validates source before processing

91

* @param source Source to validate

92

* @return True if source is valid

93

*/

94

protected boolean validateSource(ISource source);

95

96

/**

97

* Preprocesses source before dispatching

98

* @param source Source to preprocess

99

*/

100

protected void preprocessSource(ISource source);

101

}

102

```

103

104

## Source Annotations

105

106

### Source

107

108

Annotation to mark classes as telemetry sources for automatic discovery.

109

110

```java { .api }

111

@Target(ElementType.TYPE)

112

@Retention(RetentionPolicy.RUNTIME)

113

public @interface Source {

114

115

/**

116

* Source name for identification

117

* @return Source name

118

*/

119

String name() default "";

120

121

/**

122

* Source category (trace, metric, log, infrastructure)

123

* @return Source category

124

*/

125

String category() default "";

126

}

127

```

128

129

### ScopeDeclaration

130

131

Annotation to declare the scope of source data.

132

133

```java { .api }

134

@Target(ElementType.TYPE)

135

@Retention(RetentionPolicy.RUNTIME)

136

public @interface ScopeDeclaration {

137

138

/**

139

* Scope identifier

140

* @return Scope ID

141

*/

142

int id();

143

144

/**

145

* Scope name

146

* @return Scope name

147

*/

148

String name() default "";

149

}

150

```

151

152

### ScopeDefaultColumn

153

154

Annotation to define default columns for scope entities.

155

156

```java { .api }

157

@Target(ElementType.TYPE)

158

@Retention(RetentionPolicy.RUNTIME)

159

public @interface ScopeDefaultColumn {

160

161

/**

162

* Virtual column name

163

* @return Column name

164

*/

165

String virtualColumnName();

166

167

/**

168

* Field name in the source class

169

* @return Field name

170

*/

171

String fieldName();

172

173

/**

174

* Whether the column requires entity ID

175

* @return True if entity ID required

176

*/

177

boolean requireEntityId() default false;

178

}

179

```

180

181

## Service Sources

182

183

### Service

184

185

Source for service-level telemetry data and metadata.

186

187

```java { .api }

188

@Source(name = "service", category = "service")

189

@ScopeDeclaration(id = DefaultScopeDefine.SERVICE, name = "Service")

190

public class Service extends ISource {

191

192

@Getter @Setter

193

private String name;

194

195

@Getter @Setter

196

private String shortName;

197

198

@Getter @Setter

199

private String group;

200

201

@Getter @Setter

202

private NodeType nodeType;

203

204

@Getter @Setter

205

private List<String> layers;

206

207

@Override

208

public int scope();

209

210

@Override

211

public String getEntityId();

212

213

@Override

214

public void prepare();

215

216

/**

217

* Checks if service is normal (not virtual)

218

* @return True if normal service

219

*/

220

public boolean isNormal();

221

222

/**

223

* Gets service layer information

224

* @return Primary service layer

225

*/

226

public String getLayer();

227

}

228

```

229

230

### ServiceMeta

231

232

Source for service metadata and registration information.

233

234

```java { .api }

235

@Source(name = "service_meta", category = "service")

236

public class ServiceMeta extends ISource {

237

238

@Getter @Setter

239

private String name;

240

241

@Getter @Setter

242

private NodeType nodeType;

243

244

@Getter @Setter

245

private List<String> layers;

246

247

@Getter @Setter

248

private JsonObject properties;

249

250

@Override

251

public int scope();

252

253

@Override

254

public String getEntityId();

255

256

@Override

257

public void prepare();

258

259

/**

260

* Adds property to service metadata

261

* @param key Property key

262

* @param value Property value

263

*/

264

public void addProperty(String key, String value);

265

266

/**

267

* Gets property from service metadata

268

* @param key Property key

269

* @return Property value or null

270

*/

271

public String getProperty(String key);

272

}

273

```

274

275

### ServiceRelation

276

277

Source for service relationship and dependency information.

278

279

```java { .api }

280

@Source(name = "service_relation", category = "relation")

281

@ScopeDeclaration(id = DefaultScopeDefine.SERVICE_RELATION, name = "ServiceRelation")

282

public class ServiceRelation extends ISource {

283

284

@Getter @Setter

285

private String sourceServiceId;

286

287

@Getter @Setter

288

private String destServiceId;

289

290

@Getter @Setter

291

private String sourceServiceName;

292

293

@Getter @Setter

294

private String destServiceName;

295

296

@Getter @Setter

297

private DetectPoint detectPoint;

298

299

@Getter @Setter

300

private int componentId;

301

302

@Override

303

public int scope();

304

305

@Override

306

public String getEntityId();

307

308

@Override

309

public void prepare();

310

311

/**

312

* Gets relation ID for entity identification

313

* @return Service relation ID

314

*/

315

public String getRelationId();

316

}

317

```

318

319

## Instance Sources

320

321

### ServiceInstance

322

323

Source for service instance telemetry data and metadata.

324

325

```java { .api }

326

@Source(name = "service_instance", category = "service")

327

@ScopeDeclaration(id = DefaultScopeDefine.SERVICE_INSTANCE, name = "ServiceInstance")

328

public class ServiceInstance extends ISource {

329

330

@Getter @Setter

331

private String name;

332

333

@Getter @Setter

334

private String serviceId;

335

336

@Getter @Setter

337

private String serviceName;

338

339

@Getter @Setter

340

private JsonObject properties;

341

342

@Override

343

public int scope();

344

345

@Override

346

public String getEntityId();

347

348

@Override

349

public void prepare();

350

351

/**

352

* Gets instance properties

353

* @return Instance properties as JSON

354

*/

355

public JsonObject getProperties();

356

357

/**

358

* Adds instance property

359

* @param key Property key

360

* @param value Property value

361

*/

362

public void addProperty(String key, String value);

363

364

/**

365

* Gets specific instance property

366

* @param key Property key

367

* @return Property value or null

368

*/

369

public String getProperty(String key);

370

}

371

```

372

373

### ServiceInstanceRelation

374

375

Source for service instance relationship data.

376

377

```java { .api }

378

@Source(name = "service_instance_relation", category = "relation")

379

@ScopeDeclaration(id = DefaultScopeDefine.SERVICE_INSTANCE_RELATION, name = "ServiceInstanceRelation")

380

public class ServiceInstanceRelation extends ISource {

381

382

@Getter @Setter

383

private String sourceServiceId;

384

385

@Getter @Setter

386

private String sourceServiceInstanceId;

387

388

@Getter @Setter

389

private String destServiceId;

390

391

@Getter @Setter

392

private String destServiceInstanceId;

393

394

@Getter @Setter

395

private String sourceServiceName;

396

397

@Getter @Setter

398

private String sourceServiceInstanceName;

399

400

@Getter @Setter

401

private String destServiceName;

402

403

@Getter @Setter

404

private String destServiceInstanceName;

405

406

@Getter @Setter

407

private DetectPoint detectPoint;

408

409

@Getter @Setter

410

private int componentId;

411

412

@Override

413

public int scope();

414

415

@Override

416

public String getEntityId();

417

418

@Override

419

public void prepare();

420

}

421

```

422

423

## Endpoint Sources

424

425

### Endpoint

426

427

Source for endpoint telemetry data and performance metrics.

428

429

```java { .api }

430

@Source(name = "endpoint", category = "endpoint")

431

@ScopeDeclaration(id = DefaultScopeDefine.ENDPOINT, name = "Endpoint")

432

public class Endpoint extends ISource {

433

434

@Getter @Setter

435

private String name;

436

437

@Getter @Setter

438

private String serviceId;

439

440

@Getter @Setter

441

private String serviceName;

442

443

@Getter @Setter

444

private String serviceInstanceId;

445

446

@Getter @Setter

447

private String serviceInstanceName;

448

449

@Getter @Setter

450

private int latency;

451

452

@Getter @Setter

453

private boolean status;

454

455

@Getter @Setter

456

private int responseCode;

457

458

@Getter @Setter

459

private RequestType type;

460

461

@Override

462

public int scope();

463

464

@Override

465

public String getEntityId();

466

467

@Override

468

public void prepare();

469

470

/**

471

* Checks if endpoint call was successful

472

* @return True if successful (status == true)

473

*/

474

public boolean isSuccess();

475

476

/**

477

* Gets endpoint latency in milliseconds

478

* @return Latency value

479

*/

480

public int getLatency();

481

}

482

```

483

484

### EndpointMeta

485

486

Source for endpoint metadata and registration information.

487

488

```java { .api }

489

@Source(name = "endpoint_meta", category = "endpoint")

490

public class EndpointMeta extends ISource {

491

492

@Getter @Setter

493

private String endpoint;

494

495

@Getter @Setter

496

private String serviceId;

497

498

@Getter @Setter

499

private String serviceName;

500

501

@Override

502

public int scope();

503

504

@Override

505

public String getEntityId();

506

507

@Override

508

public void prepare();

509

}

510

```

511

512

### EndpointRelation

513

514

Source for endpoint relationship and dependency information.

515

516

```java { .api }

517

@Source(name = "endpoint_relation", category = "relation")

518

@ScopeDeclaration(id = DefaultScopeDefine.ENDPOINT_RELATION, name = "EndpointRelation")

519

public class EndpointRelation extends ISource {

520

521

@Getter @Setter

522

private String endpoint;

523

524

@Getter @Setter

525

private String childEndpoint;

526

527

@Getter @Setter

528

private int rpcLatency;

529

530

@Getter @Setter

531

private boolean status;

532

533

@Getter @Setter

534

private int responseCode;

535

536

@Getter @Setter

537

private RequestType type;

538

539

@Getter @Setter

540

private DetectPoint detectPoint;

541

542

@Getter @Setter

543

private int componentId;

544

545

@Override

546

public int scope();

547

548

@Override

549

public String getEntityId();

550

551

@Override

552

public void prepare();

553

}

554

```

555

556

## Specialized Sources

557

558

### DatabaseAccess

559

560

Source for database operation telemetry data.

561

562

```java { .api }

563

@Source(name = "database_access", category = "database")

564

@ScopeDeclaration(id = DefaultScopeDefine.DATABASE_ACCESS, name = "DatabaseAccess")

565

public class DatabaseAccess extends ISource {

566

567

@Getter @Setter

568

private String databaseTypeId;

569

570

@Getter @Setter

571

private String name;

572

573

@Getter @Setter

574

private int latency;

575

576

@Getter @Setter

577

private boolean status;

578

579

@Getter @Setter

580

private String sqlStatement;

581

582

@Getter @Setter

583

private String operation;

584

585

@Override

586

public int scope();

587

588

@Override

589

public String getEntityId();

590

591

@Override

592

public void prepare();

593

594

/**

595

* Gets database type identifier

596

* @return Database type ID

597

*/

598

public String getDatabaseTypeId();

599

600

/**

601

* Gets SQL statement (may be truncated)

602

* @return SQL statement

603

*/

604

public String getSqlStatement();

605

}

606

```

607

608

### CacheAccess

609

610

Source for cache operation telemetry data.

611

612

```java { .api }

613

@Source(name = "cache_access", category = "cache")

614

public class CacheAccess extends ISource {

615

616

@Getter @Setter

617

private String name;

618

619

@Getter @Setter

620

private int latency;

621

622

@Getter @Setter

623

private boolean status;

624

625

@Getter @Setter

626

private String operation;

627

628

@Getter @Setter

629

private String key;

630

631

@Override

632

public int scope();

633

634

@Override

635

public String getEntityId();

636

637

@Override

638

public void prepare();

639

640

/**

641

* Gets cache operation type

642

* @return Operation (GET, SET, DELETE, etc.)

643

*/

644

public String getOperation();

645

}

646

```

647

648

### MQAccess

649

650

Source for message queue operation telemetry data.

651

652

```java { .api }

653

@Source(name = "mq_access", category = "mq")

654

public class MQAccess extends ISource {

655

656

@Getter @Setter

657

private String broker;

658

659

@Getter @Setter

660

private String topic;

661

662

@Getter @Setter

663

private int latency;

664

665

@Getter @Setter

666

private boolean status;

667

668

@Getter @Setter

669

private String operation;

670

671

@Getter @Setter

672

private TransmissionLatency transmissionLatency;

673

674

@Override

675

public int scope();

676

677

@Override

678

public String getEntityId();

679

680

@Override

681

public void prepare();

682

683

/**

684

* Gets message broker information

685

* @return Broker identifier

686

*/

687

public String getBroker();

688

689

/**

690

* Gets message topic

691

* @return Topic name

692

*/

693

public String getTopic();

694

}

695

```

696

697

### Log

698

699

Source for log data and events.

700

701

```java { .api }

702

@Source(name = "log", category = "log")

703

public class Log extends ISource {

704

705

@Getter @Setter

706

private String serviceId;

707

708

@Getter @Setter

709

private String serviceInstanceId;

710

711

@Getter @Setter

712

private String endpointId;

713

714

@Getter @Setter

715

private String traceId;

716

717

@Getter @Setter

718

private String traceSegmentId;

719

720

@Getter @Setter

721

private int spanId;

722

723

@Getter @Setter

724

private ContentType contentType;

725

726

@Getter @Setter

727

private String content;

728

729

@Getter @Setter

730

private List<String> tags;

731

732

@Override

733

public int scope();

734

735

@Override

736

public String getEntityId();

737

738

@Override

739

public void prepare();

740

741

/**

742

* Gets log content

743

* @return Log content string

744

*/

745

public String getContent();

746

747

/**

748

* Gets associated trace ID

749

* @return Trace ID or null if not associated

750

*/

751

public String getTraceId();

752

753

/**

754

* Adds tag to log entry

755

* @param tag Tag to add

756

*/

757

public void addTag(String tag);

758

}

759

```

760

761

### Segment

762

763

Source for trace segment data.

764

765

```java { .api }

766

@Source(name = "segment", category = "trace")

767

public class Segment extends ISource {

768

769

@Getter @Setter

770

private String traceId;

771

772

@Getter @Setter

773

private String segmentId;

774

775

@Getter @Setter

776

private String serviceId;

777

778

@Getter @Setter

779

private String serviceInstanceId;

780

781

@Getter @Setter

782

private String endpointName;

783

784

@Getter @Setter

785

private int latency;

786

787

@Getter @Setter

788

private boolean isError;

789

790

@Getter @Setter

791

private List<String> tags;

792

793

@Getter @Setter

794

private byte[] dataBinary;

795

796

@Override

797

public int scope();

798

799

@Override

800

public String getEntityId();

801

802

@Override

803

public void prepare();

804

805

/**

806

* Gets trace segment identifier

807

* @return Segment ID

808

*/

809

public String getSegmentId();

810

811

/**

812

* Checks if segment contains errors

813

* @return True if error occurred

814

*/

815

public boolean isError();

816

}

817

```

818

819

## Infrastructure Sources

820

821

### K8SMetrics

822

823

Source for Kubernetes metrics and telemetry.

824

825

```java { .api }

826

@Source(name = "k8s_metrics", category = "infrastructure")

827

public class K8SMetrics extends ISource {

828

829

@Getter @Setter

830

private String cluster;

831

832

@Getter @Setter

833

private String namespace;

834

835

@Getter @Setter

836

private String workload;

837

838

@Getter @Setter

839

private String pod;

840

841

@Getter @Setter

842

private String container;

843

844

@Getter @Setter

845

private long cpuUsage;

846

847

@Getter @Setter

848

private long memoryUsage;

849

850

@Getter @Setter

851

private Map<String, String> labels;

852

853

@Override

854

public int scope();

855

856

@Override

857

public String getEntityId();

858

859

@Override

860

public void prepare();

861

862

/**

863

* Gets Kubernetes cluster name

864

* @return Cluster name

865

*/

866

public String getCluster();

867

868

/**

869

* Gets Kubernetes namespace

870

* @return Namespace name

871

*/

872

public String getNamespace();

873

}

874

```

875

876

### EnvoyInstanceMetric

877

878

Source for Envoy proxy metrics and telemetry.

879

880

```java { .api }

881

@Source(name = "envoy_instance_metric", category = "infrastructure")

882

public class EnvoyInstanceMetric extends ISource {

883

884

@Getter @Setter

885

private String serviceId;

886

887

@Getter @Setter

888

private String serviceInstanceId;

889

890

@Getter @Setter

891

private long totalRequestsCount;

892

893

@Getter @Setter

894

private long totalConnectionsCount;

895

896

@Getter @Setter

897

private long activeConnectionsCount;

898

899

@Getter @Setter

900

private double cpuUsage;

901

902

@Getter @Setter

903

private double heapMemoryUsed;

904

905

@Override

906

public int scope();

907

908

@Override

909

public String getEntityId();

910

911

@Override

912

public void prepare();

913

}

914

```

915

916

## Source Enums and Constants

917

918

### NodeType

919

920

Enumeration for service node types.

921

922

```java { .api }

923

public enum NodeType {

924

Normal(0), Browser(1), Unknown(2);

925

926

private int value;

927

928

NodeType(int value) {

929

this.value = value;

930

}

931

932

public int value() {

933

return value;

934

}

935

936

public static NodeType valueOf(int value) {

937

for (NodeType nodeType : NodeType.values()) {

938

if (nodeType.value == value) {

939

return nodeType;

940

}

941

}

942

throw new IllegalArgumentException("Unknown NodeType value: " + value);

943

}

944

}

945

```

946

947

### DetectPoint

948

949

Enumeration for detection points in service relationships.

950

951

```java { .api }

952

public enum DetectPoint {

953

CLIENT(0), SERVER(1), PROXY(2);

954

955

private int value;

956

957

DetectPoint(int value) {

958

this.value = value;

959

}

960

961

public int value() {

962

return value;

963

}

964

965

public static DetectPoint valueOf(int value) {

966

for (DetectPoint detectPoint : DetectPoint.values()) {

967

if (detectPoint.value == value) {

968

return detectPoint;

969

}

970

}

971

throw new IllegalArgumentException("Unknown DetectPoint value: " + value);

972

}

973

}

974

```

975

976

### RequestType

977

978

Enumeration for request types.

979

980

```java { .api }

981

public enum RequestType {

982

RPC(0), DATABASE(1), HTTP(2), CACHE(3), MQ(4);

983

984

private int value;

985

986

RequestType(int value) {

987

this.value = value;

988

}

989

990

public int value() {

991

return value;

992

}

993

994

public static RequestType valueOf(int value) {

995

for (RequestType requestType : RequestType.values()) {

996

if (requestType.value == value) {

997

return requestType;

998

}

999

}

1000

throw new IllegalArgumentException("Unknown RequestType value: " + value);

1001

}

1002

}

1003

```

1004

1005

### ContentType

1006

1007

Enumeration for log content types.

1008

1009

```java { .api }

1010

public enum ContentType {

1011

NONE(0), TEXT(1), JSON(2), YAML(3);

1012

1013

private int value;

1014

1015

ContentType(int value) {

1016

this.value = value;

1017

}

1018

1019

public int value() {

1020

return value;

1021

}

1022

1023

public static ContentType valueOf(int value) {

1024

for (ContentType contentType : ContentType.values()) {

1025

if (contentType.value == value) {

1026

return contentType;

1027

}

1028

}

1029

throw new IllegalArgumentException("Unknown ContentType value: " + value);

1030

}

1031

}

1032

```

1033

1034

## Usage Examples

1035

1036

### Implementing Custom Source

1037

1038

```java

1039

@Source(name = "custom_business_metric", category = "business")

1040

@ScopeDeclaration(id = 1000, name = "BusinessMetric") // Custom scope ID

1041

public class CustomBusinessMetric implements ISource {

1042

1043

@Getter @Setter

1044

private String businessUnit;

1045

1046

@Getter @Setter

1047

private String operation;

1048

1049

@Getter @Setter

1050

private double revenue;

1051

1052

@Getter @Setter

1053

private int transactionCount;

1054

1055

@Getter @Setter

1056

private String currency;

1057

1058

private long timeBucket;

1059

private String entityId;

1060

1061

@Override

1062

public int scope() {

1063

return 1000; // Custom scope for business metrics

1064

}

1065

1066

@Override

1067

public long getTimeBucket() {

1068

return timeBucket;

1069

}

1070

1071

@Override

1072

public void setTimeBucket(long timeBucket) {

1073

this.timeBucket = timeBucket;

1074

}

1075

1076

@Override

1077

public String getEntityId() {

1078

return entityId;

1079

}

1080

1081

@Override

1082

public void prepare() {

1083

// Generate entity ID from business unit and operation

1084

this.entityId = businessUnit + ":" + operation;

1085

1086

// Set time bucket if not already set

1087

if (timeBucket == 0) {

1088

timeBucket = TimeBucket.getMinuteTimeBucket(System.currentTimeMillis());

1089

}

1090

}

1091

1092

public double getRevenuePerTransaction() {

1093

return transactionCount > 0 ? revenue / transactionCount : 0.0;

1094

}

1095

}

1096

```

1097

1098

### Creating Custom Source Dispatcher

1099

1100

```java

1101

@Component

1102

public class CustomBusinessMetricDispatcher implements SourceDispatcher<CustomBusinessMetric> {

1103

1104

@Override

1105

public void dispatch(CustomBusinessMetric source) {

1106

// Validate source data

1107

if (source.getBusinessUnit() == null || source.getOperation() == null) {

1108

throw new IllegalArgumentException("Business unit and operation are required");

1109

}

1110

1111

// Prepare source

1112

source.prepare();

1113

1114

// Apply business rules

1115

applyBusinessRules(source);

1116

1117

// Route to metrics processor

1118

MetricsStreamProcessor.getInstance().in(source);

1119

}

1120

1121

private void applyBusinessRules(CustomBusinessMetric source) {

1122

// Example: Convert currency to USD if needed

1123

if (!"USD".equals(source.getCurrency())) {

1124

double exchangeRate = getExchangeRate(source.getCurrency(), "USD");

1125

source.setRevenue(source.getRevenue() * exchangeRate);

1126

source.setCurrency("USD");

1127

}

1128

1129

// Example: Filter out test data

1130

if (source.getBusinessUnit().startsWith("TEST_")) {

1131

throw new IllegalArgumentException("Test data not allowed in production");

1132

}

1133

}

1134

1135

private double getExchangeRate(String fromCurrency, String toCurrency) {

1136

// Implement currency conversion logic

1137

return 1.0; // Placeholder

1138

}

1139

}

1140

```

1141

1142

### Processing Service Sources

1143

1144

```java

1145

public class ServiceSourceProcessor {

1146

1147

private SourceReceiver sourceReceiver;

1148

1149

public void processServiceRegistration(String serviceName, NodeType nodeType,

1150

List<String> layers, Map<String, String> properties) {

1151

1152

// Create service source

1153

Service serviceSource = new Service();

1154

serviceSource.setName(serviceName);

1155

serviceSource.setNodeType(nodeType);

1156

serviceSource.setLayers(layers);

1157

1158

try {

1159

sourceReceiver.receive(serviceSource);

1160

System.out.println("Service registered: " + serviceName);

1161

} catch (IOException e) {

1162

System.err.println("Failed to register service: " + e.getMessage());

1163

}

1164

1165

// Create service metadata source

1166

ServiceMeta metaSource = new ServiceMeta();

1167

metaSource.setName(serviceName);

1168

metaSource.setNodeType(nodeType);

1169

metaSource.setLayers(layers);

1170

1171

JsonObject props = new JsonObject();

1172

for (Map.Entry<String, String> entry : properties.entrySet()) {

1173

props.addProperty(entry.getKey(), entry.getValue());

1174

}

1175

metaSource.setProperties(props);

1176

1177

try {

1178

sourceReceiver.receive(metaSource);

1179

System.out.println("Service metadata registered: " + serviceName);

1180

} catch (IOException e) {

1181

System.err.println("Failed to register service metadata: " + e.getMessage());

1182

}

1183

}

1184

1185

public void processServiceRelation(String sourceService, String destService,

1186

DetectPoint detectPoint, int componentId) {

1187

1188

ServiceRelation relationSource = new ServiceRelation();

1189

relationSource.setSourceServiceName(sourceService);

1190

relationSource.setDestServiceName(destService);

1191

relationSource.setDetectPoint(detectPoint);

1192

relationSource.setComponentId(componentId);

1193

1194

try {

1195

sourceReceiver.receive(relationSource);

1196

System.out.println("Service relation recorded: " + sourceService + " -> " + destService);

1197

} catch (IOException e) {

1198

System.err.println("Failed to record service relation: " + e.getMessage());

1199

}

1200

}

1201

}

1202

```

1203

1204

### Processing Endpoint Sources

1205

1206

```java

1207

public class EndpointSourceProcessor {

1208

1209

private SourceReceiver sourceReceiver;

1210

1211

public void processEndpointCall(String serviceName, String instanceName, String endpointName,

1212

int latency, boolean success, int responseCode, RequestType type) {

1213

1214

// Create endpoint source

1215

Endpoint endpointSource = new Endpoint();

1216

endpointSource.setServiceName(serviceName);

1217

endpointSource.setServiceInstanceName(instanceName);

1218

endpointSource.setName(endpointName);

1219

endpointSource.setLatency(latency);

1220

endpointSource.setStatus(success);

1221

endpointSource.setResponseCode(responseCode);

1222

endpointSource.setType(type);

1223

1224

try {

1225

sourceReceiver.receive(endpointSource);

1226

1227

if (!success) {

1228

System.out.println("Error endpoint call recorded: " + endpointName +

1229

" (" + responseCode + ")");

1230

}

1231

} catch (IOException e) {

1232

System.err.println("Failed to process endpoint call: " + e.getMessage());

1233

}

1234

}

1235

1236

public void processEndpointRelation(String sourceEndpoint, String destEndpoint,

1237

int latency, boolean success, DetectPoint detectPoint) {

1238

1239

EndpointRelation relationSource = new EndpointRelation();

1240

relationSource.setEndpoint(sourceEndpoint);

1241

relationSource.setChildEndpoint(destEndpoint);

1242

relationSource.setRpcLatency(latency);

1243

relationSource.setStatus(success);

1244

relationSource.setDetectPoint(detectPoint);

1245

1246

try {

1247

sourceReceiver.receive(relationSource);

1248

} catch (IOException e) {

1249

System.err.println("Failed to process endpoint relation: " + e.getMessage());

1250

}

1251

}

1252

}

1253

```

1254

1255

### Processing Database and Cache Sources

1256

1257

```java

1258

public class DatabaseCacheSourceProcessor {

1259

1260

private SourceReceiver sourceReceiver;

1261

1262

public void processDatabaseAccess(String databaseType, String operation, String sqlStatement,

1263

int latency, boolean success) {

1264

1265

DatabaseAccess dbSource = new DatabaseAccess();

1266

dbSource.setDatabaseTypeId(databaseType);

1267

dbSource.setOperation(operation);

1268

dbSource.setSqlStatement(sqlStatement);

1269

dbSource.setLatency(latency);

1270

dbSource.setStatus(success);

1271

1272

try {

1273

sourceReceiver.receive(dbSource);

1274

1275

if (latency > 1000) { // Log slow queries

1276

System.out.println("Slow database query detected: " + latency + "ms - " +

1277

sqlStatement.substring(0, Math.min(100, sqlStatement.length())));

1278

}

1279

} catch (IOException e) {

1280

System.err.println("Failed to process database access: " + e.getMessage());

1281

}

1282

}

1283

1284

public void processCacheAccess(String cacheName, String operation, String key,

1285

int latency, boolean success) {

1286

1287

CacheAccess cacheSource = new CacheAccess();

1288

cacheSource.setName(cacheName);

1289

cacheSource.setOperation(operation);

1290

cacheSource.setKey(key);

1291

cacheSource.setLatency(latency);

1292

cacheSource.setStatus(success);

1293

1294

try {

1295

sourceReceiver.receive(cacheSource);

1296

1297

if (!success) {

1298

System.out.println("Cache operation failed: " + operation + " on " + key);

1299

}

1300

} catch (IOException e) {

1301

System.err.println("Failed to process cache access: " + e.getMessage());

1302

}

1303

}

1304

}

1305

```

1306

1307

### Batch Source Processing

1308

1309

```java

1310

public class BatchSourceProcessor {

1311

1312

private SourceReceiver sourceReceiver;

1313

private List<ISource> sourceBatch = new ArrayList<>();

1314

private final int BATCH_SIZE = 100;

1315

1316

public void addSource(ISource source) {

1317

sourceBatch.add(source);

1318

1319

if (sourceBatch.size() >= BATCH_SIZE) {

1320

flushBatch();

1321

}

1322

}

1323

1324

public void flushBatch() {

1325

if (sourceBatch.isEmpty()) {

1326

return;

1327

}

1328

1329

try {

1330

sourceReceiver.receiveBatch(new ArrayList<>(sourceBatch));

1331

System.out.println("Processed batch of " + sourceBatch.size() + " sources");

1332

sourceBatch.clear();

1333

} catch (IOException e) {

1334

System.err.println("Failed to process source batch: " + e.getMessage());

1335

// Optionally retry individual sources

1336

retryIndividualSources();

1337

}

1338

}

1339

1340

private void retryIndividualSources() {

1341

for (ISource source : sourceBatch) {

1342

try {

1343

sourceReceiver.receive(source);

1344

} catch (IOException e) {

1345

System.err.println("Failed to process individual source: " +

1346

source.getClass().getSimpleName() + " - " + e.getMessage());

1347

}

1348

}

1349

sourceBatch.clear();

1350

}

1351

1352

// Call this method periodically to ensure pending sources are processed

1353

public void periodicFlush() {

1354

if (!sourceBatch.isEmpty()) {

1355

flushBatch();

1356

}

1357

}

1358

}

1359

```

1360

1361

## Core Source Types

1362

1363

```java { .api }

1364

/**

1365

* Transmission latency for message queue operations

1366

*/

1367

public class TransmissionLatency {

1368

private long producerTime;

1369

private long consumerTime;

1370

1371

public long getProducerTime();

1372

public void setProducerTime(long producerTime);

1373

public long getConsumerTime();

1374

public void setConsumerTime(long consumerTime);

1375

public long getLatency();

1376

}

1377

1378

/**

1379

* Default scope definitions for built-in source types

1380

*/

1381

public class DefaultScopeDefine {

1382

public static final int SERVICE = 1;

1383

public static final int SERVICE_INSTANCE = 2;

1384

public static final int ENDPOINT = 3;

1385

public static final int SERVICE_RELATION = 4;

1386

public static final int SERVICE_INSTANCE_RELATION = 5;

1387

public static final int ENDPOINT_RELATION = 6;

1388

public static final int DATABASE_ACCESS = 7;

1389

public static final int ALL = 99;

1390

}

1391

1392

/**

1393

* Source processing exception

1394

*/

1395

public class SourceProcessingException extends RuntimeException {

1396

public SourceProcessingException(String message);

1397

public SourceProcessingException(String message, Throwable cause);

1398

}

1399

```