or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.md

index.mddocs/

0

# Apache Flink External Resources

1

2

Apache Flink external resources management framework that provides GPU resource discovery and allocation capabilities for distributed stream and batch processing applications. The framework enables GPU-aware task scheduling and resource allocation through configurable discovery scripts and extensible APIs.

3

4

## Package Information

5

6

- **Package Name**: flink-external-resources

7

- **Package Type**: maven

8

- **Language**: Java

9

- **Maven Coordinates**: `org.apache.flink:flink-external-resources:2.1.0` (parent) / `org.apache.flink:flink-external-resource-gpu:2.1.0` (GPU module)

10

- **Installation**: Add GPU module dependency to your Maven `pom.xml`:

11

12

```xml

13

<dependency>

14

<groupId>org.apache.flink</groupId>

15

<artifactId>flink-external-resource-gpu</artifactId>

16

<version>2.1.0</version>

17

</dependency>

18

```

19

20

**Module Structure:**

21

- **Parent**: `flink-external-resources` - Provides the external resource framework

22

- **GPU Module**: `flink-external-resource-gpu` - Implements GPU-specific resource discovery

23

24

## Core Imports

25

26

```java

27

// GPU-specific classes

28

import org.apache.flink.externalresource.gpu.GPUDriverFactory;

29

import org.apache.flink.externalresource.gpu.GPUInfo;

30

import org.apache.flink.externalresource.gpu.GPUDriverOptions;

31

32

// Flink core external resource interfaces

33

import org.apache.flink.api.common.externalresource.ExternalResourceDriver;

34

import org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory;

35

import org.apache.flink.api.common.externalresource.ExternalResourceInfo;

36

37

// Configuration and utilities

38

import org.apache.flink.configuration.Configuration;

39

import org.apache.flink.configuration.ConfigOption;

40

import org.apache.flink.configuration.IllegalConfigurationException;

41

import org.apache.flink.util.FlinkException;

42

import org.apache.flink.util.Preconditions;

43

44

// Standard Java imports

45

import java.util.Set;

46

import java.util.Collection;

47

import java.util.Optional;

48

import java.io.FileNotFoundException;

49

import java.util.concurrent.TimeoutException;

50

```

51

52

## Basic Usage

53

54

```java

55

import org.apache.flink.configuration.Configuration;

56

import org.apache.flink.externalresource.gpu.GPUDriverFactory;

57

import org.apache.flink.externalresource.gpu.GPUDriverOptions;

58

import org.apache.flink.externalresource.gpu.GPUInfo;

59

import org.apache.flink.api.common.externalresource.ExternalResourceDriver;

60

import org.apache.flink.api.common.externalresource.ExternalResourceInfo;

61

import java.util.Set;

62

import java.util.Optional;

63

64

// Configure GPU discovery with default NVIDIA script

65

Configuration config = new Configuration();

66

// Using default script (optional - can omit if default is acceptable)

67

config.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH,

68

"plugins/external-resource-gpu/nvidia-gpu-discovery.sh");

69

// Enable coordination mode to prevent conflicts

70

config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode");

71

72

// Create GPU driver via factory

73

GPUDriverFactory factory = new GPUDriverFactory();

74

ExternalResourceDriver driver = factory.createExternalResourceDriver(config);

75

76

// Discover available GPU resources

77

Set<? extends ExternalResourceInfo> gpuResources = driver.retrieveResourceInfo(2); // Request 2 GPUs

78

79

// Access GPU information

80

for (ExternalResourceInfo gpu : gpuResources) {

81

// Cast to GPUInfo for type safety (actual return type)

82

GPUInfo gpuInfo = (GPUInfo) gpu;

83

84

// Access GPU index property

85

Optional<String> index = gpu.getProperty("index");

86

if (index.isPresent()) {

87

System.out.println("Allocated GPU index: " + index.get());

88

}

89

90

// Get all available properties

91

System.out.println("Available properties: " + gpu.getKeys());

92

93

// String representation

94

System.out.println("GPU: " + gpu.toString()); // Output: "GPU Device(0)", "GPU Device(1)", etc.

95

}

96

97

// Example with custom script and arguments

98

Configuration customConfig = new Configuration();

99

customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_PATH, "/opt/custom-gpu-discovery.sh");

100

customConfig.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--min-memory=8GB --cuda-version=11.0");

101

```

