or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

analysis-framework.mdconfiguration.mdindex.mdprofiling.mdquery-services.mdremote-communication.mdsource-processing.mdstorage-layer.md

remote-communication.mddocs/

0

# Remote Communication

1

2

The SkyWalking remote communication layer enables distributed processing across multiple OAP (Observability Analysis Platform) nodes through gRPC-based clustering. It provides load balancing, routing strategies, and serialization mechanisms for horizontal scaling and high availability deployments.

3

4

## Remote Service Infrastructure

5

6

### RemoteSenderService

7

8

gRPC client service for inter-node communication with configurable routing strategies.

9

10

```java { .api }

11

public class RemoteSenderService implements Service {

12

13

/**

14

* Sends stream data to remote OAP node with routing strategy

15

* @param nextWorkName Target worker name on remote node

16

* @param streamData Stream data to transmit

17

* @param selector Routing selector for node selection

18

* @throws RemoteException If transmission fails

19

*/

20

public void send(String nextWorkName, StreamData streamData, Selector selector)

21

throws RemoteException;

22

23

/**

24

* Sends data to specific remote address

25

* @param remoteAddress Target remote address

26

* @param nextWorkName Target worker name

27

* @param streamData Stream data to transmit

28

* @throws RemoteException If transmission fails

29

*/

30

public void send(RemoteAddress remoteAddress, String nextWorkName, StreamData streamData)

31

throws RemoteException;

32

33

/**

34

* Sends data with timeout configuration

35

* @param nextWorkName Target worker name

36

* @param streamData Stream data to transmit

37

* @param selector Routing selector

38

* @param timeoutSeconds Transmission timeout in seconds

39

* @throws RemoteException If transmission fails or times out

40

*/

41

public void sendWithTimeout(String nextWorkName, StreamData streamData,

42

Selector selector, int timeoutSeconds) throws RemoteException;

43

44

/**

45

* Gets available remote addresses

46

* @return List of configured remote addresses

47

*/

48

public List<RemoteAddress> getRemoteAddresses();

49

50

/**

51

* Checks if remote service is available

52

* @param remoteAddress Remote address to check

53

* @return True if remote service is reachable

54

*/

55

public boolean isAvailable(RemoteAddress remoteAddress);

56

}

57

```

58

59

### RemoteServiceHandler

60

61

Handles incoming remote service requests and routes them to appropriate workers.

62

63

```java { .api }

64

public class RemoteServiceHandler {

65

66

/**

67

* Handles incoming remote stream data

68

* @param streamData Received stream data

69

* @param nextWorkerName Target worker name for processing

70

* @throws RemoteException If handling fails

71

*/

72

public void handle(StreamData streamData, String nextWorkerName) throws RemoteException;

73

74

/**

75

* Registers worker for remote request handling

76

* @param workerName Worker identifier

77

* @param worker Worker instance to handle requests

78

*/

79

public void registerWorker(String workerName, AbstractWorker<?> worker);

80

81

/**

82

* Unregisters worker from remote handling

83

* @param workerName Worker identifier to remove

84

*/

85

public void unregisterWorker(String workerName);

86

87

/**

88

* Gets registered worker by name

89

* @param workerName Worker identifier

90

* @return Worker instance or null if not found

91

*/

92

public AbstractWorker<?> getWorker(String workerName);

93

}

94

```

95

96

## Routing and Selection

97

98

### Selector Interface

99

100

Base interface for routing strategy selection.

101

102

```java { .api }

103

public interface Selector {

104

105

/**

106

* Selects remote address based on routing strategy

107

* @param remoteAddresses Available remote addresses

108

* @param data Data being routed (for hash-based selection)

109

* @return Selected remote address

110

*/

111

RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);

112

113

/**

114

* Gets selector type identifier

115

* @return Selector type name

116

*/

117

String getType();

118

}

119

```

120

121

### HashCodeSelector

122

123

Routes data based on hash code for consistent routing.

124

125

```java { .api }

126

public class HashCodeSelector implements Selector {

127

128

/**

129

* Selects remote address using data hash code modulo

130

* @param remoteAddresses Available remote addresses

131

* @param data Data to route (hash code used for selection)

132

* @return Selected address based on hash distribution

133

*/

134

@Override

135

public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);

136

137

/**

138

* Gets hash code from stream data for routing

139

* @param data Stream data

140

* @return Hash code for routing decision

141

*/

142

protected int getHashCode(StreamData data);

143

}

144

```

