CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-rpc-akka

Pekko-based RPC implementation for Apache Flink's distributed computing framework

Pending
Overview
Eval results
Files

exceptions.mddocs/

Exception Handling

Specialized exception classes for RPC-specific error conditions, state management, and message handling in the Pekko RPC system.

Capabilities

RpcInvalidStateException

Exception indicating that an RPC endpoint or service is in an invalid state for the requested operation.

/**
 * Exception indicating invalid RPC state.
 * Thrown when RPC operations are attempted on endpoints or services
 * that are not in the appropriate state (e.g., stopped, terminated, or not started).
 */
public class RpcInvalidStateException extends FlinkRuntimeException {
    
    /**
     * Constructor with descriptive message.
     * @param message Description of the invalid state condition
     */
    public RpcInvalidStateException(String message);
    
    /**
     * Constructor with underlying cause.
     * @param cause The underlying exception that led to the invalid state
     */
    public RpcInvalidStateException(Throwable cause);
    
    /**
     * Constructor with both message and cause.
     * @param message Description of the invalid state condition
     * @param cause The underlying exception that led to the invalid state
     */
    public RpcInvalidStateException(String message, Throwable cause);
}

Usage Examples:

import org.apache.flink.runtime.rpc.pekko.exceptions.RpcInvalidStateException;

public class RpcEndpointImpl extends RpcEndpoint {
    private volatile boolean isStarted = false;
    private volatile boolean isTerminated = false;
    
    public void performOperation() throws RpcInvalidStateException {
        if (!isStarted) {
            throw new RpcInvalidStateException(
                "Cannot perform operation: RPC endpoint has not been started"
            );
        }
        
        if (isTerminated) {
            throw new RpcInvalidStateException(
                "Cannot perform operation: RPC endpoint has been terminated"
            );
        }
        
        // Perform the operation
        doActualWork();
    }
    
    public void connectToRemote(String address) throws RpcInvalidStateException {
        try {
            if (getActorSystem().whenTerminated().isCompleted()) {
                throw new RpcInvalidStateException(
                    "Cannot connect to remote: Actor system has been terminated"
                );
            }
            
            // Attempt connection
            establishConnection(address);
            
        } catch (Exception e) {
            throw new RpcInvalidStateException(
                "Failed to connect to remote endpoint due to invalid state", 
                e
            );
        }
    }
    
    // Error handling in service lifecycle
    public void handleStateTransition() {
        try {
            transitionToNextState();
        } catch (IllegalStateException e) {
            // Convert to RPC-specific exception
            throw new RpcInvalidStateException(
                "Invalid state transition in RPC service", 
                e
            );
        }
    }
}

UnknownMessageException

Exception for handling unknown or unsupported message types in RPC communication.

/**
 * Exception for unknown message types.
 * Thrown when an RPC actor receives a message it cannot handle or recognize.
 * Extends RpcRuntimeException to indicate RPC-specific runtime errors.
 */
public class UnknownMessageException extends RpcRuntimeException {
    
    /**
     * Constructor with descriptive message.
     * @param message Description of the unknown message condition
     */
    public UnknownMessageException(String message);
    
    /**
     * Constructor with message and underlying cause.
     * @param message Description of the unknown message condition
     * @param cause The underlying exception that occurred while processing
     */
    public UnknownMessageException(String message, Throwable cause);
    
    /**
     * Constructor with underlying cause only.
     * @param cause The underlying exception that occurred while processing
     */
    public UnknownMessageException(Throwable cause);
}

Usage Examples:

import org.apache.flink.runtime.rpc.pekko.exceptions.UnknownMessageException;
import org.apache.pekko.actor.AbstractActor;

public class RpcActorImpl extends AbstractActor {
    
    @Override
    public Receive createReceive() {
        return receiveBuilder()
            .match(String.class, this::handleStringMessage)
            .match(Integer.class, this::handleIntegerMessage)
            .match(StartMessage.class, this::handleStartMessage)
            .match(StopMessage.class, this::handleStopMessage)
            .matchAny(this::handleUnknownMessage)
            .build();
    }
    
    private void handleStringMessage(String message) {
        // Handle string messages
        logger.debug("Received string message: {}", message);
    }
    
