or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ai-mcp.mdconfiguration.mdcore-api.mdexceptions.mdindex.mdnaming.mdremote.md

remote.mddocs/

0

# Remote Communication

1

2

Low-level remote communication infrastructure including gRPC support, request/response handling, connection management, and callback mechanisms for building robust client-server communication.

3

4

## Capabilities

5

6

### Request and Response Base Classes

7

8

Foundation classes for all remote communication operations providing common functionality for headers, request IDs, and response handling.

9

10

```java { .api }

11

/**

12

* Base request class for remote communication

13

*/

14

abstract class Request implements Payload {

15

/** Request headers */

16

private Map<String, String> headers = new HashMap<>();

17

18

/** Unique request identifier */

19

private String requestId;

20

21

/**

22

* Add header to request

23

* @param key Header key

24

* @param value Header value

25

*/

26

public void putHeader(String key, String value);

27

28

/**

29

* Add multiple headers to request

30

* @param headers Map of headers to add

31

*/

32

public void putAllHeader(Map<String, String> headers);

33

34

/**

35

* Get header value by key

36

* @param key Header key

37

* @return Header value or null if not found

38

*/

39

public String getHeader(String key);

40

41

/**

42

* Get all headers

43

* @return Map of all headers

44

*/

45

public Map<String, String> getHeaders();

46

47

/**

48

* Get request identifier

49

* @return Unique request ID

50

*/

51

public String getRequestId();

52

53

/**

54

* Set request identifier

55

* @param requestId Unique request ID

56

*/

57

public void setRequestId(String requestId);

58

59

/**

60

* Get request type for routing

61

* @return Request type string

62

*/

63

public abstract String getRequestType();

64

}

65

66

/**

67

* Base response class for remote communication

68

*/

69

abstract class Response implements Payload {

70

/** Response result code */

71

private int resultCode = ResponseCode.SUCCESS.getCode();

72

73

/** Error code for failures */

74

private int errorCode = 0;

75

76

/** Response message */

77

private String message;

78

79

/** Request ID this response corresponds to */

80

private String requestId;

81

82

/**

83

* Check if response indicates success

84

* @return true if operation was successful

85

*/

86

public boolean isSuccess();

87

88

/**

89

* Get result code

90

* @return Response result code

91

*/

92

public int getResultCode();

93

94

/**

95

* Set result code

96

* @param resultCode Response result code

97

*/

98

public void setResultCode(int resultCode);

99

100

/**

101

* Get error code

102

* @return Error code for failures

103

*/

104

public int getErrorCode();

105

106

/**

107

* Set error code

108

* @param errorCode Error code for failures

109

*/

110

public void setErrorCode(int errorCode);

111

112

/**

113

* Get response message

114

* @return Response message

115

*/

116

public String getMessage();

117

118

/**

119

* Set response message

120

* @param message Response message

121

*/

122

public void setMessage(String message);

123

124

/**

125

* Get request ID

126

* @return Request ID this response corresponds to

127

*/

128

public String getRequestId();

129

130

/**

131

* Set request ID

132

* @param requestId Request ID

133

*/

134

public void setRequestId(String requestId);

135

136

/**

137

* Get response type for routing

138

* @return Response type string

139

*/

140

public abstract String getResponseType();

141

}

142

143

/**

144

* Base payload interface for serialization

145

*/

146

interface Payload extends Serializable {

147

// Marker interface for serializable payloads

148

}

149

```

150

151

### Connection and Health Check Requests

152

153

Standard request types for connection management and server health monitoring.

154

155

```java { .api }

156

/**

157

* Connection setup request for establishing client-server connection

158

*/

159

class ConnectionSetupRequest extends Request {

160

/** Client version information */

161

private String clientVersion;

162

163

/** Client abilities */

164

private ClientAbilities abilities;

165

166

/** Tenant information */

167

private String tenant;

168

169

/** Labels for client identification */

170

private Map<String, String> labels;

171

172

/**

173

* Default constructor

174

*/

175

public ConnectionSetupRequest();

176

177

/**

178

* Get client version

179

* @return Client version string

180

*/

181

public String getClientVersion();

182

183

/**

184

* Set client version

185

* @param clientVersion Client version string

186

*/

187

public void setClientVersion(String clientVersion);

188

189

/**

190

* Get client abilities

191

* @return Client abilities object

192

*/

193

public ClientAbilities getAbilities();

194

195

/**

196

* Set client abilities

197

* @param abilities Client abilities object

198

*/

199

public void setAbilities(ClientAbilities abilities);

200

201

/**

202

* Get tenant information

203

* @return Tenant string

204

*/

205

public String getTenant();

206

207

/**

208

* Set tenant information

209

* @param tenant Tenant string

210

*/

211

public void setTenant(String tenant);

212

213

/**

214

* Get client labels

215

* @return Map of client labels

216

*/

217

public Map<String, String> getLabels();

218

219

/**

220

* Set client labels

221

* @param labels Map of client labels

222

*/

223

public void setLabels(Map<String, String> labels);

224

225

@Override

226

public String getRequestType();

227

}

228

229

/**

230

* Health check request for monitoring server status

231

*/

232

class HealthCheckRequest extends Request {

233

/**

234

* Default constructor

235

*/

236

public HealthCheckRequest();

237

238

@Override

239

public String getRequestType();

240

}

241

242

/**

243

* Server check request for server capabilities inquiry

244

*/

245

class ServerCheckRequest extends Request {

246

/**

247

* Default constructor

248

*/

249

public ServerCheckRequest();

250

251

@Override

252

public String getRequestType();

253

}

254

```

