CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-runtime-2-10

Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.

Pending
Overview
Eval results
Files

rpc-framework.mddocs/

RPC Framework

The RPC (Remote Procedure Call) Framework provides the communication infrastructure for distributed components in the Flink cluster. It enables reliable, asynchronous communication between JobManagers, TaskManagers, and client applications across the cluster network.

Core RPC Services

RpcService

The main RPC service interface that manages remote communication endpoints and connections.

public interface RpcService extends AutoCloseable {
    <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
    <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
        String address, F fencingToken, Class<C> clazz);
    
    void stopServer(RpcEndpoint endpoint);
    
    CompletableFuture<Void> stopService();
    
    Executor getExecutor();
    ScheduledExecutor getScheduledExecutor();
    
    void executeRunnable(Runnable runnable);
    void execute(Runnable runnable);
    
    <T> CompletableFuture<T> execute(Callable<T> callable);
    ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
}

RpcEndpoint

Abstract base class for all RPC endpoints in the Flink cluster. Components extend this class to expose RPC interfaces.

public abstract class RpcEndpoint implements AutoCloseable {
    protected RpcEndpoint(RpcService rpcService);
    protected RpcEndpoint(RpcService rpcService, String endpointId);
    
    public void start() throws Exception;
    public CompletableFuture<Void> closeAsync();
    
    @Override
    public final void close() throws Exception;
    
    protected final String getAddress();
    protected final String getEndpointId();
    protected final RpcService getRpcService();
    
    protected final <C extends RpcGateway> CompletableFuture<C> connectTo(String address, Class<C> clazz);
    protected final <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connectTo(
        String address, F fencingToken, Class<C> clazz);
    
    protected final void scheduleRunAsync(Runnable runnable, long delay, TimeUnit timeUnit);
    protected final ScheduledFuture<?> scheduleRunAsync(Runnable runnable, Time delay);
    
    protected final <V> CompletableFuture<V> callAsync(Callable<V> callable, Time timeout);
    
    protected void onStart() throws Exception;
    protected CompletableFuture<Void> onStop();
    
    protected void validateRunsInMainThread();
    protected void runAsync(Runnable runnable);
}

Gateway Interfaces

RpcGateway

Base interface for all RPC gateway implementations. Gateways provide client-side access to remote RPC endpoints.

public interface RpcGateway {
    String getAddress();
    String getHostname();
}

FencedRpcGateway

Extended RPC gateway interface that includes fencing tokens for leader election scenarios.

public interface FencedRpcGateway<F extends Serializable> extends RpcGateway {
    F getFencingToken();
}

RpcServer

Interface representing the server-side RPC endpoint that can be stopped and provides address information.

public interface RpcServer extends AutoCloseable {
    void start() throws Exception;
    
    @Override
    void close() throws Exception;
    
    String getAddress();
    int getPort();
    
    CompletableFuture<Void> getTerminationFuture();
}

RPC Method Annotations

RpcMethod

Annotation to mark methods as RPC-callable with timeout specifications.

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcMethod {
    /**
     * Timeout for the RPC call in milliseconds.
     */
    long timeout() default -1L;
}

RpcTimeout

Annotation to specify timeout for RPC calls at the parameter level.

@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME) 
public @interface RpcTimeout {
    // Marker annotation for timeout parameters
}

Exception Handling

RpcException

Base exception class for RPC-related failures.

public class RpcException extends FlinkException {
    public RpcException(String message);
    public RpcException(String message, Throwable cause);
}

RpcConnectionException

Exception thrown when RPC connection establishment fails.

public class RpcConnectionException extends RpcException {
    public RpcConnectionException(String message);
    public RpcConnectionException(String message, Throwable cause);
    
    public RpcConnectionException(String targetAddress, Class<?> rpcGatewayClass, Throwable cause);
    
    public String getTargetAddress();
    public Class<?> getRpcGatewayClass();
}

RpcRuntimeException

Runtime exception for RPC failures that don't require explicit handling.

public class RpcRuntimeException extends FlinkRuntimeException {
    public RpcRuntimeException(String message);
    public RpcRuntimeException(String message, Throwable cause);
}

Factory and Utils

RpcServiceUtils

Utility class providing factory methods and helper functions for RPC services.

public class RpcServiceUtils {
    public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception;
    
