Pekko-based RPC implementation for Apache Flink's distributed computing framework
—
Specialized exception classes for RPC-specific error conditions, state management, and message handling in the Pekko RPC system.
Exception indicating that an RPC endpoint or service is in an invalid state for the requested operation.
/**
* Exception indicating invalid RPC state.
* Thrown when RPC operations are attempted on endpoints or services
* that are not in the appropriate state (e.g., stopped, terminated, or not started).
*/
public class RpcInvalidStateException extends FlinkRuntimeException {
/**
* Constructor with descriptive message.
* @param message Description of the invalid state condition
*/
public RpcInvalidStateException(String message);
/**
* Constructor with underlying cause.
* @param cause The underlying exception that led to the invalid state
*/
public RpcInvalidStateException(Throwable cause);
/**
* Constructor with both message and cause.
* @param message Description of the invalid state condition
* @param cause The underlying exception that led to the invalid state
*/
public RpcInvalidStateException(String message, Throwable cause);
}Usage Examples:
import org.apache.flink.runtime.rpc.pekko.exceptions.RpcInvalidStateException;
public class RpcEndpointImpl extends RpcEndpoint {
private volatile boolean isStarted = false;
private volatile boolean isTerminated = false;
public void performOperation() throws RpcInvalidStateException {
if (!isStarted) {
throw new RpcInvalidStateException(
"Cannot perform operation: RPC endpoint has not been started"
);
}
if (isTerminated) {
throw new RpcInvalidStateException(
"Cannot perform operation: RPC endpoint has been terminated"
);
}
// Perform the operation
doActualWork();
}
public void connectToRemote(String address) throws RpcInvalidStateException {
try {
if (getActorSystem().whenTerminated().isCompleted()) {
throw new RpcInvalidStateException(
"Cannot connect to remote: Actor system has been terminated"
);
}
// Attempt connection
establishConnection(address);
} catch (Exception e) {
throw new RpcInvalidStateException(
"Failed to connect to remote endpoint due to invalid state",
e
);
}
}
// Error handling in service lifecycle
public void handleStateTransition() {
try {
transitionToNextState();
} catch (IllegalStateException e) {
// Convert to RPC-specific exception
throw new RpcInvalidStateException(
"Invalid state transition in RPC service",
e
);
}
}
}Exception for handling unknown or unsupported message types in RPC communication.
/**
* Exception for unknown message types.
* Thrown when an RPC actor receives a message it cannot handle or recognize.
* Extends RpcRuntimeException to indicate RPC-specific runtime errors.
*/
public class UnknownMessageException extends RpcRuntimeException {
/**
* Constructor with descriptive message.
* @param message Description of the unknown message condition
*/
public UnknownMessageException(String message);
/**
* Constructor with message and underlying cause.
* @param message Description of the unknown message condition
* @param cause The underlying exception that occurred while processing
*/
public UnknownMessageException(String message, Throwable cause);
/**
* Constructor with underlying cause only.
* @param cause The underlying exception that occurred while processing
*/
public UnknownMessageException(Throwable cause);
}Usage Examples:
import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;
import org.apache.pekko.actor.AbstractActor;
public class RpcActorImpl extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, this::handleStringMessage)
.match(Integer.class, this::handleIntegerMessage)
.match(StartMessage.class, this::handleStartMessage)
.match(StopMessage.class, this::handleStopMessage)
.matchAny(this::handleUnknownMessage)
.build();
}
private void handleStringMessage(String message) {
// Handle string messages
logger.debug("Received string message: {}", message);
}
private void handleIntegerMessage(Integer message) {
// Handle integer messages
logger.debug("Received integer message: {}", message);
}
private void handleStartMessage(StartMessage message) {
// Handle start control message
logger.info("Starting RPC actor");
}
private void handleStopMessage(StopMessage message) {
// Handle stop control message
logger.info("Stopping RPC actor");
}
private void handleUnknownMessage(Object message) {
String messageType = message != null ? message.getClass().getSimpleName() : "null";
String errorMsg = String.format(
"Received unknown message type '%s': %s",
messageType,
message
);
logger.warn(errorMsg);
// Send error response back to sender
getSender().tell(
new UnknownMessageException(errorMsg),
getSelf()
);
}
}
// Message handler with exception propagation
public class MessageProcessor {
public void processMessage(Object message) throws UnknownMessageException {
if (message instanceof CommandMessage) {
processCommand((CommandMessage) message);
} else if (message instanceof QueryMessage) {
processQuery((QueryMessage) message);
} else if (message instanceof EventMessage) {
processEvent((EventMessage) message);
} else {
throw new UnknownMessageException(
"Cannot process message of type: " + message.getClass().getName()
);
}
}
public void handleMessageWithRecovery(Object message) {
try {
processMessage(message);
} catch (UnknownMessageException e) {
logger.error("Failed to process unknown message", e);
// Attempt fallback processing
try {
processFallback(message);
} catch (Exception fallbackException) {
throw new UnknownMessageException(
"Message processing failed completely",
fallbackException
);
}
}
}
private void processCommand(CommandMessage cmd) { /* implementation */ }
private void processQuery(QueryMessage query) { /* implementation */ }
private void processEvent(EventMessage event) { /* implementation */ }
private void processFallback(Object message) { /* fallback implementation */ }
}Common patterns for handling RPC exceptions in distributed environments.
/**
* Utility methods for common RPC exception handling patterns.
*/
public class RpcExceptionHandling {
/**
* Determines if an exception indicates a recoverable RPC error.
* @param exception Exception to analyze
* @return true if the error might be recoverable with retry
*/
public static boolean isRecoverableException(Throwable exception);
/**
* Determines if an exception indicates RPC endpoint termination.
* @param exception Exception to analyze
* @return true if the exception indicates endpoint termination
*/
public static boolean isEndpointTerminatedException(Throwable exception);
/**
* Extracts the root cause from a chain of RPC exceptions.
* @param exception Exception to analyze
* @return Root cause exception
*/
public static Throwable getRootCause(Throwable exception);
}Comprehensive Error Handling Examples:
import org.apache.flink.runtime.rpc.pekko.exceptions.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
public class RobustRpcClient {
private static final int MAX_RETRIES = 3;
private static final long RETRY_DELAY_MS = 1000;
public <T> CompletableFuture<T> callWithRetry(
String endpoint,
String methodName,
Object... args) {
return callWithRetryInternal(endpoint, methodName, 0, args);
}
private <T> CompletableFuture<T> callWithRetryInternal(
String endpoint,
String methodName,
int attempt,
Object... args) {
return makeRpcCall(endpoint, methodName, args)
.handle((result, throwable) -> {
if (throwable == null) {
return CompletableFuture.completedFuture(result);
}
// Unwrap CompletionException
Throwable actualException = throwable instanceof CompletionException
? throwable.getCause() : throwable;
if (shouldRetry(actualException, attempt)) {
logger.warn("RPC call failed (attempt {}), retrying: {}",
attempt + 1, actualException.getMessage());
return CompletableFuture
.delayedExecutor(RETRY_DELAY_MS, TimeUnit.MILLISECONDS)
.execute(() -> callWithRetryInternal(endpoint, methodName, attempt + 1, args));
} else {
logger.error("RPC call failed permanently after {} attempts", attempt + 1, actualException);
return CompletableFuture.<T>failedFuture(actualException);
}
})
.thenCompose(future -> future);
}
private boolean shouldRetry(Throwable exception, int attempt) {
if (attempt >= MAX_RETRIES) {
return false;
}
// Don't retry on invalid state - usually permanent
if (exception instanceof RpcInvalidStateException) {
return false;
}
// Don't retry on unknown message - usually a programming error
if (exception instanceof UnknownMessageException) {
return false;
}
// Retry on network issues, timeouts, etc.
return isRetryableException(exception);
}
private boolean isRetryableException(Throwable exception) {
return exception instanceof java.net.ConnectException ||
exception instanceof java.util.concurrent.TimeoutException ||
exception instanceof org.apache.pekko.actor.ActorNotFound ||
(exception.getMessage() != null &&
exception.getMessage().contains("connection refused"));
}
// Circuit breaker pattern for RPC calls
public class RpcCircuitBreaker {
private volatile State state = State.CLOSED;
private volatile int failureCount = 0;
private volatile long lastFailureTime = 0;
private static final int FAILURE_THRESHOLD = 5;
private static final long TIMEOUT_MS = 60000; // 1 minute
enum State { CLOSED, OPEN, HALF_OPEN }
public <T> CompletableFuture<T> execute(CompletableFuture<T> operation) {
if (state == State.OPEN) {
if (System.currentTimeMillis() - lastFailureTime > TIMEOUT_MS) {
state = State.HALF_OPEN;
} else {
return CompletableFuture.failedFuture(
new RpcInvalidStateException("Circuit breaker is OPEN")
);
}
}
return operation
.whenComplete((result, throwable) -> {
if (throwable == null) {
onSuccess();
} else {
onFailure();
}
});
}
private void onSuccess() {
failureCount = 0;
state = State.CLOSED;
}
private void onFailure() {
failureCount++;
lastFailureTime = System.currentTimeMillis();
if (failureCount >= FAILURE_THRESHOLD) {
state = State.OPEN;
}
}
}
}Error Recovery Strategies:
public class RpcErrorRecovery {
// Graceful degradation on RPC failures
public String getJobStatus(String jobId) {
try {
return jobManagerGateway.getJobStatus(jobId).get();
} catch (RpcInvalidStateException e) {
logger.warn("JobManager not available, returning cached status", e);
return getCachedJobStatus(jobId);
} catch (UnknownMessageException e) {
logger.error("JobManager doesn't support getJobStatus operation", e);
return "UNKNOWN";
} catch (Exception e) {
logger.error("Failed to get job status", e);
throw new RuntimeException("Job status unavailable", e);
}
}
// Failover to backup endpoints
public CompletableFuture<String> callWithFailover(List<String> endpoints, String method) {
if (endpoints.isEmpty()) {
return CompletableFuture.failedFuture(
new RpcInvalidStateException("No endpoints available")
);
}
return callEndpoint(endpoints.get(0), method)
.handle((result, throwable) -> {
if (throwable == null) {
return CompletableFuture.completedFuture(result);
}
if (endpoints.size() > 1) {
logger.warn("Primary endpoint failed, trying backup: {}", throwable.getMessage());
return callWithFailover(endpoints.subList(1, endpoints.size()), method);
} else {
return CompletableFuture.<String>failedFuture(throwable);
}
})
.thenCompose(future -> future);
}
private CompletableFuture<String> callEndpoint(String endpoint, String method) {
// Implementation for calling specific endpoint
return CompletableFuture.completedFuture("result");
}
private String getCachedJobStatus(String jobId) {
// Implementation for getting cached status
return "CACHED_STATUS";
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-rpc-akka