102

103

## Architecture

104

105

The framework is built around the Flink external resource system with these key components:

106

107

- **Service Loading**: GPUDriverFactory is registered via Java Service Loader for automatic discovery by Flink

108

- **Discovery Scripts**: Configurable shell scripts execute to identify available GPU resources on cluster nodes

109

- **Resource Information**: GPUInfo objects encapsulate discovered GPU details with property-based access

110

- **Configuration**: Flink Configuration system integration for script paths and arguments

111

- **Error Handling**: Comprehensive exception handling for script execution, timeouts, and configuration issues

112

113

## Capabilities

114

115

### GPU Driver Factory

116

117

Factory for creating GPU resource drivers through Flink's external resource system.

118

119

```java { .api }

120

/**

121

* Factory for creating {@link GPUDriver} instances.

122

* Loaded automatically by Flink via Java Service Loader mechanism.

123

*/

124

public class GPUDriverFactory implements ExternalResourceDriverFactory {

125

/**

126

* Creates a GPU driver with the specified configuration

127

* @param config Configuration containing discovery script settings

128

* @return ExternalResourceDriver for GPU resource discovery

129

* @throws Exception if configuration is invalid or script setup fails

130

*/

131

@Override

132

public ExternalResourceDriver createExternalResourceDriver(Configuration config)

133

throws Exception;

134

}

135

```

136

137

### GPU Driver Implementation

138

139

The core implementation that executes discovery scripts to find available GPU resources.

140

141

```java { .api }

142

/**

143

* Driver takes the responsibility to discover GPU resources and provide the GPU resource

144

* information. It retrieves the GPU information by executing a user-defined discovery script.

145

*

146

* Note: This class is package-private and should only be created via GPUDriverFactory

147

*/

148

class GPUDriver implements ExternalResourceDriver {

149

/**

150

* Constructs GPUDriver with configuration validation and script setup

151

* @param config Configuration containing discovery script path and arguments

152

* @throws IllegalConfigurationException if script path is not configured

153

* @throws FileNotFoundException if discovery script file not found

154

* @throws FlinkException if script file exists but is not executable

155

*/

156

GPUDriver(Configuration config) throws Exception;

157

158

/**

159

* Retrieve GPU resource information by executing the configured discovery script

160

* @param gpuAmount Number of required GPU resources (must be > 0)

161

* @return Set of GPUInfo objects representing discovered GPU resources

162

* @throws IllegalArgumentException if gpuAmount <= 0

163

* @throws Exception if script execution fails

164

* @throws TimeoutException if script execution exceeds 10 seconds

165

* @throws FlinkException if script exits with non-zero code

166

*/

167

@Override

168

public Set<GPUInfo> retrieveResourceInfo(long gpuAmount) throws Exception;

169

}

170

```

171

172

### GPU Resource Information

173

174

Container for GPU resource information with property-based access.

175

176

```java { .api }

177

/**

178

* Information for GPU resource. Currently only including the GPU index.

179

* Note: Constructor is package-private - instances created by GPUDriver

180

*/

181

public class GPUInfo implements ExternalResourceInfo {

182

/**

183

* Get the property indicated by the specified key

184

* @param key of the required property ("index" is supported)

185

* @return Optional containing the value, or empty if key not found

186

*/

187

public Optional<String> getProperty(String key);

188

189

/**

190

* Get all property keys

191

* @return Collection of all property keys

192

*/

193

public Collection<String> getKeys();

194

195

/**

196

* Returns formatted string representation of GPU device

197

* @return String in format "GPU Device(index)"

198

*/

199

public String toString();

200

201

/**

202

* Hash code based on GPU index

203

* @return int hash code

204

*/

205

public int hashCode();

206

207

/**

208

* Equality comparison based on GPU index

209

* @param obj Object to compare

210

* @return boolean true if equal GPU indices

211

*/

212

public boolean equals(Object obj);

213

}

214

```