255

256

### Response Types and Error Handling

257

258

Standard response classes with error codes and status information.

259

260

```java { .api }

261

/**

262

* Connection setup response

263

*/

264

class ConnectionSetupResponse extends Response {

265

/** Server abilities */

266

private ServerAbilities serverAbilities;

267

268

/**

269

* Default constructor

270

*/

271

public ConnectionSetupResponse();

272

273

/**

274

* Get server abilities

275

* @return Server abilities object

276

*/

277

public ServerAbilities getServerAbilities();

278

279

/**

280

* Set server abilities

281

* @param serverAbilities Server abilities object

282

*/

283

public void setServerAbilities(ServerAbilities serverAbilities);

284

285

@Override

286

public String getResponseType();

287

}

288

289

/**

290

* Health check response

291

*/

292

class HealthCheckResponse extends Response {

293

/**

294

* Default constructor

295

*/

296

public HealthCheckResponse();

297

298

@Override

299

public String getResponseType();

300

}

301

302

/**

303

* Error response for failed operations

304

*/

305

class ErrorResponse extends Response {

306

/**

307

* Constructor with error code and message

308

* @param errorCode Error code

309

* @param message Error message

310

*/

311

public ErrorResponse(int errorCode, String message);

312

313

@Override

314

public String getResponseType();

315

}

316

317

/**

318

* Response code enumeration

319

*/

320

enum ResponseCode {

321

/** Success response */

322

SUCCESS(200, "Success"),

323

324

/** Generic failure */

325

FAIL(500, "Fail"),

326

327

/** Invalid parameters */

328

PARAMETER_MISSING(400, "Parameter Missing"),

329

330

/** Resource not found */

331

RESOURCE_NOT_FOUND(404, "Resource Not Found"),

332

333

/** Server internal error */

334

INTERNAL_SERVER_ERROR(500, "Internal Server Error");

335

336

/** Response code */

337

private final int code;

338

339

/** Response message */

340

private final String message;

341

342

/**

343

* Constructor

344

* @param code Response code

345

* @param message Response message

346

*/

347

ResponseCode(int code, String message);

348

349

/**

350

* Get response code

351

* @return Response code

352

*/

353

public int getCode();

354

355

/**

356

* Get response message

357

* @return Response message

358

*/

359

public String getMessage();

360

}

361

```

362

363

### Callback Interfaces

364

365

Asynchronous callback mechanisms for handling responses and server push notifications.

366

367

