or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

filesystem-factory.mdfilesystem-operations.mdhadoop-utilities.mdindex.mdio-streams.mdrecoverable-writers.md

filesystem-operations.mddocs/

0

# Core FileSystem Operations

1

2

The HadoopFileSystem class provides comprehensive file system operations by wrapping Hadoop's FileSystem implementations with Flink's FileSystem interface. It supports all standard file operations including reading, writing, directory management, and metadata access across all Hadoop-compatible file systems.

3

4

## Capabilities

5

6

### HadoopFileSystem Class

7

8

Main file system implementation that wraps Hadoop FileSystem with Flink's interface.

9

10

```java { .api }

11

/**

12

* A FileSystem that wraps a Hadoop File System.

13

* Provides Flink's file system interface over Hadoop's file system abstraction.

14

*/

15

public class HadoopFileSystem extends FileSystem {

16

/**

17

* Wraps the given Hadoop File System object as a Flink File System.

18

* The Hadoop file system object is expected to be initialized already.

19

* @param hadoopFileSystem The Hadoop FileSystem to wrap

20

*/

21

public HadoopFileSystem(org.apache.hadoop.fs.FileSystem hadoopFileSystem);

22

23

/**

24

* Gets the underlying Hadoop FileSystem.

25

* @return The underlying Hadoop FileSystem

26

*/

27

public org.apache.hadoop.fs.FileSystem getHadoopFileSystem();

28

29

/**

30

* Gets the working directory.

31

* @return Path to the working directory

32

*/

33

public Path getWorkingDirectory();

34

35

/**

36

* Gets the home directory.

37

* @return Path to the home directory

38

*/

39

public Path getHomeDirectory();

40

41

/**

42

* Gets the URI of this file system.

43

* @return URI of the file system

44

*/

45

public URI getUri();

46

47

/**

48

* Returns true since Hadoop file systems are distributed.

49

* @return always true

50

*/

51

public boolean isDistributedFS();

52

53

/**

54

* Gets the default block size for this file system.

55

* @return default block size in bytes

56

*/

57

public long getDefaultBlockSize();

58

}

59

```

60

61

### File Status Operations

62

63

Methods for retrieving file and directory metadata information.

64

65

```java { .api }

66

/**

67

* Gets the file status for the specified path.

68

* @param f path to get status for

69

* @return FileStatus containing metadata

70

* @throws IOException if the path doesn't exist or operation fails

71

*/

72

public FileStatus getFileStatus(Path f) throws IOException;

73

74

/**

75

* Checks if a path exists.

76

* @param f path to check

77

* @return true if path exists, false otherwise

78

* @throws IOException if operation fails

79

*/

80

public boolean exists(Path f) throws IOException;

81

82

/**

83

* Lists the status of files/directories in a directory.

84

* @param f directory path to list

85

* @return array of FileStatus objects for directory contents

86

* @throws IOException if path is not a directory or operation fails

87

*/

88

public FileStatus[] listStatus(Path f) throws IOException;

89

90

/**

91

* Gets file block locations for the specified file and range.

92

* @param file file status object

93

* @param start starting byte position

94

* @param len number of bytes

95

* @return array of BlockLocation objects

96

* @throws IOException if operation fails

97

*/

98

public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException;

99

```

100

101

**Usage Examples:**

102

103

```java

104

import org.apache.flink.core.fs.Path;

105

import org.apache.flink.core.fs.FileStatus;

106

import org.apache.flink.core.fs.BlockLocation;

107

108

// Get file status

109

Path filePath = new Path("hdfs://namenode:9000/data/input.txt");

110

FileStatus status = fs.getFileStatus(filePath);

111

112

System.out.println("File length: " + status.getLen());

113

System.out.println("Block size: " + status.getBlockSize());

114

System.out.println("Modification time: " + status.getModificationTime());

115

System.out.println("Is directory: " + status.isDir());

116

117

// Check if file exists

118

boolean exists = fs.exists(filePath);

119

if (exists) {

120

System.out.println("File exists");

121

}

122

123

// List directory contents

124

Path dirPath = new Path("hdfs://namenode:9000/data/");

125

FileStatus[] files = fs.listStatus(dirPath);

126

for (FileStatus file : files) {

127

System.out.println(file.getPath() + " - " + file.getLen() + " bytes");

128

}

129

130

// Get block locations for data locality

131

BlockLocation[] blocks = fs.getFileBlockLocations(status, 0, status.getLen());

132

for (BlockLocation block : blocks) {

133

System.out.println("Block at offset " + block.getOffset() +

134

" on hosts: " + String.join(",", block.getHosts()));

135

}

136

```

137

138

### File Reading Operations

139

140

Methods for opening and reading files with various options.

141

142

