CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-external-resources

Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications

Pending
Overview
Eval results
Files

index.mddocs/

Apache Flink External Resources

Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications. The framework enables GPU-aware task scheduling and resource allocation through configurable discovery scripts and extensible APIs.

Package Information

  • Package Name: flink-external-resources
  • Package Type: maven
  • Language: Java
  • Maven Coordinates: org.apache.flink:flink-external-resources:2.1.0 (parent) / org.apache.flink:flink-external-resource-gpu:2.1.0 (GPU module)
  • Installation: Add GPU module dependency to your Maven pom.xml:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-external-resource-gpu</artifactId>
    <version>2.1.0</version>
</dependency>

Module Structure:

  • Parent: flink-external-resources - Provides the external resource framework
  • GPU Module: flink-external-resource-gpu - Implements GPU-specific resource discovery

Core Imports

// GPU-specific classes
import org.apache.flink.externalresource.gpu.GPUDriverFactory;
import org.apache.flink.externalresource.gpu.GPUInfo;
import org.apache.flink.externalresource.gpu.GPUDriverOptions;

// Flink core external resource interfaces
import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;

// Configuration and utilities
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

// Standard Java imports
import java.util.Set;
import java.util.Collection;
import java.util.Optional;
import java.io.FileNotFoundException;
import java.util.concurrent.TimeoutException;

Basic Usage

import org.apache.flink.configuration.Configuration;
import org.apache.flink.externalresource.gpu.GPUDriverFactory;
import org.apache.flink.externalresource.gpu.GPUDriverOptions;
import org.apache.flink.externalresource.gpu.GPUInfo;
import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import java.util.Set;
import java.util.Optional;

// Configure GPU discovery with default NVIDIA script
Configuration config = new Configuration();
// Using default script (optional - can omit if default is acceptable)
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH, 
    "plugins/external-resource-gpu/nvidia-gpu-discovery.sh");
// Enable coordination mode to prevent conflicts
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode");

// Create GPU driver via factory
GPUDriverFactory factory = new GPUDriverFactory();
ExternalResourceDriver driver = factory.createExternalResourceDriver(config);

// Discover available GPU resources
Set<? extends ExternalResourceInfo> gpuResources = driver.retrieveResourceInfo(2); // Request 2 GPUs

// Access GPU information
for (ExternalResourceInfo gpu : gpuResources) {
    // Cast to GPUInfo for type safety (actual return type)
    GPUInfo gpuInfo = (GPUInfo) gpu;
    
    // Access GPU index property
    Optional<String> index = gpu.getProperty("index");
    if (index.isPresent()) {
        System.out.println("Allocated GPU index: " + index.get());
    }
    
    // Get all available properties
    System.out.println("Available properties: " + gpu.getKeys());
    
    // String representation
    System.out.println("GPU: " + gpu.toString()); // Output: "GPU Device(0)", "GPU Device(1)", etc.
}

// Example with custom script and arguments
Configuration customConfig = new Configuration();
customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH, "/opt/custom-gpu-discovery.sh");
customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--min-memory=8GB --cuda-version=11.0");

Architecture

The framework is built around the Flink external resource system with these key components:

  • Service Loading: GPUDriverFactory is registered via Java Service Loader for automatic discovery by Flink
  • Discovery Scripts: Configurable shell scripts execute to identify available GPU resources on cluster nodes
  • Resource Information: GPUInfo objects encapsulate discovered GPU details with property-based access
  • Configuration: Flink Configuration system integration for script paths and arguments
  • Error Handling: Comprehensive exception handling for script execution, timeouts, and configuration issues

Capabilities

GPU Driver Factory

Factory for creating GPU resource drivers through Flink's external resource system.

/**
 * Factory for creating {@link GPUDriver} instances.
 * Loaded automatically by Flink via Java Service Loader mechanism.
 */
public class GPUDriverFactory implements ExternalResourceDriverFactory {
    /**
     * Creates a GPU driver with the specified configuration
     * @param config Configuration containing discovery script settings
     * @return ExternalResourceDriver for GPU resource discovery
     * @throws Exception if configuration is invalid or script setup fails
     */
    @Override
    public ExternalResourceDriver createExternalResourceDriver(Configuration config) 
        throws Exception;
}

