or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-org-apache-flink--flink-external-resources

Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-external-resources@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-external-resources@2.1.0

0

# Apache Flink External Resources

1

2

Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications. The framework enables GPU-aware task scheduling and resource allocation through configurable discovery scripts and extensible APIs.

3

4

## Package Information

5

6

- **Package Name**: flink-external-resources

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Maven Coordinates**: `org.apache.flink:flink-external-resources:2.1.0` (parent) / `org.apache.flink:flink-external-resource-gpu:2.1.0` (GPU module)

10

- **Installation**: Add GPU module dependency to your Maven `pom.xml`:

11

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-external-resource-gpu</artifactId>

16

<version>2.1.0</version>

17

</dependency>

18

```

19

20

**Module Structure:**

21

- **Parent**: `flink-external-resources` - Provides the external resource framework

22

- **GPU Module**: `flink-external-resource-gpu` - Implements GPU-specific resource discovery

23

24

## Core Imports

25

26

```java

27

// GPU-specific classes

28

import org.apache.flink.externalresource.gpu.GPUDriverFactory;

29

import org.apache.flink.externalresource.gpu.GPUInfo;

30

import org.apache.flink.externalresource.gpu.GPUDriverOptions;

31

32

// Flink core external resource interfaces

33

import org.apache.flink.api.common.externalresource.ExternalResourceDriver;

34

import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;

35

import org.apache.flink.api.common.externalresource.ExternalResourceInfo;

36

37

// Configuration and utilities

38

import org.apache.flink.configuration.Configuration;

39

import org.apache.flink.configuration.ConfigOption;

40

import org.apache.flink.configuration.IllegalConfigurationException;

41

import org.apache.flink.util.FlinkException;

42

import org.apache.flink.util.Preconditions;

43

44

// Standard Java imports

45

import java.util.Set;

46

import java.util.Collection;

47

import java.util.Optional;

48

import java.io.FileNotFoundException;

49

import java.util.concurrent.TimeoutException;

50

```

51

52

## Basic Usage

53

54

```java

55

import org.apache.flink.configuration.Configuration;

56

import org.apache.flink.externalresource.gpu.GPUDriverFactory;

57

import org.apache.flink.externalresource.gpu.GPUDriverOptions;

58

import org.apache.flink.externalresource.gpu.GPUInfo;

59

import org.apache.flink.api.common.externalresource.ExternalResourceDriver;

60

import org.apache.flink.api.common.externalresource.ExternalResourceInfo;

61

import java.util.Set;

62

import java.util.Optional;

63

64

// Configure GPU discovery with default NVIDIA script

65

Configuration config = new Configuration();

66

// Using default script (optional - can omit if default is acceptable)

67

config.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH,

68

"plugins/external-resource-gpu/nvidia-gpu-discovery.sh");

69

// Enable coordination mode to prevent conflicts

70

config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode");

71

72

// Create GPU driver via factory

73

GPUDriverFactory factory = new GPUDriverFactory();

74

ExternalResourceDriver driver = factory.createExternalResourceDriver(config);

75

76

// Discover available GPU resources

77

Set<? extends ExternalResourceInfo> gpuResources = driver.retrieveResourceInfo(2); // Request 2 GPUs

78

79

// Access GPU information

80

for (ExternalResourceInfo gpu : gpuResources) {

81

// Cast to GPUInfo for type safety (actual return type)

82

GPUInfo gpuInfo = (GPUInfo) gpu;

83

84

// Access GPU index property

85

Optional<String> index = gpu.getProperty("index");

86

if (index.isPresent()) {

87

System.out.println("Allocated GPU index: " + index.get());

88

}

89

90

// Get all available properties

91

System.out.println("Available properties: " + gpu.getKeys());

92

93

// String representation

94

System.out.println("GPU: " + gpu.toString()); // Output: "GPU Device(0)", "GPU Device(1)", etc.

95

}

96

97