    public static RpcService createRpcService(String hostname, int port, Configuration configuration, 
                                            HighAvailabilityServices highAvailabilityServices) throws Exception;
    
    public static int getRandomPort();
    
    public static String createWildcardAddress();
    public static String getHostname(RpcService rpcService);
    
    public static CompletableFuture<Void> terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout);
    public static CompletableFuture<Void> terminateRpcService(RpcService rpcService, Time timeout);
    
    public static CompletableFuture<Void> terminateRpcServices(Time timeout, RpcService... rpcServices);
    
    public static <T extends RpcGateway> T getSynchronousRpcGateway(
        T rpcGateway, 
        Class<T> rpcGatewayClass, 
        Time timeout
    );
}

Configuration Options

RPC Configuration

Configuration options for customizing RPC behavior and networking.

public class RpcOptions {
    public static final ConfigOption<String> RPC_BIND_ADDRESS = 
        key("rpc.bind-address").defaultValue("");
        
    public static final ConfigOption<Integer> RPC_PORT = 
        key("rpc.port").defaultValue(0);
        
    public static final ConfigOption<Duration> RPC_ASK_TIMEOUT = 
        key("rpc.ask-timeout").defaultValue(Duration.ofSeconds(10));
        
    public static final ConfigOption<Duration> RPC_LOOKUP_TIMEOUT = 
        key("rpc.lookup-timeout").defaultValue(Duration.ofSeconds(10));
        
    public static final ConfigOption<Integer> RPC_CONNECT_RETRIES = 
        key("rpc.connect.retries").defaultValue(5);
        
    public static final ConfigOption<Duration> RPC_CONNECT_RETRY_DELAY = 
        key("rpc.connect.retry-delay").defaultValue(Duration.ofSeconds(1));
        
    public static final ConfigOption<Boolean> RPC_SSL_ENABLED = 
        key("rpc.ssl.enabled").defaultValue(false);
}

Usage Examples

Creating an RPC Endpoint

import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcMethod;

// Define the RPC gateway interface
public interface TaskManagerGateway extends RpcGateway {
    CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, @RpcTimeout Time timeout);
    CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
    CompletableFuture<TaskExecutorInfo> requestTaskExecutorInfo(@RpcTimeout Time timeout);
}

// Implement the RPC endpoint
public class TaskManagerRpcEndpoint extends RpcEndpoint implements TaskManagerGateway {
    private final TaskManager taskManager;
    
    public TaskManagerRpcEndpoint(RpcService rpcService, TaskManager taskManager) {
        super(rpcService, "TaskManager");
        this.taskManager = taskManager;
    }
    
    @Override
    protected void onStart() throws Exception {
        System.out.println("TaskManager RPC endpoint started at: " + getAddress());
    }
    
    @Override
    @RpcMethod(timeout = 30000L) // 30 second timeout
    public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                taskManager.submitTask(tdd);
                return Acknowledge.get();
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        }, getRpcService().getExecutor());
    }
    
    @Override
    @RpcMethod(timeout = 10000L) // 10 second timeout
    public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                taskManager.cancelTask(executionAttemptID);
                return Acknowledge.get();
            } catch (Exception e) {
                throw new CompletionException(e);
            }
        }, getRpcService().getExecutor());
    }
    
    @Override
    @RpcMethod(timeout = 5000L) // 5 second timeout
    public CompletableFuture<TaskExecutorInfo> requestTaskExecutorInfo(Time timeout) {
        return CompletableFuture.supplyAsync(() -> {
            return taskManager.getTaskExecutorInfo();
        }, getRpcService().getExecutor());
    }
    
    @Override
    protected CompletableFuture<Void> onStop() {
        System.out.println("TaskManager RPC endpoint stopping");
        return CompletableFuture.completedFuture(null);
    }
}

Setting Up RPC Service

import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.configuration.Configuration;

// Create RPC service configuration
Configuration config = new Configuration();
config.setString("rpc.bind-address", "localhost");
config.setInteger("rpc.port", 6123);
config.setString("rpc.ask-timeout", "10 s");

// Create RPC service
RpcService rpcService = RpcServiceUtils.createRpcService("localhost", 6123, config);

try {
    // Start TaskManager RPC endpoint
    TaskManager taskManager = new TaskManager();
    TaskManagerRpcEndpoint taskManagerEndpoint = new TaskManagerRpcEndpoint(rpcService, taskManager);
    taskManagerEndpoint.start();
    
    System.out.println("TaskManager available at: " + taskManagerEndpoint.getAddress());
    
    // Keep service running
    Thread.sleep(60000);
    
} finally {
    // Clean shutdown
    rpcService.stopService().get();
}