145

146

### ForeverFirstSelector

147

148

Always selects the first available remote address.

149

150

```java { .api }

151

public class ForeverFirstSelector implements Selector {

152

153

/**

154

* Always selects first address from list

155

* @param remoteAddresses Available remote addresses

156

* @param data Data being routed (ignored)

157

* @return First remote address in list

158

*/

159

@Override

160

public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);

161

}

162

```

163

164

### RollingSelector

165

166

Round-robin selection across available remote addresses.

167

168

```java { .api }

169

public class RollingSelector implements Selector {

170

171

private AtomicInteger index;

172

173

/**

174

* Selects remote address using round-robin strategy

175

* @param remoteAddresses Available remote addresses

176

* @param data Data being routed (ignored)

177

* @return Next address in round-robin sequence

178

*/

179

@Override

180

public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);

181

182

/**

183

* Resets rolling counter

184

*/

185

public void reset();

186

}

187

```

188

189

## Remote Data Interfaces

190

191

### Serializable Interface

192

193

Marker interface for remote-serializable data.

194

195

```java { .api }

196

public interface Serializable {

197

198

/**

199

* Serializes object to byte array for remote transmission

200

* @return Serialized byte array

201

* @throws SerializationException If serialization fails

202

*/

203

byte[] serialize() throws SerializationException;

204

205

/**

206

* Gets serialization version for compatibility

207

* @return Serialization version number

208

*/

209

int getSerializationVersion();

210

}

211

```

212

213

### Deserializable Interface

214

215

Marker interface for remote-deserializable data.

216

217

```java { .api }

218

public interface Deserializable {

219

220

/**

221

* Deserializes object from byte array

222

* @param data Serialized byte array

223

* @throws DeserializationException If deserialization fails

224

*/

225

void deserialize(byte[] data) throws DeserializationException;

226

227

/**

228

* Gets deserialization version for compatibility

229

* @return Deserialization version number

230

*/

231

int getDeserializationVersion();

232

233

/**

234

* Checks if version is compatible for deserialization

235

* @param version Incoming serialization version

236

* @return True if version is compatible

237

*/

238

boolean isCompatible(int version);

239

}

240

```

241

242

### StreamData

243

244

Base class for data transmitted between remote nodes.

245

246

```java { .api }

247

public abstract class StreamData implements Serializable, Deserializable {

248

249

protected long timestamp;

250

protected String remoteAddress;

251

252

/**

253

* Gets data timestamp

254

* @return Timestamp in milliseconds

255

*/

256

public long getTimestamp();

257

258

/**

259

* Sets data timestamp

260

* @param timestamp Timestamp in milliseconds

261

*/

262

public void setTimestamp(long timestamp);

263

264

/**

265

* Gets originating remote address

266

* @return Remote address string

267

*/

268

public String getRemoteAddress();

269

270

/**

271

* Sets originating remote address

272

* @param remoteAddress Remote address string

273

*/

274

public void setRemoteAddress(String remoteAddress);

275

276

/**

277

* Gets unique identifier for routing

278

* @return String identifier for consistent routing

279

*/

280

public abstract String id();

281

282

/**

283

* Gets stream data type identifier

284

* @return Data type for worker routing

285

*/

286

public abstract int remoteHashCode();

287

}

288

```

289

290

## Remote Configuration

291

292

### RemoteAddress

293

294

Represents remote OAP node address configuration.

295

296

```java { .api }

297

public class RemoteAddress {

298

299

private String host;

300

private int port;

301

private boolean selfAddress;

302

303

/**

304

* Creates remote address configuration

305

* @param host Remote host address

306

* @param port Remote port number

307

*/

308

public RemoteAddress(String host, int port);

309

310

/**

311

* Creates remote address with self-identification

312

* @param host Remote host address

313

* @param port Remote port number

314

* @param selfAddress True if this represents current node

315

*/

316

public RemoteAddress(String host, int port, boolean selfAddress);

317

318

/**

319

* Gets host address

320

* @return Host string

321

*/

322

public String getHost();

323

324

/**

325

* Gets port number

326

* @return Port number

327

*/

328

public int getPort();

329

330

/**

331

* Checks if this is current node address

332

* @return True if self address

333

*/

334

public boolean isSelfAddress();

335

336

/**

337

* Gets full address string

338

* @return "host:port" format

339

*/

340

public String getAddress();

341

342

@Override

343

public boolean equals(Object obj);

344

345

@Override

346

public int hashCode();

347

348

@Override

349

public String toString();

350

}

351

```

