or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md

file-management.mddocs/

0

# File Management

1

2

Temporary file management system for handling downloaded blocks during transfer operations.

3

4

## Capabilities

5

6

### DownloadFile Interface

7

8

Handle for files used when fetching remote data to disk with lifecycle management.

9

10

```java { .api }

11

/**

12

* Handle for files used when fetching remote data to disk

13

* Provides lifecycle management for temporary download files

14

*/

15

public interface DownloadFile {

16

/**

17

* Delete the download file and clean up resources

18

* @return true if file was successfully deleted, false otherwise

19

*/

20

boolean delete();

21

22

/**

23

* Open the file for writing data

24

* @return DownloadFileWritableChannel for writing data to the file

25

* @throws IOException if file cannot be opened for writing

26

*/

27

DownloadFileWritableChannel openForWriting() throws IOException;

28

29

/**

30

* Get the path to the download file

31

* @return String path to the file

32

*/

33

String path();

34

}

35

```

36

37

### DownloadFileManager Interface

38

39

Manager for creating and cleaning up temporary download files.

40

41

```java { .api }

42

/**

43

* Manager for creating and cleaning up temporary download files

44

*/

45

public interface DownloadFileManager {

46

/**

47

* Create a temporary file for downloading data

48

* @param transportConf - Transport configuration for file settings

49

* @return DownloadFile instance for the created temporary file

50

*/

51

DownloadFile createTempFile(TransportConf transportConf);

52

53

/**

54

* Register a temporary file for cleanup when no longer needed

55

* @param file - DownloadFile to register for cleanup

56

* @return true if file was successfully registered, false otherwise

57

*/

58

boolean registerTempFileToClean(DownloadFile file);

59

}

60

```

61

62

### DownloadFileWritableChannel Interface

63

64

Channel for writing fetched data that allows reading only after writer is closed.

65

66

```java { .api }

67

/**

68

* Channel for writing fetched data with read capability after closure

69

*/

70

public interface DownloadFileWritableChannel extends WritableByteChannel {

71

/**

72

* Close the channel and return the written data as a readable buffer

73

* @return ManagedBuffer containing all written data

74

* @throws IOException if error occurs during close or buffer creation

75

*/

76

ManagedBuffer closeAndRead() throws IOException;

77

78

/**

79

* Write data to the channel

80

* @param src - ByteBuffer containing data to write

81

* @return Number of bytes written

82

* @throws IOException if write operation fails

83

*/

84

@Override

85

int write(ByteBuffer src) throws IOException;

86

87

/**

88

* Check if the channel is open for writing

89

* @return true if channel is open, false if closed

90

*/

91

@Override

92

boolean isOpen();

93

94

/**

95

* Close the channel for writing

96

* @throws IOException if close operation fails

97

*/

98

@Override

99

void close() throws IOException;

100

}

101

```

102

103

### SimpleDownloadFile Implementation

104

105

Simple DownloadFile implementation without encryption.

106

107

```java { .api }

108

/**

109

* Simple DownloadFile implementation without encryption

110

*/

111

public class SimpleDownloadFile implements DownloadFile {

112

/**

113

* Create a simple download file

114

* @param file - File instance to wrap

115

* @param transportConf - Transport configuration

116

*/

117

public SimpleDownloadFile(File file, TransportConf transportConf);

118

119

/**

120

* Delete the download file

121

* @return true if file was successfully deleted

122

*/

123

@Override

124

public boolean delete();

125

126

/**

127

* Open the file for writing

128

* @return DownloadFileWritableChannel for writing to the file

129

* @throws IOException if file cannot be opened

130

*/

131

@Override

132

public DownloadFileWritableChannel openForWriting() throws IOException;

133

134

/**

135

* Get the file path

136

* @return String path to the file

137

*/

138

@Override

139

public String path();

140

}

141

```

142

143

**Usage Examples:**

144

145