    private void handleIntegerMessage(Integer message) {
        // Handle integer messages
        logger.debug("Received integer message: {}", message);
    }
    
    private void handleStartMessage(StartMessage message) {
        // Handle start control message
        logger.info("Starting RPC actor");
    }
    
    private void handleStopMessage(StopMessage message) {
        // Handle stop control message
        logger.info("Stopping RPC actor");
    }
    
    private void handleUnknownMessage(Object message) {
        String messageType = message != null ? message.getClass().getSimpleName() : "null";
        String errorMsg = String.format(
            "Received unknown message type '%s': %s", 
            messageType, 
            message
        );
        
        logger.warn(errorMsg);
        
        // Send error response back to sender
        getSender().tell(
            new UnknownMessageException(errorMsg), 
            getSelf()
        );
    }
}

// Message handler with exception propagation
public class MessageProcessor {
    
    public void processMessage(Object message) throws UnknownMessageException {
        if (message instanceof CommandMessage) {
            processCommand((CommandMessage) message);
        } else if (message instanceof QueryMessage) {
            processQuery((QueryMessage) message);
        } else if (message instanceof EventMessage) {
            processEvent((EventMessage) message);
        } else {
            throw new UnknownMessageException(
                "Cannot process message of type: " + message.getClass().getName()
            );
        }
    }
    
    public void handleMessageWithRecovery(Object message) {
        try {
            processMessage(message);
        } catch (UnknownMessageException e) {
            logger.error("Failed to process unknown message", e);
            
            // Attempt fallback processing
            try {
                processFallback(message);
            } catch (Exception fallbackException) {
                throw new UnknownMessageException(
                    "Message processing failed completely", 
                    fallbackException
                );
            }
        }
    }
    
    private void processCommand(CommandMessage cmd) { /* implementation */ }
    private void processQuery(QueryMessage query) { /* implementation */ }
    private void processEvent(EventMessage event) { /* implementation */ }
    private void processFallback(Object message) { /* fallback implementation */ }
}

Exception Handling Patterns

Common patterns for handling RPC exceptions in distributed environments.

/**
 * Utility methods for common RPC exception handling patterns.
 */
public class RpcExceptionHandling {
    
    /**
     * Determines if an exception indicates a recoverable RPC error.
     * @param exception Exception to analyze
     * @return true if the error might be recoverable with retry
     */
    public static boolean isRecoverableException(Throwable exception);
    
    /**
     * Determines if an exception indicates RPC endpoint termination.
     * @param exception Exception to analyze  
     * @return true if the exception indicates endpoint termination
     */
    public static boolean isEndpointTerminatedException(Throwable exception);
    
    /**
     * Extracts the root cause from a chain of RPC exceptions.
     * @param exception Exception to analyze
     * @return Root cause exception
     */
    public static Throwable getRootCause(Throwable exception);
}

Comprehensive Error Handling Examples:

import org.apache.flink.runtime.rpc.pekko.exceptions.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

public class RobustRpcClient {
    private static final int MAX_RETRIES = 3;
    private static final long RETRY_DELAY_MS = 1000;
    
    public <T> CompletableFuture<T> callWithRetry(
            String endpoint, 
            String methodName, 
            Object... args) {
        
        return callWithRetryInternal(endpoint, methodName, 0, args);
    }
    
    private <T> CompletableFuture<T> callWithRetryInternal(
            String endpoint, 
            String methodName, 
            int attempt, 
            Object... args) {
        
        return makeRpcCall(endpoint, methodName, args)
            .handle((result, throwable) -> {
                if (throwable == null) {
                    return CompletableFuture.completedFuture(result);
                }
                
                // Unwrap CompletionException
                Throwable actualException = throwable instanceof CompletionException 
                    ? throwable.getCause() : throwable;
                
                if (shouldRetry(actualException, attempt)) {
                    logger.warn("RPC call failed (attempt {}), retrying: {}", 
                               attempt + 1, actualException.getMessage());
                    
                    return CompletableFuture
                        .delayedExecutor(RETRY_DELAY_MS, TimeUnit.MILLISECONDS)
                        .execute(() -> callWithRetryInternal(endpoint, methodName, attempt + 1, args));
                } else {
                    logger.error("RPC call failed permanently after {} attempts", attempt + 1, actualException);
                    return CompletableFuture.<T>failedFuture(actualException);
                }
            })
            .thenCompose(future -> future);
    }
    
