0
# Core FileSystem Operations
1
2
The HadoopFileSystem class provides comprehensive file system operations by wrapping Hadoop's FileSystem implementations with Flink's FileSystem interface. It supports all standard file operations including reading, writing, directory management, and metadata access across all Hadoop-compatible file systems.
3
4
## Capabilities
5
6
### HadoopFileSystem Class
7
8
Main file system implementation that wraps Hadoop FileSystem with Flink's interface.
9
10
```java { .api }
11
/**
12
* A FileSystem that wraps a Hadoop File System.
13
* Provides Flink's file system interface over Hadoop's file system abstraction.
14
*/
15
public class HadoopFileSystem extends FileSystem {
16
/**
17
* Wraps the given Hadoop File System object as a Flink File System.
18
* The Hadoop file system object is expected to be initialized already.
19
* @param hadoopFileSystem The Hadoop FileSystem to wrap
20
*/
21
public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem);
22
23
/**
24
* Gets the underlying Hadoop FileSystem.
25
* @return The underlying Hadoop FileSystem
26
*/
27
public org.apache.hadoop.fs.FileSystem getHadoopFileSystem();
28
29
/**
30
* Gets the working directory.
31
* @return Path to the working directory
32
*/
33
public Path getWorkingDirectory();
34
35
/**
36
* Gets the home directory.
37
* @return Path to the home directory
38
*/
39
public Path getHomeDirectory();
40
41
/**
42
* Gets the URI of this file system.
43
* @return URI of the file system
44
*/
45
public URI getUri();
46
47
/**
48
* Returns true since Hadoop file systems are distributed.
49
* @return always true
50
*/
51
public boolean isDistributedFS();
52
53
/**
54
* Gets the default block size for this file system.
55
* @return default block size in bytes
56
*/
57
public long getDefaultBlockSize();
58
}
59
```
60
61
### File Status Operations
62
63
Methods for retrieving file and directory metadata information.
64
65
```java { .api }
66
/**
67
* Gets the file status for the specified path.
68
* @param f path to get status for
69
* @return FileStatus containing metadata
70
* @throws IOException if the path doesn't exist or operation fails
71
*/
72
public FileStatus getFileStatus(Path f) throws IOException;
73
74
/**
75
* Checks if a path exists.
76
* @param f path to check
77
* @return true if path exists, false otherwise
78
* @throws IOException if operation fails
79
*/
80
public boolean exists(Path f) throws IOException;
81
82
/**
83
* Lists the status of files/directories in a directory.
84
* @param f directory path to list
85
* @return array of FileStatus objects for directory contents
86
* @throws IOException if path is not a directory or operation fails
87
*/
88
public FileStatus[] listStatus(Path f) throws IOException;
89
90
/**
91
* Gets file block locations for the specified file and range.
92
* @param file file status object
93
* @param start starting byte position
94
* @param len number of bytes
95
* @return array of BlockLocation objects
96
* @throws IOException if operation fails
97
*/
98
public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException;
99
```
100
101
**Usage Examples:**
102
103
```java
104
import org.apache.flink.core.fs.Path;
105
import org.apache.flink.core.fs.FileStatus;
106
import org.apache.flink.core.fs.BlockLocation;
107
108
// Get file status
109
Path filePath = new Path("hdfs://namenode:9000/data/input.txt");
110
FileStatus status = fs.getFileStatus(filePath);
111
112
System.out.println("File length: " + status.getLen());
113
System.out.println("Block size: " + status.getBlockSize());
114
System.out.println("Modification time: " + status.getModificationTime());
115
System.out.println("Is directory: " + status.isDir());
116
117
// Check if file exists
118
boolean exists = fs.exists(filePath);
119
if (exists) {
120
System.out.println("File exists");
121
}
122
123
// List directory contents
124
Path dirPath = new Path("hdfs://namenode:9000/data/");
125
FileStatus[] files = fs.listStatus(dirPath);
126
for (FileStatus file : files) {
127
System.out.println(file.getPath() + " - " + file.getLen() + " bytes");
128
}
129
130
// Get block locations for data locality
131
BlockLocation[] blocks = fs.getFileBlockLocations(status, 0, status.getLen());
132
for (BlockLocation block : blocks) {
133
System.out.println("Block at offset " + block.getOffset() +
134
" on hosts: " + String.join(",", block.getHosts()));
135
}
136
```
137
138
### File Reading Operations
139
140
Methods for opening and reading files with various options.
141
142
```java { .api }
143
/**
144
* Opens a file for reading.
145
* @param f path to the file
146
* @return HadoopDataInputStream for reading
147
* @throws IOException if file cannot be opened
148
*/
149
public HadoopDataInputStream open(Path f) throws IOException;
150
151
/**
152
* Opens a file for reading with specified buffer size.
153
* @param f path to the file
154
* @param bufferSize buffer size for reading
155
* @return HadoopDataInputStream for reading
156
* @throws IOException if file cannot be opened
157
*/
158
public HadoopDataInputStream open(Path f, int bufferSize) throws IOException;
159
```
160
161
**Usage Examples:**
162
163
```java
164
import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;
165
166
// Open file for reading
167
Path inputPath = new Path("hdfs://namenode:9000/data/input.txt");
168
HadoopDataInputStream inputStream = fs.open(inputPath);
169
170
// Read data
171
byte[] buffer = new byte[1024];
172
int bytesRead = inputStream.read(buffer);
173
while (bytesRead != -1) {
174
// Process buffer
175
System.out.write(buffer, 0, bytesRead);
176
bytesRead = inputStream.read(buffer);
177
}
178
inputStream.close();
179
180
// Open with custom buffer size
181
HadoopDataInputStream bufferedStream = fs.open(inputPath, 64 * 1024); // 64KB buffer
182
// ... read operations
183
bufferedStream.close();
184
185
// Random access reading
186
inputStream = fs.open(inputPath);
187
inputStream.seek(1000); // Seek to position 1000
188
int byteAtPosition = inputStream.read();
189
inputStream.close();
190
```
191
192
### File Writing Operations
193
194
Methods for creating and writing files with various configuration options.
195
196
```java { .api }
197
/**
198
* Creates a file with write mode specification.
199
* @param f path to create
200
* @param overwrite write mode (OVERWRITE or NO_OVERWRITE)
201
* @return HadoopDataOutputStream for writing
202
* @throws IOException if file cannot be created
203
*/
204
public HadoopDataOutputStream create(Path f, WriteMode overwrite) throws IOException;
205
206
/**
207
* Creates a file with detailed HDFS parameters.
208
* @param f path to create
209
* @param overwrite whether to overwrite existing file
210
* @param bufferSize buffer size for writing
211
* @param replication replication factor
212
* @param blockSize block size in bytes
213
* @return HadoopDataOutputStream for writing
214
* @throws IOException if file cannot be created
215
*/
216
public HadoopDataOutputStream create(Path f, boolean overwrite, int bufferSize,
217
short replication, long blockSize) throws IOException;
218
```
219
220
**Usage Examples:**
221
222
```java
223
import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;
224
import org.apache.flink.core.fs.FileSystem.WriteMode;
225
226
// Create file with simple write mode
227
Path outputPath = new Path("hdfs://namenode:9000/data/output.txt");
228
HadoopDataOutputStream outputStream = fs.create(outputPath, WriteMode.OVERWRITE);
229
230
// Write data
231
String data = "Hello, Hadoop FileSystem!";
232
outputStream.write(data.getBytes());
233
outputStream.flush();
234
outputStream.close();
235
236
// Create file with detailed HDFS parameters
237
HadoopDataOutputStream hdfsStream = fs.create(
238
outputPath,
239
true, // overwrite
240
32 * 1024, // 32KB buffer
241
(short) 3, // replication factor
242
128 * 1024 * 1024 // 128MB block size
243
);
244
245
// Write with positioning
246
hdfsStream.write("First part".getBytes());
247
long position = hdfsStream.getPos();
248
hdfsStream.write("Second part".getBytes());
249
hdfsStream.sync(); // Force sync to storage
250
hdfsStream.close();
251
```
252
253
### Directory Operations
254
255
Methods for directory management and manipulation.
256
257
```java { .api }
258
/**
259
* Creates directories for the specified path.
260
* @param f path to create directories for
261
* @return true if directories were created or already exist
262
* @throws IOException if operation fails
263
*/
264
public boolean mkdirs(Path f) throws IOException;
265
266
/**
267
* Deletes a file or directory.
268
* @param f path to delete
269
* @param recursive if true, delete directory recursively
270
* @return true if deletion was successful
271
* @throws IOException if operation fails
272
*/
273
public boolean delete(Path f, boolean recursive) throws IOException;
274
275
/**
276
* Renames a file or directory.
277
* @param src source path
278
* @param dst destination path
279
* @return true if rename was successful
280
* @throws IOException if operation fails
281
*/
282
public boolean rename(Path src, Path dst) throws IOException;
283
```
284
285
**Usage Examples:**
286
287
```java
288
// Create directories
289
Path dirPath = new Path("hdfs://namenode:9000/data/processed/");
290
boolean created = fs.mkdirs(dirPath);
291
if (created) {
292
System.out.println("Directories created successfully");
293
}
294
295
// Delete file
296
Path fileToDelete = new Path("hdfs://namenode:9000/data/temp.txt");
297
boolean deleted = fs.delete(fileToDelete, false);
298
299
// Delete directory recursively
300
Path dirToDelete = new Path("hdfs://namenode:9000/data/temp/");
301
boolean deletedRecursive = fs.delete(dirToDelete, true);
302
303
// Rename file
304
Path oldPath = new Path("hdfs://namenode:9000/data/old_name.txt");
305
Path newPath = new Path("hdfs://namenode:9000/data/new_name.txt");
306
boolean renamed = fs.rename(oldPath, newPath);
307
```
308
309
### Recoverable Writer Creation
310
311
Methods for creating fault-tolerant writers that support exactly-once processing guarantees.
312
313
```java { .api }
314
/**
315
* Creates a recoverable writer for fault-tolerant writing.
316
* @return RecoverableWriter instance
317
* @throws IOException if writer creation fails due to unsupported file system
318
*/
319
public RecoverableWriter createRecoverableWriter() throws IOException;
320
321
/**
322
* Creates a recoverable writer with configuration options.
323
* @param conf configuration map with writer options
324
* @return RecoverableWriter instance
325
* @throws IOException if writer creation fails
326
*/
327
public RecoverableWriter createRecoverableWriter(Map<String, String> conf) throws IOException;
328
```
329
330
**Usage Examples:**
331
332
```java
333
import org.apache.flink.core.fs.RecoverableWriter;
334
import java.util.HashMap;
335
import java.util.Map;
336
337
// Create recoverable writer with default settings
338
RecoverableWriter writer = fs.createRecoverableWriter();
339
340
// Create recoverable writer with configuration
341
Map<String, String> config = new HashMap<>();
342
config.put("fs.hdfs.no-local-write", "true");
343
RecoverableWriter configuredWriter = fs.createRecoverableWriter(config);
344
345
// Use the writer for fault-tolerant streaming
346
Path outputPath = new Path("hdfs://namenode:9000/output/data.txt");
347
RecoverableFsDataOutputStream stream = writer.open(outputPath);
348
// ... write data with recovery capabilities
349
```
350
351
### Utility Methods
352
353
Additional utility methods for path conversion and file system identification.
354
355
```java { .api }
356
/**
357
* Converts Flink Path to Hadoop Path.
358
* @param path Flink Path object
359
* @return Hadoop Path object
360
*/
361
public static org.apache.hadoop.fs.Path toHadoopPath(Path path);
362
363
/**
364
* Gets the file system kind based on scheme.
365
* @param scheme URI scheme
366
* @return FileSystemKind indicating the type of file system
367
*/
368
static FileSystemKind getKindForScheme(String scheme);
369
```
370
371
## File Status Types
372
373
```java { .api }
374
/**
375
* FileStatus implementation for Hadoop file systems.
376
*/
377
public class HadoopFileStatus implements FileStatus {
378
public HadoopFileStatus(org.apache.hadoop.fs.FileStatus fileStatus);
379
380
public long getLen();
381
public long getBlockSize();
382
public long getAccessTime();
383
public long getModificationTime();
384
public short getReplication();
385
public Path getPath();
386
public boolean isDir();
387
public org.apache.hadoop.fs.FileStatus getInternalFileStatus();
388
389
public static HadoopFileStatus fromHadoopStatus(org.apache.hadoop.fs.FileStatus fileStatus);
390
}
391
392
/**
393
* FileStatus with block location information.
394
*/
395
public class LocatedHadoopFileStatus extends HadoopFileStatus implements LocatedFileStatus {
396
public LocatedHadoopFileStatus(org.apache.hadoop.fs.LocatedFileStatus fileStatus);
397
public BlockLocation[] getBlockLocations();
398
}
399
400
/**
401
* Block location implementation for Hadoop file systems.
402
*/
403
public class HadoopBlockLocation implements BlockLocation {
404
public HadoopBlockLocation(org.apache.hadoop.fs.BlockLocation blockLocation);
405
406
public String[] getHosts() throws IOException;
407
public long getLength();
408
public long getOffset();
409
public int compareTo(BlockLocation o);
410
}
411
```
412
413
## Error Handling
414
415
Common exceptions and error scenarios:
416
417
```java
418
try {
419
FileStatus status = fs.getFileStatus(nonExistentPath);
420
} catch (FileNotFoundException e) {
421
System.err.println("File not found: " + e.getMessage());
422
} catch (IOException e) {
423
System.err.println("I/O error: " + e.getMessage());
424
}
425
426
try {
427
fs.delete(readOnlyFile, false);
428
} catch (AccessControlException e) {
429
System.err.println("Permission denied: " + e.getMessage());
430
}
431
```