0
# Remote Communication
1
2
The SkyWalking remote communication layer enables distributed processing across multiple OAP (Observability Analysis Platform) nodes through gRPC-based clustering. It provides load balancing, routing strategies, and serialization mechanisms for horizontal scaling and high availability deployments.
3
4
## Remote Service Infrastructure
5
6
### RemoteSenderService
7
8
gRPC client service for inter-node communication with configurable routing strategies.
9
10
```java { .api }
11
public class RemoteSenderService implements Service {
12
13
/**
14
* Sends stream data to remote OAP node with routing strategy
15
* @param nextWorkName Target worker name on remote node
16
* @param streamData Stream data to transmit
17
* @param selector Routing selector for node selection
18
* @throws RemoteException If transmission fails
19
*/
20
public void send(String nextWorkName, StreamData streamData, Selector selector)
21
throws RemoteException;
22
23
/**
24
* Sends data to specific remote address
25
* @param remoteAddress Target remote address
26
* @param nextWorkName Target worker name
27
* @param streamData Stream data to transmit
28
* @throws RemoteException If transmission fails
29
*/
30
public void send(RemoteAddress remoteAddress, String nextWorkName, StreamData streamData)
31
throws RemoteException;
32
33
/**
34
* Sends data with timeout configuration
35
* @param nextWorkName Target worker name
36
* @param streamData Stream data to transmit
37
* @param selector Routing selector
38
* @param timeoutSeconds Transmission timeout in seconds
39
* @throws RemoteException If transmission fails or times out
40
*/
41
public void sendWithTimeout(String nextWorkName, StreamData streamData,
42
Selector selector, int timeoutSeconds) throws RemoteException;
43
44
/**
45
* Gets available remote addresses
46
* @return List of configured remote addresses
47
*/
48
public List<RemoteAddress> getRemoteAddresses();
49
50
/**
51
* Checks if remote service is available
52
* @param remoteAddress Remote address to check
53
* @return True if remote service is reachable
54
*/
55
public boolean isAvailable(RemoteAddress remoteAddress);
56
}
57
```
58
59
### RemoteServiceHandler
60
61
Handles incoming remote service requests and routes them to appropriate workers.
62
63
```java { .api }
64
public class RemoteServiceHandler {
65
66
/**
67
* Handles incoming remote stream data
68
* @param streamData Received stream data
69
* @param nextWorkerName Target worker name for processing
70
* @throws RemoteException If handling fails
71
*/
72
public void handle(StreamData streamData, String nextWorkerName) throws RemoteException;
73
74
/**
75
* Registers worker for remote request handling
76
* @param workerName Worker identifier
77
* @param worker Worker instance to handle requests
78
*/
79
public void registerWorker(String workerName, AbstractWorker<?> worker);
80
81
/**
82
* Unregisters worker from remote handling
83
* @param workerName Worker identifier to remove
84
*/
85
public void unregisterWorker(String workerName);
86
87
/**
88
* Gets registered worker by name
89
* @param workerName Worker identifier
90
* @return Worker instance or null if not found
91
*/
92
public AbstractWorker<?> getWorker(String workerName);
93
}
94
```
95
96
## Routing and Selection
97
98
### Selector Interface
99
100
Base interface for routing strategy selection.
101
102
```java { .api }
103
public interface Selector {
104
105
/**
106
* Selects remote address based on routing strategy
107
* @param remoteAddresses Available remote addresses
108
* @param data Data being routed (for hash-based selection)
109
* @return Selected remote address
110
*/
111
RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);
112
113
/**
114
* Gets selector type identifier
115
* @return Selector type name
116
*/
117
String getType();
118
}
119
```
120
121
### HashCodeSelector
122
123
Routes data based on hash code for consistent routing.
124
125
```java { .api }
126
public class HashCodeSelector implements Selector {
127
128
/**
129
* Selects remote address using data hash code modulo
130
* @param remoteAddresses Available remote addresses
131
* @param data Data to route (hash code used for selection)
132
* @return Selected address based on hash distribution
133
*/
134
@Override
135
public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);
136
137
/**
138
* Gets hash code from stream data for routing
139
* @param data Stream data
140
* @return Hash code for routing decision
141
*/
142
protected int getHashCode(StreamData data);
143
}
144
```
145
146
### ForeverFirstSelector
147
148
Always selects the first available remote address.
149
150
```java { .api }
151
public class ForeverFirstSelector implements Selector {
152
153
/**
154
* Always selects first address from list
155
* @param remoteAddresses Available remote addresses
156
* @param data Data being routed (ignored)
157
* @return First remote address in list
158
*/
159
@Override
160
public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);
161
}
162
```
163
164
### RollingSelector
165
166
Round-robin selection across available remote addresses.
167
168
```java { .api }
169
public class RollingSelector implements Selector {
170
171
private AtomicInteger index;
172
173
/**
174
* Selects remote address using round-robin strategy
175
* @param remoteAddresses Available remote addresses
176
* @param data Data being routed (ignored)
177
* @return Next address in round-robin sequence
178
*/
179
@Override
180
public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data);
181
182
/**
183
* Resets rolling counter
184
*/
185
public void reset();
186
}
187
```
188
189
## Remote Data Interfaces
190
191
### Serializable Interface
192
193
Marker interface for remote-serializable data.
194
195
```java { .api }
196
public interface Serializable {
197
198
/**
199
* Serializes object to byte array for remote transmission
200
* @return Serialized byte array
201
* @throws SerializationException If serialization fails
202
*/
203
byte[] serialize() throws SerializationException;
204
205
/**
206
* Gets serialization version for compatibility
207
* @return Serialization version number
208
*/
209
int getSerializationVersion();
210
}
211
```
212
213
### Deserializable Interface
214
215
Marker interface for remote-deserializable data.
216
217
```java { .api }
218
public interface Deserializable {
219
220
/**
221
* Deserializes object from byte array
222
* @param data Serialized byte array
223
* @throws DeserializationException If deserialization fails
224
*/
225
void deserialize(byte[] data) throws DeserializationException;
226
227
/**
228
* Gets deserialization version for compatibility
229
* @return Deserialization version number
230
*/
231
int getDeserializationVersion();
232
233
/**
234
* Checks if version is compatible for deserialization
235
* @param version Incoming serialization version
236
* @return True if version is compatible
237
*/
238
boolean isCompatible(int version);
239
}
240
```
241
242
### StreamData
243
244
Base class for data transmitted between remote nodes.
245
246
```java { .api }
247
public abstract class StreamData implements Serializable, Deserializable {
248
249
protected long timestamp;
250
protected String remoteAddress;
251
252
/**
253
* Gets data timestamp
254
* @return Timestamp in milliseconds
255
*/
256
public long getTimestamp();
257
258
/**
259
* Sets data timestamp
260
* @param timestamp Timestamp in milliseconds
261
*/
262
public void setTimestamp(long timestamp);
263
264
/**
265
* Gets originating remote address
266
* @return Remote address string
267
*/
268
public String getRemoteAddress();
269
270
/**
271
* Sets originating remote address
272
* @param remoteAddress Remote address string
273
*/
274
public void setRemoteAddress(String remoteAddress);
275
276
/**
277
* Gets unique identifier for routing
278
* @return String identifier for consistent routing
279
*/
280
public abstract String id();
281
282
/**
283
* Gets stream data type identifier
284
* @return Data type for worker routing
285
*/
286
public abstract int remoteHashCode();
287
}
288
```
289
290
## Remote Configuration
291
292
### RemoteAddress
293
294
Represents remote OAP node address configuration.
295
296
```java { .api }
297
public class RemoteAddress {
298
299
private String host;
300
private int port;
301
private boolean selfAddress;
302
303
/**
304
* Creates remote address configuration
305
* @param host Remote host address
306
* @param port Remote port number
307
*/
308
public RemoteAddress(String host, int port);
309
310
/**
311
* Creates remote address with self-identification
312
* @param host Remote host address
313
* @param port Remote port number
314
* @param selfAddress True if this represents current node
315
*/
316
public RemoteAddress(String host, int port, boolean selfAddress);
317
318
/**
319
* Gets host address
320
* @return Host string
321
*/
322
public String getHost();
323
324
/**
325
* Gets port number
326
* @return Port number
327
*/
328
public int getPort();
329
330
/**
331
* Checks if this is current node address
332
* @return True if self address
333
*/
334
public boolean isSelfAddress();
335
336
/**
337
* Gets full address string
338
* @return "host:port" format
339
*/
340
public String getAddress();
341
342
@Override
343
public boolean equals(Object obj);
344
345
@Override
346
public int hashCode();
347
348
@Override
349
public String toString();
350
}
351
```
352
353
### RemoteConfiguration
354
355
Configuration for remote service connectivity.
356
357
```java { .api }
358
public class RemoteConfiguration {
359
360
private List<RemoteAddress> remoteAddresses;
361
private int connectionTimeout;
362
private int requestTimeout;
363
private int maxRetries;
364
private boolean enableHeartbeat;
365
private int heartbeatInterval;
366
367
/**
368
* Gets configured remote addresses
369
* @return List of remote addresses
370
*/
371
public List<RemoteAddress> getRemoteAddresses();
372
373
/**
374
* Sets remote addresses
375
* @param remoteAddresses List of remote addresses
376
*/
377
public void setRemoteAddresses(List<RemoteAddress> remoteAddresses);
378
379
/**
380
* Gets connection timeout
381
* @return Connection timeout in milliseconds
382
*/
383
public int getConnectionTimeout();
384
385
/**
386
* Gets request timeout
387
* @return Request timeout in milliseconds
388
*/
389
public int getRequestTimeout();
390
391
/**
392
* Gets maximum retry attempts
393
* @return Maximum retries
394
*/
395
public int getMaxRetries();
396
397
/**
398
* Checks if heartbeat is enabled
399
* @return True if heartbeat enabled
400
*/
401
public boolean isEnableHeartbeat();
402
403
/**
404
* Gets heartbeat interval
405
* @return Heartbeat interval in seconds
406
*/
407
public int getHeartbeatInterval();
408
}
409
```
410
411
## Remote Exceptions
412
413
### RemoteException
414
415
Base exception for remote communication errors.
416
417
```java { .api }
418
public class RemoteException extends Exception {
419
420
/**
421
* Creates remote exception with message
422
* @param message Error message
423
*/
424
public RemoteException(String message);
425
426
/**
427
* Creates remote exception with message and cause
428
* @param message Error message
429
* @param cause Underlying cause
430
*/
431
public RemoteException(String message, Throwable cause);
432
}
433
```
434
435
### SerializationException
436
437
Exception for data serialization errors.
438
439
```java { .api }
440
public class SerializationException extends RemoteException {
441
442
/**
443
* Creates serialization exception
444
* @param message Error message
445
*/
446
public SerializationException(String message);
447
448
/**
449
* Creates serialization exception with cause
450
* @param message Error message
451
* @param cause Underlying cause
452
*/
453
public SerializationException(String message, Throwable cause);
454
}
455
```
456
457
### DeserializationException
458
459
Exception for data deserialization errors.
460
461
```java { .api }
462
public class DeserializationException extends RemoteException {
463
464
/**
465
* Creates deserialization exception
466
* @param message Error message
467
*/
468
public DeserializationException(String message);
469
470
/**
471
* Creates deserialization exception with cause
472
* @param message Error message
473
* @param cause Underlying cause
474
*/
475
public DeserializationException(String message, Throwable cause);
476
}
477
```
478
479
## gRPC Integration
480
481
### RemoteServiceGrpc
482
483
gRPC service definitions for remote communication.
484
485
```java { .api }
486
public class RemoteServiceGrpc {
487
488
/**
489
* gRPC stub for remote service calls
490
*/
491
public static class RemoteServiceStub {
492
493
/**
494
* Sends stream data to remote node
495
* @param request Stream data request
496
* @param responseObserver Response observer for async handling
497
*/
498
public void call(StreamDataRequest request,
499
StreamObserver<StreamDataResponse> responseObserver);
500
}
501
502
/**
503
* gRPC blocking stub for synchronous calls
504
*/
505
public static class RemoteServiceBlockingStub {
506
507
/**
508
* Sends stream data synchronously
509
* @param request Stream data request
510
* @return Stream data response
511
*/
512
public StreamDataResponse call(StreamDataRequest request);
513
}
514
515
/**
516
* Service implementation base class
517
*/
518
public static abstract class RemoteServiceImplBase implements BindableService {
519
520
/**
521
* Handles incoming remote calls
522
* @param request Stream data request
523
* @param responseObserver Response observer
524
*/
525
public abstract void call(StreamDataRequest request,
526
StreamObserver<StreamDataResponse> responseObserver);
527
}
528
}
529
```
530
531
## Usage Examples
532
533
### Setting up Remote Communication
534
535
```java
536
// Configure remote addresses
537
List<RemoteAddress> remoteAddresses = Arrays.asList(
538
new RemoteAddress("oap-node-1", 11800),
539
new RemoteAddress("oap-node-2", 11800),
540
new RemoteAddress("oap-node-3", 11800)
541
);
542
543
RemoteConfiguration config = new RemoteConfiguration();
544
config.setRemoteAddresses(remoteAddresses);
545
config.setConnectionTimeout(5000);
546
config.setRequestTimeout(10000);
547
config.setMaxRetries(3);
548
config.setEnableHeartbeat(true);
549
config.setHeartbeatInterval(30);
550
551
// Initialize remote sender service
552
RemoteSenderService remoteSender = new RemoteSenderService();
553
remoteSender.initialize(config);
554
```
555
556
### Sending Data with Different Routing Strategies
557
558
```java
559
// Hash-based routing for consistent distribution
560
Selector hashSelector = new HashCodeSelector();
561
remoteSender.send("MetricsAggregateWorker", metricsData, hashSelector);
562
563
// Round-robin routing for load balancing
564
Selector rollingSelector = new RollingSelector();
565
remoteSender.send("RecordPersistentWorker", recordData, rollingSelector);
566
567
// Always send to first available node
568
Selector firstSelector = new ForeverFirstSelector();
569
remoteSender.send("ManagementWorker", managementData, firstSelector);
570
571
// Send to specific remote address
572
RemoteAddress specificNode = new RemoteAddress("oap-primary", 11800);
573
remoteSender.send(specificNode, "PriorityWorker", criticalData);
574
```
575
576
### Implementing Custom Stream Data
577
578
```java
579
public class CustomTelemetryData extends StreamData {
580
581
private String serviceName;
582
private String operationName;
583
private long duration;
584
private Map<String, String> tags;
585
586
@Override
587
public String id() {
588
// Create unique ID for routing consistency
589
return serviceName + ":" + operationName;
590
}
591
592
@Override
593
public int remoteHashCode() {
594
// Hash code for routing decisions
595
return Objects.hash(serviceName, operationName);
596
}
597
598
@Override
599
public byte[] serialize() throws SerializationException {
600
try {
601
// Serialize to protobuf or other format
602
ByteArrayOutputStream baos = new ByteArrayOutputStream();
603
DataOutputStream dos = new DataOutputStream(baos);
604
605
dos.writeUTF(serviceName);
606
dos.writeUTF(operationName);
607
dos.writeLong(duration);
608
dos.writeInt(tags.size());
609
610
for (Map.Entry<String, String> entry : tags.entrySet()) {
611
dos.writeUTF(entry.getKey());
612
dos.writeUTF(entry.getValue());
613
}
614
615
return baos.toByteArray();
616
} catch (IOException e) {
617
throw new SerializationException("Failed to serialize custom telemetry data", e);
618
}
619
}
620
621
@Override
622
public void deserialize(byte[] data) throws DeserializationException {
623
try {
624
ByteArrayInputStream bais = new ByteArrayInputStream(data);
625
DataInputStream dis = new DataInputStream(bais);
626
627
this.serviceName = dis.readUTF();
628
this.operationName = dis.readUTF();
629
this.duration = dis.readLong();
630
631
int tagCount = dis.readInt();
632
this.tags = new HashMap<>();
633
634
for (int i = 0; i < tagCount; i++) {
635
String key = dis.readUTF();
636
String value = dis.readUTF();
637
tags.put(key, value);
638
}
639
} catch (IOException e) {
640
throw new DeserializationException("Failed to deserialize custom telemetry data", e);
641
}
642
}
643
644
@Override
645
public int getSerializationVersion() {
646
return 1;
647
}
648
649
@Override
650
public int getDeserializationVersion() {
651
return 1;
652
}
653
654
@Override
655
public boolean isCompatible(int version) {
656
return version <= getDeserializationVersion();
657
}
658
659
// Getters and setters
660
public String getServiceName() { return serviceName; }
661
public void setServiceName(String serviceName) { this.serviceName = serviceName; }
662
663
public String getOperationName() { return operationName; }
664
public void setOperationName(String operationName) { this.operationName = operationName; }
665
666
public long getDuration() { return duration; }
667
public void setDuration(long duration) { this.duration = duration; }
668
669
public Map<String, String> getTags() { return tags; }
670
public void setTags(Map<String, String> tags) { this.tags = tags; }
671
}
672
```
673
674
### Implementing Custom Selector
675
676
```java
677
public class GeographicSelector implements Selector {
678
679
private final String preferredRegion;
680
private final Map<RemoteAddress, String> addressRegions;
681
682
public GeographicSelector(String preferredRegion,
683
Map<RemoteAddress, String> addressRegions) {
684
this.preferredRegion = preferredRegion;
685
this.addressRegions = addressRegions;
686
}
687
688
@Override
689
public RemoteAddress select(List<RemoteAddress> remoteAddresses, StreamData data) {
690
// First, try to find addresses in preferred region
691
List<RemoteAddress> preferredAddresses = remoteAddresses.stream()
692
.filter(addr -> preferredRegion.equals(addressRegions.get(addr)))
693
.collect(Collectors.toList());
694
695
if (!preferredAddresses.isEmpty()) {
696
// Use hash-based selection within preferred region
697
int hash = Math.abs(data.remoteHashCode());
698
int index = hash % preferredAddresses.size();
699
return preferredAddresses.get(index);
700
}
701
702
// Fall back to any available address
703
if (!remoteAddresses.isEmpty()) {
704
int hash = Math.abs(data.remoteHashCode());
705
int index = hash % remoteAddresses.size();
706
return remoteAddresses.get(index);
707
}
708
709
return null;
710
}
711
712
@Override
713
public String getType() {
714
return "geographic";
715
}
716
}
717
```
718
719
### Handling Remote Service Requests
720
721
```java
722
@Component
723
public class CustomRemoteServiceHandler extends RemoteServiceHandler {
724
725
@Override
726
public void handle(StreamData streamData, String nextWorkerName) throws RemoteException {
727
try {
728
// Validate stream data
729
if (streamData == null) {
730
throw new RemoteException("Received null stream data");
731
}
732
733
// Check if worker exists
734
AbstractWorker<?> worker = getWorker(nextWorkerName);
735
if (worker == null) {
736
throw new RemoteException("Unknown worker: " + nextWorkerName);
737
}
738
739
// Log incoming request
740
logger.info("Handling remote request for worker: {} from address: {}",
741
nextWorkerName, streamData.getRemoteAddress());
742
743
// Route to appropriate worker
744
super.handle(streamData, nextWorkerName);
745
746
} catch (Exception e) {
747
logger.error("Failed to handle remote request", e);
748
throw new RemoteException("Request handling failed", e);
749
}
750
}
751
752
public void registerCustomWorkers() {
753
// Register custom workers for remote handling
754
registerWorker("CustomMetricsWorker", new CustomMetricsWorker());
755
registerWorker("CustomRecordWorker", new CustomRecordWorker());
756
registerWorker("CustomAnalysisWorker", new CustomAnalysisWorker());
757
}
758
}
759
```
760
761
## Core Remote Types
762
763
```java { .api }
764
/**
765
* gRPC request for stream data transmission
766
*/
767
public class StreamDataRequest {
768
private String workerName;
769
private byte[] streamData;
770
private String dataType;
771
772
public String getWorkerName();
773
public byte[] getStreamData();
774
public String getDataType();
775
}
776
777
/**
778
* gRPC response for stream data transmission
779
*/
780
public class StreamDataResponse {
781
private boolean success;
782
private String errorMessage;
783
784
public boolean isSuccess();
785
public String getErrorMessage();
786
}
787
788
/**
789
* Remote module definition
790
*/
791
public class RemoteModule extends ModuleDefine {
792
public static final String NAME = "remote";
793
794
@Override
795
public String name();
796
797
@Override
798
public Class[] services();
799
}
800
801
/**
802
* Load balancing strategy enumeration
803
*/
804
public enum LoadBalanceStrategy {
805
HASH_CODE, ROLLING, FIRST_AVAILABLE, CUSTOM
806
}
807
```