or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-flink--flink-external-resources

Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-external-resources@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-external-resources@2.1.0

index.mddocs/

Apache Flink External Resources

Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications. The framework enables GPU-aware task scheduling and resource allocation through configurable discovery scripts and extensible APIs.

Package Information

  • Package Name: flink-external-resources
  • Package Type: maven
  • Language: Java
  • Maven Coordinates: org.apache.flink:flink-external-resources:2.1.0 (parent) / org.apache.flink:flink-external-resource-gpu:2.1.0 (GPU module)
  • Installation: Add GPU module dependency to your Maven pom.xml:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-external-resource-gpu</artifactId>
    <version>2.1.0</version>
</dependency>

Module Structure:

  • Parent: flink-external-resources - Provides the external resource framework
  • GPU Module: flink-external-resource-gpu - Implements GPU-specific resource discovery

Core Imports

// GPU-specific classes
import org.apache.flink.externalresource.gpu.GPUDriverFactory;
import org.apache.flink.externalresource.gpu.GPUInfo;
import org.apache.flink.externalresource.gpu.GPUDriverOptions;

// Flink core external resource interfaces
import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;

// Configuration and utilities
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

// Standard Java imports
import java.util.Set;
import java.util.Collection;
import java.util.Optional;
import java.io.FileNotFoundException;
import java.util.concurrent.TimeoutException;

Basic Usage

import org.apache.flink.configuration.Configuration;
import org.apache.flink.externalresource.gpu.GPUDriverFactory;
import org.apache.flink.externalresource.gpu.GPUDriverOptions;
import org.apache.flink.externalresource.gpu.GPUInfo;
import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
import java.util.Set;
import java.util.Optional;

// Configure GPU discovery with default NVIDIA script
Configuration config = new Configuration();
// Using default script (optional - can omit if default is acceptable)
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH, 
    "plugins/external-resource-gpu/nvidia-gpu-discovery.sh");
// Enable coordination mode to prevent conflicts
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode");

// Create GPU driver via factory
GPUDriverFactory factory = new GPUDriverFactory();
ExternalResourceDriver driver = factory.createExternalResourceDriver(config);

// Discover available GPU resources
Set<? extends ExternalResourceInfo> gpuResources = driver.retrieveResourceInfo(2); // Request 2 GPUs

// Access GPU information
for (ExternalResourceInfo gpu : gpuResources) {
    // Cast to GPUInfo for type safety (actual return type)
    GPUInfo gpuInfo = (GPUInfo) gpu;
    
    // Access GPU index property
    Optional<String> index = gpu.getProperty("index");
    if (index.isPresent()) {
        System.out.println("Allocated GPU index: " + index.get());
    }
    
    // Get all available properties
    System.out.println("Available properties: " + gpu.getKeys());
    
    // String representation
    System.out.println("GPU: " + gpu.toString()); // Output: "GPU Device(0)", "GPU Device(1)", etc.
}

// Example with custom script and arguments
Configuration customConfig = new Configuration();
customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH, "/opt/custom-gpu-discovery.sh");
customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--min-memory=8GB --cuda-version=11.0");

Architecture

The framework is built around the Flink external resource system with these key components:

  • Service Loading: GPUDriverFactory is registered via Java Service Loader for automatic discovery by Flink
  • Discovery Scripts: Configurable shell scripts execute to identify available GPU resources on cluster nodes
  • Resource Information: GPUInfo objects encapsulate discovered GPU details with property-based access
  • Configuration: Flink Configuration system integration for script paths and arguments
  • Error Handling: Comprehensive exception handling for script execution, timeouts, and configuration issues

Capabilities

GPU Driver Factory

Factory for creating GPU resource drivers through Flink's external resource system.

/**
 * Factory for creating {@link GPUDriver} instances.
 * Loaded automatically by Flink via Java Service Loader mechanism.
 */
public class GPUDriverFactory implements ExternalResourceDriverFactory {
    /**
     * Creates a GPU driver with the specified configuration
     * @param config Configuration containing discovery script settings
     * @return ExternalResourceDriver for GPU resource discovery
     * @throws Exception if configuration is invalid or script setup fails
     */
    @Override
    public ExternalResourceDriver createExternalResourceDriver(Configuration config) 
        throws Exception;
}

