or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

application-mode.mdartifact-management.mdcli-frontend.mdcluster-client.mddeployment-management.mdindex.mdprogram-packaging.md

artifact-management.mddocs/

0

# Artifact Management

1

2

Flexible system for fetching and managing job artifacts from various sources including local files, HTTP endpoints, and distributed file systems with pluggable fetcher implementations and comprehensive error handling.

3

4

## Capabilities

5

6

### Artifact Fetch Manager

7

8

Central manager for artifact fetching operations providing unified interface to multiple fetcher implementations.

9

10

```java { .api }

11

/**

12

* Manager for artifact fetching operations

13

*/

14

public class ArtifactFetchManager {

15

/**

16

* Create artifact fetch manager from configuration

17

* @param configuration Flink configuration

18

* @return Configured artifact fetch manager

19

*/

20

public static ArtifactFetchManager fromConfiguration(Configuration configuration);

21

22

/**

23

* Fetch artifact from URI to target directory

24

* @param uri URI of the artifact to fetch

25

* @param targetDir Target directory for the artifact

26

* @return Future with fetched file

27

* @throws Exception if fetching fails

28

*/

29

public CompletableFuture<File> fetchArtifact(URI uri, File targetDir) throws Exception;

30

31

/**

32

* Fetch artifact with custom filename

33

* @param uri URI of the artifact to fetch

34

* @param targetDir Target directory for the artifact

35

* @param filename Custom filename for the artifact

36

* @return Future with fetched file

37

* @throws Exception if fetching fails

38

*/

39

public CompletableFuture<File> fetchArtifact(

40

URI uri,

41

File targetDir,

42

@Nullable String filename

43

) throws Exception;

44

45

/**

46

* Get registered artifact fetchers

47

* @return Collection of registered fetchers

48

*/

49

public Collection<ArtifactFetcher> getRegisteredFetchers();

50

51

/**

52

* Register custom artifact fetcher

53

* @param fetcher Artifact fetcher to register

54

*/

55

public void registerFetcher(ArtifactFetcher fetcher);

56

}

57

```

58

59

**Usage Examples:**

60

61

```java

62

import org.apache.flink.client.program.artifact.ArtifactFetchManager;

63

import java.net.URI;

64

import java.io.File;

65

66

// Create artifact fetch manager

67

ArtifactFetchManager fetchManager = ArtifactFetchManager.fromConfiguration(config);

68

69

// Fetch artifact from HTTP

70

URI httpUri = new URI("https://example.com/path/to/job.jar");

71

File targetDir = new File("/tmp/artifacts");

72

targetDir.mkdirs();

73

74

CompletableFuture<File> fetchResult = fetchManager.fetchArtifact(httpUri, targetDir);

75

File fetchedFile = fetchResult.get();

76

System.out.println("Fetched artifact: " + fetchedFile.getAbsolutePath());

77

78

// Fetch with custom filename

79

CompletableFuture<File> customFetch = fetchManager.fetchArtifact(

80

httpUri,

81

targetDir,

82

"my-job.jar"

83

);

84

File customFile = customFetch.get();

85

```

86

87

### Artifact Fetcher Interface

88

89

Interface for implementing artifact fetcher strategies supporting different protocols and storage systems.

90

91

```java { .api }

92

/**

93

* Interface for artifact fetching implementations

94

*/

95

public interface ArtifactFetcher {

96

/**

97

* Fetch artifact from URI

98

* @param uri URI of the artifact

99

* @param flinkConf Flink configuration

100

* @param targetDir Target directory

101

* @param filename Optional custom filename

102

* @return Future with fetched file

103

* @throws Exception if fetching fails

104

*/

105

CompletableFuture<File> fetch(

106

URI uri,

107

Configuration flinkConf,

108

File targetDir,

109

@Nullable String filename

110

) throws Exception;

111

112

/**

113

* Check if this fetcher supports the given URI scheme

114

* @param uri URI to check

115

* @return true if supported

116

*/

117

boolean supportsScheme(URI uri);

118

}

119

```

120

121

### HTTP Artifact Fetcher

122

123

Implementation for fetching artifacts from HTTP and HTTPS endpoints with support for authentication and custom headers.

124

125

```java { .api }

126

/**

127

* Fetcher for HTTP-based artifacts

128

*/

129

public class HttpArtifactFetcher implements ArtifactFetcher {

130

/**

131

* Create HTTP artifact fetcher with default configuration

132

*/

133

public HttpArtifactFetcher();

134

135

/**

136

* Create HTTP artifact fetcher with connection timeout

137

* @param connectionTimeoutMs Connection timeout in milliseconds

138

* @param readTimeoutMs Read timeout in milliseconds

139

*/

140

public HttpArtifactFetcher(int connectionTimeoutMs, int readTimeoutMs);

141

142

@Override

143

public CompletableFuture<File> fetch(

144

URI uri,

145

Configuration flinkConf,

146

File targetDir,

147

@Nullable String filename

148

) throws Exception;

149

150

@Override

151

public boolean supportsScheme(URI uri);

152

}

153

```