```java { .api }

143

/**

144

* Opens a file for reading.

145

* @param f path to the file

146

* @return HadoopDataInputStream for reading

147

* @throws IOException if file cannot be opened

148

*/

149

public HadoopDataInputStream open(Path f) throws IOException;

150

151

/**

152

* Opens a file for reading with specified buffer size.

153

* @param f path to the file

154

* @param bufferSize buffer size for reading

155

* @return HadoopDataInputStream for reading

156

* @throws IOException if file cannot be opened

157

*/

158

public HadoopDataInputStream open(Path f, int bufferSize) throws IOException;

159

```

160

161

**Usage Examples:**

162

163

```java

164

import org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream;

165

166

// Open file for reading

167

Path inputPath = new Path("hdfs://namenode:9000/data/input.txt");

168

HadoopDataInputStream inputStream = fs.open(inputPath);

169

170

// Read data

171

byte[] buffer = new byte[1024];

172

int bytesRead = inputStream.read(buffer);

173

while (bytesRead != -1) {

174

// Process buffer

175

System.out.write(buffer, 0, bytesRead);

176

bytesRead = inputStream.read(buffer);

177

}

178

inputStream.close();

179

180

// Open with custom buffer size

181

HadoopDataInputStream bufferedStream = fs.open(inputPath, 64 * 1024); // 64KB buffer

182

// ... read operations

183

bufferedStream.close();

184

185

// Random access reading

186

inputStream = fs.open(inputPath);

187

inputStream.seek(1000); // Seek to position 1000

188

int byteAtPosition = inputStream.read();

189

inputStream.close();

190

```

191

192

### File Writing Operations

193

194

Methods for creating and writing files with various configuration options.

195

196

```java { .api }

197

/**

198

* Creates a file with write mode specification.

199

* @param f path to create

200

* @param overwrite write mode (OVERWRITE or NO_OVERWRITE)

201

* @return HadoopDataOutputStream for writing

202

* @throws IOException if file cannot be created

203

*/

204

public HadoopDataOutputStream create(Path f, WriteMode overwrite) throws IOException;

205

206

/**

207

* Creates a file with detailed HDFS parameters.

208

* @param f path to create

209

* @param overwrite whether to overwrite existing file

210

* @param bufferSize buffer size for writing

211

* @param replication replication factor

212

* @param blockSize block size in bytes

213

* @return HadoopDataOutputStream for writing

214

* @throws IOException if file cannot be created

215

*/

216

public HadoopDataOutputStream create(Path f, boolean overwrite, int bufferSize,

217

short replication, long blockSize) throws IOException;

218

```

219

220

**Usage Examples:**

221

222

```java

223

import org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream;

224

import org.apache.flink.core.fs.FileSystem.WriteMode;

225

226

// Create file with simple write mode

227

Path outputPath = new Path("hdfs://namenode:9000/data/output.txt");

228

HadoopDataOutputStream outputStream = fs.create(outputPath, WriteMode.OVERWRITE);

229

230

// Write data

231

String data = "Hello, Hadoop FileSystem!";

232

outputStream.write(data.getBytes());

233

outputStream.flush();

234

outputStream.close();

235

236

// Create file with detailed HDFS parameters

237

HadoopDataOutputStream hdfsStream = fs.create(

238

outputPath,

239

true, // overwrite

240

32 * 1024, // 32KB buffer

241

(short) 3, // replication factor

242

128 * 1024 * 1024 // 128MB block size

243

);

244

245

// Write with positioning

246

hdfsStream.write("First part".getBytes());

247

long position = hdfsStream.getPos();

248

hdfsStream.write("Second part".getBytes());

249

hdfsStream.sync(); // Force sync to storage

250

hdfsStream.close();

251

```

252

253

### Directory Operations

254

255

Methods for directory management and manipulation.

256

257

```java { .api }

258

/**

259

* Creates directories for the specified path.

260

* @param f path to create directories for

261

* @return true if directories were created or already exist

262

* @throws IOException if operation fails

263

*/

264

public boolean mkdirs(Path f) throws IOException;

265

266

/**

267

* Deletes a file or directory.

268

* @param f path to delete

269

* @param recursive if true, delete directory recursively

270

* @return true if deletion was successful

271

* @throws IOException if operation fails

272

*/

273

public boolean delete(Path f, boolean recursive) throws IOException;

274

275

/**

276

* Renames a file or directory.

277

* @param src source path

278

* @param dst destination path

279

* @return true if rename was successful

280

* @throws IOException if operation fails

281

*/

282

public boolean rename(Path src, Path dst) throws IOException;

283

```

284

285

**Usage Examples:**

286

287

