Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-external-resources@2.1.00
# Apache Flink External Resources
1
2
Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications. The framework enables GPU-aware task scheduling and resource allocation through configurable discovery scripts and extensible APIs.
3
4
## Package Information
5
6
- **Package Name**: flink-external-resources
7
- **Package Type**: maven
8
- **Language**: Java
9
- **Maven Coordinates**: `org.apache.flink:flink-external-resources:2.1.0` (parent) / `org.apache.flink:flink-external-resource-gpu:2.1.0` (GPU module)
10
- **Installation**: Add GPU module dependency to your Maven `pom.xml`:
11
12
```xml
13
<dependency>
14
<groupId>org.apache.flink</groupId>
15
<artifactId>flink-external-resource-gpu</artifactId>
16
<version>2.1.0</version>
17
</dependency>
18
```
19
20
**Module Structure:**
21
- **Parent**: `flink-external-resources` - Provides the external resource framework
22
- **GPU Module**: `flink-external-resource-gpu` - Implements GPU-specific resource discovery
23
24
## Core Imports
25
26
```java
27
// GPU-specific classes
28
import org.apache.flink.externalresource.gpu.GPUDriverFactory;
29
import org.apache.flink.externalresource.gpu.GPUInfo;
30
import org.apache.flink.externalresource.gpu.GPUDriverOptions;
31
32
// Flink core external resource interfaces
33
import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
34
import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;
35
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
36
37
// Configuration and utilities
38
import org.apache.flink.configuration.Configuration;
39
import org.apache.flink.configuration.ConfigOption;
40
import org.apache.flink.configuration.IllegalConfigurationException;
41
import org.apache.flink.util.FlinkException;
42
import org.apache.flink.util.Preconditions;
43
44
// Standard Java imports
45
import java.util.Set;
46
import java.util.Collection;
47
import java.util.Optional;
48
import java.io.FileNotFoundException;
49
import java.util.concurrent.TimeoutException;
50
```
51
52
## Basic Usage
53
54
```java
55
import org.apache.flink.configuration.Configuration;
56
import org.apache.flink.externalresource.gpu.GPUDriverFactory;
57
import org.apache.flink.externalresource.gpu.GPUDriverOptions;
58
import org.apache.flink.externalresource.gpu.GPUInfo;
59
import org.apache.flink.api.common.externalresource.ExternalResourceDriver;
60
import org.apache.flink.api.common.externalresource.ExternalResourceInfo;
61
import java.util.Set;
62
import java.util.Optional;
63
64
// Configure GPU discovery with default NVIDIA script
65
Configuration config = new Configuration();
66
// Using default script (optional - can omit if default is acceptable)
67
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH,
68
"plugins/external-resource-gpu/nvidia-gpu-discovery.sh");
69
// Enable coordination mode to prevent conflicts
70
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode");
71
72
// Create GPU driver via factory
73
GPUDriverFactory factory = new GPUDriverFactory();
74
ExternalResourceDriver driver = factory.createExternalResourceDriver(config);
75
76
// Discover available GPU resources
77
Set<? extends ExternalResourceInfo> gpuResources = driver.retrieveResourceInfo(2); // Request 2 GPUs
78
79
// Access GPU information
80
for (ExternalResourceInfo gpu : gpuResources) {
81
// Cast to GPUInfo for type safety (actual return type)
82
GPUInfo gpuInfo = (GPUInfo) gpu;
83
84
// Access GPU index property
85
Optional<String> index = gpu.getProperty("index");
86
if (index.isPresent()) {
87
System.out.println("Allocated GPU index: " + index.get());
88
}
89
90
// Get all available properties
91
System.out.println("Available properties: " + gpu.getKeys());
92
93
// String representation
94
System.out.println("GPU: " + gpu.toString()); // Output: "GPU Device(0)", "GPU Device(1)", etc.
95
}
96
97
// Example with custom script and arguments
98
Configuration customConfig = new Configuration();
99
customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH, "/opt/custom-gpu-discovery.sh");
100
customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--min-memory=8GB --cuda-version=11.0");
101
```
102
103
## Architecture
104
105
The framework is built around the Flink external resource system with these key components:
106
107
- **Service Loading**: GPUDriverFactory is registered via Java Service Loader for automatic discovery by Flink
108
- **Discovery Scripts**: Configurable shell scripts execute to identify available GPU resources on cluster nodes
109
- **Resource Information**: GPUInfo objects encapsulate discovered GPU details with property-based access
110
- **Configuration**: Flink Configuration system integration for script paths and arguments
111
- **Error Handling**: Comprehensive exception handling for script execution, timeouts, and configuration issues
112
113
## Capabilities
114
115
### GPU Driver Factory
116
117
Factory for creating GPU resource drivers through Flink's external resource system.
118
119
```java { .api }
120
/**
121
* Factory for creating {@link GPUDriver} instances.
122
* Loaded automatically by Flink via Java Service Loader mechanism.
123
*/
124
public class GPUDriverFactory implements ExternalResourceDriverFactory {
125
/**
126
* Creates a GPU driver with the specified configuration
127
* @param config Configuration containing discovery script settings
128
* @return ExternalResourceDriver for GPU resource discovery
129
* @throws Exception if configuration is invalid or script setup fails
130
*/
131
@Override
132
public ExternalResourceDriver createExternalResourceDriver(Configuration config)
133
throws Exception;
134
}
135
```
136
137
### GPU Driver Implementation
138
139
The core implementation that executes discovery scripts to find available GPU resources.
140
141
```java { .api }
142
/**
143
* Driver takes the responsibility to discover GPU resources and provide the GPU resource
144
* information. It retrieves the GPU information by executing a user-defined discovery script.
145
*
146
* Note: This class is package-private and should only be created via GPUDriverFactory
147
*/
148
class GPUDriver implements ExternalResourceDriver {
149
/**
150
* Constructs GPUDriver with configuration validation and script setup
151
* @param config Configuration containing discovery script path and arguments
152
* @throws IllegalConfigurationException if script path is not configured
153
* @throws FileNotFoundException if discovery script file not found
154
* @throws FlinkException if script file exists but is not executable
155
*/
156
GPUDriver(Configuration config) throws Exception;
157
158
/**
159
* Retrieve GPU resource information by executing the configured discovery script
160
* @param gpuAmount Number of required GPU resources (must be > 0)
161
* @return Set of GPUInfo objects representing discovered GPU resources
162
* @throws IllegalArgumentException if gpuAmount <= 0
163
* @throws Exception if script execution fails
164
* @throws TimeoutException if script execution exceeds 10 seconds
165
* @throws FlinkException if script exits with non-zero code
166
*/
167
@Override
168
public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws Exception;
169
}
170
```
171
172
### GPU Resource Information
173
174
Container for GPU resource information with property-based access.
175
176
```java { .api }
177
/**
178
* Information for GPU resource. Currently only including the GPU index.
179
* Note: Constructor is package-private - instances created by GPUDriver
180
*/
181
public class GPUInfo implements ExternalResourceInfo {
182
/**
183
* Get the property indicated by the specified key
184
* @param key of the required property ("index" is supported)
185
* @return Optional containing the value, or empty if key not found
186
*/
187
public Optional<String> getProperty(String key);
188
189
/**
190
* Get all property keys
191
* @return Collection of all property keys
192
*/
193
public Collection<String> getKeys();
194
195
/**
196
* Returns formatted string representation of GPU device
197
* @return String in format "GPU Device(index)"
198
*/
199
public String toString();
200
201
/**
202
* Hash code based on GPU index
203
* @return int hash code
204
*/
205
public int hashCode();
206
207
/**
208
* Equality comparison based on GPU index
209
* @param obj Object to compare
210
* @return boolean true if equal GPU indices
211
*/
212
public boolean equals(Object obj);
213
}
214
```
215
216
### GPU Driver Configuration
217
218
Configuration options for GPU resource discovery behavior.
219
220
```java { .api }
221
/**
222
* A collection of all configuration options for GPU driver.
223
* Uses @Documentation.SuffixOption for automatic key generation.
224
*/
225
@PublicEvolving
226
public class GPUDriverOptions {
227
/**
228
* Configuration key: "discovery-script.path"
229
* Full key pattern: external-resource.<resource_name>.param.discovery-script.path
230
*
231
* The path of the discovery script. Can be absolute path or relative to FLINK_HOME.
232
* Default: plugins/external-resource-gpu/nvidia-gpu-discovery.sh
233
*/
234
@Documentation.SuffixOption("external-resource.<resource_name>.param")
235
public static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =
236
key("discovery-script.path")
237
.stringType()
238
.defaultValue("plugins/external-resource-gpu/nvidia-gpu-discovery.sh")
239
.withDescription("Path to GPU discovery script");
240
241
/**
242
* Configuration key: "discovery-script.args"
243
* Full key pattern: external-resource.<resource_name>.param.discovery-script.args
244
*
245
* The arguments passed to the discovery script as second parameter.
246
* No default value - leave unset if script requires no arguments.
247
*/
248
@Documentation.SuffixOption("external-resource.<resource_name>.param")
249
public static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =
250
key("discovery-script.args")
251
.stringType()
252
.noDefaultValue()
253
.withDescription("Arguments for GPU discovery script");
254
}
255
```
256
257
### Flink Core Interfaces
258
259
The external resource system is built on these core Flink interfaces:
260
261
```java { .api }
262
/**
263
* Driver which takes the responsibility to manage and provide the information of external resource.
264
* Drivers are instantiated via an ExternalResourceDriverFactory.
265
* TaskExecutor retrieves ExternalResourceInfo from the drivers.
266
*/
267
@PublicEvolving
268
public interface ExternalResourceDriver {
269
/**
270
* Retrieve the information of the external resources according to the amount.
271
* @param amount of the required external resources
272
* @return information set of the required external resources
273
* @throws Exception if there is something wrong during retrieving
274
*/
275
Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;
276
}
277
278
/**
279
* Factory for ExternalResourceDriver. Instantiate a driver with configuration.
280
* Drivers with factories automatically qualify for plugin loading if the driver jar
281
* is self-contained and contains a META-INF/services file.
282
*/
283
@PublicEvolving
284
public interface ExternalResourceDriverFactory {
285
/**
286
* Construct the ExternalResourceDriver from configuration.
287
* @param config configuration for this external resource
288
* @return the driver for this external resource
289
* @throws Exception if there is something wrong during the creation
290
*/
291
ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;
292
}
293
294
/**
295
* Contains the information of an external resource.
296
*/
297
@PublicEvolving
298
public interface ExternalResourceInfo {
299
/**
300
* Get the property indicated by the specified key.
301
* @param key of the required property
302
* @return an Optional containing the value, or empty if no value stored under key
303
*/
304
Optional<String> getProperty(String key);
305
306
/**
307
* Get all property keys.
308
* @return collection of all property keys
309
*/
310
Collection<String> getKeys();
311
}
312
```
313
314
## Configuration Integration
315
316
### Flink Configuration Keys
317
318
The GPU driver integrates with Flink's configuration system using these key patterns:
319
320
```java
321
// Configuration key pattern for external resources
322
external-resource.<resource_name>.param.discovery-script.path
323
external-resource.<resource_name>.param.discovery-script.args
324
325
// Example for GPU resources named "gpu"
326
external-resource.gpu.param.discovery-script.path: "/opt/flink/scripts/nvidia-gpu-discovery.sh"
327
external-resource.gpu.param.discovery-script.args: "--cuda-version=11.0"
328
```
329
330
### Default Discovery Script
331
332
The framework includes a default NVIDIA GPU discovery script:
333
334
**Script Details:**
335
- **Location**: `{FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.sh`
336
- **Dependencies**: Requires `nvidia-smi` command available in PATH
337
- **Common script**: Uses `gpu-discovery-common.sh` for shared allocation logic
338
- **GPU Detection**: Executes `nvidia-smi --query-gpu=index --format=csv,noheader`
339
- **Output Format**: Comma-separated GPU indices (e.g., "0,1,2")
340
- **Timeout**: 10 seconds maximum execution time
341
- **Process cleanup**: Discovery script process is destroyed forcibly after completion
342
343
**Script Arguments:**
344
```bash
345
# Basic usage
346
./nvidia-gpu-discovery.sh <gpu-amount>
347
348
# With coordination mode (prevents resource conflicts)
349
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode
350
351
# Custom coordination file location
352
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/path
353
```
354
355
**Internal Process:**
356
1. Script validates GPU amount > 0, exits with code 0 if amount = 0
357
2. Calls `nvidia-smi` to get available GPU indices
358
3. Executes allocation logic (coordination or non-coordination mode)
359
4. Returns comma-separated indices of allocated GPUs
360
5. Exits with code 1 if insufficient GPUs available
361
362
## Error Handling
363
364
### Exception Types and Conditions
365
366
```java { .api }
367
/**
368
* Configuration-related exceptions thrown during GPUDriver construction
369
*/
370
371
// IllegalConfigurationException
372
// Thrown when: GPU discovery script path is null, empty, or whitespace-only
373
// Message: "GPU discovery script ('external-resource.<name>.param.discovery-script.path') is not configured."
374
375
// FileNotFoundException
376
// Thrown when: Discovery script file does not exist at the specified path
377
// Message: "The gpu discovery script does not exist in path <absolute-path>."
378
379
// FlinkException
380
// Thrown when: Script file exists but is not executable
381
// Message: "The discovery script <absolute-path> is not executable."
382
383
/**
384
* Runtime exceptions thrown during resource discovery (retrieveResourceInfo)
385
*/
386
387
// IllegalArgumentException
388
// Thrown when: gpuAmount parameter <= 0
389
// Message: "The gpuAmount should be positive when retrieving the GPU resource information."
390
391
// TimeoutException
392
// Thrown when: Script execution exceeds 10 seconds (DISCOVERY_SCRIPT_TIMEOUT_MS)
393
// Message: "The discovery script executed for over 10000 ms."
394
395
// FlinkException
396
// Thrown when: Discovery script exits with non-zero return code
397
// Message: "Discovery script exit with non-zero return code: <exit-code>."
398
// Additional: Warning logged with stdout/stderr content
399
400
/**
401
* Discovery script output validation
402
*/
403
// Warning logged when: Script produces multiple output lines
404
// Message: "The output of the discovery script should only contain one single line. Finding <count> lines..."
405
// Behavior: Only first line is used, others ignored
406
```
407
408
### Discovery Script Requirements
409
410
Custom discovery scripts must follow these requirements:
411
412
**Script Interface:**
413
- Be executable by the Flink process (chmod +x)
414
- Accept GPU amount as first positional argument (required)
415
- Accept optional arguments as second positional argument (space-separated string)
416
- Return comma-separated GPU indices on stdout (single line only)
417
- Exit with code 0 for success, non-zero for failure
418
- Complete execution within 10 seconds
419
420
**Output Validation:**
421
- Script output is read from stdout only
422
- Multiple lines: Only first line used, others ignored with warning
423
- Empty output: Returns empty Set (no GPUs allocated)
424
- Whitespace in indices: Automatically trimmed
425
- Invalid format: Creates GPUInfo with the trimmed string as index
426
427
**Error Handling:**
428
- Non-zero exit: Logs stdout/stderr content and throws FlinkException
429
- Timeout: Process destroyed forcibly, TimeoutException thrown
430
- Script execution uses Runtime.getRuntime().exec() with process streams captured
431
432
**Example Custom Discovery Script:**
433
```bash
434
#!/bin/bash
435
# Custom GPU discovery script
436
GPU_AMOUNT=$1
437
SCRIPT_ARGS="$2"
438
439
# Your custom GPU detection logic here
440
# Must output comma-separated indices
441
echo "0,1,3" # Example: allocate GPUs 0, 1, and 3
442
```
443
444
**Script Execution Context:**
445
```java
446
// GPUDriver executes script as:
447
String cmd = discoveryScript.getAbsolutePath() + " " + gpuAmount + " " + args;
448
Process process = Runtime.getRuntime().exec(cmd);
449
```
450
451
## Service Loader Integration
452
453
The GPU driver is automatically discovered by Flink through Java Service Loader:
454
455
**META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory:**
456
```
457
org.apache.flink.externalresource.gpu.GPUDriverFactory
458
```
459
460
This enables automatic registration with Flink's external resource management system without manual configuration.
461
462
### Discovery Script Coordination
463
464
The default NVIDIA GPU discovery script supports coordination mode to prevent resource conflicts:
465
466
```bash
467
# Usage patterns
468
./nvidia-gpu-discovery.sh <gpu-amount>
469
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode
470
./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/path
471
```
472
473
**Coordination Features:**
474
- **Non-coordination mode**: Simple first-N allocation from available GPUs
475
- **Coordination mode**: File-based locking prevents multiple processes from claiming same GPUs
476
- **Process cleanup**: Automatically reclaims GPUs from dead processes
477
- **Default coordination file**: `/var/tmp/flink-gpu-coordination`
478
479
```java { .api }
480
// Configuration for coordination mode
481
config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode --coordination-file /tmp/gpu-coord");
482
```
483
484
## Types
485
486
### Core Flink Types
487
488
```java { .api }
489
/**
490
* Configuration system for Flink settings
491
*/
492
public class Configuration {
493
public <T> T get(ConfigOption<T> option);
494
public <T> Configuration set(ConfigOption<T> option, T value);
495
}
496
497
/**
498
* Typed configuration option with key, type, and default value
499
*/
500
public class ConfigOption<T> {
501
public String key();
502
public T defaultValue();
503
}
504
505
/**
506
* Exception for invalid configuration values
507
*/
508
@PublicEvolving
509
public class IllegalConfigurationException extends RuntimeException {
510
public IllegalConfigurationException(String message);
511
public IllegalConfigurationException(String message, Throwable cause);
512
}
513
514
/**
515
* Base class of all Flink-specific checked exceptions
516
*/
517
@Public
518
public class FlinkException extends Exception {
519
public FlinkException(String message);
520
public FlinkException(String message, Throwable cause);
521
}
522
```
523
524
### GPU-Specific Constants
525
526
```java { .api }
527
/**
528
* Property key constant for accessing GPU index from GPUInfo
529
*/
530
public static final String PROPERTY_KEY_INDEX = "index";
531
532
/**
533
* Script execution timeout in milliseconds (package-private in GPUDriver)
534
*/
535
private static final long DISCOVERY_SCRIPT_TIMEOUT_MS = 10000;
536
537
/**
538
* Default discovery script locations and names
539
*/
540
public static final String DEFAULT_FLINK_PLUGINS_DIRS = "plugins";
541
// Default script: {FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.sh
542
```
543
544
### Service Loader Configuration
545
546
The GPU driver factory is registered via Service Loader in the JAR file:
547
548
**File:** `META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory`
549
**Content:**
550
```
551
org.apache.flink.externalresource.gpu.GPUDriverFactory
552
```
553
554
This file enables automatic discovery and loading by Flink's plugin system without requiring manual registration or configuration changes.