215

216

### GPU Driver Configuration

217

218

Configuration options for GPU resource discovery behavior.

219

220

```java { .api }

221

/**

222

* A collection of all configuration options for GPU driver.

223

* Uses @Documentation.SuffixOption for automatic key generation.

224

*/

225

@PublicEvolving

226

public class GPUDriverOptions {

227

/**

228

* Configuration key: "discovery-script.path"

229

* Full key pattern: external-resource.<resource_name>.param.discovery-script.path

230

*

231

* The path of the discovery script. Can be absolute path or relative to FLINK_HOME.

232

* Default: plugins/external-resource-gpu/nvidia-gpu-discovery.sh

233

*/

234

@Documentation.SuffixOption("external-resource.<resource_name>.param")

235

public static final ConfigOption<String> DISCOVERY_SCRIPT_PATH =

236

key("discovery-script.path")

237

.stringType()

238

.defaultValue("plugins/external-resource-gpu/nvidia-gpu-discovery.sh")

239

.withDescription("Path to GPU discovery script");

240

241

/**

242

* Configuration key: "discovery-script.args"

243

* Full key pattern: external-resource.<resource_name>.param.discovery-script.args

244

*

245

* The arguments passed to the discovery script as second parameter.

246

* No default value - leave unset if script requires no arguments.

247

*/

248

@Documentation.SuffixOption("external-resource.<resource_name>.param")

249

public static final ConfigOption<String> DISCOVERY_SCRIPT_ARG =

250

key("discovery-script.args")

251

.stringType()

252

.noDefaultValue()

253

.withDescription("Arguments for GPU discovery script");

254

}

255

```

256

257

### Flink Core Interfaces

258

259

The external resource system is built on these core Flink interfaces:

260

261

```java { .api }

262

/**

263

* Driver which takes the responsibility to manage and provide the information of external resource.

264

* Drivers are instantiated via an ExternalResourceDriverFactory.

265

* TaskExecutor retrieves ExternalResourceInfo from the drivers.

266

*/

267

@PublicEvolving

268

public interface ExternalResourceDriver {

269

/**

270

* Retrieve the information of the external resources according to the amount.

271

* @param amount of the required external resources

272

* @return information set of the required external resources

273

* @throws Exception if there is something wrong during retrieving

274

*/

275

Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;

276

}

277

278

/**

279

* Factory for ExternalResourceDriver. Instantiate a driver with configuration.

280

* Drivers with factories automatically qualify for plugin loading if the driver jar

281

* is self-contained and contains a META-INF/services file.

282

*/

283

@PublicEvolving

284

public interface ExternalResourceDriverFactory {

285

/**

286

* Construct the ExternalResourceDriver from configuration.

287

* @param config configuration for this external resource

288

* @return the driver for this external resource

289

* @throws Exception if there is something wrong during the creation

290

*/

291

ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;

292

}

293

294

/**

295

* Contains the information of an external resource.

296

*/

297

@PublicEvolving

298

public interface ExternalResourceInfo {

299

/**

300

* Get the property indicated by the specified key.

301

* @param key of the required property

302

* @return an Optional containing the value, or empty if no value stored under key

303

*/

304

Optional<String> getProperty(String key);

305

306

/**

307

* Get all property keys.

308

* @return collection of all property keys

309

*/

310

Collection<String> getKeys();

311

}

312

```

313

314

## Configuration Integration

315

316

### Flink Configuration Keys

317

318

The GPU driver integrates with Flink's configuration system using these key patterns:

319

320

```java

321

// Configuration key pattern for external resources

322

external-resource.<resource_name>.param.discovery-script.path

323

external-resource.<resource_name>.param.discovery-script.args

324

325

// Example for GPU resources named "gpu"

326

external-resource.gpu.param.discovery-script.path: "/opt/flink/scripts/nvidia-gpu-discovery.sh"

327

external-resource.gpu.param.discovery-script.args: "--cuda-version=11.0"

328

```

329

330