```java

146

import org.apache.spark.network.shuffle.*;

147

import org.apache.spark.network.util.TransportConf;

148

import java.io.File;

149

import java.io.IOException;

150

import java.nio.ByteBuffer;

151

152

// Example 1: Basic download file usage

153

public class BasicDownloadFileExample {

154

public void demonstrateBasicUsage() throws IOException {

155

TransportConf conf = new TransportConf("shuffle");

156

157

// Create a temporary file for downloading

158

File tempFile = File.createTempFile("shuffle-download-", ".tmp");

159

SimpleDownloadFile downloadFile = new SimpleDownloadFile(tempFile, conf);

160

161

System.out.println("Created download file at: " + downloadFile.path());

162

163

// Open for writing

164

try (DownloadFileWritableChannel channel = downloadFile.openForWriting()) {

165

// Write some data

166

String testData = "This is test shuffle block data";

167

ByteBuffer dataBuffer = ByteBuffer.wrap(testData.getBytes());

168

169

int bytesWritten = channel.write(dataBuffer);

170

System.out.println("Wrote " + bytesWritten + " bytes to download file");

171

172

// Close and read the data back

173

ManagedBuffer readBuffer = channel.closeAndRead();

174

System.out.println("Read back " + readBuffer.size() + " bytes");

175

176

// Process the data

177

try (InputStream dataStream = readBuffer.createInputStream()) {

178

byte[] readData = ByteStreams.toByteArray(dataStream);

179

String readString = new String(readData);

180

System.out.println("Read data: " + readString);

181

} finally {

182

readBuffer.release();

183

}

184

}

185

186

// Clean up

187

boolean deleted = downloadFile.delete();

188

System.out.println("File deleted: " + deleted);

189

}

190

}

191

192

// Example 2: Download file manager implementation

193

public class SimpleDownloadFileManager implements DownloadFileManager {

194

private final Set<DownloadFile> managedFiles = ConcurrentHashMap.newKeySet();

195

private final String tempDirPath;

196

197

public SimpleDownloadFileManager(String tempDirPath) {

198

this.tempDirPath = tempDirPath;

199

}

200

201

@Override

202

public DownloadFile createTempFile(TransportConf transportConf) {

203

try {

204

File tempDir = new File(tempDirPath);

205

if (!tempDir.exists()) {

206

tempDir.mkdirs();

207

}

208

209

File tempFile = File.createTempFile("shuffle-", ".tmp", tempDir);

210

SimpleDownloadFile downloadFile = new SimpleDownloadFile(tempFile, transportConf);

211

212

// Register for cleanup

213

registerTempFileToClean(downloadFile);

214

215

return downloadFile;

216

} catch (IOException e) {

217

throw new RuntimeException("Failed to create temp file", e);

218

}

219

}

220

221

@Override

222

public boolean registerTempFileToClean(DownloadFile file) {

223

return managedFiles.add(file);

224

}

225

226

public void cleanupAllFiles() {

227

int cleanedCount = 0;

228

for (DownloadFile file : managedFiles) {

229

if (file.delete()) {

230

cleanedCount++;

231

}

232

}

233

managedFiles.clear();

234

System.out.println("Cleaned up " + cleanedCount + " temporary files");

235

}

236

}

237

238

// Example 3: Integration with block fetching

239

public class DownloadFileBlockFetchingExample {

240

public void fetchBlocksToFiles() {

241

TransportConf conf = new TransportConf("shuffle");

242

SimpleDownloadFileManager fileManager = new SimpleDownloadFileManager("/tmp/shuffle-downloads");

243

244

BlockFetchingListener fileDownloadListener = new BlockFetchingListener() {

245

@Override

246

public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {

247

System.out.println("Downloaded block " + blockId + " to file, size: " + data.size());

248

249

// Data is already written to file, just release the buffer

250

data.release();

251

}

252

253

@Override

254

public void onBlockFetchFailure(String blockId, Throwable exception) {

255

System.err.println("Failed to download block " + blockId + ": " + exception.getMessage());

256

}

257

};

258

259

// Create transport client

260

TransportClient client = createTransportClient("shuffle-server", 7337);

261

262

// Fetch blocks with file download

263

String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};

264

OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(

265

client, "app-001", "executor-1", blockIds,

266

fileDownloadListener, conf, fileManager

267

);

268

269

// Start the fetch

270

fetcher.start();

271

272

// Later, clean up temporary files

273

fileManager.cleanupAllFiles();

274

}

275

}

276

277

// Example 4: Custom download file with compression

278

public class CompressedDownloadFile implements DownloadFile {

279

private final File file;

280

private final TransportConf conf;

281

private final CompressionCodec codec;

282

283

public CompressedDownloadFile(File file, TransportConf conf, CompressionCodec codec) {

284

this.file = file;

285

this.conf = conf;

286

this.codec = codec;

287

}

288

289

@Override

290

public boolean delete() {

291

return file.delete();

292

}

293

294

@Override

295

public DownloadFileWritableChannel openForWriting() throws IOException {

296

return new CompressedWritableChannel(file, codec);

297

}

298

299

@Override

300

public String path() {

301

return file.getAbsolutePath();

302

}

303

304

private static class CompressedWritableChannel implements DownloadFileWritableChannel {

305

private final FileOutputStream fileOut;

306

private final OutputStream compressedOut;

307

private boolean closed = false;

308

309

CompressedWritableChannel(File file, CompressionCodec codec) throws IOException {

310

this.fileOut = new FileOutputStream(file);

311

this.compressedOut = codec.compressedOutputStream(fileOut);

312

}

313

314

@Override

315

public int write(ByteBuffer src) throws IOException {

316

if (closed) throw new IOException("Channel is closed");

317

318

int bytesToWrite = src.remaining();

319

byte[] buffer = new byte[bytesToWrite];

320

src.get(buffer);

321

compressedOut.write(buffer);

322

return bytesToWrite;

323

}

324

325

@Override

326

public boolean isOpen() {

327

return !closed;

328

}

329

330

@Override

331

public void close() throws IOException {

332

if (!closed) {

333

compressedOut.close();

334

fileOut.close();

335

closed = true;

336

}

337

}

338

339

@Override

340

public ManagedBuffer closeAndRead() throws IOException {

341

close();

342

// Return compressed file as managed buffer

343

return new FileSegmentManagedBuffer(conf, file, 0, file.length());

344

}

345

}

346

}

347

348

// Example 5: Monitoring download file operations

349

public class MonitoredDownloadFileManager implements DownloadFileManager {

350

private final DownloadFileManager delegate;

351

private final Counter filesCreated = new Counter();

352

private final Counter filesRegistered = new Counter();

353

private final Gauge activeFiles;

354

private final Set<DownloadFile> activeFileSet = ConcurrentHashMap.newKeySet();

355

356

public MonitoredDownloadFileManager(DownloadFileManager delegate) {

357

this.delegate = delegate;

358

this.activeFiles = () -> activeFileSet.size();

359

}

360

361

@Override

362

public DownloadFile createTempFile(TransportConf transportConf) {

363

DownloadFile file = delegate.createTempFile(transportConf);

364

filesCreated.inc();

365

activeFileSet.add(file);

366

367

// Wrap the file to monitor deletion

368

return new MonitoredDownloadFile(file);

369

}

370

371

@Override

372

public boolean registerTempFileToClean(DownloadFile file) {

373

boolean registered = delegate.registerTempFileToClean(file);

374

if (registered) {

375

filesRegistered.inc();

376

}

377

return registered;

378

}

379

380

private class MonitoredDownloadFile implements DownloadFile {

381

private final DownloadFile delegate;

382

383

MonitoredDownloadFile(DownloadFile delegate) {

384

this.delegate = delegate;

385

}

386

387

@Override

388

public boolean delete() {

389

boolean deleted = delegate.delete();

390

if (deleted) {

391

activeFileSet.remove(this);

392

}

393

return deleted;

394

}

395

396

@Override

397

public DownloadFileWritableChannel openForWriting() throws IOException {

398

return delegate.openForWriting();

399

}

400

401

@Override

402

public String path() {

403

return delegate.path();

404

}

405

}

406

407

public void printMetrics() {

408

System.out.println("Download File Metrics:");

409

System.out.println(" Files Created: " + filesCreated.getCount());

410

System.out.println(" Files Registered: " + filesRegistered.getCount());

411

System.out.println(" Active Files: " + activeFiles.getValue());

412

}

413

}

414

```

