0
# File Management
1
2
Temporary file management system for handling downloaded blocks during transfer operations.
3
4
## Capabilities
5
6
### DownloadFile Interface
7
8
Handle for files used when fetching remote data to disk with lifecycle management.
9
10
```java { .api }
11
/**
12
* Handle for files used when fetching remote data to disk
13
* Provides lifecycle management for temporary download files
14
*/
15
public interface DownloadFile {
16
/**
17
* Delete the download file and clean up resources
18
* @return true if file was successfully deleted, false otherwise
19
*/
20
boolean delete();
21
22
/**
23
* Open the file for writing data
24
* @return DownloadFileWritableChannel for writing data to the file
25
* @throws IOException if file cannot be opened for writing
26
*/
27
DownloadFileWritableChannel openForWriting() throws IOException;
28
29
/**
30
* Get the path to the download file
31
* @return String path to the file
32
*/
33
String path();
34
}
35
```
36
37
### DownloadFileManager Interface
38
39
Manager for creating and cleaning up temporary download files.
40
41
```java { .api }
42
/**
43
* Manager for creating and cleaning up temporary download files
44
*/
45
public interface DownloadFileManager {
46
/**
47
* Create a temporary file for downloading data
48
* @param transportConf - Transport configuration for file settings
49
* @return DownloadFile instance for the created temporary file
50
*/
51
DownloadFile createTempFile(TransportConf transportConf);
52
53
/**
54
* Register a temporary file for cleanup when no longer needed
55
* @param file - DownloadFile to register for cleanup
56
* @return true if file was successfully registered, false otherwise
57
*/
58
boolean registerTempFileToClean(DownloadFile file);
59
}
60
```
61
62
### DownloadFileWritableChannel Interface
63
64
Channel for writing fetched data that allows reading only after writer is closed.
65
66
```java { .api }
67
/**
68
* Channel for writing fetched data with read capability after closure
69
*/
70
public interface DownloadFileWritableChannel extends WritableByteChannel {
71
/**
72
* Close the channel and return the written data as a readable buffer
73
* @return ManagedBuffer containing all written data
74
* @throws IOException if error occurs during close or buffer creation
75
*/
76
ManagedBuffer closeAndRead() throws IOException;
77
78
/**
79
* Write data to the channel
80
* @param src - ByteBuffer containing data to write
81
* @return Number of bytes written
82
* @throws IOException if write operation fails
83
*/
84
@Override
85
int write(ByteBuffer src) throws IOException;
86
87
/**
88
* Check if the channel is open for writing
89
* @return true if channel is open, false if closed
90
*/
91
@Override
92
boolean isOpen();
93
94
/**
95
* Close the channel for writing
96
* @throws IOException if close operation fails
97
*/
98
@Override
99
void close() throws IOException;
100
}
101
```
102
103
### SimpleDownloadFile Implementation
104
105
Simple DownloadFile implementation without encryption.
106
107
```java { .api }
108
/**
109
* Simple DownloadFile implementation without encryption
110
*/
111
public class SimpleDownloadFile implements DownloadFile {
112
/**
113
* Create a simple download file
114
* @param file - File instance to wrap
115
* @param transportConf - Transport configuration
116
*/
117
public SimpleDownloadFile(File file, TransportConf transportConf);
118
119
/**
120
* Delete the download file
121
* @return true if file was successfully deleted
122
*/
123
@Override
124
public boolean delete();
125
126
/**
127
* Open the file for writing
128
* @return DownloadFileWritableChannel for writing to the file
129
* @throws IOException if file cannot be opened
130
*/
131
@Override
132
public DownloadFileWritableChannel openForWriting() throws IOException;
133
134
/**
135
* Get the file path
136
* @return String path to the file
137
*/
138
@Override
139
public String path();
140
}
141
```
142
143
**Usage Examples:**
144
145
```java
146
import org.apache.spark.network.shuffle.*;
147
import org.apache.spark.network.util.TransportConf;
148
import java.io.File;
149
import java.io.IOException;
150
import java.nio.ByteBuffer;
151
152
// Example 1: Basic download file usage
153
public class BasicDownloadFileExample {
154
public void demonstrateBasicUsage() throws IOException {
155
TransportConf conf = new TransportConf("shuffle");
156
157
// Create a temporary file for downloading
158
File tempFile = File.createTempFile("shuffle-download-", ".tmp");
159
SimpleDownloadFile downloadFile = new SimpleDownloadFile(tempFile, conf);
160
161
System.out.println("Created download file at: " + downloadFile.path());
162
163
// Open for writing
164
try (DownloadFileWritableChannel channel = downloadFile.openForWriting()) {
165
// Write some data
166
String testData = "This is test shuffle block data";
167
ByteBuffer dataBuffer = ByteBuffer.wrap(testData.getBytes());
168
169
int bytesWritten = channel.write(dataBuffer);
170
System.out.println("Wrote " + bytesWritten + " bytes to download file");
171
172
// Close and read the data back
173
ManagedBuffer readBuffer = channel.closeAndRead();
174
System.out.println("Read back " + readBuffer.size() + " bytes");
175
176
// Process the data
177
try (InputStream dataStream = readBuffer.createInputStream()) {
178
byte[] readData = ByteStreams.toByteArray(dataStream);
179
String readString = new String(readData);
180
System.out.println("Read data: " + readString);
181
} finally {
182
readBuffer.release();
183
}
184
}
185
186
// Clean up
187
boolean deleted = downloadFile.delete();
188
System.out.println("File deleted: " + deleted);
189
}
190
}
191
192
// Example 2: Download file manager implementation
193
public class SimpleDownloadFileManager implements DownloadFileManager {
194
private final Set<DownloadFile> managedFiles = ConcurrentHashMap.newKeySet();
195
private final String tempDirPath;
196
197
public SimpleDownloadFileManager(String tempDirPath) {
198
this.tempDirPath = tempDirPath;
199
}
200
201
@Override
202
public DownloadFile createTempFile(TransportConf transportConf) {
203
try {
204
File tempDir = new File(tempDirPath);
205
if (!tempDir.exists()) {
206
tempDir.mkdirs();
207
}
208
209
File tempFile = File.createTempFile("shuffle-", ".tmp", tempDir);
210
SimpleDownloadFile downloadFile = new SimpleDownloadFile(tempFile, transportConf);
211
212
// Register for cleanup
213
registerTempFileToClean(downloadFile);
214
215
return downloadFile;
216
} catch (IOException e) {
217
throw new RuntimeException("Failed to create temp file", e);
218
}
219
}
220
221
@Override
222
public boolean registerTempFileToClean(DownloadFile file) {
223
return managedFiles.add(file);
224
}
225
226
public void cleanupAllFiles() {
227
int cleanedCount = 0;
228
for (DownloadFile file : managedFiles) {
229
if (file.delete()) {
230
cleanedCount++;
231
}
232
}
233
managedFiles.clear();
234
System.out.println("Cleaned up " + cleanedCount + " temporary files");
235
}
236
}
237
238
// Example 3: Integration with block fetching
239
public class DownloadFileBlockFetchingExample {
240
public void fetchBlocksToFiles() {
241
TransportConf conf = new TransportConf("shuffle");
242
SimpleDownloadFileManager fileManager = new SimpleDownloadFileManager("/tmp/shuffle-downloads");
243
244
BlockFetchingListener fileDownloadListener = new BlockFetchingListener() {
245
@Override
246
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
247
System.out.println("Downloaded block " + blockId + " to file, size: " + data.size());
248
249
// Data is already written to file, just release the buffer
250
data.release();
251
}
252
253
@Override
254
public void onBlockFetchFailure(String blockId, Throwable exception) {
255
System.err.println("Failed to download block " + blockId + ": " + exception.getMessage());
256
}
257
};
258
259
// Create transport client
260
TransportClient client = createTransportClient("shuffle-server", 7337);
261
262
// Fetch blocks with file download
263
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
264
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(
265
client, "app-001", "executor-1", blockIds,
266
fileDownloadListener, conf, fileManager
267
);
268
269
// Start the fetch
270
fetcher.start();
271
272
// Later, clean up temporary files
273
fileManager.cleanupAllFiles();
274
}
275
}
276
277
// Example 4: Custom download file with compression
278
public class CompressedDownloadFile implements DownloadFile {
279
private final File file;
280
private final TransportConf conf;
281
private final CompressionCodec codec;
282
283
public CompressedDownloadFile(File file, TransportConf conf, CompressionCodec codec) {
284
this.file = file;
285
this.conf = conf;
286
this.codec = codec;
287
}
288
289
@Override
290
public boolean delete() {
291
return file.delete();
292
}
293
294
@Override
295
public DownloadFileWritableChannel openForWriting() throws IOException {
296
return new CompressedWritableChannel(file, codec);
297
}
298
299
@Override
300
public String path() {
301
return file.getAbsolutePath();
302
}
303
304
private static class CompressedWritableChannel implements DownloadFileWritableChannel {
305
private final FileOutputStream fileOut;
306
private final OutputStream compressedOut;
307
private boolean closed = false;
308
309
CompressedWritableChannel(File file, CompressionCodec codec) throws IOException {
310
this.fileOut = new FileOutputStream(file);
311
this.compressedOut = codec.compressedOutputStream(fileOut);
312
}
313
314
@Override
315
public int write(ByteBuffer src) throws IOException {
316
if (closed) throw new IOException("Channel is closed");
317
318
int bytesToWrite = src.remaining();
319
byte[] buffer = new byte[bytesToWrite];
320
src.get(buffer);
321
compressedOut.write(buffer);
322
return bytesToWrite;
323
}
324
325
@Override
326
public boolean isOpen() {
327
return !closed;
328
}
329
330
@Override
331
public void close() throws IOException {
332
if (!closed) {
333
compressedOut.close();
334
fileOut.close();
335
closed = true;
336
}
337
}
338
339
@Override
340
public ManagedBuffer closeAndRead() throws IOException {
341
close();
342
// Return compressed file as managed buffer
343
return new FileSegmentManagedBuffer(conf, file, 0, file.length());
344
}
345
}
346
}
347
348
// Example 5: Monitoring download file operations
349
public class MonitoredDownloadFileManager implements DownloadFileManager {
350
private final DownloadFileManager delegate;
351
private final Counter filesCreated = new Counter();
352
private final Counter filesRegistered = new Counter();
353
private final Gauge activeFiles;
354
private final Set<DownloadFile> activeFileSet = ConcurrentHashMap.newKeySet();
355
356
public MonitoredDownloadFileManager(DownloadFileManager delegate) {
357
this.delegate = delegate;
358
this.activeFiles = () -> activeFileSet.size();
359
}
360
361
@Override
362
public DownloadFile createTempFile(TransportConf transportConf) {
363
DownloadFile file = delegate.createTempFile(transportConf);
364
filesCreated.inc();
365
activeFileSet.add(file);
366
367
// Wrap the file to monitor deletion
368
return new MonitoredDownloadFile(file);
369
}
370
371
@Override
372
public boolean registerTempFileToClean(DownloadFile file) {
373
boolean registered = delegate.registerTempFileToClean(file);
374
if (registered) {
375
filesRegistered.inc();
376
}
377
return registered;
378
}
379
380
private class MonitoredDownloadFile implements DownloadFile {
381
private final DownloadFile delegate;
382
383
MonitoredDownloadFile(DownloadFile delegate) {
384
this.delegate = delegate;
385
}
386
387
@Override
388
public boolean delete() {
389
boolean deleted = delegate.delete();
390
if (deleted) {
391
activeFileSet.remove(this);
392
}
393
return deleted;
394
}
395
396
@Override
397
public DownloadFileWritableChannel openForWriting() throws IOException {
398
return delegate.openForWriting();
399
}
400
401
@Override
402
public String path() {
403
return delegate.path();
404
}
405
}
406
407
public void printMetrics() {
408
System.out.println("Download File Metrics:");
409
System.out.println(" Files Created: " + filesCreated.getCount());
410
System.out.println(" Files Registered: " + filesRegistered.getCount());
411
System.out.println(" Active Files: " + activeFiles.getValue());
412
}
413
}
414
```
415
416
### File Management Best Practices
417
418
1. **Resource Cleanup**:
419
- Always call `delete()` on DownloadFile instances when finished
420
- Use try-with-resources for DownloadFileWritableChannel
421
- Implement proper cleanup in DownloadFileManager
422
423
2. **Error Handling**:
424
- Handle IOException during file operations gracefully
425
- Implement retry logic for transient file system errors
426
- Monitor disk space and handle out-of-space conditions
427
428
3. **Performance Optimization**:
429
- Use appropriate buffer sizes for file I/O
430
- Consider compression for large blocks
431
- Implement file pooling for high-frequency operations
432
433
4. **Security Considerations**:
434
- Create temporary files in secure directories
435
- Set appropriate file permissions
436
- Clean up sensitive data from temporary files
437
438
5. **Monitoring**:
439
- Track temporary file creation and cleanup
440
- Monitor disk usage in temporary directories
441
- Alert on excessive temporary file accumulation
442
443
### Configuration Parameters
444
445
Key configuration parameters for file management:
446
447
- `spark.shuffle.file.buffer.size` - Buffer size for file I/O operations
448
- `spark.local.dir` - Local directories for temporary files
449
- `spark.shuffle.spill.compress` - Enable compression for spilled data
450
- `spark.shuffle.compress` - Enable compression for shuffle files
451
- `spark.io.compression.codec` - Compression codec to use