0
# Distributed File Operations
1
2
Distributed file copying utility similar to Hadoop DistCp, featuring custom input formats, parallel file processing, and comprehensive file system operations.
3
4
## Capabilities
5
6
### DistCp Main Class
7
8
Distributed file copy utility that copies files from source to destination path in parallel using Flink's distributed processing capabilities.
9
10
```java { .api }
11
/**
12
* Distributed file copy utility similar to Hadoop DistCp.
13
* Usage: DistCp --input <path> --output <path> [--parallelism <n>]
14
*
15
* Note: In distributed environments, HDFS paths must be provided for both input and output.
16
* Local file system paths can be used when running locally.
17
*/
18
public class DistCp {
19
public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
20
public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
21
22
public static void main(String[] args) throws Exception;
23
}
24
```
25
26
**Usage Examples:**
27
28
```java
29
// Copy files with default parallelism
30
String[] args = {
31
"--input", "/source/directory",
32
"--output", "/destination/directory"
33
};
34
DistCp.main(args);
35
36
// Copy files with custom parallelism
37
String[] args = {
38
"--input", "hdfs://source/path",
39
"--output", "hdfs://destination/path",
40
"--parallelism", "20"
41
};
42
DistCp.main(args);
43
44
// Access copy statistics after execution
45
JobExecutionResult result = env.getLastJobExecutionResult();
46
Map<String, Object> accumulators = result.getAllAccumulatorResults();
47
Long bytesCopied = (Long) accumulators.get(DistCp.BYTES_COPIED_CNT_NAME);
48
Long filesCopied = (Long) accumulators.get(DistCp.FILES_COPIED_CNT_NAME);
49
```
50
51
### File Copy Task
52
53
Data structure representing a single file copy operation with source path and relative destination path.
54
55
```java { .api }
56
/**
57
* Represents a single file copy task with source and destination information
58
*/
59
public class FileCopyTask {
60
/**
61
* Creates file copy task
62
* @param path Source file path
63
* @param relativePath Relative path for destination
64
*/
65
public FileCopyTask(Path path, String relativePath);
66
67
/**
68
* Get source file path
69
* @return Source Path object
70
*/
71
public Path getPath();
72
73
/**
74
* Get relative destination path
75
* @return Relative path string
76
*/
77
public String getRelativePath();
78
79
@Override
80
public String toString();
81
}
82
```
83
84
**Usage Examples:**
85
86
```java
87
import org.apache.flink.core.fs.Path;
88
import org.apache.flink.examples.java.distcp.FileCopyTask;
89
90
// Create file copy task
91
Path sourcePath = new Path("/source/data/file1.txt");
92
String relativePath = "data/file1.txt";
93
FileCopyTask task = new FileCopyTask(sourcePath, relativePath);
94
95
// Access task properties
96
Path source = task.getPath();
97
String destination = task.getRelativePath();
98
System.out.println("Copy: " + source + " -> " + destination);
99
100
// Use in file discovery
101
List<FileCopyTask> tasks = new ArrayList<>();
102
// Recursively discover files and create tasks
103
tasks.add(new FileCopyTask(filePath, "subdir/filename.ext"));
104
```
105
106
### File Copy Input Split
107
108
Input split implementation for distributing file copy tasks across parallel workers.
109
110
```java { .api }
111
/**
112
* Input split containing a file copy task for parallel processing
113
*/
114
public class FileCopyTaskInputSplit implements InputSplit {
115
/**
116
* Creates input split with copy task
117
* @param task File copy task to process
118
* @param splitNumber Split number for identification
119
*/
120
public FileCopyTaskInputSplit(FileCopyTask task, int splitNumber);
121
122
/**
123
* Get file copy task
124
* @return FileCopyTask to be processed
125
*/
126
public FileCopyTask getTask();
127
128
/**
129
* Get split number
130
* @return Split identification number
131
*/
132
@Override
133
public int getSplitNumber();
134
}
135
```
136
137
**Usage Examples:**
138
139
```java
140
// Create input split for parallel processing
141
FileCopyTask task = new FileCopyTask(sourcePath, relativePath);
142
FileCopyTaskInputSplit split = new FileCopyTaskInputSplit(task, 0);
143
144
// Access split properties
145
FileCopyTask copyTask = split.getTask();
146
int splitId = split.getSplitNumber();
147
148
// Use in input format implementation
149
List<FileCopyTaskInputSplit> splits = new ArrayList<>();
150
for (int i = 0; i < tasks.size(); i++) {
151
splits.add(new FileCopyTaskInputSplit(tasks.get(i), i));
152
}
153
```
154
155
### File Copy Input Format
156
157
Custom input format for reading file copy tasks and distributing them across parallel workers.
158
159
```java { .api }
160
/**
161
* Input format for distributed file copy operations.
162
* Distributes file copy tasks across parallel workers.
163
*/
164
public class FileCopyTaskInputFormat extends RichInputFormat<FileCopyTask, FileCopyTaskInputSplit> {
165
/**
166
* Creates input format with list of copy tasks
167
* @param tasks List of file copy tasks to distribute
168
*/
169
public FileCopyTaskInputFormat(List<FileCopyTask> tasks);
170
171
/**
172
* Configure input format
173
* @param parameters Configuration parameters
174
*/
175
@Override
176
public void configure(Configuration parameters);
177
178
/**
179
* Create input splits for parallel processing
180
* @param minNumSplits Minimum number of splits requested
181
* @return Array of input splits
182
*/
183
@Override
184
public FileCopyTaskInputSplit[] createInputSplits(int minNumSplits) throws IOException;
185
186
/**
187
* Get input split type information
188
* @return BaseStatistics for the input
189
*/
190
@Override
191
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
192
193
/**
194
* Open input split for reading
195
* @param split Input split to open
196
*/
197
@Override
198
public void open(FileCopyTaskInputSplit split) throws IOException;
199
200
/**
201
* Check if more records available
202
* @return true if more records available, false otherwise
203
*/
204
@Override
205
public boolean reachedEnd() throws IOException;
206
207
/**
208
* Read next record
209
* @param reuse Reusable object for record
210
* @return Next FileCopyTask or null if end reached
211
*/
212
@Override
213
public FileCopyTask nextRecord(FileCopyTask reuse) throws IOException;
214
215
/**
216
* Close input format
217
*/
218
@Override
219
public void close() throws IOException;
220
}
221
```
222
223
**Usage Examples:**
224
225
```java
226
// Create input format with tasks
227
List<FileCopyTask> copyTasks = getCopyTasks(sourcePath);
228
FileCopyTaskInputFormat inputFormat = new FileCopyTaskInputFormat(copyTasks);
229
230
// Use with DataSource
231
DataSet<FileCopyTask> inputTasks = new DataSource<>(
232
env,
233
inputFormat,
234
new GenericTypeInfo<>(FileCopyTask.class),
235
"fileCopyTasks"
236
);
237
238
// Process tasks with custom function
239
DataSet<Object> results = inputTasks.flatMap(new FileCopyProcessor());
240
```
241
242
## File Processing Pattern
243
244
### Copy Operation Implementation
245
246
The DistCp utility implements file copying using the following pattern:
247
248
```java
249
// Create copy tasks from source directory
250
List<FileCopyTask> tasks = getCopyTasks(sourcePath);
251
252
// Create DataSource with custom input format
253
DataSet<FileCopyTask> inputTasks = new DataSource<>(
254
env,
255
new FileCopyTaskInputFormat(tasks),
256
new GenericTypeInfo<>(FileCopyTask.class),
257
"fileCopyTasks"
258
);
259
260
// Process each task with rich flat map function
261
FlatMapOperator<FileCopyTask, Object> results = inputTasks.flatMap(
262
new RichFlatMapFunction<FileCopyTask, Object>() {
263
private LongCounter fileCounter;
264
private LongCounter bytesCounter;
265
266
@Override
267
public void open(Configuration parameters) throws Exception {
268
bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);
269
fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);
270
}
271
272
@Override
273
public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception {
274
// Perform actual file copy operation
275
copyFile(task);
276
}
277
}
278
);
279
```
280
281
### File Discovery Algorithm
282
283
Recursive file discovery builds the list of copy tasks:
284
285
```java
286
private static List<FileCopyTask> getCopyTasks(Path sourcePath) throws IOException {
287
List<FileCopyTask> tasks = new ArrayList<>();
288
getCopyTasks(sourcePath, "", tasks);
289
return tasks;
290
}
291
292
private static void getCopyTasks(Path p, String rel, List<FileCopyTask> tasks) throws IOException {
293
FileStatus[] fileStatuses = p.getFileSystem().listStatus(p);
294
if (fileStatuses == null) {
295
return;
296
}
297
298
for (FileStatus fs : fileStatuses) {
299
if (fs.isDir()) {
300
// Recursively process directories
301
getCopyTasks(fs.getPath(), rel + fs.getPath().getName() + "/", tasks);
302
} else {
303
// Add file to copy tasks
304
Path filePath = fs.getPath();
305
tasks.add(new FileCopyTask(filePath, rel + filePath.getName()));
306
}
307
}
308
}
309
```
310
311
### File System Compatibility
312
313
DistCp handles both local and distributed file systems:
314
315
```java
316
// Check execution environment
317
private static boolean isLocal(final ExecutionEnvironment env) {
318
return env instanceof LocalEnvironment;
319
}
320
321
// Check file system type
322
private static boolean isOnDistributedFS(final Path path) throws IOException {
323
return path.getFileSystem().isDistributedFS();
324
}
325
326
// Validate paths for distributed execution
327
if (!isLocal(env) && !(isOnDistributedFS(sourcePath) && isOnDistributedFS(targetPath))) {
328
System.out.println("In a distributed mode only HDFS input/output paths are supported");
329
return;
330
}
331
```
332
333
### Accumulator-based Metrics
334
335
DistCp uses accumulators to track copy progress:
336
337
```java
338
// Initialize counters in open() method
339
private LongCounter fileCounter;
340
private LongCounter bytesCounter;
341
342
@Override
343
public void open(Configuration parameters) throws Exception {
344
bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);
345
fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);
346
}
347
348
// Update counters during copy operation
349
int bytes = IOUtils.copy(inputStream, outputStream);
350
bytesCounter.add(bytes);
351
fileCounter.add(1L);
352
353
// Access results after execution
354
Map<String, Object> accumulators = env.getLastJobExecutionResult().getAllAccumulatorResults();
355
System.out.println("Files copied: " + accumulators.get(FILES_COPIED_CNT_NAME));
356
System.out.println("Bytes copied: " + accumulators.get(BYTES_COPIED_CNT_NAME));
357
```
358
359
## Usage Considerations
360
361
### Environment Requirements
362
363
- **Local Execution**: Both local file system paths and HDFS paths supported
364
- **Distributed Execution**: Only HDFS paths supported for source and destination
365
- **Parallelism**: Configurable parallel worker count (default: 10)
366
- **Directory Handling**: Creates parent directories automatically for local file systems
367
368
### Parameter Requirements
369
370
```java
371
// Required parameters
372
--input <path> // Source directory or file path
373
--output <path> // Destination directory path
374
375
// Optional parameters
376
--parallelism <n> // Number of parallel workers (default: 10)
377
```
378
379
### Limitations
380
381
- Empty directories are not copied
382
- No retry mechanism for failed copies
383
- Overwrites existing files at destination
384
- Requires HDFS paths in distributed environments
385
386
## Types
387
388
### Core File System Types
389
390
```java { .api }
391
// Flink file system types
392
import org.apache.flink.core.fs.Path;
393
import org.apache.flink.core.fs.FileSystem;
394
import org.apache.flink.core.fs.FileStatus;
395
import org.apache.flink.core.fs.FSDataInputStream;
396
import org.apache.flink.core.fs.FSDataOutputStream;
397
398
// File copy task
399
FileCopyTask task = new FileCopyTask(sourcePath, relativePath);
400
401
// Input format and splits
402
FileCopyTaskInputFormat inputFormat = new FileCopyTaskInputFormat(tasks);
403
FileCopyTaskInputSplit split = new FileCopyTaskInputSplit(task, splitNumber);
404
405
// Accumulator types for metrics
406
LongCounter bytesCounter;
407
LongCounter filesCounter;
408
```