or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

clustering.mddistributed-copy.mdgraph-processing.mdindex.mdmisc-examples.mdrelational-processing.mdword-count.md

distributed-copy.mddocs/

0

# Distributed File Operations

1

2

Distributed file copying utility similar to Hadoop DistCp, featuring custom input formats, parallel file processing, and comprehensive file system operations.

3

4

## Capabilities

5

6

### DistCp Main Class

7

8

Distributed file copy utility that copies files from source to destination path in parallel using Flink's distributed processing capabilities.

9

10

```java { .api }

11

/**

12

* Distributed file copy utility similar to Hadoop DistCp.

13

* Usage: DistCp --input <path> --output <path> [--parallelism <n>]

14

*

15

* Note: In distributed environments, HDFS paths must be provided for both input and output.

16

* Local file system paths can be used when running locally.

17

*/

18

public class DistCp {

19

public static final String BYTES_COPIED_CNT_NAME = "BYTES_COPIED";

20

public static final String FILES_COPIED_CNT_NAME = "FILES_COPIED";

21

22

public static void main(String[] args) throws Exception;

23

}

24

```

25

26

**Usage Examples:**

27

28

```java

29

// Copy files with default parallelism

30

String[] args = {

31

"--input", "/source/directory",

32

"--output", "/destination/directory"

33

};

34

DistCp.main(args);

35

36

// Copy files with custom parallelism

37

String[] args = {

38

"--input", "hdfs://source/path",

39

"--output", "hdfs://destination/path",

40

"--parallelism", "20"

41

};

42

DistCp.main(args);

43

44

// Access copy statistics after execution

45

JobExecutionResult result = env.getLastJobExecutionResult();

46

Map<String, Object> accumulators = result.getAllAccumulatorResults();

47

Long bytesCopied = (Long) accumulators.get(DistCp.BYTES_COPIED_CNT_NAME);

48

Long filesCopied = (Long) accumulators.get(DistCp.FILES_COPIED_CNT_NAME);

49

```

50

51

### File Copy Task

52

53

Data structure representing a single file copy operation with source path and relative destination path.

54

55

```java { .api }

56

/**

57

* Represents a single file copy task with source and destination information

58

*/

59

public class FileCopyTask {

60

/**

61

* Creates file copy task

62

* @param path Source file path

63

* @param relativePath Relative path for destination

64

*/

65

public FileCopyTask(Path path, String relativePath);

66

67

/**

68

* Get source file path

69

* @return Source Path object

70

*/

71

public Path getPath();

72

73

/**

74

* Get relative destination path

75

* @return Relative path string

76

*/

77

public String getRelativePath();

78

79

@Override

80

public String toString();

81

}

82

```

83

84

**Usage Examples:**

85

86

```java

87

import org.apache.flink.core.fs.Path;

88

import org.apache.flink.examples.java.distcp.FileCopyTask;

89

90

// Create file copy task

91

Path sourcePath = new Path("/source/data/file1.txt");

92

String relativePath = "data/file1.txt";

93

FileCopyTask task = new FileCopyTask(sourcePath, relativePath);

94

95

// Access task properties

96

Path source = task.getPath();

97

String destination = task.getRelativePath();

98

System.out.println("Copy: " + source + " -> " + destination);

99

100

// Use in file discovery

101

List<FileCopyTask> tasks = new ArrayList<>();

102

// Recursively discover files and create tasks

103

tasks.add(new FileCopyTask(filePath, "subdir/filename.ext"));

104

```

105

106

### File Copy Input Split

107

108

Input split implementation for distributing file copy tasks across parallel workers.

109

110

```java { .api }

111

/**

112

* Input split containing a file copy task for parallel processing

113

*/

114

public class FileCopyTaskInputSplit implements InputSplit {

115

/**

116

* Creates input split with copy task

117

* @param task File copy task to process

118

* @param splitNumber Split number for identification

119

*/

120

public FileCopyTaskInputSplit(FileCopyTask task, int splitNumber);

121

122

/**

123

* Get file copy task

124

* @return FileCopyTask to be processed

125

*/

126

public FileCopyTask getTask();

127

128

/**

129

* Get split number

130

* @return Split identification number

131

*/

132

@Override

133

public int getSplitNumber();

134

}

135

```

136

137

**Usage Examples:**

138

139