Connecting to Remote RPC Endpoints

import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.concurrent.FutureUtils;

public class JobManagerClient {
    private final RpcService rpcService;
    
    public JobManagerClient(RpcService rpcService) {
        this.rpcService = rpcService;
    }
    
    public void connectAndSubmitJob(String taskManagerAddress, JobGraph jobGraph) {
        // Connect to remote TaskManager
        CompletableFuture<TaskManagerGateway> connectionFuture = 
            rpcService.connect(taskManagerAddress, TaskManagerGateway.class);
        
        connectionFuture.thenCompose(taskManagerGateway -> {
            System.out.println("Connected to TaskManager at: " + taskManagerGateway.getAddress());
            
            // Submit tasks to TaskManager
            List<CompletableFuture<Acknowledge>> taskFutures = new ArrayList<>();
            
            for (JobVertex vertex : jobGraph.getVertices()) {
                TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(vertex);
                CompletableFuture<Acknowledge> taskFuture = taskManagerGateway.submitTask(
                    tdd, 
                    Time.seconds(30)
                );
                taskFutures.add(taskFuture);
            }
            
            // Wait for all tasks to be submitted
            return FutureUtils.waitForAll(taskFutures);
            
        }).whenComplete((result, throwable) -> {
            if (throwable != null) {
                System.err.println("Failed to submit tasks: " + throwable.getMessage());
            } else {
                System.out.println("All tasks submitted successfully");
            }
        });
    }
    
    private TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobVertex vertex) {
        // Create task deployment descriptor from job vertex
        return new TaskDeploymentDescriptor(/* ... */);
    }
}

Fenced RPC with Leader Election

import org.apache.flink.runtime.rpc.FencedRpcGateway;
import java.util.UUID;

// Define fenced RPC gateway for leader election scenarios
public interface ResourceManagerGateway extends FencedRpcGateway<UUID> {
    CompletableFuture<RegistrationResponse> registerTaskManager(
        UUID leaderSessionId,
        TaskManagerRegistration registration,
        @RpcTimeout Time timeout
    );
}

// Implement fenced RPC endpoint
public class ResourceManagerRpcEndpoint extends RpcEndpoint implements ResourceManagerGateway {
    private volatile UUID leaderSessionId;
    private volatile boolean isLeader = false;
    
    public ResourceManagerRpcEndpoint(RpcService rpcService) {
        super(rpcService, "ResourceManager");
    }
    
    @Override
    public UUID getFencingToken() {
        return leaderSessionId;
    }
    
    @Override
    @RpcMethod(timeout = 15000L)
    public CompletableFuture<RegistrationResponse> registerTaskManager(
            UUID leaderSessionId,
            TaskManagerRegistration registration, 
            Time timeout) {
        
        return CompletableFuture.supplyAsync(() -> {
            // Verify fencing token
            if (!isLeader || !Objects.equals(this.leaderSessionId, leaderSessionId)) {
                throw new CompletionException(new RpcException("Invalid leader session ID"));
            }
            
            // Process TaskManager registration
            return processTaskManagerRegistration(registration);
            
        }, getRpcService().getExecutor());
    }
    
    public void becomeLeader(UUID newLeaderSessionId) {
        runAsync(() -> {
            this.leaderSessionId = newLeaderSessionId;
            this.isLeader = true;
            System.out.println("Became leader with session ID: " + newLeaderSessionId);
        });
    }
    
    public void revokeLeadership() {
        runAsync(() -> {
            this.isLeader = false;
            this.leaderSessionId = null;
            System.out.println("Leadership revoked");
        });
    }
    
    private RegistrationResponse processTaskManagerRegistration(TaskManagerRegistration registration) {
        // Implementation for processing registration
        return new RegistrationResponse.Success();
    }
}

Error Handling and Retries

public class RobustRpcClient {
    private final RpcService rpcService;
    private final ScheduledExecutorService retryExecutor;
    
    public RobustRpcClient(RpcService rpcService) {
        this.rpcService = rpcService;
        this.retryExecutor = Executors.newScheduledThreadPool(1);
    }
    