GPU Driver Implementation

The core implementation that executes discovery scripts to find available GPU resources.

/**
 * Driver takes the responsibility to discover GPU resources and provide the GPU resource
 * information. It retrieves the GPU information by executing a user-defined discovery script.
 * 
 * Note: This class is package-private and should only be created via GPUDriverFactory
 */
class GPUDriver implements ExternalResourceDriver {
    /**
     * Constructs GPUDriver with configuration validation and script setup
     * @param config Configuration containing discovery script path and arguments
     * @throws IllegalConfigurationException if script path is not configured
     * @throws FileNotFoundException if discovery script file not found
     * @throws FlinkException if script file exists but is not executable
     */
    GPUDriver(Configuration config) throws Exception;
    
    /**
     * Retrieve GPU resource information by executing the configured discovery script
     * @param gpuAmount Number of required GPU resources (must be > 0)
     * @return Set of GPUInfo objects representing discovered GPU resources
     * @throws IllegalArgumentException if gpuAmount <= 0
     * @throws Exception if script execution fails
     * @throws TimeoutException if script execution exceeds 10 seconds
     * @throws FlinkException if script exits with non-zero code
     */
    @Override
    public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws Exception;
}

GPU Resource Information

Container for GPU resource information with property-based access.

/**
 * Information for GPU resource. Currently only including the GPU index.
 * Note: Constructor is package-private - instances created by GPUDriver
 */
public class GPUInfo implements ExternalResourceInfo {
    /**
     * Get the property indicated by the specified key
     * @param key of the required property ("index" is supported)
     * @return Optional containing the value, or empty if key not found
     */
    public Optional<String> getProperty(String key);
    
    /**
     * Get all property keys
     * @return Collection of all property keys
     */
    public Collection<String> getKeys();
    
    /**
     * Returns formatted string representation of GPU device
     * @return String in format "GPU Device(index)"
     */
    public String toString();
    
    /**
     * Hash code based on GPU index
     * @return int hash code
     */
    public int hashCode();
    
    /**
     * Equality comparison based on GPU index
     * @param obj Object to compare
     * @return boolean true if equal GPU indices
     */
    public boolean equals(Object obj);
}

GPU Driver Configuration

Configuration options for GPU resource discovery behavior.

/**
 * A collection of all configuration options for GPU driver.
 * Uses @Documentation.SuffixOption for automatic key generation.
 */
@PublicEvolving
public class GPUDriverOptions {
    /**
     * Configuration key: "discovery-script.path"
     * Full key pattern: external-resource.<resource_name>.param.discovery-script.path
     * 
     * The path of the discovery script. Can be absolute path or relative to FLINK_HOME.
     * Default: plugins/external-resource-gpu/nvidia-gpu-discovery.sh
     */
    @Documentation.SuffixOption("external-resource.<resource_name>.param")
    public static final ConfigOption<String> DISCOVERY_SCRIPT_PATH = 
        key("discovery-script.path")
            .stringType()
            .defaultValue("plugins/external-resource-gpu/nvidia-gpu-discovery.sh")
            .withDescription("Path to GPU discovery script");
    
    /**
     * Configuration key: "discovery-script.args"
     * Full key pattern: external-resource.<resource_name>.param.discovery-script.args
     * 
     * The arguments passed to the discovery script as second parameter.
     * No default value - leave unset if script requires no arguments.
     */
    @Documentation.SuffixOption("external-resource.<resource_name>.param")
    public static final ConfigOption<String> DISCOVERY_SCRIPT_ARG = 
        key("discovery-script.args")
            .stringType()
            .noDefaultValue()
            .withDescription("Arguments for GPU discovery script");
}

Flink Core Interfaces

The external resource system is built on these core Flink interfaces:

/**
 * Driver which takes the responsibility to manage and provide the information of external resource.
 * Drivers are instantiated via an ExternalResourceDriverFactory.
 * TaskExecutor retrieves ExternalResourceInfo from the drivers.
 */