```java

140

// Create input split for parallel processing

141

FileCopyTask task = new FileCopyTask(sourcePath, relativePath);

142

FileCopyTaskInputSplit split = new FileCopyTaskInputSplit(task, 0);

143

144

// Access split properties

145

FileCopyTask copyTask = split.getTask();

146

int splitId = split.getSplitNumber();

147

148

// Use in input format implementation

149

List<FileCopyTaskInputSplit> splits = new ArrayList<>();

150

for (int i = 0; i < tasks.size(); i++) {

151

splits.add(new FileCopyTaskInputSplit(tasks.get(i), i));

152

}

153

```

154

155

### File Copy Input Format

156

157

Custom input format for reading file copy tasks and distributing them across parallel workers.

158

159

```java { .api }

160

/**

161

* Input format for distributed file copy operations.

162

* Distributes file copy tasks across parallel workers.

163

*/

164

public class FileCopyTaskInputFormat extends RichInputFormat<FileCopyTask, FileCopyTaskInputSplit> {

165

/**

166

* Creates input format with list of copy tasks

167

* @param tasks List of file copy tasks to distribute

168

*/

169

public FileCopyTaskInputFormat(List<FileCopyTask> tasks);

170

171

/**

172

* Configure input format

173

* @param parameters Configuration parameters

174

*/

175

@Override

176

public void configure(Configuration parameters);

177

178

/**

179

* Create input splits for parallel processing

180

* @param minNumSplits Minimum number of splits requested

181

* @return Array of input splits

182

*/

183

@Override

184

public FileCopyTaskInputSplit[] createInputSplits(int minNumSplits) throws IOException;

185

186

/**

187

* Get input split type information

188

* @return BaseStatistics for the input

189

*/

190

@Override

191

public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException;

192

193

/**

194

* Open input split for reading

195

* @param split Input split to open

196

*/

197

@Override

198

public void open(FileCopyTaskInputSplit split) throws IOException;

199

200

/**

201

* Check if more records available

202

* @return true if more records available, false otherwise

203

*/

204

@Override

205

public boolean reachedEnd() throws IOException;

206

207

/**

208

* Read next record

209

* @param reuse Reusable object for record

210

* @return Next FileCopyTask or null if end reached

211

*/

212

@Override

213

public FileCopyTask nextRecord(FileCopyTask reuse) throws IOException;

214

215

/**

216

* Close input format

217

*/

218

@Override

219

public void close() throws IOException;

220

}

221

```

222

223

**Usage Examples:**

224

225

```java

226

// Create input format with tasks

227

List<FileCopyTask> copyTasks = getCopyTasks(sourcePath);

228

FileCopyTaskInputFormat inputFormat = new FileCopyTaskInputFormat(copyTasks);

229

230

// Use with DataSource

231

DataSet<FileCopyTask> inputTasks = new DataSource<>(

232

env,

233

inputFormat,

234

new GenericTypeInfo<>(FileCopyTask.class),

235

"fileCopyTasks"

236

);

237

238

// Process tasks with custom function

239

DataSet<Object> results = inputTasks.flatMap(new FileCopyProcessor());

240

```

241

242

## File Processing Pattern

243

244

### Copy Operation Implementation

245

246

The DistCp utility implements file copying using the following pattern:

247

248

```java

249

// Create copy tasks from source directory

250

List<FileCopyTask> tasks = getCopyTasks(sourcePath);

251

252

// Create DataSource with custom input format

253

DataSet<FileCopyTask> inputTasks = new DataSource<>(

254

env,

255

new FileCopyTaskInputFormat(tasks),

256

new GenericTypeInfo<>(FileCopyTask.class),

257

"fileCopyTasks"

258

);

259

260

// Process each task with rich flat map function

261

FlatMapOperator<FileCopyTask, Object> results = inputTasks.flatMap(

262

new RichFlatMapFunction<FileCopyTask, Object>() {

263

private LongCounter fileCounter;

264

private LongCounter bytesCounter;

265

266

@Override

267

public void open(Configuration parameters) throws Exception {

268

bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);

269

fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);

270

}

271

272

@Override

273

public void flatMap(FileCopyTask task, Collector<Object> out) throws Exception {

274

// Perform actual file copy operation

275

copyFile(task);

276

}

277

}

278

);

279

```

280

281

### File Discovery Algorithm

282

283

Recursive file discovery builds the list of copy tasks:

284

285