154

155

### File System Artifact Fetcher

156

157

Implementation for fetching artifacts from distributed file systems including HDFS, S3, and other Flink-supported file systems.

158

159

```java { .api }

160

/**

161

* Fetcher for file system-based artifacts

162

*/

163

public class FsArtifactFetcher implements ArtifactFetcher {

164

/**

165

* Create file system artifact fetcher

166

*/

167

public FsArtifactFetcher();

168

169

@Override

170

public CompletableFuture<File> fetch(

171

URI uri,

172

Configuration flinkConf,

173

File targetDir,

174

@Nullable String filename

175

) throws Exception;

176

177

@Override

178

public boolean supportsScheme(URI uri);

179

}

180

```

181

182

### Local Artifact Fetcher

183

184

Implementation for handling local file system artifacts with support for symbolic links and file validation.

185

186

```java { .api }

187

/**

188

* Fetcher for local file system artifacts

189

*/

190

public class LocalArtifactFetcher implements ArtifactFetcher {

191

/**

192

* Create local artifact fetcher

193

*/

194

public LocalArtifactFetcher();

195

196

@Override

197

public CompletableFuture<File> fetch(

198

URI uri,

199

Configuration flinkConf,

200

File targetDir,

201

@Nullable String filename

202

) throws Exception;

203

204

@Override

205

public boolean supportsScheme(URI uri);

206

207

/**

208

* Copy local file to target directory

209

* @param sourceFile Source file to copy

210

* @param targetDir Target directory

211

* @param targetFileName Target filename

212

* @return Copied file

213

* @throws IOException if copy fails

214

*/

215

protected File copyLocalFile(File sourceFile, File targetDir, String targetFileName)

216

throws IOException;

217

}

218

```

219

220

### Artifact Utilities

221

222

Utility functions for common artifact operations including validation, metadata extraction, and path manipulation.

223

224

```java { .api }

225

/**

226

* Utilities for artifact operations

227

*/

228

public class ArtifactUtils {

229

/**

230

* Extract filename from URI

231

* @param uri URI to extract filename from

232

* @return Extracted filename or null

233

*/

234

@Nullable

235

public static String extractFilenameFromUri(URI uri);

236

237

/**

238

* Validate artifact file

239

* @param file File to validate

240

* @return true if valid artifact

241

*/

242

public static boolean isValidArtifact(File file);

243

244

/**

245

* Get file extension from filename

246

* @param filename Filename to analyze

247

* @return File extension or empty string

248

*/

249

public static String getFileExtension(String filename);

250

251

/**

252

* Create unique filename in directory

253

* @param targetDir Target directory

254

* @param baseFilename Base filename

255

* @return Unique filename

256

*/

257

public static String createUniqueFilename(File targetDir, String baseFilename);

258

259

/**

260

* Calculate file checksum

261

* @param file File to checksum

262

* @param algorithm Hash algorithm (MD5, SHA-1, SHA-256)

263

* @return Hex-encoded checksum

264

* @throws IOException if calculation fails

265

*/

266

public static String calculateChecksum(File file, String algorithm) throws IOException;

267

268

/**

269

* Verify file checksum

270

* @param file File to verify

271

* @param expectedChecksum Expected checksum

272

* @param algorithm Hash algorithm

273

* @return true if checksum matches

274

* @throws IOException if verification fails

275

*/

276

public static boolean verifyChecksum(File file, String expectedChecksum, String algorithm)

277

throws IOException;

278

}

279

```

280

281

### CLI Artifact Fetch Options

282

283

Command-line options for artifact fetching operations integrated with the CLI frontend.

284

285

```java { .api }

286

/**

287

* Options for artifact fetching operations

288

*/

289

public class ArtifactFetchOptions extends CommandLineOptions {

290

/**

291

* Get artifact URI from options

292

* @return Artifact URI

293

*/

294

public String getArtifactUri();

295

296

/**

297

* Get target directory from options

298

* @return Target directory path

299

*/

300

public String getTargetDirectory();

301

302

/**

303

* Get custom filename from options

304

* @return Custom filename or null

305

*/

306

@Nullable

307

public String getCustomFilename();

308

309

/**

310

* Get connection timeout from options

311

* @return Connection timeout in milliseconds

312

*/

313

public int getConnectionTimeout();

314

315

/**

316

* Get read timeout from options

317

* @return Read timeout in milliseconds

318

*/

319

public int getReadTimeout();

320

321

/**

322

* Check if checksum verification is enabled

323

* @return true if verification enabled

324

*/

325

public boolean isChecksumVerificationEnabled();

326

327

/**

328

* Get expected checksum from options

329

* @return Expected checksum or null

330

*/

331

@Nullable

332

public String getExpectedChecksum();

333

334

/**

335

* Get checksum algorithm from options

336

* @return Checksum algorithm (default: SHA-256)

337

*/

338

public String getChecksumAlgorithm();

339

}

340

```