GPU Driver Implementation

The core implementation that executes discovery scripts to find available GPU resources.

/**
 * Driver takes the responsibility to discover GPU resources and provide the GPU resource
 * information. It retrieves the GPU information by executing a user-defined discovery script.
 * 
 * Note: This class is package-private and should only be created via GPUDriverFactory
 */
class GPUDriver implements ExternalResourceDriver {
    /**
     * Constructs GPUDriver with configuration validation and script setup
     * @param config Configuration containing discovery script path and arguments
     * @throws IllegalConfigurationException if script path is not configured
     * @throws FileNotFoundException if discovery script file not found
     * @throws FlinkException if script file exists but is not executable
     */
    GPUDriver(Configuration config) throws Exception;
    
    /**
     * Retrieve GPU resource information by executing the configured discovery script
     * @param gpuAmount Number of required GPU resources (must be > 0)
     * @return Set of GPUInfo objects representing discovered GPU resources
     * @throws IllegalArgumentException if gpuAmount <= 0
     * @throws Exception if script execution fails
     * @throws TimeoutException if script execution exceeds 10 seconds
     * @throws FlinkException if script exits with non-zero code
     */
    @Override
    public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws Exception;
}

GPU Resource Information

Container for GPU resource information with property-based access.

/**
 * Information for GPU resource. Currently only including the GPU index.
 * Note: Constructor is package-private - instances created by GPUDriver
 */
public class GPUInfo implements ExternalResourceInfo {
    /**
     * Get the property indicated by the specified key
     * @param key of the required property ("index" is supported)
     * @return Optional containing the value, or empty if key not found
     */
    public Optional<String> getProperty(String key);
    
    /**
     * Get all property keys
     * @return Collection of all property keys
     */
    public Collection<String> getKeys();
    
    /**
     * Returns formatted string representation of GPU device
     * @return String in format "GPU Device(index)"
     */
    public String toString();
    
    /**
     * Hash code based on GPU index
     * @return int hash code
     */
    public int hashCode();
    
    /**
     * Equality comparison based on GPU index
     * @param obj Object to compare
     * @return boolean true if equal GPU indices
     */
    public boolean equals(Object obj);
}

GPU Driver Configuration

Configuration options for GPU resource discovery behavior.

/**
 * A collection of all configuration options for GPU driver.
 * Uses @Documentation.SuffixOption for automatic key generation.
 */
@PublicEvolving
public class GPUDriverOptions {
    /**
     * Configuration key: "discovery-script.path"
     * Full key pattern: external-resource.<resource_name>.param.discovery-script.path
     * 
     * The path of the discovery script. Can be absolute path or relative to FLINK_HOME.
     * Default: plugins/external-resource-gpu/nvidia-gpu-discovery.sh
     */
    @Documentation.SuffixOption("external-resource.<resource_name>.param")
    public static final ConfigOption<String> DISCOVERY_SCRIPT_PATH = 
        key("discovery-script.path")
            .stringType()
            .defaultValue("plugins/external-resource-gpu/nvidia-gpu-discovery.sh")
            .withDescription("Path to GPU discovery script");
    
    /**
     * Configuration key: "discovery-script.args"
     * Full key pattern: external-resource.<resource_name>.param.discovery-script.args
     * 
     * The arguments passed to the discovery script as second parameter.
     * No default value - leave unset if script requires no arguments.
     */
    @Documentation.SuffixOption("external-resource.<resource_name>.param")
    public static final ConfigOption<String> DISCOVERY_SCRIPT_ARG = 
        key("discovery-script.args")
            .stringType()
            .noDefaultValue()
            .withDescription("Arguments for GPU discovery script");
}

Flink Core Interfaces

The external resource system is built on these core Flink interfaces:

/**
 * Driver which takes the responsibility to manage and provide the information of external resource.
 * Drivers are instantiated via an ExternalResourceDriverFactory.
 * TaskExecutor retrieves ExternalResourceInfo from the drivers.
 */