```java

286

private static List<FileCopyTask> getCopyTasks(Path sourcePath) throws IOException {

287

List<FileCopyTask> tasks = new ArrayList<>();

288

getCopyTasks(sourcePath, "", tasks);

289

return tasks;

290

}

291

292

private static void getCopyTasks(Path p, String rel, List<FileCopyTask> tasks) throws IOException {

293

FileStatus[] fileStatuses = p.getFileSystem().listStatus(p);

294

if (fileStatuses == null) {

295

return;

296

}

297

298

for (FileStatus fs : fileStatuses) {

299

if (fs.isDir()) {

300

// Recursively process directories

301

getCopyTasks(fs.getPath(), rel + fs.getPath().getName() + "/", tasks);

302

} else {

303

// Add file to copy tasks

304

Path filePath = fs.getPath();

305

tasks.add(new FileCopyTask(filePath, rel + filePath.getName()));

306

}

307

}

308

}

309

```

310

311

### File System Compatibility

312

313

DistCp handles both local and distributed file systems:

314

315

```java

316

// Check execution environment

317

private static boolean isLocal(final ExecutionEnvironment env) {

318

return env instanceof LocalEnvironment;

319

}

320

321

// Check file system type

322

private static boolean isOnDistributedFS(final Path path) throws IOException {

323

return path.getFileSystem().isDistributedFS();

324

}

325

326

// Validate paths for distributed execution

327

if (!isLocal(env) && !(isOnDistributedFS(sourcePath) && isOnDistributedFS(targetPath))) {

328

System.out.println("In a distributed mode only HDFS input/output paths are supported");

329

return;

330

}

331

```

332

333

### Accumulator-based Metrics

334

335

DistCp uses accumulators to track copy progress:

336

337

```java

338

// Initialize counters in open() method

339

private LongCounter fileCounter;

340

private LongCounter bytesCounter;

341

342

@Override

343

public void open(Configuration parameters) throws Exception {

344

bytesCounter = getRuntimeContext().getLongCounter(BYTES_COPIED_CNT_NAME);

345

fileCounter = getRuntimeContext().getLongCounter(FILES_COPIED_CNT_NAME);

346

}

347

348

// Update counters during copy operation

349

int bytes = IOUtils.copy(inputStream, outputStream);

350

bytesCounter.add(bytes);

351

fileCounter.add(1L);

352

353

// Access results after execution

354

Map<String, Object> accumulators = env.getLastJobExecutionResult().getAllAccumulatorResults();

355

System.out.println("Files copied: " + accumulators.get(FILES_COPIED_CNT_NAME));

356

System.out.println("Bytes copied: " + accumulators.get(BYTES_COPIED_CNT_NAME));

357

```

358

359

## Usage Considerations

360

361

### Environment Requirements

362

363

- **Local Execution**: Both local file system paths and HDFS paths supported

364

- **Distributed Execution**: Only HDFS paths supported for source and destination

365

- **Parallelism**: Configurable parallel worker count (default: 10)

366

- **Directory Handling**: Creates parent directories automatically for local file systems

367

368

### Parameter Requirements

369

370

```java

371

// Required parameters

372

--input <path> // Source directory or file path

373

--output <path> // Destination directory path

374

375

// Optional parameters

376

--parallelism <n> // Number of parallel workers (default: 10)

377

```

378

379

### Limitations

380

381

- Empty directories are not copied

382

- No retry mechanism for failed copies

383

- Overwrites existing files at destination

384

- Requires HDFS paths in distributed environments

385

386

## Types

387

388

### Core File System Types

389

390

```java { .api }

391

// Flink file system types

392

import org.apache.flink.core.fs.Path;

393

import org.apache.flink.core.fs.FileSystem;

394

import org.apache.flink.core.fs.FileStatus;

395

import org.apache.flink.core.fs.FSDataInputStream;

396

import org.apache.flink.core.fs.FSDataOutputStream;

397

398

// File copy task

399

FileCopyTask task = new FileCopyTask(sourcePath, relativePath);

400

401

// Input format and splits

402

FileCopyTaskInputFormat inputFormat = new FileCopyTaskInputFormat(tasks);

403

FileCopyTaskInputSplit split = new FileCopyTaskInputSplit(task, splitNumber);

404

405

// Accumulator types for metrics

406

LongCounter bytesCounter;

407

LongCounter filesCounter;

408

```