352

353

### RemoteConfiguration

354

355

Configuration for remote service connectivity.

356

357

```java { .api }

358

public class RemoteConfiguration {

359

360

private List<RemoteAddress> remoteAddresses;

361

private int connectionTimeout;

362

private int requestTimeout;

363

private int maxRetries;

364

private boolean enableHeartbeat;

365

private int heartbeatInterval;

366

367

/**

368

* Gets configured remote addresses

369

* @return List of remote addresses

370

*/

371

public List<RemoteAddress> getRemoteAddresses();

372

373

/**

374

* Sets remote addresses

375

* @param remoteAddresses List of remote addresses

376

*/

377

public void setRemoteAddresses(List<RemoteAddress> remoteAddresses);

378

379

/**

380

* Gets connection timeout

381

* @return Connection timeout in milliseconds

382

*/

383

public int getConnectionTimeout();

384

385

/**

386

* Gets request timeout

387

* @return Request timeout in milliseconds

388

*/

389

public int getRequestTimeout();

390

391

/**

392

* Gets maximum retry attempts

393

* @return Maximum retries

394

*/

395

public int getMaxRetries();

396

397

/**

398

* Checks if heartbeat is enabled

399

* @return True if heartbeat enabled

400

*/

401

public boolean isEnableHeartbeat();

402

403

/**

404

* Gets heartbeat interval

405

* @return Heartbeat interval in seconds

406

*/

407

public int getHeartbeatInterval();

408

}

409

```

410

411

## Remote Exceptions

412

413

### RemoteException

414

415

Base exception for remote communication errors.

416

417

```java { .api }

418

public class RemoteException extends Exception {

419

420

/**

421

* Creates remote exception with message

422

* @param message Error message

423

*/

424

public RemoteException(String message);

425

426

/**

427

* Creates remote exception with message and cause

428

* @param message Error message

429

* @param cause Underlying cause

430

*/

431

public RemoteException(String message, Throwable cause);

432

}

433

```

434

435

### SerializationException

436

437

Exception for data serialization errors.

438

439

```java { .api }

440

public class SerializationException extends RemoteException {

441

442

/**

443

* Creates serialization exception

444

* @param message Error message

445

*/

446

public SerializationException(String message);

447

448

/**

449

* Creates serialization exception with cause

450

* @param message Error message

451

* @param cause Underlying cause

452

*/

453

public SerializationException(String message, Throwable cause);

454

}

455

```

456

457

### DeserializationException

458

459

Exception for data deserialization errors.

460

461

```java { .api }

462

public class DeserializationException extends RemoteException {

463

464

/**

465

* Creates deserialization exception

466

* @param message Error message

467

*/

468

public DeserializationException(String message);

469

470

/**

471

* Creates deserialization exception with cause

472

* @param message Error message

473

* @param cause Underlying cause

474

*/

475

public DeserializationException(String message, Throwable cause);

476

}

477

```

478

479

## gRPC Integration

480

481

### RemoteServiceGrpc

482

483

gRPC service definitions for remote communication.

484

485

```java { .api }

486

public class RemoteServiceGrpc {

487

488

/**

489

* gRPC stub for remote service calls

490

*/

491

public static class RemoteServiceStub {

492

493

/**

494

* Sends stream data to remote node

495

* @param request Stream data request

496

* @param responseObserver Response observer for async handling

497

*/

498

public void call(StreamDataRequest request,

499

StreamObserver<StreamDataResponse> responseObserver);

500

}

501

502

/**

503

* gRPC blocking stub for synchronous calls

504

*/

505

public static class RemoteServiceBlockingStub {

506

507

/**

508

* Sends stream data synchronously

509

* @param request Stream data request

510

* @return Stream data response

511

*/

512

public StreamDataResponse call(StreamDataRequest request);

513

}

514

515

/**

516

* Service implementation base class

517

*/

518

public static abstract class RemoteServiceImplBase implements BindableService {

519

520

/**

521

* Handles incoming remote calls

522

* @param request Stream data request

523

* @param responseObserver Response observer

524

*/

525

public abstract void call(StreamDataRequest request,

526

StreamObserver<StreamDataResponse> responseObserver);

527

}

528

}

529

```