    public <T> CompletableFuture<T> callWithRetry(
            String address,
            Class<? extends RpcGateway> gatewayClass,
            Function<RpcGateway, CompletableFuture<T>> rpcCall,
            int maxRetries) {
        
        return callWithRetryInternal(address, gatewayClass, rpcCall, maxRetries, 0);
    }
    
    private <T> CompletableFuture<T> callWithRetryInternal(
            String address,
            Class<? extends RpcGateway> gatewayClass,
            Function<RpcGateway, CompletableFuture<T>> rpcCall,
            int maxRetries,
            int currentAttempt) {
        
        return rpcService.connect(address, gatewayClass)
            .thenCompose(gateway -> {
                return rpcCall.apply(gateway);
            })
            .handle((result, throwable) -> {
                if (throwable != null && currentAttempt < maxRetries) {
                    System.out.println("RPC call failed (attempt " + (currentAttempt + 1) + 
                                     "/" + maxRetries + "), retrying...");
                    
                    // Exponential backoff
                    long delay = (long) Math.pow(2, currentAttempt) * 1000;
                    
                    CompletableFuture<T> retryFuture = new CompletableFuture<>();
                    retryExecutor.schedule(() -> {
                        callWithRetryInternal(address, gatewayClass, rpcCall, maxRetries, currentAttempt + 1)
                            .whenComplete((retryResult, retryThrowable) -> {
                                if (retryThrowable != null) {
                                    retryFuture.completeExceptionally(retryThrowable);
                                } else {
                                    retryFuture.complete(retryResult);
                                }
                            });
                    }, delay, TimeUnit.MILLISECONDS);
                    
                    return retryFuture;
                    
                } else if (throwable != null) {
                    CompletableFuture<T> failedFuture = new CompletableFuture<>();
                    failedFuture.completeExceptionally(throwable);
                    return failedFuture;
                } else {
                    return CompletableFuture.completedFuture(result);
                }
            })
            .thenCompose(Function.identity());
    }
}

Common Patterns

RPC Service Lifecycle Management

public class ClusterRpcManager {
    private final List<RpcService> rpcServices = new ArrayList<>();
    private final List<RpcEndpoint> rpcEndpoints = new ArrayList<>();
    
    public RpcService createRpcService(String hostname, int port, Configuration config) throws Exception {
        RpcService rpcService = RpcServiceUtils.createRpcService(hostname, port, config);
        rpcServices.add(rpcService);
        return rpcService;
    }
    
    public <T extends RpcEndpoint> T startRpcEndpoint(T endpoint) throws Exception {
        endpoint.start();
        rpcEndpoints.add(endpoint);
        return endpoint;
    }
    
    public void shutdown() throws Exception {
        // Stop all endpoints first
        List<CompletableFuture<Void>> endpointFutures = rpcEndpoints.stream()
            .map(RpcEndpoint::closeAsync)
            .collect(Collectors.toList());
        
        CompletableFuture.allOf(endpointFutures.toArray(new CompletableFuture[0])).get();
        
        // Then stop all services
        List<CompletableFuture<Void>> serviceFutures = rpcServices.stream()
            .map(RpcService::stopService)
            .collect(Collectors.toList());
        
        CompletableFuture.allOf(serviceFutures.toArray(new CompletableFuture[0])).get();
    }
}

Timeout Handling

public class TimeoutAwareRpcClient {
    
    public <T> CompletableFuture<T> callWithTimeout(
            CompletableFuture<T> rpcCall, 
            Duration timeout,
            ScheduledExecutorService timeoutExecutor) {
        
        CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
        
        // Set up timeout
        ScheduledFuture<?> timeoutTask = timeoutExecutor.schedule(() -> {
            timeoutFuture.completeExceptionally(
                new TimeoutException("RPC call timed out after " + timeout)
            );
        }, timeout.toMillis(), TimeUnit.MILLISECONDS);
        
        // Race between RPC call completion and timeout
        rpcCall.whenComplete((result, throwable) -> {
            timeoutTask.cancel(false);
            if (throwable != null) {
                timeoutFuture.completeExceptionally(throwable);
            } else {
                timeoutFuture.complete(result);
            }
        });
        
        return timeoutFuture;
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10

docs

data-exchange.md

execution-graph.md

high-availability.md

index.md

job-management.md

message-passing.md

metrics.md

mini-cluster.md

rpc-framework.md

state-management.md

task-execution.md

tile.json