Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications. The framework enables GPU-aware task scheduling and resource allocation through configurable discovery scripts and extensible APIs.
org.apache.flink:flink-external-resources:2.1.0 (parent) / org.apache.flink:flink-external-resource-gpu:2.1.0 (GPU module)pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-external-resource-gpu</artifactId>
<version>2.1.0</version>
</dependency>Module Structure:
flink-external-resources - Provides the external resource frameworkflink-external-resource-gpu - Implements GPU-specific resource discovery// GPU-specific classes
import org.apache.flink.externalresource.gpu.GPUDriverFactory;
import org.apache.flink.externalresource.gpu.GPUInfo;
import org.apache.flink.externalresource.gpu.GPUDriverOptions;
// Flink core external resource interfaces
import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
// Configuration and utilities
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
// Standard Java imports
import java.util.Set;
import java.util.Collection;
import java.util.Optional;
import java.io.FileNotFoundException;
import java.util.concurrent.TimeoutException;import org.apache.flink.configuration.Configuration;
import org.apache.flink.externalresource.gpu.GPUDriverFactory;
import org.apache.flink.externalresource.gpu.GPUDriverOptions;
import org.apache.flink.externalresource.gpu.GPUInfo;
import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import java.util.Set;
import java.util.Optional;
// Configure GPU discovery with default NVIDIA script
Configuration config = new Configuration();
// Using default script (optional - can omit if default is acceptable)
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH,
"plugins/external-resource-gpu/nvidia-gpu-discovery.sh");
// Enable coordination mode to prevent conflicts
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode");
// Create GPU driver via factory
GPUDriverFactory factory = new GPUDriverFactory();
ExternalResourceDriver driver = factory.createExternalResourceDriver(config);
// Discover available GPU resources
Set<? extends ExternalResourceInfo> gpuResources = driver.retrieveResourceInfo(2); // Request 2 GPUs
// Access GPU information
for (ExternalResourceInfo gpu : gpuResources) {
// Cast to GPUInfo for type safety (actual return type)
GPUInfo gpuInfo = (GPUInfo) gpu;
// Access GPU index property
Optional<String> index = gpu.getProperty("index");
if (index.isPresent()) {
System.out.println("Allocated GPU index: " + index.get());
}
// Get all available properties
System.out.println("Available properties: " + gpu.getKeys());
// String representation
System.out.println("GPU: " + gpu.toString()); // Output: "GPU Device(0)", "GPU Device(1)", etc.
}
// Example with custom script and arguments
Configuration customConfig = new Configuration();
customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH, "/opt/custom-gpu-discovery.sh");
customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--min-memory=8GB --cuda-version=11.0");The framework is built around the Flink external resource system with these key components:
Factory for creating GPU resource drivers through Flink's external resource system.
/**
* Factory for creating {@link GPUDriver} instances.
* Loaded automatically by Flink via Java Service Loader mechanism.
*/
public class GPUDriverFactory implements ExternalResourceDriverFactory {
/**
* Creates a GPU driver with the specified configuration
* @param config Configuration containing discovery script settings
* @return ExternalResourceDriver for GPU resource discovery
* @throws Exception if configuration is invalid or script setup fails
*/
@Override
public ExternalResourceDriver createExternalResourceDriver(Configuration config)
throws Exception;
}The core implementation that executes discovery scripts to find available GPU resources.
/**
* Driver takes the responsibility to discover GPU resources and provide the GPU resource
* information. It retrieves the GPU information by executing a user-defined discovery script.
*
* Note: This class is package-private and should only be created via GPUDriverFactory
*/
class GPUDriver implements ExternalResourceDriver {
/**
* Constructs GPUDriver with configuration validation and script setup
* @param config Configuration containing discovery script path and arguments
* @throws IllegalConfigurationException if script path is not configured
* @throws FileNotFoundException if discovery script file not found
* @throws FlinkException if script file exists but is not executable
*/
GPUDriver(Configuration config) throws Exception;
/**
* Retrieve GPU resource information by executing the configured discovery script
* @param gpuAmount Number of required GPU resources (must be > 0)
* @return Set of GPUInfo objects representing discovered GPU resources
* @throws IllegalArgumentException if gpuAmount <= 0
* @throws Exception if script execution fails
* @throws TimeoutException if script execution exceeds 10 seconds
* @throws FlinkException if script exits with non-zero code
*/
@Override
public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws Exception;
}Container for GPU resource information with property-based access.
/**
* Information for GPU resource. Currently only including the GPU index.
* Note: Constructor is package-private - instances created by GPUDriver
*/
public class GPUInfo implements ExternalResourceInfo {
/**
* Get the property indicated by the specified key
* @param key of the required property ("index" is supported)
* @return Optional containing the value, or empty if key not found
*/
public Optional<String> getProperty(String key);
/**
* Get all property keys
* @return Collection of all property keys
*/
public Collection<String> getKeys();
/**
* Returns formatted string representation of GPU device
* @return String in format "GPU Device(index)"
*/
public String toString();
/**
* Hash code based on GPU index
* @return int hash code
*/
public int hashCode();
/**
* Equality comparison based on GPU index
* @param obj Object to compare
* @return boolean true if equal GPU indices
*/
public boolean equals(Object obj);
}Configuration options for GPU resource discovery behavior.
/**
* A collection of all configuration options for GPU driver.
* Uses @Documentation.SuffixOption for automatic key generation.
*/
@PublicEvolving
public class GPUDriverOptions {
/**
* Configuration key: "discovery-script.path"
* Full key pattern: external-resource.<resource_name>.param.discovery-script.path
*
* The path of the discovery script. Can be absolute path or relative to FLINK_HOME.
* Default: plugins/external-resource-gpu/nvidia-gpu-discovery.sh
*/
@Documentation.SuffixOption("external-resource.<resource_name>.param")
public static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
key("discovery-script.path")
.stringType()
.defaultValue("plugins/external-resource-gpu/nvidia-gpu-discovery.sh")
.withDescription("Path to GPU discovery script");
/**
* Configuration key: "discovery-script.args"
* Full key pattern: external-resource.<resource_name>.param.discovery-script.args
*
* The arguments passed to the discovery script as second parameter.
* No default value - leave unset if script requires no arguments.
*/
@Documentation.SuffixOption("external-resource.<resource_name>.param")
public static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
key("discovery-script.args")
.stringType()
.noDefaultValue()
.withDescription("Arguments for GPU discovery script");
}The external resource system is built on these core Flink interfaces:
/**
* Driver which takes the responsibility to manage and provide the information of external resource.
* Drivers are instantiated via an ExternalResourceDriverFactory.
* TaskExecutor retrieves ExternalResourceInfo from the drivers.
*/
@PublicEvolving
public interface ExternalResourceDriver {
/**
* Retrieve the information of the external resources according to the amount.
* @param amount of the required external resources
* @return information set of the required external resources
* @throws Exception if there is something wrong during retrieving
*/
Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;
}
/**
* Factory for ExternalResourceDriver. Instantiate a driver with configuration.
* Drivers with factories automatically qualify for plugin loading if the driver jar
* is self-contained and contains a META-INF/services file.
*/
@PublicEvolving
public interface ExternalResourceDriverFactory {
/**
* Construct the ExternalResourceDriver from configuration.
* @param config configuration for this external resource
* @return the driver for this external resource
* @throws Exception if there is something wrong during the creation
*/
ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;
}
/**
* Contains the information of an external resource.
*/
@PublicEvolving
public interface ExternalResourceInfo {
/**
* Get the property indicated by the specified key.
* @param key of the required property
* @return an Optional containing the value, or empty if no value stored under key
*/
Optional<String> getProperty(String key);
/**
* Get all property keys.
* @return collection of all property keys
*/
Collection<String> getKeys();
}The GPU driver integrates with Flink's configuration system using these key patterns:
// Configuration key pattern for external resources
external-resource.<resource_name>.param.discovery-script.path
external-resource.<resource_name>.param.discovery-script.args
// Example for GPU resources named "gpu"
external-resource.gpu.param.discovery-script.path: "/opt/flink/scripts/nvidia-gpu-discovery.sh"
external-resource.gpu.param.discovery-script.args: "--cuda-version=11.0"The framework includes a default NVIDIA GPU discovery script:
Script Details:
{FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.shnvidia-smi command available in PATHgpu-discovery-common.sh for shared allocation logicnvidia-smi --query-gpu=index --format=csv,noheaderScript Arguments:
# Basic usage
./nvidia-gpu-discovery.sh <gpu-amount>
# With coordination mode (prevents resource conflicts)
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode
# Custom coordination file location
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/pathInternal Process:
nvidia-smi to get available GPU indices/**
* Configuration-related exceptions thrown during GPUDriver construction
*/
// IllegalConfigurationException
// Thrown when: GPU discovery script path is null, empty, or whitespace-only
// Message: "GPU discovery script ('external-resource.<name>.param.discovery-script.path') is not configured."
// FileNotFoundException
// Thrown when: Discovery script file does not exist at the specified path
// Message: "The gpu discovery script does not exist in path <absolute-path>."
// FlinkException
// Thrown when: Script file exists but is not executable
// Message: "The discovery script <absolute-path> is not executable."
/**
* Runtime exceptions thrown during resource discovery (retrieveResourceInfo)
*/
// IllegalArgumentException
// Thrown when: gpuAmount parameter <= 0
// Message: "The gpuAmount should be positive when retrieving the GPU resource information."
// TimeoutException
// Thrown when: Script execution exceeds 10 seconds (DISCOVERY_SCRIPT_TIMEOUT_MS)
// Message: "The discovery script executed for over 10000 ms."
// FlinkException
// Thrown when: Discovery script exits with non-zero return code
// Message: "Discovery script exit with non-zero return code: <exit-code>."
// Additional: Warning logged with stdout/stderr content
/**
* Discovery script output validation
*/
// Warning logged when: Script produces multiple output lines
// Message: "The output of the discovery script should only contain one single line. Finding <count> lines..."
// Behavior: Only first line is used, others ignoredCustom discovery scripts must follow these requirements:
Script Interface:
Output Validation:
Error Handling:
Example Custom Discovery Script:
#!/bin/bash
# Custom GPU discovery script
GPU_AMOUNT=$1
SCRIPT_ARGS="$2"
# Your custom GPU detection logic here
# Must output comma-separated indices
echo "0,1,3" # Example: allocate GPUs 0, 1, and 3Script Execution Context:
// GPUDriver executes script as:
String cmd = discoveryScript.getAbsolutePath() + " " + gpuAmount + " " + args;
Process process = Runtime.getRuntime().exec(cmd);The GPU driver is automatically discovered by Flink through Java Service Loader:
META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory:
org.apache.flink.externalresource.gpu.GPUDriverFactoryThis enables automatic registration with Flink's external resource management system without manual configuration.
The default NVIDIA GPU discovery script supports coordination mode to prevent resource conflicts:
# Usage patterns
./nvidia-gpu-discovery.sh <gpu-amount>
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/pathCoordination Features:
/var/tmp/flink-gpu-coordination// Configuration for coordination mode
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode --coordination-file /tmp/gpu-coord");/**
* Configuration system for Flink settings
*/
public class Configuration {
public <T> T get(ConfigOption<T> option);
public <T> Configuration set(ConfigOption<T> option, T value);
}
/**
* Typed configuration option with key, type, and default value
*/
public class ConfigOption<T> {
public String key();
public T defaultValue();
}
/**
* Exception for invalid configuration values
*/
@PublicEvolving
public class IllegalConfigurationException extends RuntimeException {
public IllegalConfigurationException(String message);
public IllegalConfigurationException(String message, Throwable cause);
}
/**
* Base class of all Flink-specific checked exceptions
*/
@Public
public class FlinkException extends Exception {
public FlinkException(String message);
public FlinkException(String message, Throwable cause);
}/**
* Property key constant for accessing GPU index from GPUInfo
*/
public static final String PROPERTY_KEY_INDEX = "index";
/**
* Script execution timeout in milliseconds (package-private in GPUDriver)
*/
private static final long DISCOVERY_SCRIPT_TIMEOUT_MS = 10000;
/**
* Default discovery script locations and names
*/
public static final String DEFAULT_FLINK_PLUGINS_DIRS = "plugins";
// Default script: {FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.shThe GPU driver factory is registered via Service Loader in the JAR file:
File: META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory
Content:
org.apache.flink.externalresource.gpu.GPUDriverFactoryThis file enables automatic discovery and loading by Flink's plugin system without requiring manual registration or configuration changes.