Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-external-resources@2.1.0Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications. The framework enables GPU-aware task scheduling and resource allocation through configurable discovery scripts and extensible APIs.
org.apache.flink:flink-external-resources:2.1.0 (parent) / org.apache.flink:flink-external-resource-gpu:2.1.0 (GPU module)pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-external-resource-gpu</artifactId>
<version>2.1.0</version>
</dependency>Module Structure:
flink-external-resources - Provides the external resource frameworkflink-external-resource-gpu - Implements GPU-specific resource discovery// GPU-specific classes
import org.apache.flink.externalresource.gpu.GPUDriverFactory;
import org.apache.flink.externalresource.gpu.GPUInfo;
import org.apache.flink.externalresource.gpu.GPUDriverOptions;
// Flink core external resource interfaces
import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
// Configuration and utilities
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
// Standard Java imports
import java.util.Set;
import java.util.Collection;
import java.util.Optional;
import java.io.FileNotFoundException;
import java.util.concurrent.TimeoutException;import org.apache.flink.configuration.Configuration;
import org.apache.flink.externalresource.gpu.GPUDriverFactory;
import org.apache.flink.externalresource.gpu.GPUDriverOptions;
import org.apache.flink.externalresource.gpu.GPUInfo;
import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import java.util.Set;
import java.util.Optional;
// Configure GPU discovery with default NVIDIA script
Configuration config = new Configuration();
// Using default script (optional - can omit if default is acceptable)
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH,
"plugins/external-resource-gpu/nvidia-gpu-discovery.sh");
// Enable coordination mode to prevent conflicts
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode");
// Create GPU driver via factory
GPUDriverFactory factory = new GPUDriverFactory();
ExternalResourceDriver driver = factory.createExternalResourceDriver(config);
// Discover available GPU resources
Set<? extends ExternalResourceInfo> gpuResources = driver.retrieveResourceInfo(2); // Request 2 GPUs
// Access GPU information
for (ExternalResourceInfo gpu : gpuResources) {
// Cast to GPUInfo for type safety (actual return type)
GPUInfo gpuInfo = (GPUInfo) gpu;
// Access GPU index property
Optional<String> index = gpu.getProperty("index");
if (index.isPresent()) {
System.out.println("Allocated GPU index: " + index.get());
}
// Get all available properties
System.out.println("Available properties: " + gpu.getKeys());
// String representation
System.out.println("GPU: " + gpu.toString()); // Output: "GPU Device(0)", "GPU Device(1)", etc.
}
// Example with custom script and arguments
Configuration customConfig = new Configuration();
customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH, "/opt/custom-gpu-discovery.sh");
customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--min-memory=8GB --cuda-version=11.0");The framework is built around the Flink external resource system with these key components:
Factory for creating GPU resource drivers through Flink's external resource system.
/**
* Factory for creating {@link GPUDriver} instances.
* Loaded automatically by Flink via Java Service Loader mechanism.
*/
public class GPUDriverFactory implements ExternalResourceDriverFactory {
/**
* Creates a GPU driver with the specified configuration
* @param config Configuration containing discovery script settings
* @return ExternalResourceDriver for GPU resource discovery
* @throws Exception if configuration is invalid or script setup fails
*/
@Override
public ExternalResourceDriver createExternalResourceDriver(Configuration config)
throws Exception;
}The core implementation that executes discovery scripts to find available GPU resources.
/**
* Driver takes the responsibility to discover GPU resources and provide the GPU resource
* information. It retrieves the GPU information by executing a user-defined discovery script.
*
* Note: This class is package-private and should only be created via GPUDriverFactory
*/
class GPUDriver implements ExternalResourceDriver {
/**
* Constructs GPUDriver with configuration validation and script setup
* @param config Configuration containing discovery script path and arguments
* @throws IllegalConfigurationException if script path is not configured
* @throws FileNotFoundException if discovery script file not found
* @throws FlinkException if script file exists but is not executable
*/
GPUDriver(Configuration config) throws Exception;
/**
* Retrieve GPU resource information by executing the configured discovery script
* @param gpuAmount Number of required GPU resources (must be > 0)
* @return Set of GPUInfo objects representing discovered GPU resources
* @throws IllegalArgumentException if gpuAmount <= 0
* @throws Exception if script execution fails
* @throws TimeoutException if script execution exceeds 10 seconds
* @throws FlinkException if script exits with non-zero code
*/
@Override
public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws Exception;
}Container for GPU resource information with property-based access.
/**
* Information for GPU resource. Currently only including the GPU index.
* Note: Constructor is package-private - instances created by GPUDriver
*/
public class GPUInfo implements ExternalResourceInfo {
/**
* Get the property indicated by the specified key
* @param key of the required property ("index" is supported)
* @return Optional containing the value, or empty if key not found
*/
public Optional<String> getProperty(String key);
/**
* Get all property keys
* @return Collection of all property keys
*/
public Collection<String> getKeys();
/**
* Returns formatted string representation of GPU device
* @return String in format "GPU Device(index)"
*/
public String toString();
/**
* Hash code based on GPU index
* @return int hash code
*/
public int hashCode();
/**
* Equality comparison based on GPU index
* @param obj Object to compare
* @return boolean true if equal GPU indices
*/
public boolean equals(Object obj);
}Configuration options for GPU resource discovery behavior.
/**
* A collection of all configuration options for GPU driver.
* Uses @Documentation.SuffixOption for automatic key generation.
*/
@PublicEvolving
public class GPUDriverOptions {
/**
* Configuration key: "discovery-script.path"
* Full key pattern: external-resource.<resource_name>.param.discovery-script.path
*
* The path of the discovery script. Can be absolute path or relative to FLINK_HOME.
* Default: plugins/external-resource-gpu/nvidia-gpu-discovery.sh
*/
@Documentation.SuffixOption("external-resource.<resource_name>.param")
public static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
key("discovery-script.path")
.stringType()
.defaultValue("plugins/external-resource-gpu/nvidia-gpu-discovery.sh")
.withDescription("Path to GPU discovery script");
/**
* Configuration key: "discovery-script.args"
* Full key pattern: external-resource.<resource_name>.param.discovery-script.args
*
* The arguments passed to the discovery script as second parameter.
* No default value - leave unset if script requires no arguments.
*/
@Documentation.SuffixOption("external-resource.<resource_name>.param")
public static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
key("discovery-script.args")
.stringType()
.noDefaultValue()
.withDescription("Arguments for GPU discovery script");
}The external resource system is built on these core Flink interfaces:
/**
* Driver which takes the responsibility to manage and provide the information of external resource.
* Drivers are instantiated via an ExternalResourceDriverFactory.
* TaskExecutor retrieves ExternalResourceInfo from the drivers.
*/
@PublicEvolving
public interface ExternalResourceDriver {
/**
* Retrieve the information of the external resources according to the amount.
* @param amount of the required external resources
* @return information set of the required external resources
* @throws Exception if there is something wrong during retrieving
*/
Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;
}
/**
* Factory for ExternalResourceDriver. Instantiate a driver with configuration.
* Drivers with factories automatically qualify for plugin loading if the driver jar
* is self-contained and contains a META-INF/services file.
*/
@PublicEvolving
public interface ExternalResourceDriverFactory {
/**
* Construct the ExternalResourceDriver from configuration.
* @param config configuration for this external resource
* @return the driver for this external resource
* @throws Exception if there is something wrong during the creation
*/
ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;
}
/**
* Contains the information of an external resource.
*/
@PublicEvolving
public interface ExternalResourceInfo {
/**
* Get the property indicated by the specified key.
* @param key of the required property
* @return an Optional containing the value, or empty if no value stored under key
*/
Optional<String> getProperty(String key);
/**
* Get all property keys.
* @return collection of all property keys
*/
Collection<String> getKeys();
}The GPU driver integrates with Flink's configuration system using these key patterns:
// Configuration key pattern for external resources
external-resource.<resource_name>.param.discovery-script.path
external-resource.<resource_name>.param.discovery-script.args
// Example for GPU resources named "gpu"
external-resource.gpu.param.discovery-script.path: "/opt/flink/scripts/nvidia-gpu-discovery.sh"
external-resource.gpu.param.discovery-script.args: "--cuda-version=11.0"The framework includes a default NVIDIA GPU discovery script:
Script Details:
{FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.shnvidia-smi command available in PATHgpu-discovery-common.sh for shared allocation logicnvidia-smi --query-gpu=index --format=csv,noheaderScript Arguments:
# Basic usage
./nvidia-gpu-discovery.sh <gpu-amount>
# With coordination mode (prevents resource conflicts)
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode
# Custom coordination file location
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/pathInternal Process:
nvidia-smi to get available GPU indices/**
* Configuration-related exceptions thrown during GPUDriver construction
*/
// IllegalConfigurationException
// Thrown when: GPU discovery script path is null, empty, or whitespace-only
// Message: "GPU discovery script ('external-resource.<name>.param.discovery-script.path') is not configured."
// FileNotFoundException
// Thrown when: Discovery script file does not exist at the specified path
// Message: "The gpu discovery script does not exist in path <absolute-path>."
// FlinkException
// Thrown when: Script file exists but is not executable
// Message: "The discovery script <absolute-path> is not executable."
/**
* Runtime exceptions thrown during resource discovery (retrieveResourceInfo)
*/
// IllegalArgumentException
// Thrown when: gpuAmount parameter <= 0
// Message: "The gpuAmount should be positive when retrieving the GPU resource information."
// TimeoutException
// Thrown when: Script execution exceeds 10 seconds (DISCOVERY_SCRIPT_TIMEOUT_MS)
// Message: "The discovery script executed for over 10000 ms."
// FlinkException
// Thrown when: Discovery script exits with non-zero return code
// Message: "Discovery script exit with non-zero return code: <exit-code>."
// Additional: Warning logged with stdout/stderr content
/**
* Discovery script output validation
*/
// Warning logged when: Script produces multiple output lines
// Message: "The output of the discovery script should only contain one single line. Finding <count> lines..."
// Behavior: Only first line is used, others ignoredCustom discovery scripts must follow these requirements:
Script Interface:
Output Validation:
Error Handling:
Example Custom Discovery Script:
#!/bin/bash
# Custom GPU discovery script
GPU_AMOUNT=$1
SCRIPT_ARGS="$2"
# Your custom GPU detection logic here
# Must output comma-separated indices
echo "0,1,3" # Example: allocate GPUs 0, 1, and 3Script Execution Context:
// GPUDriver executes script as:
String cmd = discoveryScript.getAbsolutePath() + " " + gpuAmount + " " + args;
Process process = Runtime.getRuntime().exec(cmd);The GPU driver is automatically discovered by Flink through Java Service Loader:
META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory:
org.apache.flink.externalresource.gpu.GPUDriverFactoryThis enables automatic registration with Flink's external resource management system without manual configuration.
The default NVIDIA GPU discovery script supports coordination mode to prevent resource conflicts:
# Usage patterns
./nvidia-gpu-discovery.sh <gpu-amount>
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/pathCoordination Features:
/var/tmp/flink-gpu-coordination// Configuration for coordination mode
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode --coordination-file /tmp/gpu-coord");/**
* Configuration system for Flink settings
*/
public class Configuration {
public <T> T get(ConfigOption<T> option);
public <T> Configuration set(ConfigOption<T> option, T value);
}
/**
* Typed configuration option with key, type, and default value
*/
public class ConfigOption<T> {
public String key();
public T defaultValue();
}
/**
* Exception for invalid configuration values
*/
@PublicEvolving
public class IllegalConfigurationException extends RuntimeException {
public IllegalConfigurationException(String message);
public IllegalConfigurationException(String message, Throwable cause);
}
/**
* Base class of all Flink-specific checked exceptions
*/
@Public
public class FlinkException extends Exception {
public FlinkException(String message);
public FlinkException(String message, Throwable cause);
}/**
* Property key constant for accessing GPU index from GPUInfo
*/
public static final String PROPERTY_KEY_INDEX = "index";
/**
* Script execution timeout in milliseconds (package-private in GPUDriver)
*/
private static final long DISCOVERY_SCRIPT_TIMEOUT_MS = 10000;
/**
* Default discovery script locations and names
*/
public static final String DEFAULT_FLINK_PLUGINS_DIRS = "plugins";
// Default script: {FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.shThe GPU driver factory is registered via Service Loader in the JAR file:
File: META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory
Content:
org.apache.flink.externalresource.gpu.GPUDriverFactoryThis file enables automatic discovery and loading by Flink's plugin system without requiring manual registration or configuration changes.