```java { .api }

368

/**

369

* Request callback interface for handling async responses

370

*/

371

interface RequestCallBack<T extends Response> {

372

/**

373

* Called when response is received successfully

374

* @param response Response object

375

*/

376

void onResponse(T response);

377

378

/**

379

* Called when request fails with exception

380

* @param e Exception that occurred

381

*/

382

void onException(Throwable e);

383

384

/**

385

* Get executor for callback processing

386

* @return Executor for async processing, null for current thread

387

*/

388

default Executor getExecutor() {

389

return null;

390

}

391

392

/**

393

* Get request timeout in milliseconds

394

* @return Timeout value, 0 for no timeout

395

*/

396

default long getTimeout() {

397

return 3000L;

398

}

399

}

400

401

/**

402

* Abstract request callback implementation

403

*/

404

abstract class AbstractRequestCallBack<T extends Response> implements RequestCallBack<T> {

405

/** Executor for callback processing */

406

private Executor executor;

407

408

/** Request timeout */

409

private long timeout = 3000L;

410

411

/**

412

* Default constructor

413

*/

414

public AbstractRequestCallBack();

415

416

/**

417

* Constructor with executor

418

* @param executor Executor for callback processing

419

*/

420

public AbstractRequestCallBack(Executor executor);

421

422

/**

423

* Constructor with executor and timeout

424

* @param executor Executor for callback processing

425

* @param timeout Request timeout in milliseconds

426

*/

427

public AbstractRequestCallBack(Executor executor, long timeout);

428

429

@Override

430

public Executor getExecutor();

431

432

@Override

433

public long getTimeout();

434

435

/**

436

* Abstract response handler to be implemented

437

* @param response Response object

438

*/

439

@Override

440

public abstract void onResponse(T response);

441

442

/**

443

* Abstract exception handler to be implemented

444

* @param e Exception that occurred

445

*/

446

@Override

447

public abstract void onException(Throwable e);

448

}

449

450

/**

451

* Push callback interface for server-initiated communications

452

*/

453

interface PushCallBack {

454

/**

455

* Handle server push request

456

* @param request Push request from server

457

* @return Response to send back to server

458

*/

459

Response requestReply(Request request);

460

461

/**

462

* Get executor for push processing

463

* @return Executor for async processing

464

*/

465

default Executor getExecutor() {

466

return null;

467

}

468

}

469

470

/**

471

* Abstract push callback implementation

472

*/

473

abstract class AbstractPushCallBack implements PushCallBack {

474

/** Executor for push processing */

475

private Executor executor;

476

477

/**

478

* Default constructor

479

*/

480

public AbstractPushCallBack();

481

482

/**

483

* Constructor with executor

484

* @param executor Executor for push processing

485

*/

486

public AbstractPushCallBack(Executor executor);

487

488

@Override

489

public Executor getExecutor();

490

491

/**

492

* Abstract push request handler to be implemented

493

* @param request Push request from server

494

* @return Response to send back to server

495

*/

496

@Override

497

public abstract Response requestReply(Request request);

498

}

499

```

500

501

### Future and Async Operations

502

503

Future-based operations for handling asynchronous requests and responses.

504

505

```java { .api }

506

/**

507

* Future interface for async request operations

508

*/

509

interface RequestFuture<T> extends Future<T> {

510

/**

511

* Get request ID

512

* @return Request ID

513

*/

514

String getRequestId();

515

516

/**

517

* Check if request timed out

518

* @return true if request timed out

519

*/

520

boolean isTimeout();

521

522

/**

523

* Get the original request

524

* @return Original request object

525

*/

526

Request getRequest();

527

528

/**

529

* Set the response

530

* @param response Response object

531

*/

532

void setResponse(T response);

533

534

/**

535

* Set exception

536

* @param throwable Exception that occurred

537

*/

538

void setFailResult(Throwable throwable);

539

}

540

541

/**

542

* Default implementation of RequestFuture

543

*/

544

class DefaultRequestFuture<T> implements RequestFuture<T> {

545

/** Request ID */

546

private final String requestId;

547

548

/** Original request */

549

private final Request request;

550

551

/** Response object */

552

private volatile T response;

553

554

/** Exception if failed */

555

private volatile Throwable exception;

556

557

/** Completion flag */

558

private volatile boolean done = false;

559

560

/** Timeout flag */

561

private volatile boolean timeout = false;

562

563

/** Completion latch */

564

private final CountDownLatch latch = new CountDownLatch(1);

565

566

/**

567

* Constructor

568

* @param requestId Request ID

569

* @param request Original request

570

*/

571

public DefaultRequestFuture(String requestId, Request request);

572

573

@Override

574

public String getRequestId();

575

576

@Override

577

public boolean isTimeout();

578

579

@Override

580

public Request getRequest();

581

582

@Override

583

public void setResponse(T response);

584

585

@Override

586

public void setFailResult(Throwable throwable);

587

588

@Override

589

public boolean cancel(boolean mayInterruptIfRunning);

590

591

@Override

592

public boolean isCancelled();

593

594

@Override

595

public boolean isDone();

596

597

@Override

598

public T get() throws InterruptedException, ExecutionException;

599

600

@Override

601

public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

602

}

603

```

604

605

### gRPC Integration

606

607

Auto-generated gRPC classes for Protocol Buffers-based communication.

608

609