### Default Discovery Script

331

332

The framework includes a default NVIDIA GPU discovery script:

333

334

**Script Details:**

335

- **Location**: `{FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.sh`

336

- **Dependencies**: Requires `nvidia-smi` command available in PATH

337

- **Common script**: Uses `gpu-discovery-common.sh` for shared allocation logic

338

- **GPU Detection**: Executes `nvidia-smi --query-gpu=index --format=csv,noheader`

339

- **Output Format**: Comma-separated GPU indices (e.g., "0,1,2")

340

- **Timeout**: 10 seconds maximum execution time

341

- **Process cleanup**: Discovery script process is destroyed forcibly after completion

342

343

**Script Arguments:**

344

```bash

345

# Basic usage

346

./nvidia-gpu-discovery.sh <gpu-amount>

347

348

# With coordination mode (prevents resource conflicts)

349

./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode

350

351

# Custom coordination file location

352

./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/path

353

```

354

355

**Internal Process:**

356

1. Script validates GPU amount > 0, exits with code 0 if amount = 0

357

2. Calls `nvidia-smi` to get available GPU indices

358

3. Executes allocation logic (coordination or non-coordination mode)

359

4. Returns comma-separated indices of allocated GPUs

360

5. Exits with code 1 if insufficient GPUs available

361

362

## Error Handling

363

364

### Exception Types and Conditions

365

366

```java { .api }

367

/**

368

* Configuration-related exceptions thrown during GPUDriver construction

369

*/

370

371

// IllegalConfigurationException

372

// Thrown when: GPU discovery script path is null, empty, or whitespace-only

373

// Message: "GPU discovery script ('external-resource.<name>.param.discovery-script.path') is not configured."

374

375

// FileNotFoundException

376

// Thrown when: Discovery script file does not exist at the specified path

377

// Message: "The gpu discovery script does not exist in path <absolute-path>."

378

379

// FlinkException

380

// Thrown when: Script file exists but is not executable

381

// Message: "The discovery script <absolute-path> is not executable."

382

383

/**

384

* Runtime exceptions thrown during resource discovery (retrieveResourceInfo)

385

*/

386

387

// IllegalArgumentException

388

// Thrown when: gpuAmount parameter <= 0

389

// Message: "The gpuAmount should be positive when retrieving the GPU resource information."

390

391

// TimeoutException

392

// Thrown when: Script execution exceeds 10 seconds (DISCOVERY_SCRIPT_TIMEOUT_MS)

393

// Message: "The discovery script executed for over 10000 ms."

394

395

// FlinkException

396

// Thrown when: Discovery script exits with non-zero return code

397

// Message: "Discovery script exit with non-zero return code: <exit-code>."

398

// Additional: Warning logged with stdout/stderr content

399

400

/**

401

* Discovery script output validation

402

*/

403

// Warning logged when: Script produces multiple output lines

404

// Message: "The output of the discovery script should only contain one single line. Finding <count> lines..."

405

// Behavior: Only first line is used, others ignored

406

```

407

408

### Discovery Script Requirements

409

410

Custom discovery scripts must follow these requirements:

411

412

**Script Interface:**

413

- Be executable by the Flink process (chmod +x)

414

- Accept GPU amount as first positional argument (required)

415

- Accept optional arguments as second positional argument (space-separated string)

416

- Return comma-separated GPU indices on stdout (single line only)

417

- Exit with code 0 for success, non-zero for failure

418

- Complete execution within 10 seconds

419

420

**Output Validation:**

421

- Script output is read from stdout only

422

- Multiple lines: Only first line used, others ignored with warning

423

- Empty output: Returns empty Set (no GPUs allocated)

424

- Whitespace in indices: Automatically trimmed

425

- Invalid format: Creates GPUInfo with the trimmed string as index

426

427

**Error Handling:**

428

- Non-zero exit: Logs stdout/stderr content and throws FlinkException

429

- Timeout: Process destroyed forcibly, TimeoutException thrown

430

- Script execution uses Runtime.getRuntime().exec() with process streams captured

431

432

**Example Custom Discovery Script:**

