0
# Artifact Management
1
2
Flexible system for fetching and managing job artifacts from various sources including local files, HTTP endpoints, and distributed file systems with pluggable fetcher implementations and comprehensive error handling.
3
4
## Capabilities
5
6
### Artifact Fetch Manager
7
8
Central manager for artifact fetching operations providing unified interface to multiple fetcher implementations.
9
10
```java { .api }
11
/**
12
* Manager for artifact fetching operations
13
*/
14
public class ArtifactFetchManager {
15
/**
16
* Create artifact fetch manager from configuration
17
* @param configuration Flink configuration
18
* @return Configured artifact fetch manager
19
*/
20
public static ArtifactFetchManager fromConfiguration(Configuration configuration);
21
22
/**
23
* Fetch artifact from URI to target directory
24
* @param uri URI of the artifact to fetch
25
* @param targetDir Target directory for the artifact
26
* @return Future with fetched file
27
* @throws Exception if fetching fails
28
*/
29
public CompletableFuture<File> fetchArtifact(URI uri, File targetDir) throws Exception;
30
31
/**
32
* Fetch artifact with custom filename
33
* @param uri URI of the artifact to fetch
34
* @param targetDir Target directory for the artifact
35
* @param filename Custom filename for the artifact
36
* @return Future with fetched file
37
* @throws Exception if fetching fails
38
*/
39
public CompletableFuture<File> fetchArtifact(
40
URI uri,
41
File targetDir,
42
@Nullable String filename
43
) throws Exception;
44
45
/**
46
* Get registered artifact fetchers
47
* @return Collection of registered fetchers
48
*/
49
public Collection<ArtifactFetcher> getRegisteredFetchers();
50
51
/**
52
* Register custom artifact fetcher
53
* @param fetcher Artifact fetcher to register
54
*/
55
public void registerFetcher(ArtifactFetcher fetcher);
56
}
57
```
58
59
**Usage Examples:**
60
61
```java
62
import org.apache.flink.client.program.artifact.ArtifactFetchManager;
63
import java.net.URI;
64
import java.io.File;
65
66
// Create artifact fetch manager
67
ArtifactFetchManager fetchManager = ArtifactFetchManager.fromConfiguration(config);
68
69
// Fetch artifact from HTTP
70
URI httpUri = new URI("https://example.com/path/to/job.jar");
71
File targetDir = new File("/tmp/artifacts");
72
targetDir.mkdirs();
73
74
CompletableFuture<File> fetchResult = fetchManager.fetchArtifact(httpUri, targetDir);
75
File fetchedFile = fetchResult.get();
76
System.out.println("Fetched artifact: " + fetchedFile.getAbsolutePath());
77
78
// Fetch with custom filename
79
CompletableFuture<File> customFetch = fetchManager.fetchArtifact(
80
httpUri,
81
targetDir,
82
"my-job.jar"
83
);
84
File customFile = customFetch.get();
85
```
86
87
### Artifact Fetcher Interface
88
89
Interface for implementing artifact fetcher strategies supporting different protocols and storage systems.
90
91
```java { .api }
92
/**
93
* Interface for artifact fetching implementations
94
*/
95
public interface ArtifactFetcher {
96
/**
97
* Fetch artifact from URI
98
* @param uri URI of the artifact
99
* @param flinkConf Flink configuration
100
* @param targetDir Target directory
101
* @param filename Optional custom filename
102
* @return Future with fetched file
103
* @throws Exception if fetching fails
104
*/
105
CompletableFuture<File> fetch(
106
URI uri,
107
Configuration flinkConf,
108
File targetDir,
109
@Nullable String filename
110
) throws Exception;
111
112
/**
113
* Check if this fetcher supports the given URI scheme
114
* @param uri URI to check
115
* @return true if supported
116
*/
117
boolean supportsScheme(URI uri);
118
}
119
```
120
121
### HTTP Artifact Fetcher
122
123
Implementation for fetching artifacts from HTTP and HTTPS endpoints with support for authentication and custom headers.
124
125
```java { .api }
126
/**
127
* Fetcher for HTTP-based artifacts
128
*/
129
public class HttpArtifactFetcher implements ArtifactFetcher {
130
/**
131
* Create HTTP artifact fetcher with default configuration
132
*/
133
public HttpArtifactFetcher();
134
135
/**
136
* Create HTTP artifact fetcher with connection timeout
137
* @param connectionTimeoutMs Connection timeout in milliseconds
138
* @param readTimeoutMs Read timeout in milliseconds
139
*/
140
public HttpArtifactFetcher(int connectionTimeoutMs, int readTimeoutMs);
141
142
@Override
143
public CompletableFuture<File> fetch(
144
URI uri,
145
Configuration flinkConf,
146
File targetDir,
147
@Nullable String filename
148
) throws Exception;
149
150
@Override
151
public boolean supportsScheme(URI uri);
152
}
153
```
154
155
### File System Artifact Fetcher
156
157
Implementation for fetching artifacts from distributed file systems including HDFS, S3, and other Flink-supported file systems.
158
159
```java { .api }
160
/**
161
* Fetcher for file system-based artifacts
162
*/
163
public class FsArtifactFetcher implements ArtifactFetcher {
164
/**
165
* Create file system artifact fetcher
166
*/
167
public FsArtifactFetcher();
168
169
@Override
170
public CompletableFuture<File> fetch(
171
URI uri,
172
Configuration flinkConf,
173
File targetDir,
174
@Nullable String filename
175
) throws Exception;
176
177
@Override
178
public boolean supportsScheme(URI uri);
179
}
180
```
181
182
### Local Artifact Fetcher
183
184
Implementation for handling local file system artifacts with support for symbolic links and file validation.
185
186
```java { .api }
187
/**
188
* Fetcher for local file system artifacts
189
*/
190
public class LocalArtifactFetcher implements ArtifactFetcher {
191
/**
192
* Create local artifact fetcher
193
*/
194
public LocalArtifactFetcher();
195
196
@Override
197
public CompletableFuture<File> fetch(
198
URI uri,
199
Configuration flinkConf,
200
File targetDir,
201
@Nullable String filename
202
) throws Exception;
203
204
@Override
205
public boolean supportsScheme(URI uri);
206
207
/**
208
* Copy local file to target directory
209
* @param sourceFile Source file to copy
210
* @param targetDir Target directory
211
* @param targetFileName Target filename
212
* @return Copied file
213
* @throws IOException if copy fails
214
*/
215
protected File copyLocalFile(File sourceFile, File targetDir, String targetFileName)
216
throws IOException;
217
}
218
```
219
220
### Artifact Utilities
221
222
Utility functions for common artifact operations including validation, metadata extraction, and path manipulation.
223
224
```java { .api }
225
/**
226
* Utilities for artifact operations
227
*/
228
public class ArtifactUtils {
229
/**
230
* Extract filename from URI
231
* @param uri URI to extract filename from
232
* @return Extracted filename or null
233
*/
234
@Nullable
235
public static String extractFilenameFromUri(URI uri);
236
237
/**
238
* Validate artifact file
239
* @param file File to validate
240
* @return true if valid artifact
241
*/
242
public static boolean isValidArtifact(File file);
243
244
/**
245
* Get file extension from filename
246
* @param filename Filename to analyze
247
* @return File extension or empty string
248
*/
249
public static String getFileExtension(String filename);
250
251
/**
252
* Create unique filename in directory
253
* @param targetDir Target directory
254
* @param baseFilename Base filename
255
* @return Unique filename
256
*/
257
public static String createUniqueFilename(File targetDir, String baseFilename);
258
259
/**
260
* Calculate file checksum
261
* @param file File to checksum
262
* @param algorithm Hash algorithm (MD5, SHA-1, SHA-256)
263
* @return Hex-encoded checksum
264
* @throws IOException if calculation fails
265
*/
266
public static String calculateChecksum(File file, String algorithm) throws IOException;
267
268
/**
269
* Verify file checksum
270
* @param file File to verify
271
* @param expectedChecksum Expected checksum
272
* @param algorithm Hash algorithm
273
* @return true if checksum matches
274
* @throws IOException if verification fails
275
*/
276
public static boolean verifyChecksum(File file, String expectedChecksum, String algorithm)
277
throws IOException;
278
}
279
```
280
281
### CLI Artifact Fetch Options
282
283
Command-line options for artifact fetching operations integrated with the CLI frontend.
284
285
```java { .api }
286
/**
287
* Options for artifact fetching operations
288
*/
289
public class ArtifactFetchOptions extends CommandLineOptions {
290
/**
291
* Get artifact URI from options
292
* @return Artifact URI
293
*/
294
public String getArtifactUri();
295
296
/**
297
* Get target directory from options
298
* @return Target directory path
299
*/
300
public String getTargetDirectory();
301
302
/**
303
* Get custom filename from options
304
* @return Custom filename or null
305
*/
306
@Nullable
307
public String getCustomFilename();
308
309
/**
310
* Get connection timeout from options
311
* @return Connection timeout in milliseconds
312
*/
313
public int getConnectionTimeout();
314
315
/**
316
* Get read timeout from options
317
* @return Read timeout in milliseconds
318
*/
319
public int getReadTimeout();
320
321
/**
322
* Check if checksum verification is enabled
323
* @return true if verification enabled
324
*/
325
public boolean isChecksumVerificationEnabled();
326
327
/**
328
* Get expected checksum from options
329
* @return Expected checksum or null
330
*/
331
@Nullable
332
public String getExpectedChecksum();
333
334
/**
335
* Get checksum algorithm from options
336
* @return Checksum algorithm (default: SHA-256)
337
*/
338
public String getChecksumAlgorithm();
339
}
340
```
341
342
## Usage Patterns
343
344
### Basic Artifact Fetching
345
346
```java
347
// Configure and fetch from multiple sources
348
Configuration config = new Configuration();
349
ArtifactFetchManager manager = ArtifactFetchManager.fromConfiguration(config);
350
351
// Fetch from HTTP
352
URI httpArtifact = new URI("https://repo.example.com/artifacts/job-1.0.jar");
353
File httpResult = manager.fetchArtifact(httpArtifact, targetDir).get();
354
355
// Fetch from HDFS
356
URI hdfsArtifact = new URI("hdfs://namenode:9000/artifacts/job-1.0.jar");
357
File hdfsResult = manager.fetchArtifact(hdfsArtifact, targetDir).get();
358
359
// Fetch from local file system
360
URI localArtifact = new URI("file:///path/to/local/job.jar");
361
File localResult = manager.fetchArtifact(localArtifact, targetDir).get();
362
```
363
364
### Custom Fetcher Registration
365
366
```java
367
// Create custom fetcher for specific protocol
368
public class S3ArtifactFetcher implements ArtifactFetcher {
369
@Override
370
public CompletableFuture<File> fetch(
371
URI uri, Configuration flinkConf, File targetDir, String filename
372
) throws Exception {
373
// Custom S3 fetching logic
374
return CompletableFuture.supplyAsync(() -> {
375
// Implement S3 download
376
return downloadFromS3(uri, targetDir, filename);
377
});
378
}
379
380
@Override
381
public boolean supportsScheme(URI uri) {
382
return "s3".equals(uri.getScheme()) || "s3a".equals(uri.getScheme());
383
}
384
}
385
386
// Register custom fetcher
387
ArtifactFetchManager manager = ArtifactFetchManager.fromConfiguration(config);
388
manager.registerFetcher(new S3ArtifactFetcher());
389
```
390
391
### Artifact Validation and Checksums
392
393
```java
394
// Fetch with checksum verification
395
URI artifactUri = new URI("https://repo.example.com/job.jar");
396
File fetchedFile = manager.fetchArtifact(artifactUri, targetDir).get();
397
398
// Verify artifact integrity
399
String expectedChecksum = "a1b2c3d4e5f6...";
400
boolean isValid = ArtifactUtils.verifyChecksum(
401
fetchedFile,
402
expectedChecksum,
403
"SHA-256"
404
);
405
406
if (!isValid) {
407
throw new RuntimeException("Artifact checksum verification failed");
408
}
409
410
// Validate artifact format
411
if (!ArtifactUtils.isValidArtifact(fetchedFile)) {
412
throw new RuntimeException("Invalid artifact format");
413
}
414
```
415
416
## Error Handling
417
418
Artifact management operations handle various error conditions:
419
420
- **Network Errors**: Connection failures, timeouts, DNS resolution issues
421
- **Authentication Errors**: Invalid credentials, permission denied
422
- **File System Errors**: Disk space, permission issues, path not found
423
- **Validation Errors**: Checksum mismatches, corrupted files, invalid formats
424
- **Configuration Errors**: Invalid URIs, missing required parameters
425
426
**Error Handling Patterns:**
427
428
```java
429
try {
430
CompletableFuture<File> fetchFuture = manager.fetchArtifact(uri, targetDir);
431
432
File result = fetchFuture.handle((file, throwable) -> {
433
if (throwable != null) {
434
if (throwable.getCause() instanceof IOException) {
435
System.err.println("I/O error: " + throwable.getMessage());
436
// Handle I/O errors (network, file system)
437
} else if (throwable.getCause() instanceof SecurityException) {
438
System.err.println("Security error: " + throwable.getMessage());
439
// Handle authentication/authorization errors
440
} else {
441
System.err.println("Unexpected error: " + throwable.getMessage());
442
// Handle other errors
443
}
444
return null;
445
}
446
return file;
447
}).get();
448
449
if (result != null) {
450
System.out.println("Artifact fetched successfully: " + result.getPath());
451
}
452
453
} catch (InterruptedException | ExecutionException e) {
454
System.err.println("Failed to fetch artifact: " + e.getMessage());
455
}
456
```
457
458
The artifact management system provides a flexible and extensible framework for handling diverse artifact sources, enabling Flink applications to fetch dependencies and resources from various storage systems with consistent error handling and validation.