// Example with custom script and arguments

98

Configuration customConfig = new Configuration();

99

customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH, "/opt/custom-gpu-discovery.sh");

100

customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--min-memory=8GB --cuda-version=11.0");

101

```

102

103

## Architecture

104

105

The framework is built around the Flink external resource system with these key components:

106

107

- **Service Loading**: GPUDriverFactory is registered via Java Service Loader for automatic discovery by Flink

108

- **Discovery Scripts**: Configurable shell scripts execute to identify available GPU resources on cluster nodes

109

- **Resource Information**: GPUInfo objects encapsulate discovered GPU details with property-based access

110

- **Configuration**: Flink Configuration system integration for script paths and arguments

111

- **Error Handling**: Comprehensive exception handling for script execution, timeouts, and configuration issues

112

113

## Capabilities

114

115

### GPU Driver Factory

116

117

Factory for creating GPU resource drivers through Flink's external resource system.

118

119

```java { .api }

120

/**

121

* Factory for creating {@link GPUDriver} instances.

122

* Loaded automatically by Flink via Java Service Loader mechanism.

123

*/

124

public class GPUDriverFactory implements ExternalResourceDriverFactory {

125

/**

126

* Creates a GPU driver with the specified configuration

127

* @param config Configuration containing discovery script settings

128

* @return ExternalResourceDriver for GPU resource discovery

129

* @throws Exception if configuration is invalid or script setup fails

130

*/

131

@Override

132

public ExternalResourceDriver createExternalResourceDriver(Configuration config)

133

throws Exception;

134

}

135

```

136

137

### GPU Driver Implementation

138

139

The core implementation that executes discovery scripts to find available GPU resources.

140

141

```java { .api }

142

/**

143

* Driver takes the responsibility to discover GPU resources and provide the GPU resource

144

* information. It retrieves the GPU information by executing a user-defined discovery script.

145

*

146

* Note: This class is package-private and should only be created via GPUDriverFactory

147

*/

148

class GPUDriver implements ExternalResourceDriver {

149

/**

150

* Constructs GPUDriver with configuration validation and script setup

151

* @param config Configuration containing discovery script path and arguments

152

* @throws IllegalConfigurationException if script path is not configured

153

* @throws FileNotFoundException if discovery script file not found

154

* @throws FlinkException if script file exists but is not executable

155

*/

156

GPUDriver(Configuration config) throws Exception;

157

158

/**

159

* Retrieve GPU resource information by executing the configured discovery script

160

* @param gpuAmount Number of required GPU resources (must be > 0)

161

* @return Set of GPUInfo objects representing discovered GPU resources

162

* @throws IllegalArgumentException if gpuAmount <= 0

163

* @throws Exception if script execution fails

164

* @throws TimeoutException if script execution exceeds 10 seconds

165

* @throws FlinkException if script exits with non-zero code

166

*/

167

@Override

168

public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws Exception;

169

}

170

```

171

172

### GPU Resource Information

173

174

Container for GPU resource information with property-based access.

175

176

```java { .api }

177

/**

178

* Information for GPU resource. Currently only including the GPU index.

179

* Note: Constructor is package-private - instances created by GPUDriver

180

*/

181

public class GPUInfo implements ExternalResourceInfo {

182

/**

183

* Get the property indicated by the specified key

184

* @param key of the required property ("index" is supported)

185

* @return Optional containing the value, or empty if key not found

186

*/

187

public Optional<String> getProperty(String key);

188

189

/**

190

* Get all property keys

191

* @return Collection of all property keys

192

*/

193

public Collection<String> getKeys();

194

195

/**

196

* Returns formatted string representation of GPU device

197

* @return String in format "GPU Device(index)"

198

*/

199

public String toString();

200

201

/**

202

* Hash code based on GPU index

203

* @return int hash code

204

*/

205

public int hashCode();

206

207

/**

208

* Equality comparison based on GPU index

209

* @param obj Object to compare

210

* @return boolean true if equal GPU indices

211

*/

212

public boolean equals(Object obj);

213

}