```java { .api }

610

/**

611

* Nacos gRPC service definitions (auto-generated)

612

*/

613

class NacosGrpcService {

614

/**

615

* Get service descriptor

616

* @return Service descriptor for gRPC

617

*/

618

public static ServiceDescriptor getServiceDescriptor();

619

620

/**

621

* Create stub for gRPC calls

622

* @param channel gRPC channel

623

* @return Service stub

624

*/

625

public static NacosGrpcServiceStub newStub(Channel channel);

626

627

/**

628

* Create blocking stub for synchronous gRPC calls

629

* @param channel gRPC channel

630

* @return Blocking service stub

631

*/

632

public static NacosGrpcServiceBlockingStub newBlockingStub(Channel channel);

633

}

634

635

/**

636

* gRPC payload message (auto-generated)

637

*/

638

class Payload implements Serializable {

639

/** Metadata for the payload */

640

private Metadata metadata;

641

642

/** Body content */

643

private Any body;

644

645

/**

646

* Get metadata

647

* @return Payload metadata

648

*/

649

public Metadata getMetadata();

650

651

/**

652

* Set metadata

653

* @param metadata Payload metadata

654

*/

655

public void setMetadata(Metadata metadata);

656

657

/**

658

* Get body content

659

* @return Body as Any type

660

*/

661

public Any getBody();

662

663

/**

664

* Set body content

665

* @param body Body as Any type

666

*/

667

public void setBody(Any body);

668

}

669

670

/**

671

* gRPC metadata message (auto-generated)

672

*/

673

class Metadata implements Serializable {

674

/** Message type */

675

private String type;

676

677

/** Client IP */

678

private String clientIp;

679

680

/** Headers */

681

private Map<String, String> headers;

682

683

/**

684

* Get message type

685

* @return Message type

686

*/

687

public String getType();

688

689

/**

690

* Set message type

691

* @param type Message type

692

*/

693

public void setType(String type);

694

695

/**

696

* Get client IP

697

* @return Client IP address

698

*/

699

public String getClientIp();

700

701

/**

702

* Set client IP

703

* @param clientIp Client IP address

704

*/

705

public void setClientIp(String clientIp);

706

707

/**

708

* Get headers

709

* @return Map of headers

710

*/

711

public Map<String, String> getHeaders();

712

713

/**

714

* Set headers

715

* @param headers Map of headers

716

*/

717

public void setHeaders(Map<String, String> headers);

718

}

719

720

/**

721

* Bidirectional stream gRPC service (auto-generated)

722

*/

723

class BiRequestStreamGrpc {

724

/**

725

* Get method descriptor for bidirectional streaming

726

* @return Method descriptor

727

*/

728

public static MethodDescriptor<Payload, Payload> getRequestBiStreamMethod();

729

730

/**

731

* Create stub for bidirectional streaming

732

* @param channel gRPC channel

733

* @return Stub for bidirectional streaming

734

*/

735

public static BiRequestStreamStub newStub(Channel channel);

736

}

737

738

/**

739

* Request-response gRPC service (auto-generated)

740

*/

741

class RequestGrpc {

742

/**

743

* Get method descriptor for unary requests

744

* @return Method descriptor

745

*/

746

public static MethodDescriptor<Payload, Payload> getRequestMethod();

747

748

/**

749

* Create stub for unary requests

750

* @param channel gRPC channel

751

* @return Stub for unary requests

752

*/

753

public static RequestStub newStub(Channel channel);

754

755

/**

756

* Create blocking stub for synchronous unary requests

757

* @param channel gRPC channel

758

* @return Blocking stub for unary requests

759

*/

760

public static RequestBlockingStub newBlockingStub(Channel channel);

761

}

762

```

763

764

### Utilities and Executors

765

766

Utility classes for remote communication operations and thread management.

767

768

```java { .api }

769

/**

770

* Scheduled executor for RPC operations

771

*/

772

class RpcScheduledExecutor {

773

/** Default executor instance */

774

private static final ScheduledExecutorService EXECUTOR;

775

776

/**

777

* Get default scheduled executor

778

* @return Scheduled executor service

779

*/

780

public static ScheduledExecutorService getExecutor();

781

782

/**

783

* Schedule task with delay

784

* @param command Task to execute

785

* @param delay Delay before execution

786

* @param unit Time unit for delay

787

* @return ScheduledFuture for the task

788

*/

789

public static ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

790

791

/**

792

* Schedule task with fixed rate

793

* @param command Task to execute

794

* @param initialDelay Initial delay

795

* @param period Period between executions

796

* @param unit Time unit

797

* @return ScheduledFuture for the task

798

*/

799

public static ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

800

801

/**

802

* Schedule task with fixed delay

803

* @param command Task to execute

804

* @param initialDelay Initial delay

805

* @param delay Delay between executions

806

* @param unit Time unit

807

* @return ScheduledFuture for the task

808

*/

809

public static ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);

810

}

811

812

/**

813

* Remote communication constants

814

*/

815

class RemoteConstants {

816

/** Default request timeout */

817

public static final long DEFAULT_TIMEOUT_MILLS = 3000L;

818

819

/** Connection setup timeout */

820

public static final long CONNECT_TIMEOUT_MILLS = 3000L;

821

822

/** Keep alive interval */

823

public static final long KEEP_ALIVE_MILLS = 5000L;

824

825

/** Default retry times */

826

public static final int DEFAULT_RETRY_TIMES = 3;

827

828

/** Max retry times */

829

public static final int MAX_RETRY_TIMES = 5;

830

831

/** Connection pool size */

832

public static final int DEFAULT_CONNECTION_POOL_SIZE = 8;

833

}

834

```