530

531

## Usage Examples

532

533

### Setting up Remote Communication

534

535

```java

536

// Configure remote addresses

537

List<RemoteAddress> remoteAddresses = Arrays.asList(

538

new RemoteAddress("oap-node-1", 11800),

539

new RemoteAddress("oap-node-2", 11800),

540

new RemoteAddress("oap-node-3", 11800)

541

);

542

543

RemoteConfiguration config = new RemoteConfiguration();

544

config.setRemoteAddresses(remoteAddresses);

545

config.setConnectionTimeout(5000);

546

config.setRequestTimeout(10000);

547

config.setMaxRetries(3);

548

config.setEnableHeartbeat(true);

549

config.setHeartbeatInterval(30);

550

551

// Initialize remote sender service

552

RemoteSenderService remoteSender = new RemoteSenderService();

553

remoteSender.initialize(config);

554

```

555

556

### Sending Data with Different Routing Strategies

557

558

```java

559

// Hash-based routing for consistent distribution

560

Selector hashSelector = new HashCodeSelector();

561

remoteSender.send("MetricsAggregateWorker", metricsData, hashSelector);

562

563

// Round-robin routing for load balancing

564

Selector rollingSelector = new RollingSelector();

565

remoteSender.send("RecordPersistentWorker", recordData, rollingSelector);

566

567

// Always send to first available node

568

Selector firstSelector = new ForeverFirstSelector();

569

remoteSender.send("ManagementWorker", managementData, firstSelector);

570

571

// Send to specific remote address

572

RemoteAddress specificNode = new RemoteAddress("oap-primary", 11800);

573

remoteSender.send(specificNode, "PriorityWorker", criticalData);

574

```

575

576

### Implementing Custom Stream Data

577

578

```java

579

public class CustomTelemetryData extends StreamData {

580

581

private String serviceName;

582

private String operationName;

583

private long duration;

584

private Map<String, String> tags;

585

586

@Override

587

public String id() {

588

// Create unique ID for routing consistency

589

return serviceName + ":" + operationName;

590

}

591

592

@Override

593

public int remoteHashCode() {

594

// Hash code for routing decisions

595

return Objects.hash(serviceName, operationName);

596

}

597

598

@Override

599

public byte[] serialize() throws SerializationException {

600

try {

601

// Serialize to protobuf or other format

602

ByteArrayOutputStream baos = new ByteArrayOutputStream();

603

DataOutputStream dos = new DataOutputStream(baos);

604

605

dos.writeUTF(serviceName);

606

dos.writeUTF(operationName);

607

dos.writeLong(duration);

608

dos.writeInt(tags.size());

609

610

for (Map.Entry<String, String> entry : tags.entrySet()) {

611

dos.writeUTF(entry.getKey());

612

dos.writeUTF(entry.getValue());

613

}

614

615

return baos.toByteArray();

616

} catch (IOException e) {

617

throw new SerializationException("Failed to serialize custom telemetry data", e);

618

}

619

}

620

621

@Override

622

public void deserialize(byte[] data) throws DeserializationException {

623

try {

624

ByteArrayInputStream bais = new ByteArrayInputStream(data);

625

DataInputStream dis = new DataInputStream(bais);

626

627

this.serviceName = dis.readUTF();

628

this.operationName = dis.readUTF();

629

this.duration = dis.readLong();

630

631

int tagCount = dis.readInt();

632

this.tags = new HashMap<>();

633

634

for (int i = 0; i < tagCount; i++) {

635

String key = dis.readUTF();

636

String value = dis.readUTF();

637

tags.put(key, value);

638

}

639

} catch (IOException e) {

640

throw new DeserializationException("Failed to deserialize custom telemetry data", e);

641

}

642

}

643

644

@Override

645

public int getSerializationVersion() {

646

return 1;

647

}

648

649

@Override

650

public int getDeserializationVersion() {

651

return 1;

652

}

653

654

@Override

655

public boolean isCompatible(int version) {

656

return version <= getDeserializationVersion();

657

}

658

659

// Getters and setters

660

public String getServiceName() { return serviceName; }

661

public void setServiceName(String serviceName) { this.serviceName = serviceName; }

662

663

public String getOperationName() { return operationName; }

664

public void setOperationName(String operationName) { this.operationName = operationName; }

665

666

public long getDuration() { return duration; }

667

public void setDuration(long duration) { this.duration = duration; }

668

669

public Map<String, String> getTags() { return tags; }

670

public void setTags(Map<String, String> tags) { this.tags = tags; }

671

}

672

```