415

416

### File Management Best Practices

417

418

1. **Resource Cleanup**:

419

- Always call `delete()` on DownloadFile instances when finished

420

- Use try-with-resources for DownloadFileWritableChannel

421

- Implement proper cleanup in DownloadFileManager

422

423

2. **Error Handling**:

424

- Handle IOException during file operations gracefully

425

- Implement retry logic for transient file system errors

426

- Monitor disk space and handle out-of-space conditions

427

428

3. **Performance Optimization**:

429

- Use appropriate buffer sizes for file I/O

430

- Consider compression for large blocks

431

- Implement file pooling for high-frequency operations

432

433

4. **Security Considerations**:

434

- Create temporary files in secure directories

435

- Set appropriate file permissions

436

- Clean up sensitive data from temporary files

437

438

5. **Monitoring**:

439

- Track temporary file creation and cleanup

440

- Monitor disk usage in temporary directories

441

- Alert on excessive temporary file accumulation

442

443

### Configuration Parameters

444

445

Key configuration parameters for file management:

446

447

- `spark.shuffle.file.buffer.size` - Buffer size for file I/O operations

448

- `spark.local.dir` - Local directories for temporary files

449

- `spark.shuffle.spill.compress` - Enable compression for spilled data

450

- `spark.shuffle.compress` - Enable compression for shuffle files

451

- `spark.io.compression.codec` - Compression codec to use