835

836

## Usage Examples

837

838

### Basic Request-Response Communication

839

840

```java

841

import com.alibaba.nacos.api.remote.request.Request;

842

import com.alibaba.nacos.api.remote.response.Response;

843

import com.alibaba.nacos.api.remote.RequestCallBack;

844

import com.alibaba.nacos.api.remote.DefaultRequestFuture;

845

846

// Custom request implementation

847

public class CustomRequest extends Request {

848

private String data;

849

850

public CustomRequest(String data) {

851

this.data = data;

852

setRequestId(UUID.randomUUID().toString());

853

putHeader("Content-Type", "application/json");

854

putHeader("User-Agent", "Nacos-Client");

855

}

856

857

public String getData() {

858

return data;

859

}

860

861

@Override

862

public String getRequestType() {

863

return "CustomRequest";

864

}

865

}

866

867

// Custom response implementation

868

public class CustomResponse extends Response {

869

private String result;

870

871

public CustomResponse() {}

872

873

public String getResult() {

874

return result;

875

}

876

877

public void setResult(String result) {

878

this.result = result;

879

}

880

881

@Override

882

public String getResponseType() {

883

return "CustomResponse";

884

}

885

}

886

887

// Synchronous request-response

888

public CustomResponse sendSynchronousRequest(String data) {

889

CustomRequest request = new CustomRequest(data);

890

891

try {

892

// Create future for the request

893

DefaultRequestFuture<CustomResponse> future = new DefaultRequestFuture<>(

894

request.getRequestId(), request);

895

896

// Send request (pseudo-code - actual implementation would use client)

897

// remoteClient.sendRequest(request, future);

898

899

// Wait for response with timeout

900

CustomResponse response = future.get(5000, TimeUnit.MILLISECONDS);

901

902

if (response.isSuccess()) {

903

System.out.println("Request successful: " + response.getResult());

904

return response;

905

} else {

906

System.err.println("Request failed: " + response.getMessage());

907

return null;

908

}

909

910

} catch (TimeoutException e) {

911

System.err.println("Request timed out");

912

return null;

913

} catch (Exception e) {

914

System.err.println("Request failed with exception: " + e.getMessage());

915

return null;

916

}

917

}

918

```

919

920

### Asynchronous Communication with Callbacks

921

922

```java

923

import com.alibaba.nacos.api.remote.AbstractRequestCallBack;

924

import java.util.concurrent.Executors;

925

import java.util.concurrent.CompletableFuture;

926

927

// Asynchronous request with callback

928

public void sendAsynchronousRequest(String data, Consumer<String> onSuccess, Consumer<String> onError) {

929

CustomRequest request = new CustomRequest(data);

930

931

RequestCallBack<CustomResponse> callback = new AbstractRequestCallBack<CustomResponse>(

932

Executors.newSingleThreadExecutor(), 10000L) {

933

934

@Override

935

public void onResponse(CustomResponse response) {

936

if (response.isSuccess()) {

937

System.out.println("Async request successful: " + response.getResult());

938

onSuccess.accept(response.getResult());

939

} else {

940

System.err.println("Async request failed: " + response.getMessage());

941

onError.accept(response.getMessage());

942

}

943

}

944

945

@Override

946

public void onException(Throwable e) {

947

System.err.println("Async request failed with exception: " + e.getMessage());

948

onError.accept(e.getMessage());

949

}

950

};

951

952

// Send async request (pseudo-code)

953

// remoteClient.sendAsyncRequest(request, callback);

954

}

955

956

// Multiple parallel requests

957

public CompletableFuture<List<String>> sendParallelRequests(List<String> dataList) {

958

List<CompletableFuture<String>> futures = dataList.stream()

959

.map(data -> {

960

CompletableFuture<String> future = new CompletableFuture<>();

961

962

sendAsynchronousRequest(data,

963

result -> future.complete(result),

964

error -> future.completeExceptionally(new RuntimeException(error))

965

);

966

967

return future;

968

})

969

.collect(Collectors.toList());

970

971

// Combine all futures

972

return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))

973

.thenApply(v -> futures.stream()

974

.map(CompletableFuture::join)

975

.collect(Collectors.toList()));

976

}

977

```

978

979

### Connection Management

980

981