673

674

### Implementing Custom Selector

675

676

```java

677

public class GeographicSelector implements Selector {

678

679

private final String preferredRegion;

680

private final Map<RemoteAddress, String> addressRegions;

681

682

public GeographicSelector(String preferredRegion,

683

Map<RemoteAddress, String> addressRegions) {

684

this.preferredRegion = preferredRegion;

685

this.addressRegions = addressRegions;

686

}

687

688

@Override

689

public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data) {

690

// First, try to find addresses in preferred region

691

List<RemoteAddress> preferredAddresses = remoteAddresses.stream()

692

.filter(addr -> preferredRegion.equals(addressRegions.get(addr)))

693

.collect(Collectors.toList());

694

695

if (!preferredAddresses.isEmpty()) {

696

// Use hash-based selection within preferred region

697

int hash = Math.abs(data.remoteHashCode());

698

int index = hash % preferredAddresses.size();

699

return preferredAddresses.get(index);

700

}

701

702

// Fall back to any available address

703

if (!remoteAddresses.isEmpty()) {

704

int hash = Math.abs(data.remoteHashCode());

705

int index = hash % remoteAddresses.size();

706

return remoteAddresses.get(index);

707

}

708

709

return null;

710

}

711

712

@Override

713

public String getType() {

714

return "geographic";

715

}

716

}

717

```

718

719

### Handling Remote Service Requests

720

721

```java

722

@Component

723

public class CustomRemoteServiceHandler extends RemoteServiceHandler {

724

725

@Override

726

public void handle(StreamData streamData, String nextWorkerName) throws RemoteException {

727

try {

728

// Validate stream data

729

if (streamData == null) {

730

throw new RemoteException("Received null stream data");

731

}

732

733

// Check if worker exists

734

AbstractWorker<?> worker = getWorker(nextWorkerName);

735

if (worker == null) {

736

throw new RemoteException("Unknown worker: " + nextWorkerName);

737

}

738

739

// Log incoming request

740

logger.info("Handling remote request for worker: {} from address: {}",

741

nextWorkerName, streamData.getRemoteAddress());

742

743

// Route to appropriate worker

744

super.handle(streamData, nextWorkerName);

745

746

} catch (Exception e) {

747

logger.error("Failed to handle remote request", e);

748

throw new RemoteException("Request handling failed", e);

749

}

750

}

751

752

public void registerCustomWorkers() {

753

// Register custom workers for remote handling

754

registerWorker("CustomMetricsWorker", new CustomMetricsWorker());

755

registerWorker("CustomRecordWorker", new CustomRecordWorker());

756

registerWorker("CustomAnalysisWorker", new CustomAnalysisWorker());

757

}

758

}

759

```

760

761

## Core Remote Types

762

763

```java { .api }

764

/**

765

* gRPC request for stream data transmission

766

*/

767

public class StreamDataRequest {

768

private String workerName;

769

private byte[] streamData;

770

private String dataType;

771

772

public String getWorkerName();

773

public byte[] getStreamData();

774

public String getDataType();

775

}

776

777

/**

778

* gRPC response for stream data transmission

779

*/

780

public class StreamDataResponse {

781

private boolean success;

782

private String errorMessage;

783

784

public boolean isSuccess();

785

public String getErrorMessage();

786

}

787

788

/**

789

* Remote module definition

790

*/

791

public class RemoteModule extends ModuleDefine {

792

public static final String NAME = "remote";

793

794

@Override

795

public String name();

796

797

@Override

798

public Class[] services();

799

}

800

801

/**

802

* Load balancing strategy enumeration

803

*/

804

public enum LoadBalanceStrategy {

805

HASH_CODE, ROLLING, FIRST_AVAILABLE, CUSTOM

806

}

807

```