214

```

215

216

### GPU Driver Configuration

217

218

Configuration options for GPU resource discovery behavior.

219

220

```java { .api }

221

/**

222

* A collection of all configuration options for GPU driver.

223

* Uses @Documentation.SuffixOption for automatic key generation.

224

*/

225

@PublicEvolving

226

public class GPUDriverOptions {

227

/**

228

* Configuration key: "discovery-script.path"

229

* Full key pattern: external-resource.<resource_name>.param.discovery-script.path

230

*

231

* The path of the discovery script. Can be absolute path or relative to FLINK_HOME.

232

* Default: plugins/external-resource-gpu/nvidia-gpu-discovery.sh

233

*/

234

@Documentation.SuffixOption("external-resource.<resource_name>.param")

235

public static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =

236

key("discovery-script.path")

237

.stringType()

238

.defaultValue("plugins/external-resource-gpu/nvidia-gpu-discovery.sh")

239

.withDescription("Path to GPU discovery script");

240

241

/**

242

* Configuration key: "discovery-script.args"

243

* Full key pattern: external-resource.<resource_name>.param.discovery-script.args

244

*

245

* The arguments passed to the discovery script as second parameter.

246

* No default value - leave unset if script requires no arguments.

247

*/

248

@Documentation.SuffixOption("external-resource.<resource_name>.param")

249

public static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =

250

key("discovery-script.args")

251

.stringType()

252

.noDefaultValue()

253

.withDescription("Arguments for GPU discovery script");

254

}

255

```

256

257

### Flink Core Interfaces

258

259

The external resource system is built on these core Flink interfaces:

260

261

```java { .api }

262

/**

263

* Driver which takes the responsibility to manage and provide the information of external resource.

264

* Drivers are instantiated via an ExternalResourceDriverFactory.

265

* TaskExecutor retrieves ExternalResourceInfo from the drivers.

266

*/

267

@PublicEvolving

268

public interface ExternalResourceDriver {

269

/**

270

* Retrieve the information of the external resources according to the amount.

271

* @param amount of the required external resources

272

* @return information set of the required external resources

273

* @throws Exception if there is something wrong during retrieving

274

*/

275

Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;

276

}

277

278

/**

279

* Factory for ExternalResourceDriver. Instantiate a driver with configuration.

280

* Drivers with factories automatically qualify for plugin loading if the driver jar

281

* is self-contained and contains a META-INF/services file.

282

*/

283

@PublicEvolving

284

public interface ExternalResourceDriverFactory {

285

/**

286

* Construct the ExternalResourceDriver from configuration.

287

* @param config configuration for this external resource

288

* @return the driver for this external resource

289

* @throws Exception if there is something wrong during the creation

290

*/

291

ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;

292

}

293

294

/**

295

* Contains the information of an external resource.

296

*/

297

@PublicEvolving

298

public interface ExternalResourceInfo {

299

/**

300

* Get the property indicated by the specified key.

301

* @param key of the required property

302

* @return an Optional containing the value, or empty if no value stored under key

303

*/

304

Optional<String> getProperty(String key);

305

306

/**

307

* Get all property keys.

308

* @return collection of all property keys

309

*/

310

Collection<String> getKeys();

311

}

312

```

313

314

## Configuration Integration

315

316

### Flink Configuration Keys

317

318

The GPU driver integrates with Flink's configuration system using these key patterns:

319

320

```java

321

// Configuration key pattern for external resources

322

external-resource.<resource_name>.param.discovery-script.path

323

external-resource.<resource_name>.param.discovery-script.args

324

325

// Example for GPU resources named "gpu"

326

external-resource.gpu.param.discovery-script.path: "/opt/flink/scripts/nvidia-gpu-discovery.sh"

327

external-resource.gpu.param.discovery-script.args: "--cuda-version=11.0"