```java

982

import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;

983

import com.alibaba.nacos.api.remote.response.ConnectionSetupResponse;

984

import com.alibaba.nacos.api.remote.request.HealthCheckRequest;

985

986

public class ConnectionManager {

987

988

private volatile boolean connected = false;

989

private String clientId;

990

private ScheduledExecutorService healthCheckExecutor;

991

992

public ConnectionManager(String clientId) {

993

this.clientId = clientId;

994

this.healthCheckExecutor = Executors.newSingleThreadScheduledExecutor();

995

}

996

997

// Establish connection to server

998

public boolean connect(String serverAddress) {

999

ConnectionSetupRequest request = new ConnectionSetupRequest();

1000

request.setClientVersion("3.0.2");

1001

request.setTenant("default");

1002

1003

// Set client labels

1004

Map<String, String> labels = new HashMap<>();

1005

labels.put("clientId", clientId);

1006

labels.put("appName", "my-application");

1007

labels.put("environment", "production");

1008

request.setLabels(labels);

1009

1010

try {

1011

// Send connection setup request

1012

DefaultRequestFuture<ConnectionSetupResponse> future =

1013

new DefaultRequestFuture<>(request.getRequestId(), request);

1014

1015

// Simulate sending request

1016

ConnectionSetupResponse response = future.get(5000, TimeUnit.MILLISECONDS);

1017

1018

if (response.isSuccess()) {

1019

connected = true;

1020

System.out.println("Connected to server: " + serverAddress);

1021

1022

// Start health checking

1023

startHealthCheck();

1024

return true;

1025

} else {

1026

System.err.println("Connection failed: " + response.getMessage());

1027

return false;

1028

}

1029

1030

} catch (Exception e) {

1031

System.err.println("Connection error: " + e.getMessage());

1032

return false;

1033

}

1034

}

1035

1036

// Start periodic health checking

1037

private void startHealthCheck() {

1038

healthCheckExecutor.scheduleWithFixedDelay(() -> {

1039

HealthCheckRequest healthRequest = new HealthCheckRequest();

1040

1041

try {

1042

DefaultRequestFuture<Response> future =

1043

new DefaultRequestFuture<>(healthRequest.getRequestId(), healthRequest);

1044

1045

Response response = future.get(3000, TimeUnit.MILLISECONDS);

1046

1047

if (!response.isSuccess()) {

1048

System.err.println("Health check failed: " + response.getMessage());

1049

connected = false;

1050

// Trigger reconnection logic

1051

scheduleReconnect();

1052

}

1053

1054

} catch (Exception e) {

1055

System.err.println("Health check error: " + e.getMessage());

1056

connected = false;

1057

scheduleReconnect();

1058

}

1059

}, 5000, 10000, TimeUnit.MILLISECONDS);

1060

}

1061

1062

// Schedule reconnection attempt

1063

private void scheduleReconnect() {

1064

if (!connected) {

1065

healthCheckExecutor.schedule(() -> {

1066

System.out.println("Attempting to reconnect...");

1067

// Retry connection logic here

1068

}, 5000, TimeUnit.MILLISECONDS);

1069

}

1070

}

1071

1072

public boolean isConnected() {

1073

return connected;

1074

}

1075

1076

public void disconnect() {

1077

connected = false;

1078

if (healthCheckExecutor != null) {

1079

healthCheckExecutor.shutdown();

1080

}

1081

}

1082

}

1083

```

1084

1085

### Server Push Handling

1086

1087