```java

288

// Create directories

289

Path dirPath = new Path("hdfs://namenode:9000/data/processed/");

290

boolean created = fs.mkdirs(dirPath);

291

if (created) {

292

System.out.println("Directories created successfully");

293

}

294

295

// Delete file

296

Path fileToDelete = new Path("hdfs://namenode:9000/data/temp.txt");

297

boolean deleted = fs.delete(fileToDelete, false);

298

299

// Delete directory recursively

300

Path dirToDelete = new Path("hdfs://namenode:9000/data/temp/");

301

boolean deletedRecursive = fs.delete(dirToDelete, true);

302

303

// Rename file

304

Path oldPath = new Path("hdfs://namenode:9000/data/old_name.txt");

305

Path newPath = new Path("hdfs://namenode:9000/data/new_name.txt");

306

boolean renamed = fs.rename(oldPath, newPath);

307

```

308

309

### Recoverable Writer Creation

310

311

Methods for creating fault-tolerant writers that support exactly-once processing guarantees.

312

313

```java { .api }

314

/**

315

* Creates a recoverable writer for fault-tolerant writing.

316

* @return RecoverableWriter instance

317

* @throws IOException if writer creation fails due to unsupported file system

318

*/

319

public RecoverableWriter createRecoverableWriter() throws IOException;

320

321

/**

322

* Creates a recoverable writer with configuration options.

323

* @param conf configuration map with writer options

324

* @return RecoverableWriter instance

325

* @throws IOException if writer creation fails

326

*/

327

public RecoverableWriter createRecoverableWriter(Map<String, String> conf) throws IOException;

328

```

329

330

**Usage Examples:**

331

332

```java

333

import org.apache.flink.core.fs.RecoverableWriter;

334

import java.util.HashMap;

335

import java.util.Map;

336

337

// Create recoverable writer with default settings

338

RecoverableWriter writer = fs.createRecoverableWriter();

339

340

// Create recoverable writer with configuration

341

Map<String, String> config = new HashMap<>();

342

config.put("fs.hdfs.no-local-write", "true");

343

RecoverableWriter configuredWriter = fs.createRecoverableWriter(config);

344

345

// Use the writer for fault-tolerant streaming

346

Path outputPath = new Path("hdfs://namenode:9000/output/data.txt");

347

RecoverableFsDataOutputStream stream = writer.open(outputPath);

348

// ... write data with recovery capabilities

349

```

350

351

### Utility Methods

352

353

Additional utility methods for path conversion and file system identification.

354

355

```java { .api }

356

/**

357

* Converts Flink Path to Hadoop Path.

358

* @param path Flink Path object

359

* @return Hadoop Path object

360

*/

361

public static org.apache.hadoop.fs.Path toHadoopPath(Path path);

362

363

/**

364

* Gets the file system kind based on scheme.

365

* @param scheme URI scheme

366

* @return FileSystemKind indicating the type of file system

367

*/

368

static FileSystemKind getKindForScheme(String scheme);

369

```

370

371

## File Status Types

372

373

```java { .api }

374

/**

375

* FileStatus implementation for Hadoop file systems.

376

*/

377

public class HadoopFileStatus implements FileStatus {

378

public HadoopFileStatus(org.apache.hadoop.fs.FileStatus fileStatus);

379

380

public long getLen();

381

public long getBlockSize();

382

public long getAccessTime();

383

public long getModificationTime();

384

public short getReplication();

385

public Path getPath();

386

public boolean isDir();

387

public org.apache.hadoop.fs.FileStatus getInternalFileStatus();

388

389

public static HadoopFileStatus fromHadoopStatus(org.apache.hadoop.fs.FileStatus fileStatus);

390

}

391

392

/**

393

* FileStatus with block location information.

394

*/

395

public class LocatedHadoopFileStatus extends HadoopFileStatus implements LocatedFileStatus {

396

public LocatedHadoopFileStatus(org.apache.hadoop.fs.LocatedFileStatus fileStatus);

397

public BlockLocation[] getBlockLocations();

398

}

399

400

/**

401

* Block location implementation for Hadoop file systems.

402

*/

403

public class HadoopBlockLocation implements BlockLocation {

404

public HadoopBlockLocation(org.apache.hadoop.fs.BlockLocation blockLocation);

405

406

public String[] getHosts() throws IOException;

407

public long getLength();

408

public long getOffset();

409

public int compareTo(BlockLocation o);

410

}

411

```

412

413

## Error Handling

414

415

Common exceptions and error scenarios:

416

417

```java

418

try {

419

FileStatus status = fs.getFileStatus(nonExistentPath);

420

} catch (FileNotFoundException e) {

421

System.err.println("File not found: " + e.getMessage());

422

} catch (IOException e) {

423

System.err.println("I/O error: " + e.getMessage());

424

}

425

426

try {

427

fs.delete(readOnlyFile, false);

428

} catch (AccessControlException e) {

429

System.err.println("Permission denied: " + e.getMessage());

430

}

431

```