0
# File System Utilities
1
2
Comprehensive utilities for Hadoop FileSystem operations, path conversions, and integration between Hudi and Hadoop abstractions. Provides wrapper functionality, consistency guarantees, and retry mechanisms for reliable distributed file system operations.
3
4
## Capabilities
5
6
### HadoopFSUtils
7
8
Core utility class providing conversion functions and FileSystem management for Hadoop integration.
9
10
```java { .api }
11
/**
12
* Hadoop FileSystem utility functions
13
* Provides conversion utilities and FileSystem management
14
*/
15
public class HadoopFSUtils {
16
17
/** Prepare Hadoop configuration with Hudi-specific settings */
18
public static Configuration prepareHadoopConf(Configuration conf);
19
20
/** Get storage configuration from Hadoop configuration */
21
public static StorageConfiguration<Configuration> getStorageConf(Configuration conf);
22
23
/** Get default storage configuration */
24
public static StorageConfiguration<Configuration> getStorageConf();
25
26
/** Get storage configuration with copy of Hadoop configuration */
27
public static StorageConfiguration<Configuration> getStorageConfWithCopy(Configuration conf);
28
29
/** Get FileSystem for path string with storage configuration */
30
public static <T> FileSystem getFs(String pathStr, StorageConfiguration<T> storageConf);
31
32
/** Get FileSystem for Hadoop Path with storage configuration */
33
public static <T> FileSystem getFs(Path path, StorageConfiguration<T> storageConf);
34
35
/** Get FileSystem for path string with Hadoop configuration */
36
public static FileSystem getFs(String pathStr, Configuration conf);
37
38
/** Get FileSystem for Hadoop Path with configuration */
39
public static FileSystem getFs(Path path, Configuration conf);
40
41
/** Get FileSystem for StoragePath with configuration */
42
public static FileSystem getFs(StoragePath path, Configuration conf);
43
44
/** Get FileSystem with newCopy option */
45
public static <T> FileSystem getFs(String pathStr, StorageConfiguration<T> storageConf, boolean newCopy);
46
47
/** Get FileSystem for Path with newCopy option */
48
public static <T> FileSystem getFs(Path path, StorageConfiguration<T> storageConf, boolean newCopy);
49
50
/** Get FileSystem with local default option */
51
public static FileSystem getFs(String pathStr, Configuration conf, boolean localByDefault);
52
53
/** Add scheme to local path if missing */
54
public static Path addSchemeIfLocalPath(String path);
55
56
/** Convert StoragePath to Hadoop Path */
57
public static Path convertToHadoopPath(StoragePath path);
58
59
/** Convert Hadoop Path to StoragePath */
60
public static StoragePath convertToStoragePath(Path path);
61
62
/** Convert FileStatus to StoragePathInfo */
63
public static StoragePathInfo convertToStoragePathInfo(FileStatus fileStatus);
64
65
/** Convert FileStatus to StoragePathInfo with locations */
66
public static StoragePathInfo convertToStoragePathInfo(FileStatus fileStatus, String[] locations);
67
68
/** Convert StoragePathInfo to Hadoop FileStatus */
69
public static FileStatus convertToHadoopFileStatus(StoragePathInfo pathInfo);
70
71
/** Get FSDataInputStream with buffer settings */
72
public static FSDataInputStream getFSDataInputStream(FileSystem fs, StoragePath path, int bufferSize, boolean wrapStream);
73
74
/** Check if FileSystem is Google Cloud Storage */
75
public static boolean isGCSFileSystem(FileSystem fs);
76
77
/** Check if FileSystem is Cloudera Hadoop Distribution */
78
public static boolean isCHDFileSystem(FileSystem fs);
79
80
/** Register file system for path */
81
public static Configuration registerFileSystem(StoragePath file, Configuration conf);
82
83
/** Get file size from FileSystem */
84
public static long getFileSize(FileSystem fs, Path path);
85
86
/** Get relative partition path */
87
public static String getRelativePartitionPath(Path basePath, Path fullPartitionPath);
88
89
/** Get file ID from log path */
90
public static String getFileIdFromLogPath(Path path);
91
92
/** Get delta commit time from log path */
93
public static String getDeltaCommitTimeFromLogPath(Path path);
94
95
/** Get file ID from file path */
96
public static String getFileIdFromFilePath(Path filePath);
97
98
/** Check if path is base file */
99
public static boolean isBaseFile(Path path);
100
101
/** Check if path is log file */
102
public static boolean isLogFile(Path logPath);
103
104
/** Check if path is data file */
105
public static boolean isDataFile(Path path);
106
107
/** Get all data files in partition */
108
public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path partitionPath);
109
110
/** Construct absolute path in Hadoop */
111
public static Path constructAbsolutePathInHadoopPath(String basePath, String relativePartitionPath);
112
113
/** Get DFS full partition path */
114
public static String getDFSFullPartitionPath(FileSystem fs, Path fullPartitionPath);
115
116
/** Parallelize files processing */
117
public static <T> Map<String, T> parallelizeFilesProcess(Configuration configuration, List<String> filePathList,
118
SerializableFunction<String, T> func, int parallelism);
119
120
/** Get file status at level */
121
public static List<FileStatus> getFileStatusAtLevel(Configuration configuration, Path path, int level,
122
int parallelism, String[] subPathFilters, List<Path> subPaths);
123
124
/** Delete files in parallel */
125
public static Map<String, Boolean> deleteFilesParallelize(Configuration configuration, List<String> filePathList,
126
int parallelism);
127
}
128
```
129
130
### Path Conversion Utilities
131
132
Utilities for converting between Hudi and Hadoop path representations.
133
134
```java { .api }
135
/**
136
* Add scheme to local path if missing
137
* @param path - Path string that may need scheme
138
* @return Hadoop Path with proper scheme
139
*/
140
public static Path addSchemeIfLocalPath(String path);
141
142
/**
143
* Convert StoragePath to Hadoop Path
144
* @param path - Hudi StoragePath
145
* @return Equivalent Hadoop Path
146
*/
147
public static Path convertToHadoopPath(StoragePath path);
148
149
/**
150
* Convert Hadoop Path to StoragePath
151
* @param path - Hadoop Path
152
* @return Equivalent Hudi StoragePath
153
*/
154
public static StoragePath convertToStoragePath(Path path);
155
156
/**
157
* Convert Hadoop FileStatus to StoragePathInfo
158
* @param fileStatus - Hadoop FileStatus
159
* @return Equivalent StoragePathInfo
160
*/
161
public static StoragePathInfo convertToStoragePathInfo(FileStatus fileStatus);
162
163
/**
164
* Convert StoragePathInfo to Hadoop FileStatus
165
* @param pathInfo - Hudi StoragePathInfo
166
* @return Equivalent Hadoop FileStatus
167
*/
168
public static FileStatus convertToHadoopFileStatus(StoragePathInfo pathInfo);
169
170
/**
171
* Convert HoodiePath to Hadoop Path (legacy)
172
* @param path - Legacy HoodiePath
173
* @return Hadoop Path
174
*/
175
public static Path toPath(HoodiePath path);
176
177
/**
178
* Convert Hadoop Path to HoodiePath (legacy)
179
* @param path - Hadoop Path
180
* @return Legacy HoodiePath
181
*/
182
public static HoodiePath fromPath(Path path);
183
```
184
185
### Permission and Status Conversions
186
187
Utilities for converting file permissions and status objects between Hudi and Hadoop representations.
188
189
```java { .api }
190
/**
191
* Convert HoodieFSPermission to Hadoop FsPermission
192
* @param fsPermission - Hudi file system permission
193
* @return Hadoop FsPermission
194
*/
195
public static FsPermission toFSPermission(HoodieFSPermission fsPermission);
196
197
/**
198
* Convert Hadoop FsPermission to HoodieFSPermission
199
* @param fsPermission - Hadoop file system permission
200
* @return Hudi HoodieFSPermission
201
*/
202
public static HoodieFSPermission fromFSPermission(FsPermission fsPermission);
203
204
/**
205
* Convert Hadoop FileStatus to HoodieFileStatus
206
* @param fileStatus - Hadoop FileStatus
207
* @return Hudi HoodieFileStatus
208
*/
209
public static HoodieFileStatus fromFileStatus(FileStatus fileStatus);
210
```
211
212
### File System Operations
213
214
Advanced file system operations and metadata utilities.
215
216
```java { .api }
217
/**
218
* Get FSDataInputStream with buffering options
219
* @param fs - Hadoop FileSystem
220
* @param filePath - Storage path to read
221
* @param bufferSize - Buffer size for reading
222
* @param wrapStream - Whether to wrap the stream
223
* @return FSDataInputStream for reading
224
*/
225
public static FSDataInputStream getFSDataInputStream(FileSystem fs, StoragePath filePath,
226
int bufferSize, boolean wrapStream);
227
228
/**
229
* Get file size from FileSystem
230
* @param fs - Hadoop FileSystem
231
* @param path - Hadoop Path to check
232
* @return File size in bytes
233
*/
234
public static long getFileSize(FileSystem fs, Path path);
235
236
/**
237
* Get relative partition path
238
* @param basePath - Base path of the table
239
* @param fullPartitionPath - Full path to the partition
240
* @return Relative partition path string
241
*/
242
public static String getRelativePartitionPath(Path basePath, Path fullPartitionPath);
243
244
/**
245
* Register FileSystem for the given file
246
* @param file - Storage path for file
247
* @param conf - Hadoop configuration
248
* @return Updated configuration with registered FileSystem
249
*/
250
public static Configuration registerFileSystem(StoragePath file, Configuration conf);
251
```
252
253
### File System Type Detection
254
255
Methods for detecting specific FileSystem implementations.
256
257
```java { .api }
258
/**
259
* Check if FileSystem is Google Cloud Storage
260
* @param fs - FileSystem to check
261
* @return true if GCS FileSystem
262
*/
263
public static boolean isGCSFileSystem(FileSystem fs);
264
265
/**
266
* Check if FileSystem is Cloudera CHD FileSystem
267
* @param fs - FileSystem to check
268
* @return true if CHD FileSystem
269
*/
270
public static boolean isCHDFileSystem(FileSystem fs);
271
```
272
273
### File Identification Utilities
274
275
Utilities for extracting information from file paths and identifying file types.
276
277
```java { .api }
278
/**
279
* Extract file ID from log file path
280
* @param path - Hadoop Path to log file
281
* @return File ID string
282
*/
283
public static String getFileIdFromLogPath(Path path);
284
285
/**
286
* Extract delta commit time from log file path
287
* @param path - Hadoop Path to log file
288
* @return Delta commit time string
289
*/
290
public static String getDeltaCommitTimeFromLogPath(Path path);
291
292
/**
293
* Extract file ID from any file path
294
* @param filePath - Hadoop Path to file
295
* @return File ID string
296
*/
297
public static String getFileIdFromFilePath(Path filePath);
298
299
/**
300
* Check if path points to a base file
301
* @param path - Hadoop Path to check
302
* @return true if base file
303
*/
304
public static boolean isBaseFile(Path path);
305
306
/**
307
* Check if path points to a log file
308
* @param logPath - Hadoop Path to check
309
* @return true if log file
310
*/
311
public static boolean isLogFile(Path logPath);
312
313
/**
314
* Check if path points to a data file (base or log)
315
* @param path - Hadoop Path to check
316
* @return true if data file
317
*/
318
public static boolean isDataFile(Path path);
319
320
/**
321
* Get all data files in a partition
322
* @param fs - Hadoop FileSystem
323
* @param partitionPath - Path to partition directory
324
* @return Array of FileStatus for data files
325
*/
326
public static FileStatus[] getAllDataFilesInPartition(FileSystem fs, Path partitionPath);
327
```
328
329
### Distributed FileSystem Operations
330
331
Specialized operations for distributed file systems like HDFS.
332
333
```java { .api }
334
/**
335
* Recover DFS file lease for write operations
336
* @param dfs - DistributedFileSystem instance
337
* @param p - Path to file with lease issues
338
* @return true if lease recovery succeeded
339
*/
340
public static boolean recoverDFSFileLease(DistributedFileSystem dfs, Path p);
341
```
342
343
### HoodieWrapperFileSystem
344
345
FileSystem wrapper providing consistency guarantees through ConsistencyGuard integration.
346
347
```java { .api }
348
/**
349
* Wrapper FileSystem with consistency guarantees
350
* Ensures file operations respect consistency requirements
351
*/
352
public class HoodieWrapperFileSystem extends FileSystem {
353
354
/** Create wrapper with FileSystem and consistency guard */
355
public HoodieWrapperFileSystem(FileSystem fs, ConsistencyGuard consistencyGuard);
356
357
// Extends all FileSystem interface methods with consistency checks
358
// All standard FileSystem operations are available with consistency guarantees
359
}
360
```
361
362
### HoodieRetryWrapperFileSystem
363
364
FileSystem wrapper providing retry capabilities for unreliable file system operations.
365
366
```java { .api }
367
/**
368
* FileSystem wrapper with retry capabilities
369
* Automatically retries failed operations with configurable parameters
370
*/
371
public class HoodieRetryWrapperFileSystem extends FileSystem {
372
373
/** Create retry wrapper with configuration */
374
public HoodieRetryWrapperFileSystem(FileSystem fs, long maxRetryIntervalMs,
375
int maxRetryNumbers, long initialRetryIntervalMs,
376
String retryExceptions);
377
378
// Extends all FileSystem interface methods with retry logic
379
// Operations are automatically retried on transient failures
380
}
381
```
382
383
### HadoopSeekableDataInputStream
384
385
Seekable data input stream implementation for Hadoop FSDataInputStream.
386
387
```java { .api }
388
/**
389
* Seekable data input stream for Hadoop
390
* Provides random access capabilities for file reading
391
*/
392
public class HadoopSeekableDataInputStream implements SeekableDataInputStream {
393
394
/** Create seekable stream from FSDataInputStream */
395
public HadoopSeekableDataInputStream(FSDataInputStream fsDataInputStream);
396
397
/** Read single byte */
398
public int read();
399
400
/** Read bytes into buffer */
401
public int read(byte[] b, int off, int len);
402
403
/** Seek to specific position */
404
public void seek(long pos);
405
406
/** Get current position */
407
public long getPos();
408
409
/** Close the stream */
410
public void close();
411
}
412
```
413
414
### Additional Utility Classes
415
416
Specialized utility classes for file system operations.
417
418
```java { .api }
419
/**
420
* Serializable wrapper for FileStatus
421
* Allows FileStatus to be serialized for distributed operations
422
*/
423
public class HoodieSerializableFileStatus implements Serializable {
424
// Standard FileStatus interface methods with Serializable support
425
public Path getPath();
426
public long getLen();
427
public boolean isDirectory();
428
public long getModificationTime();
429
public FsPermission getPermission();
430
}
431
432
/**
433
* Path implementation with caching capabilities
434
* Optimizes path operations through intelligent caching
435
*/
436
public class CachingPath extends Path {
437
// Extended Path functionality with caching optimizations
438
public CachingPath(String pathString);
439
public CachingPath(Path parent, String child);
440
}
441
```
442
443
**Usage Examples:**
444
445
```java
446
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
447
import org.apache.hadoop.fs.FileSystem;
448
import org.apache.hadoop.fs.Path;
449
import org.apache.hadoop.conf.Configuration;
450
451
// Basic FileSystem operations
452
Configuration conf = new Configuration();
453
conf.set("fs.defaultFS", "hdfs://namenode:8020");
454
455
// Get FileSystem instance
456
FileSystem fs = HadoopFSUtils.getFs("hdfs://namenode:8020/data", conf);
457
458
// Path conversions
459
StoragePath storagePath = new StoragePath("hdfs://namenode:8020/data/table");
460
Path hadoopPath = HadoopFSUtils.convertToHadoopPath(storagePath);
461
StoragePath backToStorage = HadoopFSUtils.convertToStoragePath(hadoopPath);
462
463
// File identification
464
Path logFile = new Path("/data/table/.hoodie/20231201120000.deltacommit.log");
465
boolean isLog = HadoopFSUtils.isLogFile(logFile);
466
String fileId = HadoopFSUtils.getFileIdFromLogPath(logFile);
467
String commitTime = HadoopFSUtils.getDeltaCommitTimeFromLogPath(logFile);
468
469
// Get all data files in partition
470
Path partitionPath = new Path("/data/table/year=2023/month=12");
471
FileStatus[] dataFiles = HadoopFSUtils.getAllDataFilesInPartition(fs, partitionPath);
472
473
// Using wrapper FileSystem with consistency
474
ConsistencyGuard guard = new OptimisticConsistencyGuard(fs, conf);
475
HoodieWrapperFileSystem wrapperFs = new HoodieWrapperFileSystem(fs, guard);
476
477
// Using retry wrapper
478
HoodieRetryWrapperFileSystem retryFs = new HoodieRetryWrapperFileSystem(
479
fs,
480
5000L, // maxRetryIntervalMs
481
3, // maxRetryNumbers
482
1000L, // initialRetryIntervalMs
483
"java.io.IOException" // retryExceptions
484
);
485
486
// Seekable stream operations
487
Path dataFile = new Path("/data/table/file.parquet");
488
FSDataInputStream fsInput = fs.open(dataFile);
489
HadoopSeekableDataInputStream seekableInput = new HadoopSeekableDataInputStream(fsInput);
490
491
// Random access reading
492
seekableInput.seek(1024); // Seek to position 1024
493
int data = seekableInput.read();
494
long currentPos = seekableInput.getPos();
495
seekableInput.close();
496
```