or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

clustering.mddistributed-copy.mdgraph-processing.mdindex.mdmisc-examples.mdrelational-processing.mdword-count.md
tile.json

distributed-copy.mddocs/

Distributed File Operations

Distributed file copying utility similar to Hadoop DistCp, featuring custom input formats, parallel file processing, and comprehensive file system operations.

Capabilities

DistCp Main Class

Distributed file copy utility that copies files from source to destination path in parallel using Flink's distributed processing capabilities.

/**
 * Distributed file copy utility similar to Hadoop DistCp.
 * Usage: DistCp --input <path> --output <path> [--parallelism <n>]
 * 
 * Note: In distributed environments, HDFS paths must be provided for both input and output.
 * Local file system paths can be used when running locally.
 */
public class DistCp {
    public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";
    public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";
    
    public static void main(String[] args) throws Exception;
}

Usage Examples:

// Copy files with default parallelism
String[] args = {
    "--input", "/source/directory",
    "--output", "/destination/directory"
};
DistCp.main(args);

// Copy files with custom parallelism
String[] args = {
    "--input", "hdfs://source/path",
    "--output", "hdfs://destination/path", 
    "--parallelism", "20"
};
DistCp.main(args);

// Access copy statistics after execution
JobExecutionResult result = env.getLastJobExecutionResult();
Map<String, Object> accumulators = result.getAllAccumulatorResults();
Long bytesCopied = (Long) accumulators.get(DistCp.BYTES_COPIED_CNT_NAME);
Long filesCopied = (Long) accumulators.get(DistCp.FILES_COPIED_CNT_NAME);

File Copy Task

Data structure representing a single file copy operation with source path and relative destination path.

/**
 * Represents a single file copy task with source and destination information
 */
public class FileCopyTask {
    /**
     * Creates file copy task
     * @param path Source file path
     * @param relativePath Relative path for destination
     */
    public FileCopyTask(Path path, String relativePath);
    
    /**
     * Get source file path
     * @return Source Path object
     */
    public Path getPath();
    
    /**
     * Get relative destination path
     * @return Relative path string
     */
    public String getRelativePath();
    
    @Override
    public String toString();
}

Usage Examples:

import org.apache.flink.core.fs.Path;
import org.apache.flink.examples.java.distcp.FileCopyTask;

// Create file copy task
Path sourcePath = new Path("/source/data/file1.txt");
String relativePath = "data/file1.txt";
FileCopyTask task = new FileCopyTask(sourcePath, relativePath);

// Access task properties
Path source = task.getPath();
String destination = task.getRelativePath();
System.out.println("Copy: " + source + " -> " + destination);

// Use in file discovery
List<FileCopyTask> tasks = new ArrayList<>();
// Recursively discover files and create tasks
tasks.add(new FileCopyTask(filePath, "subdir/filename.ext"));

File Copy Input Split

Input split implementation for distributing file copy tasks across parallel workers.

/**
 * Input split containing a file copy task for parallel processing
 */
public class FileCopyTaskInputSplit implements InputSplit {
    /**
     * Creates input split with copy task
     * @param task File copy task to process
     * @param splitNumber Split number for identification
     */
    public FileCopyTaskInputSplit(FileCopyTask task, int splitNumber);
    
    /**
     * Get file copy task
     * @return FileCopyTask to be processed
     */
    public FileCopyTask getTask();
    
    /**
     * Get split number
     * @return Split identification number
     */
    @Override
    public int getSplitNumber();
}

Usage Examples:

// Create input split for parallel processing
FileCopyTask task = new FileCopyTask(sourcePath, relativePath);
FileCopyTaskInputSplit split = new FileCopyTaskInputSplit(task, 0);

// Access split properties
FileCopyTask copyTask = split.getTask();
int splitId = split.getSplitNumber();

// Use in input format implementation
List<FileCopyTaskInputSplit> splits = new ArrayList<>();
for (int i = 0; i < tasks.size(); i++) {
    splits.add(new FileCopyTaskInputSplit(tasks.get(i), i));
}

File Copy Input Format

Custom input format for reading file copy tasks and distributing them across parallel workers.

/**
 * Input format for distributed file copy operations.
 * Distributes file copy tasks across parallel workers.
 */
public class FileCopyTaskInputFormat extends RichInputFormat<FileCopyTask, FileCopyTaskInputSplit> {
    /**
     * Creates input format with list of copy tasks
     * @param tasks List of file copy tasks to distribute
     */
    public FileCopyTaskInputFormat(List<FileCopyTask> tasks);
    
    /**
     * Configure input format
     * @param parameters Configuration parameters
     */
    @Override
    public void configure(Configuration parameters);
    
    /**
     * Create input splits for parallel processing
     * @param minNumSplits Minimum number of splits requested
     * @return Array of input splits
     */
    @Override
    public FileCopyTaskInputSplit[] createInputSplits(int minNumSplits) throws IOException;
    
    /**
     * Get input split type information
     * @return BaseStatistics for the input
     */
    @Override
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;
    
    /**
     * Open input split for reading
     * @param split Input split to open
     */
    @Override
    public void open(FileCopyTaskInputSplit split) throws IOException;
    