```java

1088

import com.alibaba.nacos.api.remote.PushCallBack;

1089

import com.alibaba.nacos.api.remote.AbstractPushCallBack;

1090

1091

// Custom push request handler

1092

public class ConfigPushHandler extends AbstractPushCallBack {

1093

1094

private final ConfigUpdateListener configListener;

1095

1096

public ConfigPushHandler(ConfigUpdateListener configListener) {

1097

super(Executors.newFixedThreadPool(4));

1098

this.configListener = configListener;

1099

}

1100

1101

@Override

1102

public Response requestReply(Request request) {

1103

System.out.println("Received push request: " + request.getRequestType());

1104

1105

try {

1106

if ("ConfigChangeNotifyRequest".equals(request.getRequestType())) {

1107

return handleConfigChange(request);

1108

} else if ("ServiceChangeNotifyRequest".equals(request.getRequestType())) {

1109

return handleServiceChange(request);

1110

} else {

1111

return createErrorResponse("Unknown request type: " + request.getRequestType());

1112

}

1113

1114

} catch (Exception e) {

1115

System.err.println("Error handling push request: " + e.getMessage());

1116

return createErrorResponse("Internal error: " + e.getMessage());

1117

}

1118

}

1119

1120

private Response handleConfigChange(Request request) {

1121

// Extract configuration change information from request

1122

String dataId = request.getHeader("dataId");

1123

String group = request.getHeader("group");

1124

String content = request.getHeader("content");

1125

1126

System.out.printf("Config changed: %s:%s%n", group, dataId);

1127

1128

// Notify listeners

1129

if (configListener != null) {

1130

configListener.onConfigChange(dataId, group, content);

1131

}

1132

1133

// Return success response

1134

Response response = new Response() {

1135

@Override

1136

public String getResponseType() {

1137

return "ConfigChangeNotifyResponse";

1138

}

1139

};

1140

response.setResultCode(ResponseCode.SUCCESS.getCode());

1141

response.setMessage("Config change processed successfully");

1142

1143

return response;

1144

}

1145

1146

private Response handleServiceChange(Request request) {

1147

// Handle service change notification

1148

String serviceName = request.getHeader("serviceName");

1149

String groupName = request.getHeader("groupName");

1150

1151

System.out.printf("Service changed: %s in group %s%n", serviceName, groupName);

1152

1153

// Process service change

1154

// ... implementation details

1155

1156

Response response = new Response() {

1157

@Override

1158

public String getResponseType() {

1159

return "ServiceChangeNotifyResponse";

1160

}

1161

};

1162

response.setResultCode(ResponseCode.SUCCESS.getCode());

1163

1164

return response;

1165

}

1166

1167

private Response createErrorResponse(String message) {

1168

return new ErrorResponse(ResponseCode.INTERNAL_SERVER_ERROR.getCode(), message);

1169

}

1170

}

1171

1172

// Configuration update listener interface

1173

interface ConfigUpdateListener {

1174

void onConfigChange(String dataId, String group, String content);

1175

}

1176

```

1177

1178

### Advanced Request Management

1179

1180

```java

1181

import java.util.concurrent.ConcurrentHashMap;

1182

import java.util.concurrent.atomic.AtomicLong;

1183

1184

public class RequestManager {

1185

1186

private final Map<String, DefaultRequestFuture<?>> pendingRequests = new ConcurrentHashMap<>();

1187

private final AtomicLong requestCounter = new AtomicLong(0);

1188

private final ScheduledExecutorService timeoutChecker;

1189

1190

public RequestManager() {

1191

this.timeoutChecker = Executors.newSingleThreadScheduledExecutor();

1192

startTimeoutChecker();

1193

}

1194

1195

// Send request with automatic timeout handling

1196

public <T extends Response> CompletableFuture<T> sendRequest(Request request, long timeoutMs) {

1197

CompletableFuture<T> future = new CompletableFuture<>();

1198

1199

// Generate unique request ID

1200

String requestId = "req-" + requestCounter.incrementAndGet() + "-" + System.currentTimeMillis();

1201

request.setRequestId(requestId);

1202

1203

// Create request future

1204

DefaultRequestFuture<T> requestFuture = new DefaultRequestFuture<>(requestId, request);

1205

pendingRequests.put(requestId, requestFuture);

1206

1207

// Set up timeout

1208

timeoutChecker.schedule(() -> {

1209

DefaultRequestFuture<?> pending = pendingRequests.remove(requestId);

1210

if (pending != null && !pending.isDone()) {

1211

pending.setFailResult(new TimeoutException("Request timeout after " + timeoutMs + "ms"));

1212

future.completeExceptionally(new TimeoutException("Request timeout"));

1213

}

1214

}, timeoutMs, TimeUnit.MILLISECONDS);

1215

1216

// Convert request future to CompletableFuture

1217

CompletableFuture.runAsync(() -> {

1218

try {

1219

T response = requestFuture.get();

1220

future.complete(response);

1221

} catch (Exception e) {

1222

future.completeExceptionally(e);

1223

}

1224

});

1225

1226

// Send actual request (pseudo-code)

1227

// sendToServer(request);

1228

1229

return future;

1230

}

1231

1232

// Handle response from server

1233

public void handleResponse(String requestId, Response response) {

1234

DefaultRequestFuture<?> future = pendingRequests.remove(requestId);

1235

if (future != null) {

1236

@SuppressWarnings("unchecked")

1237

DefaultRequestFuture<Response> typedFuture = (DefaultRequestFuture<Response>) future;

1238

typedFuture.setResponse(response);

1239

}

1240

}

1241

1242

// Handle request failure

1243

public void handleRequestFailure(String requestId, Throwable throwable) {

1244

DefaultRequestFuture<?> future = pendingRequests.remove(requestId);

1245

if (future != null) {

1246

future.setFailResult(throwable);

1247

}

1248

}

1249

1250

// Start periodic timeout checker

1251

private void startTimeoutChecker() {

1252

timeoutChecker.scheduleWithFixedDelay(() -> {

1253

long currentTime = System.currentTimeMillis();

1254

1255

pendingRequests.entrySet().removeIf(entry -> {

1256

DefaultRequestFuture<?> future = entry.getValue();

1257

1258

// Check if request has timed out (simplified logic)

1259

if (future.isTimeout()) {

1260

future.setFailResult(new TimeoutException("Request expired"));

1261

return true;

1262

}

1263

1264

return false;

1265

});

1266

1267

}, 1000, 1000, TimeUnit.MILLISECONDS);

1268

}

1269

1270

// Get pending request count

1271

public int getPendingRequestCount() {

1272

return pendingRequests.size();

1273

}

1274

1275

// Shutdown request manager

1276

public void shutdown() {

1277

timeoutChecker.shutdown();

1278

1279

// Cancel all pending requests

1280

pendingRequests.values().forEach(future -> {

1281

if (!future.isDone()) {

1282

future.setFailResult(new RuntimeException("RequestManager shutdown"));

1283

}

1284

});

1285

1286

pendingRequests.clear();

1287

}

1288

}

1289

```