@PublicEvolving
public interface ExternalResourceDriver {
    /**
     * Retrieve the information of the external resources according to the amount.
     * @param amount of the required external resources
     * @return information set of the required external resources
     * @throws Exception if there is something wrong during retrieving
     */
    Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;
}

/**
 * Factory for ExternalResourceDriver. Instantiate a driver with configuration.
 * Drivers with factories automatically qualify for plugin loading if the driver jar
 * is self-contained and contains a META-INF/services file.
 */
@PublicEvolving
public interface ExternalResourceDriverFactory {
    /**
     * Construct the ExternalResourceDriver from configuration.
     * @param config configuration for this external resource
     * @return the driver for this external resource
     * @throws Exception if there is something wrong during the creation
     */
    ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;
}

/**
 * Contains the information of an external resource.
 */
@PublicEvolving
public interface ExternalResourceInfo {
    /**
     * Get the property indicated by the specified key.
     * @param key of the required property
     * @return an Optional containing the value, or empty if no value stored under key
     */
    Optional<String> getProperty(String key);
    
    /**
     * Get all property keys.
     * @return collection of all property keys
     */
    Collection<String> getKeys();
}

Configuration Integration

Flink Configuration Keys

The GPU driver integrates with Flink's configuration system using these key patterns:

// Configuration key pattern for external resources
external-resource.<resource_name>.param.discovery-script.path
external-resource.<resource_name>.param.discovery-script.args

// Example for GPU resources named "gpu"
external-resource.gpu.param.discovery-script.path: "/opt/flink/scripts/nvidia-gpu-discovery.sh"
external-resource.gpu.param.discovery-script.args: "--cuda-version=11.0"

Default Discovery Script

The framework includes a default NVIDIA GPU discovery script:

Script Details:

  • Location: {FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.sh
  • Dependencies: Requires nvidia-smi command available in PATH
  • Common script: Uses gpu-discovery-common.sh for shared allocation logic
  • GPU Detection: Executes nvidia-smi --query-gpu=index --format=csv,noheader
  • Output Format: Comma-separated GPU indices (e.g., "0,1,2")
  • Timeout: 10 seconds maximum execution time
  • Process cleanup: Discovery script process is destroyed forcibly after completion

Script Arguments:

# Basic usage
./nvidia-gpu-discovery.sh <gpu-amount>

# With coordination mode (prevents resource conflicts)
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode

# Custom coordination file location
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/path

Internal Process:

  1. Script validates GPU amount > 0, exits with code 0 if amount = 0
  2. Calls nvidia-smi to get available GPU indices
  3. Executes allocation logic (coordination or non-coordination mode)
  4. Returns comma-separated indices of allocated GPUs
  5. Exits with code 1 if insufficient GPUs available

Error Handling

Exception Types and Conditions

/**
 * Configuration-related exceptions thrown during GPUDriver construction
 */

// IllegalConfigurationException
// Thrown when: GPU discovery script path is null, empty, or whitespace-only
// Message: "GPU discovery script ('external-resource.<name>.param.discovery-script.path') is not configured."

// FileNotFoundException  
// Thrown when: Discovery script file does not exist at the specified path
// Message: "The gpu discovery script does not exist in path <absolute-path>."

// FlinkException
// Thrown when: Script file exists but is not executable
// Message: "The discovery script <absolute-path> is not executable."

/**
 * Runtime exceptions thrown during resource discovery (retrieveResourceInfo)
 */

// IllegalArgumentException
// Thrown when: gpuAmount parameter <= 0
// Message: "The gpuAmount should be positive when retrieving the GPU resource information."

// TimeoutException
// Thrown when: Script execution exceeds 10 seconds (DISCOVERY_SCRIPT_TIMEOUT_MS)
// Message: "The discovery script executed for over 10000 ms."

// FlinkException  
// Thrown when: Discovery script exits with non-zero return code
// Message: "Discovery script exit with non-zero return code: <exit-code>."
// Additional: Warning logged with stdout/stderr content

/**
 * Discovery script output validation
 */
// Warning logged when: Script produces multiple output lines
// Message: "The output of the discovery script should only contain one single line. Finding <count> lines..."
// Behavior: Only first line is used, others ignored

Discovery Script Requirements

Custom discovery scripts must follow these requirements:

Script Interface:

  • Be executable by the Flink process (chmod +x)
  • Accept GPU amount as first positional argument (required)
  • Accept optional arguments as second positional argument (space-separated string)
  • Return comma-separated GPU indices on stdout (single line only)
  • Exit with code 0 for success, non-zero for failure
  • Complete execution within 10 seconds

Output Validation:

  • Script output is read from stdout only
  • Multiple lines: Only first line used, others ignored with warning
  • Empty output: Returns empty Set (no GPUs allocated)
  • Whitespace in indices: Automatically trimmed
  • Invalid format: Creates GPUInfo with the trimmed string as index

Error Handling:

  • Non-zero exit: Logs stdout/stderr content and throws FlinkException
  • Timeout: Process destroyed forcibly, TimeoutException thrown
  • Script execution uses Runtime.getRuntime().exec() with process streams captured

Example Custom Discovery Script:

#!/bin/bash
# Custom GPU discovery script
GPU_AMOUNT=$1
SCRIPT_ARGS="$2"

# Your custom GPU detection logic here
# Must output comma-separated indices
echo "0,1,3"  # Example: allocate GPUs 0, 1, and 3

Script Execution Context:

// GPUDriver executes script as:
String cmd = discoveryScript.getAbsolutePath() + " " + gpuAmount + " " + args;
Process process = Runtime.getRuntime().exec(cmd);

Service Loader Integration

The GPU driver is automatically discovered by Flink through Java Service Loader:

META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory:

org.apache.flink.externalresource.gpu.GPUDriverFactory

This enables automatic registration with Flink's external resource management system without manual configuration.

Discovery Script Coordination

The default NVIDIA GPU discovery script supports coordination mode to prevent resource conflicts:

# Usage patterns
./nvidia-gpu-discovery.sh <gpu-amount>
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/path

Coordination Features:

  • Non-coordination mode: Simple first-N allocation from available GPUs
  • Coordination mode: File-based locking prevents multiple processes from claiming same GPUs
  • Process cleanup: Automatically reclaims GPUs from dead processes
  • Default coordination file: /var/tmp/flink-gpu-coordination
// Configuration for coordination mode
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode --coordination-file /tmp/gpu-coord");

Types

Core Flink Types

/**
 * Configuration system for Flink settings
 */
public class Configuration {
    public <T> T get(ConfigOption<T> option);
    public <T> Configuration set(ConfigOption<T> option, T value);
}

/**
 * Typed configuration option with key, type, and default value
 */
public class ConfigOption<T> {
    public String key();
    public T defaultValue();
}

/**
 * Exception for invalid configuration values
 */
@PublicEvolving
public class IllegalConfigurationException extends RuntimeException {
    public IllegalConfigurationException(String message);
    public IllegalConfigurationException(String message, Throwable cause);
}

/**
 * Base class of all Flink-specific checked exceptions
 */
@Public
public class FlinkException extends Exception {
    public FlinkException(String message);
    public FlinkException(String message, Throwable cause);
}

GPU-Specific Constants

/**
 * Property key constant for accessing GPU index from GPUInfo
 */
public static final String PROPERTY_KEY_INDEX = "index";

/**
 * Script execution timeout in milliseconds (package-private in GPUDriver)
 */
private static final long DISCOVERY_SCRIPT_TIMEOUT_MS = 10000;

/**
 * Default discovery script locations and names
 */
public static final String DEFAULT_FLINK_PLUGINS_DIRS = "plugins";
// Default script: {FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.sh

Service Loader Configuration

The GPU driver factory is registered via Service Loader in the JAR file:

File: META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory Content:

org.apache.flink.externalresource.gpu.GPUDriverFactory

This file enables automatic discovery and loading by Flink's plugin system without requiring manual registration or configuration changes.