341

342

## Usage Patterns

343

344

### Basic Artifact Fetching

345

346

```java

347

// Configure and fetch from multiple sources

348

Configuration config = new Configuration();

349

ArtifactFetchManager manager = ArtifactFetchManager.fromConfiguration(config);

350

351

// Fetch from HTTP

352

URI httpArtifact = new URI("https://repo.example.com/artifacts/job-1.0.jar");

353

File httpResult = manager.fetchArtifact(httpArtifact, targetDir).get();

354

355

// Fetch from HDFS

356

URI hdfsArtifact = new URI("hdfs://namenode:9000/artifacts/job-1.0.jar");

357

File hdfsResult = manager.fetchArtifact(hdfsArtifact, targetDir).get();

358

359

// Fetch from local file system

360

URI localArtifact = new URI("file:///path/to/local/job.jar");

361

File localResult = manager.fetchArtifact(localArtifact, targetDir).get();

362

```

363

364

### Custom Fetcher Registration

365

366

```java

367

// Create custom fetcher for specific protocol

368

public class S3ArtifactFetcher implements ArtifactFetcher {

369

@Override

370

public CompletableFuture<File> fetch(

371

URI uri, Configuration flinkConf, File targetDir, String filename

372

) throws Exception {

373

// Custom S3 fetching logic

374

return CompletableFuture.supplyAsync(() -> {

375

// Implement S3 download

376

return downloadFromS3(uri, targetDir, filename);

377

});

378

}

379

380

@Override

381

public boolean supportsScheme(URI uri) {

382

return "s3".equals(uri.getScheme()) || "s3a".equals(uri.getScheme());

383

}

384

}

385

386

// Register custom fetcher

387

ArtifactFetchManager manager = ArtifactFetchManager.fromConfiguration(config);

388

manager.registerFetcher(new S3ArtifactFetcher());

389

```

390

391

### Artifact Validation and Checksums

392

393

```java

394

// Fetch with checksum verification

395

URI artifactUri = new URI("https://repo.example.com/job.jar");

396

File fetchedFile = manager.fetchArtifact(artifactUri, targetDir).get();

397

398

// Verify artifact integrity

399

String expectedChecksum = "a1b2c3d4e5f6...";

400

boolean isValid = ArtifactUtils.verifyChecksum(

401

fetchedFile,

402

expectedChecksum,

403

"SHA-256"

404

);

405

406

if (!isValid) {

407

throw new RuntimeException("Artifact checksum verification failed");

408

}

409

410

// Validate artifact format

411

if (!ArtifactUtils.isValidArtifact(fetchedFile)) {

412

throw new RuntimeException("Invalid artifact format");

413

}

414

```

415

416

## Error Handling

417

418

Artifact management operations handle various error conditions:

419

420

- **Network Errors**: Connection failures, timeouts, DNS resolution issues

421

- **Authentication Errors**: Invalid credentials, permission denied

422

- **File System Errors**: Disk space, permission issues, path not found

423

- **Validation Errors**: Checksum mismatches, corrupted files, invalid formats

424

- **Configuration Errors**: Invalid URIs, missing required parameters

425

426

**Error Handling Patterns:**

427

428

```java

429

try {

430

CompletableFuture<File> fetchFuture = manager.fetchArtifact(uri, targetDir);

431

432

File result = fetchFuture.handle((file, throwable) -> {

433

if (throwable != null) {

434

if (throwable.getCause() instanceof IOException) {

435

System.err.println("I/O error: " + throwable.getMessage());

436

// Handle I/O errors (network, file system)

437

} else if (throwable.getCause() instanceof SecurityException) {

438

System.err.println("Security error: " + throwable.getMessage());

439

// Handle authentication/authorization errors

440

} else {

441

System.err.println("Unexpected error: " + throwable.getMessage());

442

// Handle other errors

443

}

444

return null;

445

}

446

return file;

447

}).get();

448

449

if (result != null) {

450

System.out.println("Artifact fetched successfully: " + result.getPath());

451

}

452

453

} catch (InterruptedException | ExecutionException e) {

454

System.err.println("Failed to fetch artifact: " + e.getMessage());

455

}

456

```

457

458

The artifact management system provides a flexible and extensible framework for handling diverse artifact sources, enabling Flink applications to fetch dependencies and resources from various storage systems with consistent error handling and validation.