1290

1291

### Retry and Circuit Breaker Patterns

1292

1293

```java

1294

import java.util.concurrent.atomic.AtomicInteger;

1295

import java.time.LocalDateTime;

1296

import java.time.Duration;

1297

1298

public class ResilientRequestHandler {

1299

1300

private final RequestManager requestManager;

1301

private final AtomicInteger failureCount = new AtomicInteger(0);

1302

private volatile LocalDateTime lastFailureTime;

1303

private volatile boolean circuitOpen = false;

1304

1305

// Circuit breaker thresholds

1306

private final int failureThreshold = 5;

1307

private final Duration circuitOpenDuration = Duration.ofMinutes(1);

1308

1309

public ResilientRequestHandler(RequestManager requestManager) {

1310

this.requestManager = requestManager;

1311

}

1312

1313

// Send request with retry and circuit breaker

1314

public <T extends Response> CompletableFuture<T> sendResilientRequest(Request request, int maxRetries) {

1315

1316

// Check circuit breaker

1317

if (isCircuitOpen()) {

1318

return CompletableFuture.failedFuture(

1319

new RuntimeException("Circuit breaker is open"));

1320

}

1321

1322

return sendWithRetry(request, maxRetries, 0);

1323

}

1324

1325

private <T extends Response> CompletableFuture<T> sendWithRetry(Request request, int maxRetries, int attempt) {

1326

1327

return requestManager.<T>sendRequest(request, 5000L)

1328

.handle((response, throwable) -> {

1329

if (throwable == null && response.isSuccess()) {

1330

// Success - reset failure count

1331

resetCircuitBreaker();

1332

return CompletableFuture.completedFuture(response);

1333

} else {

1334

// Failure - increment failure count

1335

recordFailure();

1336

1337

if (attempt < maxRetries) {

1338

// Retry with exponential backoff

1339

long delay = (long) Math.pow(2, attempt) * 1000; // 1s, 2s, 4s, 8s...

1340

1341

return CompletableFuture.<T>failedFuture(

1342

throwable != null ? throwable :

1343

new RuntimeException("Request failed: " + response.getMessage())

1344

).handle((r, t) -> {

1345

try {

1346

Thread.sleep(delay);

1347

return sendWithRetry(request, maxRetries, attempt + 1).join();

1348

} catch (InterruptedException e) {

1349

Thread.currentThread().interrupt();

1350

return CompletableFuture.<T>failedFuture(e).join();

1351

}

1352

});

1353

} else {

1354

// Max retries exceeded

1355

return CompletableFuture.<T>failedFuture(

1356

throwable != null ? throwable :

1357

new RuntimeException("Max retries exceeded. Last error: " + response.getMessage())

1358

);

1359

}

1360

}

1361

})

1362

.thenCompose(Function.identity());

1363

}

1364

1365

private boolean isCircuitOpen() {

1366

if (circuitOpen && lastFailureTime != null) {

1367

// Check if circuit should be closed

1368

if (Duration.between(lastFailureTime, LocalDateTime.now()).compareTo(circuitOpenDuration) > 0) {

1369

circuitOpen = false;

1370

failureCount.set(0);

1371

System.out.println("Circuit breaker closed - retrying requests");

1372

}

1373

}

1374

1375

return circuitOpen;

1376

}

1377

1378

private void recordFailure() {

1379

int failures = failureCount.incrementAndGet();

1380

lastFailureTime = LocalDateTime.now();

1381

1382

if (failures >= failureThreshold) {

1383

circuitOpen = true;

1384

System.err.printf("Circuit breaker opened after %d failures%n", failures);

1385

}

1386

}

1387

1388

private void resetCircuitBreaker() {

1389

failureCount.set(0);

1390

circuitOpen = false;

1391

lastFailureTime = null;

1392

}

1393

}