Distributed file copying utility similar to Hadoop DistCp, featuring custom input formats, parallel file processing, and comprehensive file system operations.
Distributed file copy utility that copies files from source to destination path in parallel using Flink's distributed processing capabilities.
/**
* Distributed file copy utility similar to Hadoop DistCp.
* Usage: DistCp --input <path> --output <path> [--parallelism <n>]
*
* Note: In distributed environments, HDFS paths must be provided for both input and output.
* Local file system paths can be used when running locally.
*/
public class DistCp {
public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
public static void main(String[] args) throws Exception;
}Usage Examples:
// Copy files with default parallelism
String[] args = {
"--input", "/source/directory",
"--output", "/destination/directory"
};
DistCp.main(args);
// Copy files with custom parallelism
String[] args = {
"--input", "hdfs://source/path",
"--output", "hdfs://destination/path",
"--parallelism", "20"
};
DistCp.main(args);
// Access copy statistics after execution
JobExecutionResult result = env.getLastJobExecutionResult();
Map<String, Object> accumulators = result.getAllAccumulatorResults();
Long bytesCopied = (Long) accumulators.get(DistCp.BYTES_COPIED_CNT_NAME);
Long filesCopied = (Long) accumulators.get(DistCp.FILES_COPIED_CNT_NAME);Data structure representing a single file copy operation with source path and relative destination path.
/**
* Represents a single file copy task with source and destination information
*/
public class FileCopyTask {
/**
* Creates file copy task
* @param path Source file path
* @param relativePath Relative path for destination
*/
public FileCopyTask(Path path, String relativePath);
/**
* Get source file path
* @return Source Path object
*/
public Path getPath();
/**
* Get relative destination path
* @return Relative path string
*/
public String getRelativePath();
@Override
public String toString();
}Usage Examples:
import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.distcp.FileCopyTask;
// Create file copy task
Path sourcePath = new Path("/source/data/file1.txt");
String relativePath = "data/file1.txt";
FileCopyTask task = new FileCopyTask(sourcePath, relativePath);
// Access task properties
Path source = task.getPath();
String destination = task.getRelativePath();
System.out.println("Copy: " + source + " -> " + destination);
// Use in file discovery
List<FileCopyTask> tasks = new ArrayList<>();
// Recursively discover files and create tasks
tasks.add(new FileCopyTask(filePath, "subdir/filename.ext"));Input split implementation for distributing file copy tasks across parallel workers.
/**
* Input split containing a file copy task for parallel processing
*/
public class FileCopyTaskInputSplit implements InputSplit {
/**
* Creates input split with copy task
* @param task File copy task to process
* @param splitNumber Split number for identification
*/
public FileCopyTaskInputSplit(FileCopyTask task, int splitNumber);
/**
* Get file copy task
* @return FileCopyTask to be processed
*/
public FileCopyTask getTask();
/**
* Get split number
* @return Split identification number
*/
@Override
public int getSplitNumber();
}Usage Examples:
// Create input split for parallel processing
FileCopyTask task = new FileCopyTask(sourcePath, relativePath);
FileCopyTaskInputSplit split = new FileCopyTaskInputSplit(task, 0);
// Access split properties
FileCopyTask copyTask = split.getTask();
int splitId = split.getSplitNumber();
// Use in input format implementation
List<FileCopyTaskInputSplit> splits = new ArrayList<>();
for (int i = 0; i < tasks.size(); i++) {
splits.add(new FileCopyTaskInputSplit(tasks.get(i), i));
}Custom input format for reading file copy tasks and distributing them across parallel workers.
/**
* Input format for distributed file copy operations.
* Distributes file copy tasks across parallel workers.
*/
public class FileCopyTaskInputFormat extends RichInputFormat<FileCopyTask, FileCopyTaskInputSplit> {
/**
* Creates input format with list of copy tasks
* @param tasks List of file copy tasks to distribute
*/
public FileCopyTaskInputFormat(List<FileCopyTask> tasks);
/**
* Configure input format
* @param parameters Configuration parameters
*/
@Override
public void configure(Configuration parameters);
/**
* Create input splits for parallel processing
* @param minNumSplits Minimum number of splits requested
* @return Array of input splits
*/
@Override
public FileCopyTaskInputSplit[] createInputSplits(int minNumSplits) throws IOException;
/**
* Get input split type information
* @return BaseStatistics for the input
*/
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
/**
* Open input split for reading
* @param split Input split to open
*/
@Override
public void open(FileCopyTaskInputSplit split) throws IOException;
/**
* Check if more records available
* @return true if more records available, false otherwise
*/
@Override
public boolean reachedEnd() throws IOException;
/**
* Read next record
* @param reuse Reusable object for record
* @return Next FileCopyTask or null if end reached
*/
@Override
public FileCopyTask nextRecord(FileCopyTask reuse) throws IOException;
/**
* Close input format
*/
@Override
public void close() throws IOException;
}Usage Examples:
// Create input format with tasks
List<FileCopyTask> copyTasks = getCopyTasks(sourcePath);
FileCopyTaskInputFormat inputFormat = new FileCopyTaskInputFormat(copyTasks);
// Use with DataSource
DataSet<FileCopyTask> inputTasks = new DataSource<>(
env,
inputFormat,
new GenericTypeInfo<>(FileCopyTask.class),
"fileCopyTasks"
);
// Process tasks with custom function
DataSet<Object> results = inputTasks.flatMap(new FileCopyProcessor());The DistCp utility implements file copying using the following pattern:
// Create copy tasks from source directory
List<FileCopyTask> tasks = getCopyTasks(sourcePath);
// Create DataSource with custom input format
DataSet<FileCopyTask> inputTasks = new DataSource<>(
env,
new FileCopyTaskInputFormat(tasks),
new GenericTypeInfo<>(FileCopyTask.class),
"fileCopyTasks"
);
// Process each task with rich flat map function
FlatMapOperator<FileCopyTask, Object> results = inputTasks.flatMap(
new RichFlatMapFunction<FileCopyTask, Object>() {
private LongCounter fileCounter;
private LongCounter bytesCounter;
@Override
public void open(Configuration parameters) throws Exception {
bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);
fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);
}
@Override
public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception {
// Perform actual file copy operation
copyFile(task);
}
}
);Recursive file discovery builds the list of copy tasks:
private static List<FileCopyTask> getCopyTasks(Path sourcePath) throws IOException {
List<FileCopyTask> tasks = new ArrayList<>();
getCopyTasks(sourcePath, "", tasks);
return tasks;
}
private static void getCopyTasks(Path p, String rel, List<FileCopyTask> tasks) throws IOException {
FileStatus[] fileStatuses = p.getFileSystem().listStatus(p);
if (fileStatuses == null) {
return;
}
for (FileStatus fs : fileStatuses) {
if (fs.isDir()) {
// Recursively process directories
getCopyTasks(fs.getPath(), rel + fs.getPath().getName() + "/", tasks);
} else {
// Add file to copy tasks
Path filePath = fs.getPath();
tasks.add(new FileCopyTask(filePath, rel + filePath.getName()));
}
}
}DistCp handles both local and distributed file systems:
// Check execution environment
private static boolean isLocal(final ExecutionEnvironment env) {
return env instanceof LocalEnvironment;
}
// Check file system type
private static boolean isOnDistributedFS(final Path path) throws IOException {
return path.getFileSystem().isDistributedFS();
}
// Validate paths for distributed execution
if (!isLocal(env) && !(isOnDistributedFS(sourcePath) && isOnDistributedFS(targetPath))) {
System.out.println("In a distributed mode only HDFS input/output paths are supported");
return;
}DistCp uses accumulators to track copy progress:
// Initialize counters in open() method
private LongCounter fileCounter;
private LongCounter bytesCounter;
@Override
public void open(Configuration parameters) throws Exception {
bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);
fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);
}
// Update counters during copy operation
int bytes = IOUtils.copy(inputStream, outputStream);
bytesCounter.add(bytes);
fileCounter.add(1L);
// Access results after execution
Map<String, Object> accumulators = env.getLastJobExecutionResult().getAllAccumulatorResults();
System.out.println("Files copied: " + accumulators.get(FILES_COPIED_CNT_NAME));
System.out.println("Bytes copied: " + accumulators.get(BYTES_COPIED_CNT_NAME));// Required parameters
--input <path> // Source directory or file path
--output <path> // Destination directory path
// Optional parameters
--parallelism <n> // Number of parallel workers (default: 10)// Flink file system types
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
// File copy task
FileCopyTask task = new FileCopyTask(sourcePath, relativePath);
// Input format and splits
FileCopyTaskInputFormat inputFormat = new FileCopyTaskInputFormat(tasks);
FileCopyTaskInputSplit split = new FileCopyTaskInputSplit(task, splitNumber);
// Accumulator types for metrics
LongCounter bytesCounter;
LongCounter filesCounter;