Nacos API package providing interfaces and common classes for dynamic service discovery, configuration management, and service management in cloud native applications and microservices
—
Low-level remote communication infrastructure including gRPC support, request/response handling, connection management, and callback mechanisms for building robust client-server communication.
Foundation classes for all remote communication operations providing common functionality for headers, request IDs, and response handling.
/**
* Base request class for remote communication
*/
abstract class Request implements Payload {
/** Request headers */
private Map<String, String> headers = new HashMap<>();
/** Unique request identifier */
private String requestId;
/**
* Add header to request
* @param key Header key
* @param value Header value
*/
public void putHeader(String key, String value);
/**
* Add multiple headers to request
* @param headers Map of headers to add
*/
public void putAllHeader(Map<String, String> headers);
/**
* Get header value by key
* @param key Header key
* @return Header value or null if not found
*/
public String getHeader(String key);
/**
* Get all headers
* @return Map of all headers
*/
public Map<String, String> getHeaders();
/**
* Get request identifier
* @return Unique request ID
*/
public String getRequestId();
/**
* Set request identifier
* @param requestId Unique request ID
*/
public void setRequestId(String requestId);
/**
* Get request type for routing
* @return Request type string
*/
public abstract String getRequestType();
}
/**
* Base response class for remote communication
*/
abstract class Response implements Payload {
/** Response result code */
private int resultCode = ResponseCode.SUCCESS.getCode();
/** Error code for failures */
private int errorCode = 0;
/** Response message */
private String message;
/** Request ID this response corresponds to */
private String requestId;
/**
* Check if response indicates success
* @return true if operation was successful
*/
public boolean isSuccess();
/**
* Get result code
* @return Response result code
*/
public int getResultCode();
/**
* Set result code
* @param resultCode Response result code
*/
public void setResultCode(int resultCode);
/**
* Get error code
* @return Error code for failures
*/
public int getErrorCode();
/**
* Set error code
* @param errorCode Error code for failures
*/
public void setErrorCode(int errorCode);
/**
* Get response message
* @return Response message
*/
public String getMessage();
/**
* Set response message
* @param message Response message
*/
public void setMessage(String message);
/**
* Get request ID
* @return Request ID this response corresponds to
*/
public String getRequestId();
/**
* Set request ID
* @param requestId Request ID
*/
public void setRequestId(String requestId);
/**
* Get response type for routing
* @return Response type string
*/
public abstract String getResponseType();
}
/**
* Base payload interface for serialization
*/
interface Payload extends Serializable {
// Marker interface for serializable payloads
}Standard request types for connection management and server health monitoring.
/**
* Connection setup request for establishing client-server connection
*/
class ConnectionSetupRequest extends Request {
/** Client version information */
private String clientVersion;
/** Client abilities */
private ClientAbilities abilities;
/** Tenant information */
private String tenant;
/** Labels for client identification */
private Map<String, String> labels;
/**
* Default constructor
*/
public ConnectionSetupRequest();
/**
* Get client version
* @return Client version string
*/
public String getClientVersion();
/**
* Set client version
* @param clientVersion Client version string
*/
public void setClientVersion(String clientVersion);
/**
* Get client abilities
* @return Client abilities object
*/
public ClientAbilities getAbilities();
/**
* Set client abilities
* @param abilities Client abilities object
*/
public void setAbilities(ClientAbilities abilities);
/**
* Get tenant information
* @return Tenant string
*/
public String getTenant();
/**
* Set tenant information
* @param tenant Tenant string
*/
public void setTenant(String tenant);
/**
* Get client labels
* @return Map of client labels
*/
public Map<String, String> getLabels();
/**
* Set client labels
* @param labels Map of client labels
*/
public void setLabels(Map<String, String> labels);
@Override
public String getRequestType();
}
/**
* Health check request for monitoring server status
*/
class HealthCheckRequest extends Request {
/**
* Default constructor
*/
public HealthCheckRequest();
@Override
public String getRequestType();
}
/**
* Server check request for server capabilities inquiry
*/
class ServerCheckRequest extends Request {
/**
* Default constructor
*/
public ServerCheckRequest();
@Override
public String getRequestType();
}Standard response classes with error codes and status information.
/**
* Connection setup response
*/
class ConnectionSetupResponse extends Response {
/** Server abilities */
private ServerAbilities serverAbilities;
/**
* Default constructor
*/
public ConnectionSetupResponse();
/**
* Get server abilities
* @return Server abilities object
*/
public ServerAbilities getServerAbilities();
/**
* Set server abilities
* @param serverAbilities Server abilities object
*/
public void setServerAbilities(ServerAbilities serverAbilities);
@Override
public String getResponseType();
}
/**
* Health check response
*/
class HealthCheckResponse extends Response {
/**
* Default constructor
*/
public HealthCheckResponse();
@Override
public String getResponseType();
}
/**
* Error response for failed operations
*/
class ErrorResponse extends Response {
/**
* Constructor with error code and message
* @param errorCode Error code
* @param message Error message
*/
public ErrorResponse(int errorCode, String message);
@Override
public String getResponseType();
}
/**
* Response code enumeration
*/
enum ResponseCode {
/** Success response */
SUCCESS(200, "Success"),
/** Generic failure */
FAIL(500, "Fail"),
/** Invalid parameters */
PARAMETER_MISSING(400, "Parameter Missing"),
/** Resource not found */
RESOURCE_NOT_FOUND(404, "Resource Not Found"),
/** Server internal error */
INTERNAL_SERVER_ERROR(500, "Internal Server Error");
/** Response code */
private final int code;
/** Response message */
private final String message;
/**
* Constructor
* @param code Response code
* @param message Response message
*/
ResponseCode(int code, String message);
/**
* Get response code
* @return Response code
*/
public int getCode();
/**
* Get response message
* @return Response message
*/
public String getMessage();
}Asynchronous callback mechanisms for handling responses and server push notifications.
/**
* Request callback interface for handling async responses
*/
interface RequestCallBack<T extends Response> {
/**
* Called when response is received successfully
* @param response Response object
*/
void onResponse(T response);
/**
* Called when request fails with exception
* @param e Exception that occurred
*/
void onException(Throwable e);
/**
* Get executor for callback processing
* @return Executor for async processing, null for current thread
*/
default Executor getExecutor() {
return null;
}
/**
* Get request timeout in milliseconds
* @return Timeout value, 0 for no timeout
*/
default long getTimeout() {
return 3000L;
}
}
/**
* Abstract request callback implementation
*/
abstract class AbstractRequestCallBack<T extends Response> implements RequestCallBack<T> {
/** Executor for callback processing */
private Executor executor;
/** Request timeout */
private long timeout = 3000L;
/**
* Default constructor
*/
public AbstractRequestCallBack();
/**
* Constructor with executor
* @param executor Executor for callback processing
*/
public AbstractRequestCallBack(Executor executor);
/**
* Constructor with executor and timeout
* @param executor Executor for callback processing
* @param timeout Request timeout in milliseconds
*/
public AbstractRequestCallBack(Executor executor, long timeout);
@Override
public Executor getExecutor();
@Override
public long getTimeout();
/**
* Abstract response handler to be implemented
* @param response Response object
*/
@Override
public abstract void onResponse(T response);
/**
* Abstract exception handler to be implemented
* @param e Exception that occurred
*/
@Override
public abstract void onException(Throwable e);
}
/**
* Push callback interface for server-initiated communications
*/
interface PushCallBack {
/**
* Handle server push request
* @param request Push request from server
* @return Response to send back to server
*/
Response requestReply(Request request);
/**
* Get executor for push processing
* @return Executor for async processing
*/
default Executor getExecutor() {
return null;
}
}
/**
* Abstract push callback implementation
*/
abstract class AbstractPushCallBack implements PushCallBack {
/** Executor for push processing */
private Executor executor;
/**
* Default constructor
*/
public AbstractPushCallBack();
/**
* Constructor with executor
* @param executor Executor for push processing
*/
public AbstractPushCallBack(Executor executor);
@Override
public Executor getExecutor();
/**
* Abstract push request handler to be implemented
* @param request Push request from server
* @return Response to send back to server
*/
@Override
public abstract Response requestReply(Request request);
}Future-based operations for handling asynchronous requests and responses.
/**
* Future interface for async request operations
*/
interface RequestFuture<T> extends Future<T> {
/**
* Get request ID
* @return Request ID
*/
String getRequestId();
/**
* Check if request timed out
* @return true if request timed out
*/
boolean isTimeout();
/**
* Get the original request
* @return Original request object
*/
Request getRequest();
/**
* Set the response
* @param response Response object
*/
void setResponse(T response);
/**
* Set exception
* @param throwable Exception that occurred
*/
void setFailResult(Throwable throwable);
}
/**
* Default implementation of RequestFuture
*/
class DefaultRequestFuture<T> implements RequestFuture<T> {
/** Request ID */
private final String requestId;
/** Original request */
private final Request request;
/** Response object */
private volatile T response;
/** Exception if failed */
private volatile Throwable exception;
/** Completion flag */
private volatile boolean done = false;
/** Timeout flag */
private volatile boolean timeout = false;
/** Completion latch */
private final CountDownLatch latch = new CountDownLatch(1);
/**
* Constructor
* @param requestId Request ID
* @param request Original request
*/
public DefaultRequestFuture(String requestId, Request request);
@Override
public String getRequestId();
@Override
public boolean isTimeout();
@Override
public Request getRequest();
@Override
public void setResponse(T response);
@Override
public void setFailResult(Throwable throwable);
@Override
public boolean cancel(boolean mayInterruptIfRunning);
@Override
public boolean isCancelled();
@Override
public boolean isDone();
@Override
public T get() throws InterruptedException, ExecutionException;
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}Auto-generated gRPC classes for Protocol Buffers-based communication.
/**
* Nacos gRPC service definitions (auto-generated)
*/
class NacosGrpcService {
/**
* Get service descriptor
* @return Service descriptor for gRPC
*/
public static ServiceDescriptor getServiceDescriptor();
/**
* Create stub for gRPC calls
* @param channel gRPC channel
* @return Service stub
*/
public static NacosGrpcServiceStub newStub(Channel channel);
/**
* Create blocking stub for synchronous gRPC calls
* @param channel gRPC channel
* @return Blocking service stub
*/
public static NacosGrpcServiceBlockingStub newBlockingStub(Channel channel);
}
/**
* gRPC payload message (auto-generated)
*/
class Payload implements Serializable {
/** Metadata for the payload */
private Metadata metadata;
/** Body content */
private Any body;
/**
* Get metadata
* @return Payload metadata
*/
public Metadata getMetadata();
/**
* Set metadata
* @param metadata Payload metadata
*/
public void setMetadata(Metadata metadata);
/**
* Get body content
* @return Body as Any type
*/
public Any getBody();
/**
* Set body content
* @param body Body as Any type
*/
public void setBody(Any body);
}
/**
* gRPC metadata message (auto-generated)
*/
class Metadata implements Serializable {
/** Message type */
private String type;
/** Client IP */
private String clientIp;
/** Headers */
private Map<String, String> headers;
/**
* Get message type
* @return Message type
*/
public String getType();
/**
* Set message type
* @param type Message type
*/
public void setType(String type);
/**
* Get client IP
* @return Client IP address
*/
public String getClientIp();
/**
* Set client IP
* @param clientIp Client IP address
*/
public void setClientIp(String clientIp);
/**
* Get headers
* @return Map of headers
*/
public Map<String, String> getHeaders();
/**
* Set headers
* @param headers Map of headers
*/
public void setHeaders(Map<String, String> headers);
}
/**
* Bidirectional stream gRPC service (auto-generated)
*/
class BiRequestStreamGrpc {
/**
* Get method descriptor for bidirectional streaming
* @return Method descriptor
*/
public static MethodDescriptor<Payload, Payload> getRequestBiStreamMethod();
/**
* Create stub for bidirectional streaming
* @param channel gRPC channel
* @return Stub for bidirectional streaming
*/
public static BiRequestStreamStub newStub(Channel channel);
}
/**
* Request-response gRPC service (auto-generated)
*/
class RequestGrpc {
/**
* Get method descriptor for unary requests
* @return Method descriptor
*/
public static MethodDescriptor<Payload, Payload> getRequestMethod();
/**
* Create stub for unary requests
* @param channel gRPC channel
* @return Stub for unary requests
*/
public static RequestStub newStub(Channel channel);
/**
* Create blocking stub for synchronous unary requests
* @param channel gRPC channel
* @return Blocking stub for unary requests
*/
public static RequestBlockingStub newBlockingStub(Channel channel);
}Utility classes for remote communication operations and thread management.
/**
* Scheduled executor for RPC operations
*/
class RpcScheduledExecutor {
/** Default executor instance */
private static final ScheduledExecutorService EXECUTOR;
/**
* Get default scheduled executor
* @return Scheduled executor service
*/
public static ScheduledExecutorService getExecutor();
/**
* Schedule task with delay
* @param command Task to execute
* @param delay Delay before execution
* @param unit Time unit for delay
* @return ScheduledFuture for the task
*/
public static ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
/**
* Schedule task with fixed rate
* @param command Task to execute
* @param initialDelay Initial delay
* @param period Period between executions
* @param unit Time unit
* @return ScheduledFuture for the task
*/
public static ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
/**
* Schedule task with fixed delay
* @param command Task to execute
* @param initialDelay Initial delay
* @param delay Delay between executions
* @param unit Time unit
* @return ScheduledFuture for the task
*/
public static ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}
/**
* Remote communication constants
*/
class RemoteConstants {
/** Default request timeout */
public static final long DEFAULT_TIMEOUT_MILLS = 3000L;
/** Connection setup timeout */
public static final long CONNECT_TIMEOUT_MILLS = 3000L;
/** Keep alive interval */
public static final long KEEP_ALIVE_MILLS = 5000L;
/** Default retry times */
public static final int DEFAULT_RETRY_TIMES = 3;
/** Max retry times */
public static final int MAX_RETRY_TIMES = 5;
/** Connection pool size */
public static final int DEFAULT_CONNECTION_POOL_SIZE = 8;
}import com.alibaba.nacos.api.remote.request.Request;
import com.alibaba.nacos.api.remote.response.Response;
import com.alibaba.nacos.api.remote.RequestCallBack;
import com.alibaba.nacos.api.remote.DefaultRequestFuture;
// Custom request implementation
public class CustomRequest extends Request {
private String data;
public CustomRequest(String data) {
this.data = data;
setRequestId(UUID.randomUUID().toString());
putHeader("Content-Type", "application/json");
putHeader("User-Agent", "Nacos-Client");
}
public String getData() {
return data;
}
@Override
public String getRequestType() {
return "CustomRequest";
}
}
// Custom response implementation
public class CustomResponse extends Response {
private String result;
public CustomResponse() {}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
@Override
public String getResponseType() {
return "CustomResponse";
}
}
// Synchronous request-response
public CustomResponse sendSynchronousRequest(String data) {
CustomRequest request = new CustomRequest(data);
try {
// Create future for the request
DefaultRequestFuture<CustomResponse> future = new DefaultRequestFuture<>(
request.getRequestId(), request);
// Send request (pseudo-code - actual implementation would use client)
// remoteClient.sendRequest(request, future);
// Wait for response with timeout
CustomResponse response = future.get(5000, TimeUnit.MILLISECONDS);
if (response.isSuccess()) {
System.out.println("Request successful: " + response.getResult());
return response;
} else {
System.err.println("Request failed: " + response.getMessage());
return null;
}
} catch (TimeoutException e) {
System.err.println("Request timed out");
return null;
} catch (Exception e) {
System.err.println("Request failed with exception: " + e.getMessage());
return null;
}
}import com.alibaba.nacos.api.remote.AbstractRequestCallBack;
import java.util.concurrent.Executors;
import java.util.concurrent.CompletableFuture;
// Asynchronous request with callback
public void sendAsynchronousRequest(String data, Consumer<String> onSuccess, Consumer<String> onError) {
CustomRequest request = new CustomRequest(data);
RequestCallBack<CustomResponse> callback = new AbstractRequestCallBack<CustomResponse>(
Executors.newSingleThreadExecutor(), 10000L) {
@Override
public void onResponse(CustomResponse response) {
if (response.isSuccess()) {
System.out.println("Async request successful: " + response.getResult());
onSuccess.accept(response.getResult());
} else {
System.err.println("Async request failed: " + response.getMessage());
onError.accept(response.getMessage());
}
}
@Override
public void onException(Throwable e) {
System.err.println("Async request failed with exception: " + e.getMessage());
onError.accept(e.getMessage());
}
};
// Send async request (pseudo-code)
// remoteClient.sendAsyncRequest(request, callback);
}
// Multiple parallel requests
public CompletableFuture<List<String>> sendParallelRequests(List<String> dataList) {
List<CompletableFuture<String>> futures = dataList.stream()
.map(data -> {
CompletableFuture<String> future = new CompletableFuture<>();
sendAsynchronousRequest(data,
result -> future.complete(result),
error -> future.completeExceptionally(new RuntimeException(error))
);
return future;
})
.collect(Collectors.toList());
// Combine all futures
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}import com.alibaba.nacos.api.remote.request.ConnectionSetupRequest;
import com.alibaba.nacos.api.remote.response.ConnectionSetupResponse;
import com.alibaba.nacos.api.remote.request.HealthCheckRequest;
public class ConnectionManager {
private volatile boolean connected = false;
private String clientId;
private ScheduledExecutorService healthCheckExecutor;
public ConnectionManager(String clientId) {
this.clientId = clientId;
this.healthCheckExecutor = Executors.newSingleThreadScheduledExecutor();
}
// Establish connection to server
public boolean connect(String serverAddress) {
ConnectionSetupRequest request = new ConnectionSetupRequest();
request.setClientVersion("3.0.2");
request.setTenant("default");
// Set client labels
Map<String, String> labels = new HashMap<>();
labels.put("clientId", clientId);
labels.put("appName", "my-application");
labels.put("environment", "production");
request.setLabels(labels);
try {
// Send connection setup request
DefaultRequestFuture<ConnectionSetupResponse> future =
new DefaultRequestFuture<>(request.getRequestId(), request);
// Simulate sending request
ConnectionSetupResponse response = future.get(5000, TimeUnit.MILLISECONDS);
if (response.isSuccess()) {
connected = true;
System.out.println("Connected to server: " + serverAddress);
// Start health checking
startHealthCheck();
return true;
} else {
System.err.println("Connection failed: " + response.getMessage());
return false;
}
} catch (Exception e) {
System.err.println("Connection error: " + e.getMessage());
return false;
}
}
// Start periodic health checking
private void startHealthCheck() {
healthCheckExecutor.scheduleWithFixedDelay(() -> {
HealthCheckRequest healthRequest = new HealthCheckRequest();
try {
DefaultRequestFuture<Response> future =
new DefaultRequestFuture<>(healthRequest.getRequestId(), healthRequest);
Response response = future.get(3000, TimeUnit.MILLISECONDS);
if (!response.isSuccess()) {
System.err.println("Health check failed: " + response.getMessage());
connected = false;
// Trigger reconnection logic
scheduleReconnect();
}
} catch (Exception e) {
System.err.println("Health check error: " + e.getMessage());
connected = false;
scheduleReconnect();
}
}, 5000, 10000, TimeUnit.MILLISECONDS);
}
// Schedule reconnection attempt
private void scheduleReconnect() {
if (!connected) {
healthCheckExecutor.schedule(() -> {
System.out.println("Attempting to reconnect...");
// Retry connection logic here
}, 5000, TimeUnit.MILLISECONDS);
}
}
public boolean isConnected() {
return connected;
}
public void disconnect() {
connected = false;
if (healthCheckExecutor != null) {
healthCheckExecutor.shutdown();
}
}
}import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.api.remote.AbstractPushCallBack;
// Custom push request handler
public class ConfigPushHandler extends AbstractPushCallBack {
private final ConfigUpdateListener configListener;
public ConfigPushHandler(ConfigUpdateListener configListener) {
super(Executors.newFixedThreadPool(4));
this.configListener = configListener;
}
@Override
public Response requestReply(Request request) {
System.out.println("Received push request: " + request.getRequestType());
try {
if ("ConfigChangeNotifyRequest".equals(request.getRequestType())) {
return handleConfigChange(request);
} else if ("ServiceChangeNotifyRequest".equals(request.getRequestType())) {
return handleServiceChange(request);
} else {
return createErrorResponse("Unknown request type: " + request.getRequestType());
}
} catch (Exception e) {
System.err.println("Error handling push request: " + e.getMessage());
return createErrorResponse("Internal error: " + e.getMessage());
}
}
private Response handleConfigChange(Request request) {
// Extract configuration change information from request
String dataId = request.getHeader("dataId");
String group = request.getHeader("group");
String content = request.getHeader("content");
System.out.printf("Config changed: %s:%s%n", group, dataId);
// Notify listeners
if (configListener != null) {
configListener.onConfigChange(dataId, group, content);
}
// Return success response
Response response = new Response() {
@Override
public String getResponseType() {
return "ConfigChangeNotifyResponse";
}
};
response.setResultCode(ResponseCode.SUCCESS.getCode());
response.setMessage("Config change processed successfully");
return response;
}
private Response handleServiceChange(Request request) {
// Handle service change notification
String serviceName = request.getHeader("serviceName");
String groupName = request.getHeader("groupName");
System.out.printf("Service changed: %s in group %s%n", serviceName, groupName);
// Process service change
// ... implementation details
Response response = new Response() {
@Override
public String getResponseType() {
return "ServiceChangeNotifyResponse";
}
};
response.setResultCode(ResponseCode.SUCCESS.getCode());
return response;
}
private Response createErrorResponse(String message) {
return new ErrorResponse(ResponseCode.INTERNAL_SERVER_ERROR.getCode(), message);
}
}
// Configuration update listener interface
interface ConfigUpdateListener {
void onConfigChange(String dataId, String group, String content);
}import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
public class RequestManager {
private final Map<String, DefaultRequestFuture<?>> pendingRequests = new ConcurrentHashMap<>();
private final AtomicLong requestCounter = new AtomicLong(0);
private final ScheduledExecutorService timeoutChecker;
public RequestManager() {
this.timeoutChecker = Executors.newSingleThreadScheduledExecutor();
startTimeoutChecker();
}
// Send request with automatic timeout handling
public <T extends Response> CompletableFuture<T> sendRequest(Request request, long timeoutMs) {
CompletableFuture<T> future = new CompletableFuture<>();
// Generate unique request ID
String requestId = "req-" + requestCounter.incrementAndGet() + "-" + System.currentTimeMillis();
request.setRequestId(requestId);
// Create request future
DefaultRequestFuture<T> requestFuture = new DefaultRequestFuture<>(requestId, request);
pendingRequests.put(requestId, requestFuture);
// Set up timeout
timeoutChecker.schedule(() -> {
DefaultRequestFuture<?> pending = pendingRequests.remove(requestId);
if (pending != null && !pending.isDone()) {
pending.setFailResult(new TimeoutException("Request timeout after " + timeoutMs + "ms"));
future.completeExceptionally(new TimeoutException("Request timeout"));
}
}, timeoutMs, TimeUnit.MILLISECONDS);
// Convert request future to CompletableFuture
CompletableFuture.runAsync(() -> {
try {
T response = requestFuture.get();
future.complete(response);
} catch (Exception e) {
future.completeExceptionally(e);
}
});
// Send actual request (pseudo-code)
// sendToServer(request);
return future;
}
// Handle response from server
public void handleResponse(String requestId, Response response) {
DefaultRequestFuture<?> future = pendingRequests.remove(requestId);
if (future != null) {
@SuppressWarnings("unchecked")
DefaultRequestFuture<Response> typedFuture = (DefaultRequestFuture<Response>) future;
typedFuture.setResponse(response);
}
}
// Handle request failure
public void handleRequestFailure(String requestId, Throwable throwable) {
DefaultRequestFuture<?> future = pendingRequests.remove(requestId);
if (future != null) {
future.setFailResult(throwable);
}
}
// Start periodic timeout checker
private void startTimeoutChecker() {
timeoutChecker.scheduleWithFixedDelay(() -> {
long currentTime = System.currentTimeMillis();
pendingRequests.entrySet().removeIf(entry -> {
DefaultRequestFuture<?> future = entry.getValue();
// Check if request has timed out (simplified logic)
if (future.isTimeout()) {
future.setFailResult(new TimeoutException("Request expired"));
return true;
}
return false;
});
}, 1000, 1000, TimeUnit.MILLISECONDS);
}
// Get pending request count
public int getPendingRequestCount() {
return pendingRequests.size();
}
// Shutdown request manager
public void shutdown() {
timeoutChecker.shutdown();
// Cancel all pending requests
pendingRequests.values().forEach(future -> {
if (!future.isDone()) {
future.setFailResult(new RuntimeException("RequestManager shutdown"));
}
});
pendingRequests.clear();
}
}import java.util.concurrent.atomic.AtomicInteger;
import java.time.LocalDateTime;
import java.time.Duration;
public class ResilientRequestHandler {
private final RequestManager requestManager;
private final AtomicInteger failureCount = new AtomicInteger(0);
private volatile LocalDateTime lastFailureTime;
private volatile boolean circuitOpen = false;
// Circuit breaker thresholds
private final int failureThreshold = 5;
private final Duration circuitOpenDuration = Duration.ofMinutes(1);
public ResilientRequestHandler(RequestManager requestManager) {
this.requestManager = requestManager;
}
// Send request with retry and circuit breaker
public <T extends Response> CompletableFuture<T> sendResilientRequest(Request request, int maxRetries) {
// Check circuit breaker
if (isCircuitOpen()) {
return CompletableFuture.failedFuture(
new RuntimeException("Circuit breaker is open"));
}
return sendWithRetry(request, maxRetries, 0);
}
private <T extends Response> CompletableFuture<T> sendWithRetry(Request request, int maxRetries, int attempt) {
return requestManager.<T>sendRequest(request, 5000L)
.handle((response, throwable) -> {
if (throwable == null && response.isSuccess()) {
// Success - reset failure count
resetCircuitBreaker();
return CompletableFuture.completedFuture(response);
} else {
// Failure - increment failure count
recordFailure();
if (attempt < maxRetries) {
// Retry with exponential backoff
long delay = (long) Math.pow(2, attempt) * 1000; // 1s, 2s, 4s, 8s...
return CompletableFuture.<T>failedFuture(
throwable != null ? throwable :
new RuntimeException("Request failed: " + response.getMessage())
).handle((r, t) -> {
try {
Thread.sleep(delay);
return sendWithRetry(request, maxRetries, attempt + 1).join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableFuture.<T>failedFuture(e).join();
}
});
} else {
// Max retries exceeded
return CompletableFuture.<T>failedFuture(
throwable != null ? throwable :
new RuntimeException("Max retries exceeded. Last error: " + response.getMessage())
);
}
}
})
.thenCompose(Function.identity());
}
private boolean isCircuitOpen() {
if (circuitOpen && lastFailureTime != null) {
// Check if circuit should be closed
if (Duration.between(lastFailureTime, LocalDateTime.now()).compareTo(circuitOpenDuration) > 0) {
circuitOpen = false;
failureCount.set(0);
System.out.println("Circuit breaker closed - retrying requests");
}
}
return circuitOpen;
}
private void recordFailure() {
int failures = failureCount.incrementAndGet();
lastFailureTime = LocalDateTime.now();
if (failures >= failureThreshold) {
circuitOpen = true;
System.err.printf("Circuit breaker opened after %d failures%n", failures);
}
}
private void resetCircuitBreaker() {
failureCount.set(0);
circuitOpen = false;
lastFailureTime = null;
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-alibaba-nacos--nacos-api