    private boolean shouldRetry(Throwable exception, int attempt) {
        if (attempt >= MAX_RETRIES) {
            return false;
        }
        
        // Don't retry on invalid state - usually permanent
        if (exception instanceof RpcInvalidStateException) {
            return false;
        }
        
        // Don't retry on unknown message - usually a programming error
        if (exception instanceof UnknownMessageException) {
            return false;
        }
        
        // Retry on network issues, timeouts, etc.
        return isRetryableException(exception);
    }
    
    private boolean isRetryableException(Throwable exception) {
        return exception instanceof java.net.ConnectException ||
               exception instanceof java.util.concurrent.TimeoutException ||
               exception instanceof org.apache.pekko.actor.ActorNotFound ||
               (exception.getMessage() != null && 
                exception.getMessage().contains("connection refused"));
    }
    
    // Circuit breaker pattern for RPC calls
    public class RpcCircuitBreaker {
        private volatile State state = State.CLOSED;
        private volatile int failureCount = 0;
        private volatile long lastFailureTime = 0;
        
        private static final int FAILURE_THRESHOLD = 5;
        private static final long TIMEOUT_MS = 60000; // 1 minute
        
        enum State { CLOSED, OPEN, HALF_OPEN }
        
        public <T> CompletableFuture<T> execute(CompletableFuture<T> operation) {
            if (state == State.OPEN) {
                if (System.currentTimeMillis() - lastFailureTime > TIMEOUT_MS) {
                    state = State.HALF_OPEN;
                } else {
                    return CompletableFuture.failedFuture(
                        new RpcInvalidStateException("Circuit breaker is OPEN")
                    );
                }
            }
            
            return operation
                .whenComplete((result, throwable) -> {
                    if (throwable == null) {
                        onSuccess();
                    } else {
                        onFailure();
                    }
                });
        }
        
        private void onSuccess() {
            failureCount = 0;
            state = State.CLOSED;
        }
        
        private void onFailure() {
            failureCount++;
            lastFailureTime = System.currentTimeMillis();
            
            if (failureCount >= FAILURE_THRESHOLD) {
                state = State.OPEN;
            }
        }
    }
}

Error Recovery Strategies:

public class RpcErrorRecovery {
    
    // Graceful degradation on RPC failures
    public String getJobStatus(String jobId) {
        try {
            return jobManagerGateway.getJobStatus(jobId).get();
        } catch (RpcInvalidStateException e) {
            logger.warn("JobManager not available, returning cached status", e);
            return getCachedJobStatus(jobId);
        } catch (UnknownMessageException e) {
            logger.error("JobManager doesn't support getJobStatus operation", e);
            return "UNKNOWN";
        } catch (Exception e) {
            logger.error("Failed to get job status", e);
            throw new RuntimeException("Job status unavailable", e);
        }
    }
    
    // Failover to backup endpoints
    public CompletableFuture<String> callWithFailover(List<String> endpoints, String method) {
        if (endpoints.isEmpty()) {
            return CompletableFuture.failedFuture(
                new RpcInvalidStateException("No endpoints available")
            );
        }
        
        return callEndpoint(endpoints.get(0), method)
            .handle((result, throwable) -> {
                if (throwable == null) {
                    return CompletableFuture.completedFuture(result);
                }
                
                if (endpoints.size() > 1) {
                    logger.warn("Primary endpoint failed, trying backup: {}", throwable.getMessage());
                    return callWithFailover(endpoints.subList(1, endpoints.size()), method);
                } else {
                    return CompletableFuture.<String>failedFuture(throwable);
                }
            })
            .thenCompose(future -> future);
    }
    
    private CompletableFuture<String> callEndpoint(String endpoint, String method) {
        // Implementation for calling specific endpoint
        return CompletableFuture.completedFuture("result");
    }
    
    private String getCachedJobStatus(String jobId) {
        // Implementation for getting cached status
        return "CACHED_STATUS";
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-rpc-akka

docs

actor-system.md

concurrent-utilities.md

exceptions.md

index.md

rpc-configuration.md

rpc-system.md

tile.json