@PublicEvolving
public interface ExternalResourceDriver {
    /**
     * Retrieve the information of the external resources according to the amount.
     * @param amount of the required external resources
     * @return information set of the required external resources
     * @throws Exception if there is something wrong during retrieving
     */
    Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;
}

/**
 * Factory for ExternalResourceDriver. Instantiate a driver with configuration.
 * Drivers with factories automatically qualify for plugin loading if the driver jar
 * is self-contained and contains a META-INF/services file.
 */
@PublicEvolving
public interface ExternalResourceDriverFactory {
    /**
     * Construct the ExternalResourceDriver from configuration.
     * @param config configuration for this external resource
     * @return the driver for this external resource
     * @throws Exception if there is something wrong during the creation
     */
    ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;
}

/**
 * Contains the information of an external resource.
 */
@PublicEvolving
public interface ExternalResourceInfo {
    /**
     * Get the property indicated by the specified key.
     * @param key of the required property
     * @return an Optional containing the value, or empty if no value stored under key
     */
    Optional<String> getProperty(String key);
    
    /**
     * Get all property keys.
     * @return collection of all property keys
     */
    Collection<String> getKeys();
}

Configuration Integration

Flink Configuration Keys

The GPU driver integrates with Flink's configuration system using these key patterns:

// Configuration key pattern for external resources
external-resource.<resource_name>.param.discovery-script.path
external-resource.<resource_name>.param.discovery-script.args

// Example for GPU resources named "gpu"
external-resource.gpu.param.discovery-script.path: "/opt/flink/scripts/nvidia-gpu-discovery.sh"
external-resource.gpu.param.discovery-script.args: "--cuda-version=11.0"

Default Discovery Script

The framework includes a default NVIDIA GPU discovery script:

Script Details:

  • Location: {FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.sh
  • Dependencies: Requires nvidia-smi command available in PATH
  • Common script: Uses gpu-discovery-common.sh for shared allocation logic
  • GPU Detection: Executes nvidia-smi --query-gpu=index --format=csv,noheader
  • Output Format: Comma-separated GPU indices (e.g., "0,1,2")
  • Timeout: 10 seconds maximum execution time
  • Process cleanup: Discovery script process is destroyed forcibly after completion

Script Arguments:

# Basic usage
./nvidia-gpu-discovery.sh <gpu-amount>

# With coordination mode (prevents resource conflicts)
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode

# Custom coordination file location
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/path

Internal Process:

  1. Script validates GPU amount > 0, exits with code 0 if amount = 0
  2. Calls nvidia-smi to get available GPU indices
  3. Executes allocation logic (coordination or non-coordination mode)
  4. Returns comma-separated indices of allocated GPUs
  5. Exits with code 1 if insufficient GPUs available

Error Handling

Exception Types and Conditions

/**
 * Configuration-related exceptions thrown during GPUDriver construction
 */

// IllegalConfigurationException
// Thrown when: GPU discovery script path is null, empty, or whitespace-only
// Message: "GPU discovery script ('external-resource.<name>.param.discovery-script.path') is not configured."

// FileNotFoundException  
// Thrown when: Discovery script file does not exist at the specified path
// Message: "The gpu discovery script does not exist in path <absolute-path>."

// FlinkException
// Thrown when: Script file exists but is not executable
// Message: "The discovery script <absolute-path> is not executable."

/**
 * Runtime exceptions thrown during resource discovery (retrieveResourceInfo)
 */

// IllegalArgumentException
// Thrown when: gpuAmount parameter <= 0
// Message: "The gpuAmount should be positive when retrieving the GPU resource information."

// TimeoutException
// Thrown when: Script execution exceeds 10 seconds (DISCOVERY_SCRIPT_TIMEOUT_MS)
// Message: "The discovery script executed for over 10000 ms."

// FlinkException  
// Thrown when: Discovery script exits with non-zero return code
// Message: "Discovery script exit with non-zero return code: <exit-code>."
// Additional: Warning logged with stdout/stderr content

/**
 * Discovery script output validation
 */
// Warning logged when: Script produces multiple output lines
// Message: "The output of the discovery script should only contain one single line. Finding <count> lines..."
// Behavior: Only first line is used, others ignored

Discovery Script Requirements

Custom discovery scripts must follow these requirements:

Script Interface:

  • Be executable by the Flink process (chmod +x)
  • Accept GPU amount as first positional argument (required)
  • Accept optional arguments as second positional argument (space-separated string)
  • Return comma-separated GPU indices on stdout (single line only)
  • Exit with code 0 for success, non-zero for failure
  • Complete execution within 10 seconds

Output Validation:

  • Script output is read from stdout only
  • Multiple lines: Only first line used, others ignored with warning
  • Empty output: Returns empty Set (no GPUs allocated)
  • Whitespace in indices: Automatically trimmed
  • Invalid format: Creates GPUInfo with the trimmed string as index

Error Handling:

  • Non-zero exit: Logs stdout/stderr content and throws FlinkException
  • Timeout: Process destroyed forcibly, TimeoutException thrown
  • Script execution uses Runtime.getRuntime().exec() with process streams captured

Example Custom Discovery Script:

#!/bin/bash
# Custom GPU discovery script
GPU_AMOUNT=$1
SCRIPT_ARGS="$2"

# Your custom GPU detection logic here
# Must output comma-separated indices
echo "0,1,3"  # Example: allocate GPUs 0, 1, and 3

Script Execution Context:

// GPUDriver executes script as:
String cmd = discoveryScript.getAbsolutePath() + " " + gpuAmount + " " + args;
Process process = Runtime.getRuntime().exec(cmd);

Service Loader Integration

The GPU driver is automatically discovered by Flink through Java Service Loader:

META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory:

org.apache.flink.externalresource.gpu.GPUDriverFactory

This enables automatic registration with Flink's external resource management system without manual configuration.

Discovery Script Coordination

The default NVIDIA GPU discovery script supports coordination mode to prevent resource conflicts:

# Usage patterns
./nvidia-gpu-discovery.sh <gpu-amount>
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/path

Coordination Features:

  • Non-coordination mode: Simple first-N allocation from available GPUs
  • Coordination mode: File-based locking prevents multiple processes from claiming same GPUs
  • Process cleanup: Automatically reclaims GPUs from dead processes
  • Default coordination file: /var/tmp/flink-gpu-coordination
// Configuration for coordination mode
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode --coordination-file /tmp/gpu-coord");

Types

Core Flink Types

/**
 * Configuration system for Flink settings
 */
public class Configuration {
    public <T> T get(ConfigOption<T> option);
    public <T> Configuration set(ConfigOption<T> option, T value);
}

/**
 * Typed configuration option with key, type, and default value
 */
public class ConfigOption<T> {
    public String key();
    public T defaultValue();
}

/**
 * Exception for invalid configuration values
 */
@PublicEvolving
public class IllegalConfigurationException extends RuntimeException {
    public IllegalConfigurationException(String message);
    public IllegalConfigurationException(String message, Throwable cause);
}

/**
 * Base class of all Flink-specific checked exceptions
 */
@Public
public class FlinkException extends Exception {
    public FlinkException(String message);
    public FlinkException(String message, Throwable cause);
}

GPU-Specific Constants

/**
 * Property key constant for accessing GPU index from GPUInfo
 */
public static final String PROPERTY_KEY_INDEX = "index";

/**
 * Script execution timeout in milliseconds (package-private in GPUDriver)
 */
private static final long DISCOVERY_SCRIPT_TIMEOUT_MS = 10000;

/**
 * Default discovery script locations and names
 */
public static final String DEFAULT_FLINK_PLUGINS_DIRS = "plugins";
// Default script: {FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.sh

Service Loader Configuration

The GPU driver factory is registered via Service Loader in the JAR file:

File: META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory Content:

org.apache.flink.externalresource.gpu.GPUDriverFactory

This file enables automatic discovery and loading by Flink's plugin system without requiring manual registration or configuration changes.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-external-resources

docs

index.md

tile.json