0
# Remote Communication
1
2
Low-level remote communication infrastructure including gRPC support, request/response handling, connection management, and callback mechanisms for building robust client-server communication.
3
4
## Capabilities
5
6
### Request and Response Base Classes
7
8
Foundation classes for all remote communication operations providing common functionality for headers, request IDs, and response handling.
9
10
```java { .api }
11
/**
12
* Base request class for remote communication
13
*/
14
abstract class Request implements Payload {
15
/** Request headers */
16
private Map<String, String> headers = new HashMap<>();
17
18
/** Unique request identifier */
19
private String requestId;
20
21
/**
22
* Add header to request
23
* @param key Header key
24
* @param value Header value
25
*/
26
public void putHeader(String key, String value);
27
28
/**
29
* Add multiple headers to request
30
* @param headers Map of headers to add
31
*/
32
public void putAllHeader(Map<String, String> headers);
33
34
/**
35
* Get header value by key
36
* @param key Header key
37
* @return Header value or null if not found
38
*/
39
public String getHeader(String key);
40
41
/**
42
* Get all headers
43
* @return Map of all headers
44
*/
45
public Map<String, String> getHeaders();
46
47
/**
48
* Get request identifier
49
* @return Unique request ID
50
*/
51
public String getRequestId();
52
53
/**
54
* Set request identifier
55
* @param requestId Unique request ID
56
*/
57
public void setRequestId(String requestId);
58
59
/**
60
* Get request type for routing
61
* @return Request type string
62
*/
63
public abstract String getRequestType();
64
}
65
66
/**
67
* Base response class for remote communication
68
*/
69
abstract class Response implements Payload {
70
/** Response result code */
71
private int resultCode = ResponseCode.SUCCESS.getCode();
72
73
/** Error code for failures */
74
private int errorCode = 0;
75
76
/** Response message */
77
private String message;
78
79
/** Request ID this response corresponds to */
80
private String requestId;
81
82
/**
83
* Check if response indicates success
84
* @return true if operation was successful
85
*/
86
public boolean isSuccess();
87
88
/**
89
* Get result code
90
* @return Response result code
91
*/
92
public int getResultCode();
93
94
/**
95
* Set result code
96
* @param resultCode Response result code
97
*/
98
public void setResultCode(int resultCode);
99
100
/**
101
* Get error code
102
* @return Error code for failures
103
*/
104
public int getErrorCode();
105
106
/**
107
* Set error code
108
* @param errorCode Error code for failures
109
*/
110
public void setErrorCode(int errorCode);
111
112
/**
113
* Get response message
114
* @return Response message
115
*/
116
public String getMessage();
117
118
/**
119
* Set response message
120
* @param message Response message
121
*/
122
public void setMessage(String message);
123
124
/**
125
* Get request ID
126
* @return Request ID this response corresponds to
127
*/
128
public String getRequestId();
129
130
/**
131
* Set request ID
132
* @param requestId Request ID
133
*/
134
public void setRequestId(String requestId);
135
136
/**
137
* Get response type for routing
138
* @return Response type string
139
*/
140
public abstract String getResponseType();
141
}
142
143
/**
144
* Base payload interface for serialization
145
*/
146
interface Payload extends Serializable {
147
// Marker interface for serializable payloads
148
}
149
```
150
151
### Connection and Health Check Requests
152
153
Standard request types for connection management and server health monitoring.
154
155
```java { .api }
156
/**
157
* Connection setup request for establishing client-server connection
158
*/
159
class ConnectionSetupRequest extends Request {
160
/** Client version information */
161
private String clientVersion;
162
163
/** Client abilities */
164
private ClientAbilities abilities;
165
166
/** Tenant information */
167
private String tenant;
168
169
/** Labels for client identification */
170
private Map<String, String> labels;
171
172
/**
173
* Default constructor
174
*/
175
public ConnectionSetupRequest();
176
177
/**
178
* Get client version
179
* @return Client version string
180
*/
181
public String getClientVersion();
182
183
/**
184
* Set client version
185
* @param clientVersion Client version string
186
*/
187
public void setClientVersion(String clientVersion);
188
189
/**
190
* Get client abilities
191
* @return Client abilities object
192
*/
193
public ClientAbilities getAbilities();
194
195
/**
196
* Set client abilities
197
* @param abilities Client abilities object
198
*/
199
public void setAbilities(ClientAbilities abilities);
200
201
/**
202
* Get tenant information
203
* @return Tenant string
204
*/
205
public String getTenant();
206
207
/**
208
* Set tenant information
209
* @param tenant Tenant string
210
*/
211
public void setTenant(String tenant);
212
213
/**
214
* Get client labels
215
* @return Map of client labels
216
*/
217
public Map<String, String> getLabels();
218
219
/**
220
* Set client labels
221
* @param labels Map of client labels
222
*/
223
public void setLabels(Map<String, String> labels);
224
225
@Override
226
public String getRequestType();
227
}
228
229
/**
230
* Health check request for monitoring server status
231
*/
232
class HealthCheckRequest extends Request {
233
/**
234
* Default constructor
235
*/
236
public HealthCheckRequest();
237
238
@Override
239
public String getRequestType();
240
}
241
242
/**
243
* Server check request for server capabilities inquiry
244
*/
245
class ServerCheckRequest extends Request {
246
/**
247
* Default constructor
248
*/
249
public ServerCheckRequest();
250
251
@Override
252
public String getRequestType();
253
}
254
```
255
256
### Response Types and Error Handling
257
258
Standard response classes with error codes and status information.
259
260
```java { .api }
261
/**
262
* Connection setup response
263
*/
264
class ConnectionSetupResponse extends Response {
265
/** Server abilities */
266
private ServerAbilities serverAbilities;
267
268
/**
269
* Default constructor
270
*/
271
public ConnectionSetupResponse();
272
273
/**
274
* Get server abilities
275
* @return Server abilities object
276
*/
277
public ServerAbilities getServerAbilities();
278
279
/**
280
* Set server abilities
281
* @param serverAbilities Server abilities object
282
*/
283
public void setServerAbilities(ServerAbilities serverAbilities);
284
285
@Override
286
public String getResponseType();
287
}
288
289
/**
290
* Health check response
291
*/
292
class HealthCheckResponse extends Response {
293
/**
294
* Default constructor
295
*/
296
public HealthCheckResponse();
297
298
@Override
299
public String getResponseType();
300
}
301
302
/**
303
* Error response for failed operations
304
*/
305
class ErrorResponse extends Response {
306
/**
307
* Constructor with error code and message
308
* @param errorCode Error code
309
* @param message Error message
310
*/
311
public ErrorResponse(int errorCode, String message);
312
313
@Override
314
public String getResponseType();
315
}
316
317
/**
318
* Response code enumeration
319
*/
320
enum ResponseCode {
321
/** Success response */
322
SUCCESS(200, "Success"),
323
324
/** Generic failure */
325
FAIL(500, "Fail"),
326
327
/** Invalid parameters */
328
PARAMETER_MISSING(400, "Parameter Missing"),
329
330
/** Resource not found */
331
RESOURCE_NOT_FOUND(404, "Resource Not Found"),
332
333
/** Server internal error */
334
INTERNAL_SERVER_ERROR(500, "Internal Server Error");
335
336
/** Response code */
337
private final int code;
338
339
/** Response message */
340
private final String message;
341
342
/**
343
* Constructor
344
* @param code Response code
345
* @param message Response message
346
*/
347
ResponseCode(int code, String message);
348
349
/**
350
* Get response code
351
* @return Response code
352
*/
353
public int getCode();
354
355
/**
356
* Get response message
357
* @return Response message
358
*/
359
public String getMessage();
360
}
361
```
362
363
### Callback Interfaces
364
365
Asynchronous callback mechanisms for handling responses and server push notifications.
366
367
```java { .api }
368
/**
369
* Request callback interface for handling async responses
370
*/
371
interface RequestCallBack<T extends Response> {
372
/**
373
* Called when response is received successfully
374
* @param response Response object
375
*/
376
void onResponse(T response);
377
378
/**
379
* Called when request fails with exception
380
* @param e Exception that occurred
381
*/
382
void onException(Throwable e);
383
384
/**
385
* Get executor for callback processing
386
* @return Executor for async processing, null for current thread
387
*/
388
default Executor getExecutor() {
389
return null;
390
}
391
392
/**
393
* Get request timeout in milliseconds
394
* @return Timeout value, 0 for no timeout
395
*/
396
default long getTimeout() {
397
return 3000L;
398
}
399
}
400
401
/**
402
* Abstract request callback implementation
403
*/
404
abstract class AbstractRequestCallBack<T extends Response> implements RequestCallBack<T> {
405
/** Executor for callback processing */
406
private Executor executor;
407
408
/** Request timeout */
409
private long timeout = 3000L;
410
411
/**
412
* Default constructor
413
*/
414
public AbstractRequestCallBack();
415
416
/**
417
* Constructor with executor
418
* @param executor Executor for callback processing
419
*/
420
public AbstractRequestCallBack(Executor executor);
421
422
/**
423
* Constructor with executor and timeout
424
* @param executor Executor for callback processing
425
* @param timeout Request timeout in milliseconds
426
*/
427
public AbstractRequestCallBack(Executor executor, long timeout);
428
429
@Override
430
public Executor getExecutor();
431
432
@Override
433
public long getTimeout();
434
435
/**
436
* Abstract response handler to be implemented
437
* @param response Response object
438
*/
439
@Override
440
public abstract void onResponse(T response);
441
442
/**
443
* Abstract exception handler to be implemented
444
* @param e Exception that occurred
445
*/
446
@Override
447
public abstract void onException(Throwable e);
448
}
449
450
/**
451
* Push callback interface for server-initiated communications
452
*/
453
interface PushCallBack {
454
/**
455
* Handle server push request
456
* @param request Push request from server
457
* @return Response to send back to server
458
*/
459
Response requestReply(Request request);
460
461
/**
462
* Get executor for push processing
463
* @return Executor for async processing
464
*/
465
default Executor getExecutor() {
466
return null;
467
}
468
}
469
470
/**
471
* Abstract push callback implementation
472
*/
473
abstract class AbstractPushCallBack implements PushCallBack {
474
/** Executor for push processing */
475
private Executor executor;
476
477
/**
478
* Default constructor
479
*/
480
public AbstractPushCallBack();
481
482
/**
483
* Constructor with executor
484
* @param executor Executor for push processing
485
*/
486
public AbstractPushCallBack(Executor executor);
487
488
@Override
489
public Executor getExecutor();
490
491
/**
492
* Abstract push request handler to be implemented
493
* @param request Push request from server
494
* @return Response to send back to server
495
*/
496
@Override
497
public abstract Response requestReply(Request request);
498
}
499
```
500
501
### Future and Async Operations
502
503
Future-based operations for handling asynchronous requests and responses.
504
505
```java { .api }
506
/**
507
* Future interface for async request operations
508
*/
509
interface RequestFuture<T> extends Future<T> {
510
/**
511
* Get request ID
512
* @return Request ID
513
*/
514
String getRequestId();
515
516
/**
517
* Check if request timed out
518
* @return true if request timed out
519
*/
520
boolean isTimeout();
521
522
/**
523
* Get the original request
524
* @return Original request object
525
*/
526
Request getRequest();
527
528
/**
529
* Set the response
530
* @param response Response object
531
*/
532
void setResponse(T response);
533
534
/**
535
* Set exception
536
* @param throwable Exception that occurred
537
*/
538
void setFailResult(Throwable throwable);
539
}
540
541
/**
542
* Default implementation of RequestFuture
543
*/
544
class DefaultRequestFuture<T> implements RequestFuture<T> {
545
/** Request ID */
546
private final String requestId;
547
548
/** Original request */
549
private final Request request;
550
551
/** Response object */
552
private volatile T response;
553
554
/** Exception if failed */
555
private volatile Throwable exception;
556
557
/** Completion flag */
558
private volatile boolean done = false;
559
560
/** Timeout flag */
561
private volatile boolean timeout = false;
562
563
/** Completion latch */
564
private final CountDownLatch latch = new CountDownLatch(1);
565
566
/**
567
* Constructor
568
* @param requestId Request ID
569
* @param request Original request
570
*/
571
public DefaultRequestFuture(String requestId, Request request);
572
573
@Override
574
public String getRequestId();
575
576
@Override
577
public boolean isTimeout();
578
579
@Override
580
public Request getRequest();
581
582
@Override
583
public void setResponse(T response);
584
585
@Override
586
public void setFailResult(Throwable throwable);
587
588
@Override
589
public boolean cancel(boolean mayInterruptIfRunning);
590
591
@Override
592
public boolean isCancelled();
593
594
@Override
595
public boolean isDone();
596
597
@Override
598
public T get() throws InterruptedException, ExecutionException;
599
600
@Override
601
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
602
}
603
```
604
605
### gRPC Integration
606
607
Auto-generated gRPC classes for Protocol Buffers-based communication.
608
609
```java { .api }
610
/**
611
* Nacos gRPC service definitions (auto-generated)
612
*/
613
class NacosGrpcService {
614
/**
615
* Get service descriptor
616
* @return Service descriptor for gRPC
617
*/
618
public static ServiceDescriptor getServiceDescriptor();
619
620
/**
621
* Create stub for gRPC calls
622
* @param channel gRPC channel
623
* @return Service stub
624
*/
625
public static NacosGrpcServiceStub newStub(Channel channel);
626
627
/**
628
* Create blocking stub for synchronous gRPC calls
629
* @param channel gRPC channel
630
* @return Blocking service stub
631
*/
632
public static NacosGrpcServiceBlockingStub newBlockingStub(Channel channel);
633
}
634
635
/**
636
* gRPC payload message (auto-generated)
637
*/
638
class Payload implements Serializable {
639
/** Metadata for the payload */
640
private Metadata metadata;
641
642
/** Body content */
643
private Any body;
644
645
/**
646
* Get metadata
647
* @return Payload metadata
648
*/
649
public Metadata getMetadata();
650
651
/**
652
* Set metadata
653
* @param metadata Payload metadata
654
*/
655
public void setMetadata(Metadata metadata);
656
657
/**
658
* Get body content
659
* @return Body as Any type
660
*/
661
public Any getBody();
662
663
/**
664
* Set body content
665
* @param body Body as Any type
666
*/
667
public void setBody(Any body);
668
}
669
670
/**
671
* gRPC metadata message (auto-generated)
672
*/
673
class Metadata implements Serializable {
674
/** Message type */
675
private String type;
676
677
/** Client IP */
678
private String clientIp;
679
680
/** Headers */
681
private Map<String, String> headers;
682
683
/**
684
* Get message type
685
* @return Message type
686
*/
687
public String getType();
688
689
/**
690
* Set message type
691
* @param type Message type
692
*/
693
public void setType(String type);
694
695
/**
696
* Get client IP
697
* @return Client IP address
698
*/
699
public String getClientIp();
700
701
/**
702
* Set client IP
703
* @param clientIp Client IP address
704
*/
705
public void setClientIp(String clientIp);
706
707
/**
708
* Get headers
709
* @return Map of headers
710
*/
711
public Map<String, String> getHeaders();
712
713
/**
714
* Set headers
715
* @param headers Map of headers
716
*/
717
public void setHeaders(Map<String, String> headers);
718
}
719
720
/**
721
* Bidirectional stream gRPC service (auto-generated)
722
*/
723
class BiRequestStreamGrpc {
724
/**
725
* Get method descriptor for bidirectional streaming
726
* @return Method descriptor
727
*/
728
public static MethodDescriptor<Payload, Payload> getRequestBiStreamMethod();
729
730
/**
731
* Create stub for bidirectional streaming
732
* @param channel gRPC channel
733
* @return Stub for bidirectional streaming
734
*/
735
public static BiRequestStreamStub newStub(Channel channel);
736
}
737
738
/**
739
* Request-response gRPC service (auto-generated)
740
*/
741
class RequestGrpc {
742
/**
743
* Get method descriptor for unary requests
744
* @return Method descriptor
745
*/
746
public static MethodDescriptor<Payload, Payload> getRequestMethod();
747
748
/**
749
* Create stub for unary requests
750
* @param channel gRPC channel
751
* @return Stub for unary requests
752
*/
753
public static RequestStub newStub(Channel channel);
754
755
/**
756
* Create blocking stub for synchronous unary requests
757
* @param channel gRPC channel
758
* @return Blocking stub for unary requests
759
*/
760
public static RequestBlockingStub newBlockingStub(Channel channel);
761
}
762
```
763
764
### Utilities and Executors
765
766
Utility classes for remote communication operations and thread management.
767
768
```java { .api }
769
/**
770
* Scheduled executor for RPC operations
771
*/
772
class RpcScheduledExecutor {
773
/** Default executor instance */
774
private static final ScheduledExecutorService EXECUTOR;
775
776
/**
777
* Get default scheduled executor
778
* @return Scheduled executor service
779
*/
780
public static ScheduledExecutorService getExecutor();
781
782
/**
783
* Schedule task with delay
784
* @param command Task to execute
785
* @param delay Delay before execution
786
* @param unit Time unit for delay
787
* @return ScheduledFuture for the task
788
*/
789
public static ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
790
791
/**
792
* Schedule task with fixed rate
793
* @param command Task to execute
794
* @param initialDelay Initial delay
795
* @param period Period between executions
796
* @param unit Time unit
797
* @return ScheduledFuture for the task
798
*/
799
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
800
801
/**
802
* Schedule task with fixed delay
803
* @param command Task to execute
804
* @param initialDelay Initial delay
805
* @param delay Delay between executions
806
* @param unit Time unit
807
* @return ScheduledFuture for the task
808
*/
809
public static ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
810
}
811
812
/**
813
* Remote communication constants
814
*/
815
class RemoteConstants {
816
/** Default request timeout */
817
public static final long DEFAULT_TIMEOUT_MILLS = 3000L;
818
819
/** Connection setup timeout */
820
public static final long CONNECT_TIMEOUT_MILLS = 3000L;
821
822
/** Keep alive interval */
823
public static final long KEEP_ALIVE_MILLS = 5000L;
824
825
/** Default retry times */
826
public static final int DEFAULT_RETRY_TIMES = 3;
827
828
/** Max retry times */
829
public static final int MAX_RETRY_TIMES = 5;
830
831
/** Connection pool size */
832
public static final int DEFAULT_CONNECTION_POOL_SIZE = 8;
833
}
834
```
835
836
## Usage Examples
837
838
### Basic Request-Response Communication
839
840
```java
841
import com.alibaba.nacos.api.remote.request.Request;
842
import com.alibaba.nacos.api.remote.response.Response;
843
import com.alibaba.nacos.api.remote.RequestCallBack;
844
import com.alibaba.nacos.api.remote.DefaultRequestFuture;
845
846
// Custom request implementation
847
public class CustomRequest extends Request {
848
private String data;
849
850
public CustomRequest(String data) {
851
this.data = data;
852
setRequestId(UUID.randomUUID().toString());
853
putHeader("Content-Type", "application/json");
854
putHeader("User-Agent", "Nacos-Client");
855
}
856
857
public String getData() {
858
return data;
859
}
860
861
@Override
862
public String getRequestType() {
863
return "CustomRequest";
864
}
865
}
866
867
// Custom response implementation
868
public class CustomResponse extends Response {
869
private String result;
870
871
public CustomResponse() {}
872
873
public String getResult() {
874
return result;
875
}
876
877
public void setResult(String result) {
878
this.result = result;
879
}
880
881
@Override
882
public String getResponseType() {
883
return "CustomResponse";
884
}
885
}
886
887
// Synchronous request-response
888
public CustomResponse sendSynchronousRequest(String data) {
889
CustomRequest request = new CustomRequest(data);
890
891
try {
892
// Create future for the request
893
DefaultRequestFuture<CustomResponse> future = new DefaultRequestFuture<>(
894
request.getRequestId(), request);
895
896
// Send request (pseudo-code - actual implementation would use client)
897
// remoteClient.sendRequest(request, future);
898
899
// Wait for response with timeout
900
CustomResponse response = future.get(5000, TimeUnit.MILLISECONDS);
901
902
if (response.isSuccess()) {
903
System.out.println("Request successful: " + response.getResult());
904
return response;
905
} else {
906
System.err.println("Request failed: " + response.getMessage());
907
return null;
908
}
909
910
} catch (TimeoutException e) {
911
System.err.println("Request timed out");
912
return null;
913
} catch (Exception e) {
914
System.err.println("Request failed with exception: " + e.getMessage());
915
return null;
916
}
917
}
918
```
919
920
### Asynchronous Communication with Callbacks
921
922
```java
923
import com.alibaba.nacos.api.remote.AbstractRequestCallBack;
924
import java.util.concurrent.Executors;
925
import java.util.concurrent.CompletableFuture;
926
927
// Asynchronous request with callback
928
public void sendAsynchronousRequest(String data, Consumer<String> onSuccess, Consumer<String> onError) {
929
CustomRequest request = new CustomRequest(data);
930
931
RequestCallBack<CustomResponse> callback = new AbstractRequestCallBack<CustomResponse>(
932
Executors.newSingleThreadExecutor(), 10000L) {
933
934
@Override
935
public void onResponse(CustomResponse response) {
936
if (response.isSuccess()) {
937
System.out.println("Async request successful: " + response.getResult());
938
onSuccess.accept(response.getResult());
939
} else {
940
System.err.println("Async request failed: " + response.getMessage());
941
onError.accept(response.getMessage());
942
}
943
}
944
945
@Override
946
public void onException(Throwable e) {
947
System.err.println("Async request failed with exception: " + e.getMessage());
948
onError.accept(e.getMessage());
949
}
950
};
951
952
// Send async request (pseudo-code)
953
// remoteClient.sendAsyncRequest(request, callback);
954
}
955
956
// Multiple parallel requests
957
public CompletableFuture<List<String>> sendParallelRequests(List<String> dataList) {
958
List<CompletableFuture<String>> futures = dataList.stream()
959
.map(data -> {
960
CompletableFuture<String> future = new CompletableFuture<>();
961
962
sendAsynchronousRequest(data,
963
result -> future.complete(result),
964
error -> future.completeExceptionally(new RuntimeException(error))
965
);
966
967
return future;
968
})
969
.collect(Collectors.toList());
970
971
// Combine all futures
972
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
973
.thenApply(v -> futures.stream()
974
.map(CompletableFuture::join)
975
.collect(Collectors.toList()));
976
}
977
```
978
979
### Connection Management
980
981
```java
982
import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
983
import com.alibaba.nacos.api.remote.response.ConnectionSetupResponse;
984
import com.alibaba.nacos.api.remote.request.HealthCheckRequest;
985
986
public class ConnectionManager {
987
988
private volatile boolean connected = false;
989
private String clientId;
990
private ScheduledExecutorService healthCheckExecutor;
991
992
public ConnectionManager(String clientId) {
993
this.clientId = clientId;
994
this.healthCheckExecutor = Executors.newSingleThreadScheduledExecutor();
995
}
996
997
// Establish connection to server
998
public boolean connect(String serverAddress) {
999
ConnectionSetupRequest request = new ConnectionSetupRequest();
1000
request.setClientVersion("3.0.2");
1001
request.setTenant("default");
1002
1003
// Set client labels
1004
Map<String, String> labels = new HashMap<>();
1005
labels.put("clientId", clientId);
1006
labels.put("appName", "my-application");
1007
labels.put("environment", "production");
1008
request.setLabels(labels);
1009
1010
try {
1011
// Send connection setup request
1012
DefaultRequestFuture<ConnectionSetupResponse> future =
1013
new DefaultRequestFuture<>(request.getRequestId(), request);
1014
1015
// Simulate sending request
1016
ConnectionSetupResponse response = future.get(5000, TimeUnit.MILLISECONDS);
1017
1018
if (response.isSuccess()) {
1019
connected = true;
1020
System.out.println("Connected to server: " + serverAddress);
1021
1022
// Start health checking
1023
startHealthCheck();
1024
return true;
1025
} else {
1026
System.err.println("Connection failed: " + response.getMessage());
1027
return false;
1028
}
1029
1030
} catch (Exception e) {
1031
System.err.println("Connection error: " + e.getMessage());
1032
return false;
1033
}
1034
}
1035
1036
// Start periodic health checking
1037
private void startHealthCheck() {
1038
healthCheckExecutor.scheduleWithFixedDelay(() -> {
1039
HealthCheckRequest healthRequest = new HealthCheckRequest();
1040
1041
try {
1042
DefaultRequestFuture<Response> future =
1043
new DefaultRequestFuture<>(healthRequest.getRequestId(), healthRequest);
1044
1045
Response response = future.get(3000, TimeUnit.MILLISECONDS);
1046
1047
if (!response.isSuccess()) {
1048
System.err.println("Health check failed: " + response.getMessage());
1049
connected = false;
1050
// Trigger reconnection logic
1051
scheduleReconnect();
1052
}
1053
1054
} catch (Exception e) {
1055
System.err.println("Health check error: " + e.getMessage());
1056
connected = false;
1057
scheduleReconnect();
1058
}
1059
}, 5000, 10000, TimeUnit.MILLISECONDS);
1060
}
1061
1062
// Schedule reconnection attempt
1063
private void scheduleReconnect() {
1064
if (!connected) {
1065
healthCheckExecutor.schedule(() -> {
1066
System.out.println("Attempting to reconnect...");
1067
// Retry connection logic here
1068
}, 5000, TimeUnit.MILLISECONDS);
1069
}
1070
}
1071
1072
public boolean isConnected() {
1073
return connected;
1074
}
1075
1076
public void disconnect() {
1077
connected = false;
1078
if (healthCheckExecutor != null) {
1079
healthCheckExecutor.shutdown();
1080
}
1081
}
1082
}
1083
```
1084
1085
### Server Push Handling
1086
1087
```java
1088
import com.alibaba.nacos.api.remote.PushCallBack;
1089
import com.alibaba.nacos.api.remote.AbstractPushCallBack;
1090
1091
// Custom push request handler
1092
public class ConfigPushHandler extends AbstractPushCallBack {
1093
1094
private final ConfigUpdateListener configListener;
1095
1096
public ConfigPushHandler(ConfigUpdateListener configListener) {
1097
super(Executors.newFixedThreadPool(4));
1098
this.configListener = configListener;
1099
}
1100
1101
@Override
1102
public Response requestReply(Request request) {
1103
System.out.println("Received push request: " + request.getRequestType());
1104
1105
try {
1106
if ("ConfigChangeNotifyRequest".equals(request.getRequestType())) {
1107
return handleConfigChange(request);
1108
} else if ("ServiceChangeNotifyRequest".equals(request.getRequestType())) {
1109
return handleServiceChange(request);
1110
} else {
1111
return createErrorResponse("Unknown request type: " + request.getRequestType());
1112
}
1113
1114
} catch (Exception e) {
1115
System.err.println("Error handling push request: " + e.getMessage());
1116
return createErrorResponse("Internal error: " + e.getMessage());
1117
}
1118
}
1119
1120
private Response handleConfigChange(Request request) {
1121
// Extract configuration change information from request
1122
String dataId = request.getHeader("dataId");
1123
String group = request.getHeader("group");
1124
String content = request.getHeader("content");
1125
1126
System.out.printf("Config changed: %s:%s%n", group, dataId);
1127
1128
// Notify listeners
1129
if (configListener != null) {
1130
configListener.onConfigChange(dataId, group, content);
1131
}
1132
1133
// Return success response
1134
Response response = new Response() {
1135
@Override
1136
public String getResponseType() {
1137
return "ConfigChangeNotifyResponse";
1138
}
1139
};
1140
response.setResultCode(ResponseCode.SUCCESS.getCode());
1141
response.setMessage("Config change processed successfully");
1142
1143
return response;
1144
}
1145
1146
private Response handleServiceChange(Request request) {
1147
// Handle service change notification
1148
String serviceName = request.getHeader("serviceName");
1149
String groupName = request.getHeader("groupName");
1150
1151
System.out.printf("Service changed: %s in group %s%n", serviceName, groupName);
1152
1153
// Process service change
1154
// ... implementation details
1155
1156
Response response = new Response() {
1157
@Override
1158
public String getResponseType() {
1159
return "ServiceChangeNotifyResponse";
1160
}
1161
};
1162
response.setResultCode(ResponseCode.SUCCESS.getCode());
1163
1164
return response;
1165
}
1166
1167
private Response createErrorResponse(String message) {
1168
return new ErrorResponse(ResponseCode.INTERNAL_SERVER_ERROR.getCode(), message);
1169
}
1170
}
1171
1172
// Configuration update listener interface
1173
interface ConfigUpdateListener {
1174
void onConfigChange(String dataId, String group, String content);
1175
}
1176
```
1177
1178
### Advanced Request Management
1179
1180
```java
1181
import java.util.concurrent.ConcurrentHashMap;
1182
import java.util.concurrent.atomic.AtomicLong;
1183
1184
public class RequestManager {
1185
1186
private final Map<String, DefaultRequestFuture<?>> pendingRequests = new ConcurrentHashMap<>();
1187
private final AtomicLong requestCounter = new AtomicLong(0);
1188
private final ScheduledExecutorService timeoutChecker;
1189
1190
public RequestManager() {
1191
this.timeoutChecker = Executors.newSingleThreadScheduledExecutor();
1192
startTimeoutChecker();
1193
}
1194
1195
// Send request with automatic timeout handling
1196
public <T extends Response> CompletableFuture<T> sendRequest(Request request, long timeoutMs) {
1197
CompletableFuture<T> future = new CompletableFuture<>();
1198
1199
// Generate unique request ID
1200
String requestId = "req-" + requestCounter.incrementAndGet() + "-" + System.currentTimeMillis();
1201
request.setRequestId(requestId);
1202
1203
// Create request future
1204
DefaultRequestFuture<T> requestFuture = new DefaultRequestFuture<>(requestId, request);
1205
pendingRequests.put(requestId, requestFuture);
1206
1207
// Set up timeout
1208
timeoutChecker.schedule(() -> {
1209
DefaultRequestFuture<?> pending = pendingRequests.remove(requestId);
1210
if (pending != null && !pending.isDone()) {
1211
pending.setFailResult(new TimeoutException("Request timeout after " + timeoutMs + "ms"));
1212
future.completeExceptionally(new TimeoutException("Request timeout"));
1213
}
1214
}, timeoutMs, TimeUnit.MILLISECONDS);
1215
1216
// Convert request future to CompletableFuture
1217
CompletableFuture.runAsync(() -> {
1218
try {
1219
T response = requestFuture.get();
1220
future.complete(response);
1221
} catch (Exception e) {
1222
future.completeExceptionally(e);
1223
}
1224
});
1225
1226
// Send actual request (pseudo-code)
1227
// sendToServer(request);
1228
1229
return future;
1230
}
1231
1232
// Handle response from server
1233
public void handleResponse(String requestId, Response response) {
1234
DefaultRequestFuture<?> future = pendingRequests.remove(requestId);
1235
if (future != null) {
1236
@SuppressWarnings("unchecked")
1237
DefaultRequestFuture<Response> typedFuture = (DefaultRequestFuture<Response>) future;
1238
typedFuture.setResponse(response);
1239
}
1240
}
1241
1242
// Handle request failure
1243
public void handleRequestFailure(String requestId, Throwable throwable) {
1244
DefaultRequestFuture<?> future = pendingRequests.remove(requestId);
1245
if (future != null) {
1246
future.setFailResult(throwable);
1247
}
1248
}
1249
1250
// Start periodic timeout checker
1251
private void startTimeoutChecker() {
1252
timeoutChecker.scheduleWithFixedDelay(() -> {
1253
long currentTime = System.currentTimeMillis();
1254
1255
pendingRequests.entrySet().removeIf(entry -> {
1256
DefaultRequestFuture<?> future = entry.getValue();
1257
1258
// Check if request has timed out (simplified logic)
1259
if (future.isTimeout()) {
1260
future.setFailResult(new TimeoutException("Request expired"));
1261
return true;
1262
}
1263
1264
return false;
1265
});
1266
1267
}, 1000, 1000, TimeUnit.MILLISECONDS);
1268
}
1269
1270
// Get pending request count
1271
public int getPendingRequestCount() {
1272
return pendingRequests.size();
1273
}
1274
1275
// Shutdown request manager
1276
public void shutdown() {
1277
timeoutChecker.shutdown();
1278
1279
// Cancel all pending requests
1280
pendingRequests.values().forEach(future -> {
1281
if (!future.isDone()) {
1282
future.setFailResult(new RuntimeException("RequestManager shutdown"));
1283
}
1284
});
1285
1286
pendingRequests.clear();
1287
}
1288
}
1289
```
1290
1291
### Retry and Circuit Breaker Patterns
1292
1293
```java
1294
import java.util.concurrent.atomic.AtomicInteger;
1295
import java.time.LocalDateTime;
1296
import java.time.Duration;
1297
1298
public class ResilientRequestHandler {
1299
1300
private final RequestManager requestManager;
1301
private final AtomicInteger failureCount = new AtomicInteger(0);
1302
private volatile LocalDateTime lastFailureTime;
1303
private volatile boolean circuitOpen = false;
1304
1305
// Circuit breaker thresholds
1306
private final int failureThreshold = 5;
1307
private final Duration circuitOpenDuration = Duration.ofMinutes(1);
1308
1309
public ResilientRequestHandler(RequestManager requestManager) {
1310
this.requestManager = requestManager;
1311
}
1312
1313
// Send request with retry and circuit breaker
1314
public <T extends Response> CompletableFuture<T> sendResilientRequest(Request request, int maxRetries) {
1315
1316
// Check circuit breaker
1317
if (isCircuitOpen()) {
1318
return CompletableFuture.failedFuture(
1319
new RuntimeException("Circuit breaker is open"));
1320
}
1321
1322
return sendWithRetry(request, maxRetries, 0);
1323
}
1324
1325
private <T extends Response> CompletableFuture<T> sendWithRetry(Request request, int maxRetries, int attempt) {
1326
1327
return requestManager.<T>sendRequest(request, 5000L)
1328
.handle((response, throwable) -> {
1329
if (throwable == null && response.isSuccess()) {
1330
// Success - reset failure count
1331
resetCircuitBreaker();
1332
return CompletableFuture.completedFuture(response);
1333
} else {
1334
// Failure - increment failure count
1335
recordFailure();
1336
1337
if (attempt < maxRetries) {
1338
// Retry with exponential backoff
1339
long delay = (long) Math.pow(2, attempt) * 1000; // 1s, 2s, 4s, 8s...
1340
1341
return CompletableFuture.<T>failedFuture(
1342
throwable != null ? throwable :
1343
new RuntimeException("Request failed: " + response.getMessage())
1344
).handle((r, t) -> {
1345
try {
1346
Thread.sleep(delay);
1347
return sendWithRetry(request, maxRetries, attempt + 1).join();
1348
} catch (InterruptedException e) {
1349
Thread.currentThread().interrupt();
1350
return CompletableFuture.<T>failedFuture(e).join();
1351
}
1352
});
1353
} else {
1354
// Max retries exceeded
1355
return CompletableFuture.<T>failedFuture(
1356
throwable != null ? throwable :
1357
new RuntimeException("Max retries exceeded. Last error: " + response.getMessage())
1358
);
1359
}
1360
}
1361
})
1362
.thenCompose(Function.identity());
1363
}
1364
1365
private boolean isCircuitOpen() {
1366
if (circuitOpen && lastFailureTime != null) {
1367
// Check if circuit should be closed
1368
if (Duration.between(lastFailureTime, LocalDateTime.now()).compareTo(circuitOpenDuration) > 0) {
1369
circuitOpen = false;
1370
failureCount.set(0);
1371
System.out.println("Circuit breaker closed - retrying requests");
1372
}
1373
}
1374
1375
return circuitOpen;
1376
}
1377
1378
private void recordFailure() {
1379
int failures = failureCount.incrementAndGet();
1380
lastFailureTime = LocalDateTime.now();
1381
1382
if (failures >= failureThreshold) {
1383
circuitOpen = true;
1384
System.err.printf("Circuit breaker opened after %d failures%n", failures);
1385
}
1386
}
1387
1388
private void resetCircuitBreaker() {
1389
failureCount.set(0);
1390
circuitOpen = false;
1391
lastFailureTime = null;
1392
}
1393
}