Apache Flink runtime engine providing core distributed streaming dataflow execution, task scheduling, state management, and fault tolerance capabilities.
—
The RPC (Remote Procedure Call) Framework provides the communication infrastructure for distributed components in the Flink cluster. It enables reliable, asynchronous communication between JobManagers, TaskManagers, and client applications across the cluster network.
The main RPC service interface that manages remote communication endpoints and connections.
public interface RpcService extends AutoCloseable {
<C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz);
<F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connect(
String address, F fencingToken, Class<C> clazz);
void stopServer(RpcEndpoint endpoint);
CompletableFuture<Void> stopService();
Executor getExecutor();
ScheduledExecutor getScheduledExecutor();
void executeRunnable(Runnable runnable);
void execute(Runnable runnable);
<T> CompletableFuture<T> execute(Callable<T> callable);
ScheduledFuture<?> scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
}Abstract base class for all RPC endpoints in the Flink cluster. Components extend this class to expose RPC interfaces.
public abstract class RpcEndpoint implements AutoCloseable {
protected RpcEndpoint(RpcService rpcService);
protected RpcEndpoint(RpcService rpcService, String endpointId);
public void start() throws Exception;
public CompletableFuture<Void> closeAsync();
@Override
public final void close() throws Exception;
protected final String getAddress();
protected final String getEndpointId();
protected final RpcService getRpcService();
protected final <C extends RpcGateway> CompletableFuture<C> connectTo(String address, Class<C> clazz);
protected final <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture<C> connectTo(
String address, F fencingToken, Class<C> clazz);
protected final void scheduleRunAsync(Runnable runnable, long delay, TimeUnit timeUnit);
protected final ScheduledFuture<?> scheduleRunAsync(Runnable runnable, Time delay);
protected final <V> CompletableFuture<V> callAsync(Callable<V> callable, Time timeout);
protected void onStart() throws Exception;
protected CompletableFuture<Void> onStop();
protected void validateRunsInMainThread();
protected void runAsync(Runnable runnable);
}Base interface for all RPC gateway implementations. Gateways provide client-side access to remote RPC endpoints.
public interface RpcGateway {
String getAddress();
String getHostname();
}Extended RPC gateway interface that includes fencing tokens for leader election scenarios.
public interface FencedRpcGateway<F extends Serializable> extends RpcGateway {
F getFencingToken();
}Interface representing the server-side RPC endpoint that can be stopped and provides address information.
public interface RpcServer extends AutoCloseable {
void start() throws Exception;
@Override
void close() throws Exception;
String getAddress();
int getPort();
CompletableFuture<Void> getTerminationFuture();
}Annotation to mark methods as RPC-callable with timeout specifications.
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcMethod {
/**
* Timeout for the RPC call in milliseconds.
*/
long timeout() default -1L;
}Annotation to specify timeout for RPC calls at the parameter level.
@Target(ElementType.PARAMETER)
@Retention(RetentionPolicy.RUNTIME)
public @interface RpcTimeout {
// Marker annotation for timeout parameters
}Base exception class for RPC-related failures.
public class RpcException extends FlinkException {
public RpcException(String message);
public RpcException(String message, Throwable cause);
}Exception thrown when RPC connection establishment fails.
public class RpcConnectionException extends RpcException {
public RpcConnectionException(String message);
public RpcConnectionException(String message, Throwable cause);
public RpcConnectionException(String targetAddress, Class<?> rpcGatewayClass, Throwable cause);
public String getTargetAddress();
public Class<?> getRpcGatewayClass();
}Runtime exception for RPC failures that don't require explicit handling.
public class RpcRuntimeException extends FlinkRuntimeException {
public RpcRuntimeException(String message);
public RpcRuntimeException(String message, Throwable cause);
}Utility class providing factory methods and helper functions for RPC services.
public class RpcServiceUtils {
public static RpcService createRpcService(String hostname, int port, Configuration configuration) throws Exception;
public static RpcService createRpcService(String hostname, int port, Configuration configuration,
HighAvailabilityServices highAvailabilityServices) throws Exception;
public static int getRandomPort();
public static String createWildcardAddress();
public static String getHostname(RpcService rpcService);
public static CompletableFuture<Void> terminateRpcEndpoint(RpcEndpoint rpcEndpoint, Time timeout);
public static CompletableFuture<Void> terminateRpcService(RpcService rpcService, Time timeout);
public static CompletableFuture<Void> terminateRpcServices(Time timeout, RpcService... rpcServices);
public static <T extends RpcGateway> T getSynchronousRpcGateway(
T rpcGateway,
Class<T> rpcGatewayClass,
Time timeout
);
}Configuration options for customizing RPC behavior and networking.
public class RpcOptions {
public static final ConfigOption<String> RPC_BIND_ADDRESS =
key("rpc.bind-address").defaultValue("");
public static final ConfigOption<Integer> RPC_PORT =
key("rpc.port").defaultValue(0);
public static final ConfigOption<Duration> RPC_ASK_TIMEOUT =
key("rpc.ask-timeout").defaultValue(Duration.ofSeconds(10));
public static final ConfigOption<Duration> RPC_LOOKUP_TIMEOUT =
key("rpc.lookup-timeout").defaultValue(Duration.ofSeconds(10));
public static final ConfigOption<Integer> RPC_CONNECT_RETRIES =
key("rpc.connect.retries").defaultValue(5);
public static final ConfigOption<Duration> RPC_CONNECT_RETRY_DELAY =
key("rpc.connect.retry-delay").defaultValue(Duration.ofSeconds(1));
public static final ConfigOption<Boolean> RPC_SSL_ENABLED =
key("rpc.ssl.enabled").defaultValue(false);
}import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcMethod;
// Define the RPC gateway interface
public interface TaskManagerGateway extends RpcGateway {
CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, @RpcTimeout Time timeout);
CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, @RpcTimeout Time timeout);
CompletableFuture<TaskExecutorInfo> requestTaskExecutorInfo(@RpcTimeout Time timeout);
}
// Implement the RPC endpoint
public class TaskManagerRpcEndpoint extends RpcEndpoint implements TaskManagerGateway {
private final TaskManager taskManager;
public TaskManagerRpcEndpoint(RpcService rpcService, TaskManager taskManager) {
super(rpcService, "TaskManager");
this.taskManager = taskManager;
}
@Override
protected void onStart() throws Exception {
System.out.println("TaskManager RPC endpoint started at: " + getAddress());
}
@Override
@RpcMethod(timeout = 30000L) // 30 second timeout
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
return CompletableFuture.supplyAsync(() -> {
try {
taskManager.submitTask(tdd);
return Acknowledge.get();
} catch (Exception e) {
throw new CompletionException(e);
}
}, getRpcService().getExecutor());
}
@Override
@RpcMethod(timeout = 10000L) // 10 second timeout
public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
return CompletableFuture.supplyAsync(() -> {
try {
taskManager.cancelTask(executionAttemptID);
return Acknowledge.get();
} catch (Exception e) {
throw new CompletionException(e);
}
}, getRpcService().getExecutor());
}
@Override
@RpcMethod(timeout = 5000L) // 5 second timeout
public CompletableFuture<TaskExecutorInfo> requestTaskExecutorInfo(Time timeout) {
return CompletableFuture.supplyAsync(() -> {
return taskManager.getTaskExecutorInfo();
}, getRpcService().getExecutor());
}
@Override
protected CompletableFuture<Void> onStop() {
System.out.println("TaskManager RPC endpoint stopping");
return CompletableFuture.completedFuture(null);
}
}import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcServiceUtils;
import org.apache.flink.configuration.Configuration;
// Create RPC service configuration
Configuration config = new Configuration();
config.setString("rpc.bind-address", "localhost");
config.setInteger("rpc.port", 6123);
config.setString("rpc.ask-timeout", "10 s");
// Create RPC service
RpcService rpcService = RpcServiceUtils.createRpcService("localhost", 6123, config);
try {
// Start TaskManager RPC endpoint
TaskManager taskManager = new TaskManager();
TaskManagerRpcEndpoint taskManagerEndpoint = new TaskManagerRpcEndpoint(rpcService, taskManager);
taskManagerEndpoint.start();
System.out.println("TaskManager available at: " + taskManagerEndpoint.getAddress());
// Keep service running
Thread.sleep(60000);
} finally {
// Clean shutdown
rpcService.stopService().get();
}import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.concurrent.FutureUtils;
public class JobManagerClient {
private final RpcService rpcService;
public JobManagerClient(RpcService rpcService) {
this.rpcService = rpcService;
}
public void connectAndSubmitJob(String taskManagerAddress, JobGraph jobGraph) {
// Connect to remote TaskManager
CompletableFuture<TaskManagerGateway> connectionFuture =
rpcService.connect(taskManagerAddress, TaskManagerGateway.class);
connectionFuture.thenCompose(taskManagerGateway -> {
System.out.println("Connected to TaskManager at: " + taskManagerGateway.getAddress());
// Submit tasks to TaskManager
List<CompletableFuture<Acknowledge>> taskFutures = new ArrayList<>();
for (JobVertex vertex : jobGraph.getVertices()) {
TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(vertex);
CompletableFuture<Acknowledge> taskFuture = taskManagerGateway.submitTask(
tdd,
Time.seconds(30)
);
taskFutures.add(taskFuture);
}
// Wait for all tasks to be submitted
return FutureUtils.waitForAll(taskFutures);
}).whenComplete((result, throwable) -> {
if (throwable != null) {
System.err.println("Failed to submit tasks: " + throwable.getMessage());
} else {
System.out.println("All tasks submitted successfully");
}
});
}
private TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobVertex vertex) {
// Create task deployment descriptor from job vertex
return new TaskDeploymentDescriptor(/* ... */);
}
}import org.apache.flink.runtime.rpc.FencedRpcGateway;
import java.util.UUID;
// Define fenced RPC gateway for leader election scenarios
public interface ResourceManagerGateway extends FencedRpcGateway<UUID> {
CompletableFuture<RegistrationResponse> registerTaskManager(
UUID leaderSessionId,
TaskManagerRegistration registration,
@RpcTimeout Time timeout
);
}
// Implement fenced RPC endpoint
public class ResourceManagerRpcEndpoint extends RpcEndpoint implements ResourceManagerGateway {
private volatile UUID leaderSessionId;
private volatile boolean isLeader = false;
public ResourceManagerRpcEndpoint(RpcService rpcService) {
super(rpcService, "ResourceManager");
}
@Override
public UUID getFencingToken() {
return leaderSessionId;
}
@Override
@RpcMethod(timeout = 15000L)
public CompletableFuture<RegistrationResponse> registerTaskManager(
UUID leaderSessionId,
TaskManagerRegistration registration,
Time timeout) {
return CompletableFuture.supplyAsync(() -> {
// Verify fencing token
if (!isLeader || !Objects.equals(this.leaderSessionId, leaderSessionId)) {
throw new CompletionException(new RpcException("Invalid leader session ID"));
}
// Process TaskManager registration
return processTaskManagerRegistration(registration);
}, getRpcService().getExecutor());
}
public void becomeLeader(UUID newLeaderSessionId) {
runAsync(() -> {
this.leaderSessionId = newLeaderSessionId;
this.isLeader = true;
System.out.println("Became leader with session ID: " + newLeaderSessionId);
});
}
public void revokeLeadership() {
runAsync(() -> {
this.isLeader = false;
this.leaderSessionId = null;
System.out.println("Leadership revoked");
});
}
private RegistrationResponse processTaskManagerRegistration(TaskManagerRegistration registration) {
// Implementation for processing registration
return new RegistrationResponse.Success();
}
}public class RobustRpcClient {
private final RpcService rpcService;
private final ScheduledExecutorService retryExecutor;
public RobustRpcClient(RpcService rpcService) {
this.rpcService = rpcService;
this.retryExecutor = Executors.newScheduledThreadPool(1);
}
public <T> CompletableFuture<T> callWithRetry(
String address,
Class<? extends RpcGateway> gatewayClass,
Function<RpcGateway, CompletableFuture<T>> rpcCall,
int maxRetries) {
return callWithRetryInternal(address, gatewayClass, rpcCall, maxRetries, 0);
}
private <T> CompletableFuture<T> callWithRetryInternal(
String address,
Class<? extends RpcGateway> gatewayClass,
Function<RpcGateway, CompletableFuture<T>> rpcCall,
int maxRetries,
int currentAttempt) {
return rpcService.connect(address, gatewayClass)
.thenCompose(gateway -> {
return rpcCall.apply(gateway);
})
.handle((result, throwable) -> {
if (throwable != null && currentAttempt < maxRetries) {
System.out.println("RPC call failed (attempt " + (currentAttempt + 1) +
"/" + maxRetries + "), retrying...");
// Exponential backoff
long delay = (long) Math.pow(2, currentAttempt) * 1000;
CompletableFuture<T> retryFuture = new CompletableFuture<>();
retryExecutor.schedule(() -> {
callWithRetryInternal(address, gatewayClass, rpcCall, maxRetries, currentAttempt + 1)
.whenComplete((retryResult, retryThrowable) -> {
if (retryThrowable != null) {
retryFuture.completeExceptionally(retryThrowable);
} else {
retryFuture.complete(retryResult);
}
});
}, delay, TimeUnit.MILLISECONDS);
return retryFuture;
} else if (throwable != null) {
CompletableFuture<T> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(throwable);
return failedFuture;
} else {
return CompletableFuture.completedFuture(result);
}
})
.thenCompose(Function.identity());
}
}public class ClusterRpcManager {
private final List<RpcService> rpcServices = new ArrayList<>();
private final List<RpcEndpoint> rpcEndpoints = new ArrayList<>();
public RpcService createRpcService(String hostname, int port, Configuration config) throws Exception {
RpcService rpcService = RpcServiceUtils.createRpcService(hostname, port, config);
rpcServices.add(rpcService);
return rpcService;
}
public <T extends RpcEndpoint> T startRpcEndpoint(T endpoint) throws Exception {
endpoint.start();
rpcEndpoints.add(endpoint);
return endpoint;
}
public void shutdown() throws Exception {
// Stop all endpoints first
List<CompletableFuture<Void>> endpointFutures = rpcEndpoints.stream()
.map(RpcEndpoint::closeAsync)
.collect(Collectors.toList());
CompletableFuture.allOf(endpointFutures.toArray(new CompletableFuture[0])).get();
// Then stop all services
List<CompletableFuture<Void>> serviceFutures = rpcServices.stream()
.map(RpcService::stopService)
.collect(Collectors.toList());
CompletableFuture.allOf(serviceFutures.toArray(new CompletableFuture[0])).get();
}
}public class TimeoutAwareRpcClient {
public <T> CompletableFuture<T> callWithTimeout(
CompletableFuture<T> rpcCall,
Duration timeout,
ScheduledExecutorService timeoutExecutor) {
CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
// Set up timeout
ScheduledFuture<?> timeoutTask = timeoutExecutor.schedule(() -> {
timeoutFuture.completeExceptionally(
new TimeoutException("RPC call timed out after " + timeout)
);
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
// Race between RPC call completion and timeout
rpcCall.whenComplete((result, throwable) -> {
timeoutTask.cancel(false);
if (throwable != null) {
timeoutFuture.completeExceptionally(throwable);
} else {
timeoutFuture.complete(result);
}
});
return timeoutFuture;
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-2-10