433

```bash

434

#!/bin/bash

435

# Custom GPU discovery script

436

GPU_AMOUNT=$1

437

SCRIPT_ARGS="$2"

438

439

# Your custom GPU detection logic here

440

# Must output comma-separated indices

441

echo "0,1,3" # Example: allocate GPUs 0, 1, and 3

442

```

443

444

**Script Execution Context:**

445

```java

446

// GPUDriver executes script as:

447

String cmd = discoveryScript.getAbsolutePath() + " " + gpuAmount + " " + args;

448

Process process = Runtime.getRuntime().exec(cmd);

449

```

450

451

## Service Loader Integration

452

453

The GPU driver is automatically discovered by Flink through Java Service Loader:

454

455

**META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory:**

456

```

457

org.apache.flink.externalresource.gpu.GPUDriverFactory

458

```

459

460

This enables automatic registration with Flink's external resource management system without manual configuration.

461

462

### Discovery Script Coordination

463

464

The default NVIDIA GPU discovery script supports coordination mode to prevent resource conflicts:

465

466

```bash

467

# Usage patterns

468

./nvidia-gpu-discovery.sh <gpu-amount>

469

./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode

470

./nvidia-gpu-discovery.sh <gpu-amount> --enable-coordination-mode --coordination-file /custom/path

471

```

472

473

**Coordination Features:**

474

- **Non-coordination mode**: Simple first-N allocation from available GPUs

475

- **Coordination mode**: File-based locking prevents multiple processes from claiming same GPUs

476

- **Process cleanup**: Automatically reclaims GPUs from dead processes

477

- **Default coordination file**: `/var/tmp/flink-gpu-coordination`

478

479

```java { .api }

480

// Configuration for coordination mode

481

config.set(GPUDriverOptions.DISCOVERY_SCRIPT_ARG, "--enable-coordination-mode --coordination-file /tmp/gpu-coord");

482

```

483

484

## Types

485

486

### Core Flink Types

487

488

```java { .api }

489

/**

490

* Configuration system for Flink settings

491

*/

492

public class Configuration {

493

public <T> T get(ConfigOption<T> option);

494

public <T> Configuration set(ConfigOption<T> option, T value);

495

}

496

497

/**

498

* Typed configuration option with key, type, and default value

499

*/

500

public class ConfigOption<T> {

501

public String key();

502

public T defaultValue();

503

}

504

505

/**

506

* Exception for invalid configuration values

507

*/

508

@PublicEvolving

509

public class IllegalConfigurationException extends RuntimeException {

510

public IllegalConfigurationException(String message);

511

public IllegalConfigurationException(String message, Throwable cause);

512

}

513

514

/**

515

* Base class of all Flink-specific checked exceptions

516

*/

517

@Public

518

public class FlinkException extends Exception {

519

public FlinkException(String message);

520

public FlinkException(String message, Throwable cause);

521

}

522

```

523

524

### GPU-Specific Constants

525

526

```java { .api }

527

/**

528

* Property key constant for accessing GPU index from GPUInfo

529

*/

530

public static final String PROPERTY_KEY_INDEX = "index";

531

532

/**

533

* Script execution timeout in milliseconds (package-private in GPUDriver)

534

*/

535

private static final long DISCOVERY_SCRIPT_TIMEOUT_MS = 10000;

536

537

/**

538

* Default discovery script locations and names

539

*/

540

public static final String DEFAULT_FLINK_PLUGINS_DIRS = "plugins";

541

// Default script: {FLINK_PLUGINS_DIRS}/external-resource-gpu/nvidia-gpu-discovery.sh

542

```

543

544

### Service Loader Configuration

545

546

The GPU driver factory is registered via Service Loader in the JAR file:

547

548

**File:** `META-INF/services/org.apache.flink.api.common.externalresource.ExternalResourceDriverFactory`

549

**Content:**

550

```

551

org.apache.flink.externalresource.gpu.GPUDriverFactory

552

```

553

554

This file enables automatic discovery and loading by Flink's plugin system without requiring manual registration or configuration changes.