328

```

329

330

### Default Discovery Script

331

332

The framework includes a default NVIDIA GPU discovery script:

333

334

**Script Details:**

335

- **Location**: `{FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.sh`

336

- **Dependencies**: Requires `nvidia-smi` command available in PATH

337

- **Common script**: Uses `gpu-discovery-common.sh` for shared allocation logic

338

- **GPU Detection**: Executes `nvidia-smi --query-gpu=index --format=csv,noheader`

339

- **Output Format**: Comma-separated GPU indices (e.g., "0,1,2")

340

- **Timeout**: 10 seconds maximum execution time

341

- **Process cleanup**: Discovery script process is destroyed forcibly after completion

342

343

**Script Arguments:**

344

```bash

345

# Basic usage

346

./nvidia-gpu-discovery.sh <gpu-amount>

347

348

# With coordination mode (prevents resource conflicts)

349

./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode

350

351

# Custom coordination file location

352

./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/path

353

```

354

355

**Internal Process:**

356

1. Script validates GPU amount > 0, exits with code 0 if amount = 0

357

2. Calls `nvidia-smi` to get available GPU indices

358

3. Executes allocation logic (coordination or non-coordination mode)

359

4. Returns comma-separated indices of allocated GPUs

360

5. Exits with code 1 if insufficient GPUs available

361

362

## Error Handling

363

364

### Exception Types and Conditions

365

366

```java { .api }

367

/**

368

* Configuration-related exceptions thrown during GPUDriver construction

369

*/

370

371

// IllegalConfigurationException

372

// Thrown when: GPU discovery script path is null, empty, or whitespace-only

373

// Message: "GPU discovery script ('external-resource.<name>.param.discovery-script.path') is not configured."

374

375

// FileNotFoundException

376

// Thrown when: Discovery script file does not exist at the specified path

377

// Message: "The gpu discovery script does not exist in path <absolute-path>."

378

379

// FlinkException

380

// Thrown when: Script file exists but is not executable

381

// Message: "The discovery script <absolute-path> is not executable."

382

383

/**

384

* Runtime exceptions thrown during resource discovery (retrieveResourceInfo)

385

*/

386

387

// IllegalArgumentException

388

// Thrown when: gpuAmount parameter <= 0

389

// Message: "The gpuAmount should be positive when retrieving the GPU resource information."

390

391

// TimeoutException

392

// Thrown when: Script execution exceeds 10 seconds (DISCOVERY_SCRIPT_TIMEOUT_MS)

393

// Message: "The discovery script executed for over 10000 ms."

394

395

// FlinkException

396

// Thrown when: Discovery script exits with non-zero return code

397

// Message: "Discovery script exit with non-zero return code: <exit-code>."

398

// Additional: Warning logged with stdout/stderr content

399

400

/**

401

* Discovery script output validation

402

*/

403

// Warning logged when: Script produces multiple output lines

404

// Message: "The output of the discovery script should only contain one single line. Finding <count> lines..."

405

// Behavior: Only first line is used, others ignored

406

```

407

408

### Discovery Script Requirements

409

410

Custom discovery scripts must follow these requirements:

411

412

**Script Interface:**

413

- Be executable by the Flink process (chmod +x)

414

- Accept GPU amount as first positional argument (required)

415

- Accept optional arguments as second positional argument (space-separated string)

416

- Return comma-separated GPU indices on stdout (single line only)

417

- Exit with code 0 for success, non-zero for failure

418

- Complete execution within 10 seconds

419

420

**Output Validation:**

421

- Script output is read from stdout only

422

- Multiple lines: Only first line used, others ignored with warning

423

- Empty output: Returns empty Set (no GPUs allocated)

424

- Whitespace in indices: Automatically trimmed

425

- Invalid format: Creates GPUInfo with the trimmed string as index

426

427

**Error Handling:**

428

- Non-zero exit: Logs stdout/stderr content and throws FlinkException

429

- Timeout: Process destroyed forcibly, TimeoutException thrown

430

- Script execution uses Runtime.getRuntime().exec() with process streams captured

431

432

**Example Custom Discovery Script:**

433

```bash

434

#!/bin/bash

435

# Custom GPU discovery script

436

GPU_AMOUNT=$1

437

SCRIPT_ARGS="$2"

438

439

# Your custom GPU detection logic here

440

# Must output comma-separated indices

441

echo "0,1,3" # Example: allocate GPUs 0, 1, and 3

442

```

443

444

**Script Execution Context:**

445

```java

446

// GPUDriver executes script as:

447

String cmd = discoveryScript.getAbsolutePath() + " " + gpuAmount + " " + args;

448

Process process = Runtime.getRuntime().exec(cmd);

449

```

450

451

## Service Loader Integration

452

453

The GPU driver is automatically discovered by Flink through Java Service Loader:

454

455

**META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory:**

456

```

457

org.apache.flink.externalresource.gpu.GPUDriverFactory

458

```

459

460

This enables automatic registration with Flink's external resource management system without manual configuration.

461

462

### Discovery Script Coordination

463

464

The default NVIDIA GPU discovery script supports coordination mode to prevent resource conflicts:

465

466

```bash

467

# Usage patterns

468

./nvidia-gpu-discovery.sh <gpu-amount>

469

./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode

470

./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/path

471

```

472

473

**Coordination Features:**

474

- **Non-coordination mode**: Simple first-N allocation from available GPUs

475

- **Coordination mode**: File-based locking prevents multiple processes from claiming same GPUs

476

- **Process cleanup**: Automatically reclaims GPUs from dead processes

477

- **Default coordination file**: `/var/tmp/flink-gpu-coordination`

478

479

```java { .api }

480

// Configuration for coordination mode

481

config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode --coordination-file /tmp/gpu-coord");

482

```

483

484

## Types

485

486

### Core Flink Types

487

488

```java { .api }

489

/**

490

* Configuration system for Flink settings

491

*/

492

public class Configuration {

493

public <T> T get(ConfigOption<T> option);

494

public <T> Configuration set(ConfigOption<T> option, T value);

495

}

496

497

/**

498

* Typed configuration option with key, type, and default value

499

*/

500

public class ConfigOption<T> {

501

public String key();

502

public T defaultValue();

503

}

504

505

/**

506

* Exception for invalid configuration values

507

*/

508

@PublicEvolving

509

public class IllegalConfigurationException extends RuntimeException {

510

public IllegalConfigurationException(String message);

511

public IllegalConfigurationException(String message, Throwable cause);

512

}

513

514

/**

515

* Base class of all Flink-specific checked exceptions

516

*/

517

@Public

518

public class FlinkException extends Exception {

519

public FlinkException(String message);

520

public FlinkException(String message, Throwable cause);

521

}

522

```

523

524

### GPU-Specific Constants

525

526

```java { .api }

527

/**

528

* Property key constant for accessing GPU index from GPUInfo

529

*/

530

public static final String PROPERTY_KEY_INDEX = "index";

531

532

/**

533

* Script execution timeout in milliseconds (package-private in GPUDriver)

534

*/

535

private static final long DISCOVERY_SCRIPT_TIMEOUT_MS = 10000;

536

537

/**

538

* Default discovery script locations and names

539

*/

540

public static final String DEFAULT_FLINK_PLUGINS_DIRS = "plugins";

541

// Default script: {FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.sh

542

```

543

544

### Service Loader Configuration

545

546

The GPU driver factory is registered via Service Loader in the JAR file:

547

548

**File:** `META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory`

549

**Content:**

550

```

551

org.apache.flink.externalresource.gpu.GPUDriverFactory

552

```

553

554

This file enables automatic discovery and loading by Flink's plugin system without requiring manual registration or configuration changes.