External resource driver for GPU management in Apache Flink streaming and batch processing jobs
npx @tessl/cli install tessl/maven-org-apache-flink--flink-external-resource-gpu@2.1.0The Flink External Resource GPU Driver provides GPU resource management capabilities for Apache Flink streaming and batch processing jobs. It implements Flink's ExternalResourceDriver interface to enable discovery, allocation, and management of GPU resources across cluster nodes using configurable discovery scripts.
org.apache.flink and artifactId flink-external-resource-gpuimport org.apache.flink.externalresource.gpu.GPUDriverFactory;
import org.apache.flink.externalresource.gpu.GPUDriverOptions;
import org.apache.flink.externalresource.gpu.GPUInfo;
import org.apache.flink.configuration.Configuration;import org.apache.flink.externalresource.gpu.GPUDriverFactory;
import org.apache.flink.externalresource.gpu.GPUDriverOptions;
import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
import org.apache.flink.configuration.Configuration;
import java.util.Set;
// Configure GPU discovery
Configuration config = new Configuration();
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH, "/path/to/gpu-discovery-script.sh");
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--device-type nvidia");
// Create GPU driver through factory
GPUDriverFactory factory = new GPUDriverFactory();
ExternalResourceDriver driver = factory.createExternalResourceDriver(config);
// Discover GPU resources
Set<GPUInfo> gpuResources = driver.retrieveResourceInfo(2L); // Request 2 GPUs
// Use GPU information
for (GPUInfo gpu : gpuResources) {
// Get GPU device index (GPUInfo always provides "index" property)
String deviceIndex = gpu.getProperty("index").orElse("unknown");
System.out.println("Available GPU: " + gpu.toString()); // e.g., "GPU Device(0)"
}The GPU driver is built around several key components:
Factory for creating GPU driver instances with proper configuration validation.
/**
* Factory for creating GPU driver instances
*/
public class GPUDriverFactory implements ExternalResourceDriverFactory {
/**
* Creates an external resource driver for GPU management
* @param config Configuration containing GPU discovery settings
* @return ExternalResourceDriver instance for GPU resources
* @throws Exception if configuration is invalid or driver creation fails
*/
public ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;
}Represents individual GPU device information including device indices and properties.
/**
* Information container for GPU resource, currently including the GPU index
* Note: Constructor is package-private, instances created through GPUDriver.retrieveResourceInfo()
*/
public class GPUInfo implements ExternalResourceInfo {
/**
* Gets property value by key
* @param key Property key to retrieve (supports "index")
* @return Optional containing property value, or empty if key not found
*/
public Optional<String> getProperty(String key);
/**
* Gets all available property keys
* @return Collection of available property keys (currently only "index")
*/
public Collection<String> getKeys();
/**
* String representation of GPU device
* @return Formatted string like "GPU Device(0)"
*/
public String toString();
/**
* Hash code based on GPU index
* @return Hash code for this GPU info
*/
public int hashCode();
/**
* Equality comparison based on GPU index
* @param obj Object to compare
* @return true if objects represent same GPU device
*/
public boolean equals(Object obj);
}Configuration options for GPU discovery script path and arguments.
/**
* Configuration options for GPU driver
*/
@PublicEvolving
public class GPUDriverOptions {
/**
* Configuration option for discovery script path
* Key: "discovery-script.path"
* Default: "/opt/flink/plugins/external-resource-gpu/nvidia-gpu-discovery.sh" (DEFAULT_FLINK_PLUGINS_DIRS + "/external-resource-gpu/nvidia-gpu-discovery.sh")
* Description: Path to GPU discovery script (absolute or relative to FLINK_HOME)
*/
public static final ConfigOption<String> DISCOVERY_SCRIPT_PATH;
/**
* Configuration option for discovery script arguments
* Key: "discovery-script.args"
* Default: No default value
* Description: Arguments passed to the discovery script
*/
public static final ConfigOption<String> DISCOVERY_SCRIPT_ARG;
}Core functionality for discovering and retrieving GPU resources through configurable scripts.
/**
* Driver for GPU resource discovery and management
* Implements ExternalResourceDriver interface for Flink integration
* Note: Constructor is package-private, instances created through GPUDriverFactory
*/
class GPUDriver implements ExternalResourceDriver {
/**
* Discovers and retrieves GPU resources by executing discovery script
* @param gpuAmount Number of GPUs to discover (must be > 0)
* @return Unmodifiable set of GPUInfo objects representing discovered GPUs
* @throws IllegalArgumentException if gpuAmount <= 0
* @throws TimeoutException if discovery script times out (10 second limit)
* @throws FlinkException if discovery script exits with non-zero code
* @throws FileNotFoundException if discovery script file does not exist
* @throws IllegalConfigurationException if discovery script path is not configured
*/
public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws Exception;
}The GPU driver uses a 10-second timeout for discovery script execution (defined by private constant DISCOVERY_SCRIPT_TIMEOUT_MS = 10000L) and expects GPU device indices to be identified by the "index" property key. The discovery script execution includes comprehensive error handling and logging for debugging script execution issues.
Logging behavior:
// External dependencies from flink-core
interface ExternalResourceDriver {
Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;
}
interface ExternalResourceDriverFactory {
ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;
}
interface ExternalResourceInfo {
Optional<String> getProperty(String key);
Collection<String> getKeys();
}
// Configuration types
class Configuration {
<T> T get(ConfigOption<T> option);
<T> void set(ConfigOption<T> option, T value);
}
class ConfigOption<T> {
String key();
}The GPU driver throws specific exceptions for different error conditions:
Configuration and script validation during driver initialization:
Discovery script integration expects:
gpuAmount and optional argsThe driver integrates with external discovery scripts to detect GPU hardware:
# Example script execution (command format: <script_path> <gpuAmount> <args>)
/path/to/discovery-script.sh 2 --device-type nvidia
# Expected output format (comma-separated indices on single line)
0,1
# If no GPUs found, script should output empty string or just whitespaceThe discovery script should:
The driver executes the script using Runtime.exec() with command format: <script_absolute_path> <gpuAmount> <args>