    /**
     * Check if more records available
     * @return true if more records available, false otherwise
     */
    @Override
    public boolean reachedEnd() throws IOException;
    
    /**
     * Read next record
     * @param reuse Reusable object for record
     * @return Next FileCopyTask or null if end reached
     */
    @Override
    public FileCopyTask nextRecord(FileCopyTask reuse) throws IOException;
    
    /**
     * Close input format
     */
    @Override
    public void close() throws IOException;
}

Usage Examples:

// Create input format with tasks
List<FileCopyTask> copyTasks = getCopyTasks(sourcePath);
FileCopyTaskInputFormat inputFormat = new FileCopyTaskInputFormat(copyTasks);

// Use with DataSource
DataSet<FileCopyTask> inputTasks = new DataSource<>(
    env,
    inputFormat,
    new GenericTypeInfo<>(FileCopyTask.class),
    "fileCopyTasks"
);

// Process tasks with custom function
DataSet<Object> results = inputTasks.flatMap(new FileCopyProcessor());

File Processing Pattern

Copy Operation Implementation

The DistCp utility implements file copying using the following pattern:

// Create copy tasks from source directory
List<FileCopyTask> tasks = getCopyTasks(sourcePath);

// Create DataSource with custom input format
DataSet<FileCopyTask> inputTasks = new DataSource<>(
    env,
    new FileCopyTaskInputFormat(tasks),
    new GenericTypeInfo<>(FileCopyTask.class),
    "fileCopyTasks"
);

// Process each task with rich flat map function
FlatMapOperator<FileCopyTask, Object> results = inputTasks.flatMap(
    new RichFlatMapFunction<FileCopyTask, Object>() {
        private LongCounter fileCounter;
        private LongCounter bytesCounter;
        
        @Override
        public void open(Configuration parameters) throws Exception {
            bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);
            fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);
        }
        
        @Override
        public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception {
            // Perform actual file copy operation
            copyFile(task);
        }
    }
);

File Discovery Algorithm

Recursive file discovery builds the list of copy tasks:

private static List<FileCopyTask> getCopyTasks(Path sourcePath) throws IOException {
    List<FileCopyTask> tasks = new ArrayList<>();
    getCopyTasks(sourcePath, "", tasks);
    return tasks;
}

private static void getCopyTasks(Path p, String rel, List<FileCopyTask> tasks) throws IOException {
    FileStatus[] fileStatuses = p.getFileSystem().listStatus(p);
    if (fileStatuses == null) {
        return;
    }
    
    for (FileStatus fs : fileStatuses) {
        if (fs.isDir()) {
            // Recursively process directories
            getCopyTasks(fs.getPath(), rel + fs.getPath().getName() + "/", tasks);
        } else {
            // Add file to copy tasks
            Path filePath = fs.getPath();
            tasks.add(new FileCopyTask(filePath, rel + filePath.getName()));
        }
    }
}

File System Compatibility

DistCp handles both local and distributed file systems:

// Check execution environment
private static boolean isLocal(final ExecutionEnvironment env) {
    return env instanceof LocalEnvironment;
}

// Check file system type
private static boolean isOnDistributedFS(final Path path) throws IOException {
    return path.getFileSystem().isDistributedFS();
}

// Validate paths for distributed execution
if (!isLocal(env) && !(isOnDistributedFS(sourcePath) && isOnDistributedFS(targetPath))) {
    System.out.println("In a distributed mode only HDFS input/output paths are supported");
    return;
}

Accumulator-based Metrics

DistCp uses accumulators to track copy progress:

// Initialize counters in open() method
private LongCounter fileCounter;
private LongCounter bytesCounter;

@Override
public void open(Configuration parameters) throws Exception {
    bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);
    fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);
}

// Update counters during copy operation
int bytes = IOUtils.copy(inputStream, outputStream);
bytesCounter.add(bytes);
fileCounter.add(1L);

// Access results after execution
Map<String, Object> accumulators = env.getLastJobExecutionResult().getAllAccumulatorResults();
System.out.println("Files copied: " + accumulators.get(FILES_COPIED_CNT_NAME));
System.out.println("Bytes copied: " + accumulators.get(BYTES_COPIED_CNT_NAME));

Usage Considerations

Environment Requirements

  • Local Execution: Both local file system paths and HDFS paths supported
  • Distributed Execution: Only HDFS paths supported for source and destination
  • Parallelism: Configurable parallel worker count (default: 10)
  • Directory Handling: Creates parent directories automatically for local file systems

Parameter Requirements

// Required parameters
--input <path>      // Source directory or file path
--output <path>     // Destination directory path

// Optional parameters  
--parallelism <n>   // Number of parallel workers (default: 10)

Limitations

  • Empty directories are not copied
  • No retry mechanism for failed copies
  • Overwrites existing files at destination
  • Requires HDFS paths in distributed environments

Types

Core File System Types

// Flink file system types
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;

// File copy task
FileCopyTask task = new FileCopyTask(sourcePath, relativePath);

// Input format and splits
FileCopyTaskInputFormat inputFormat = new FileCopyTaskInputFormat(tasks);
FileCopyTaskInputSplit split = new FileCopyTaskInputSplit(task, splitNumber);

// Accumulator types for metrics
LongCounter bytesCounter;
LongCounter filesCounter;