Temporary file management system for handling downloaded blocks during transfer operations.
Handle for files used when fetching remote data to disk with lifecycle management.
/**
* Handle for files used when fetching remote data to disk
* Provides lifecycle management for temporary download files
*/
public interface DownloadFile {
/**
* Delete the download file and clean up resources
* @return true if file was successfully deleted, false otherwise
*/
boolean delete();
/**
* Open the file for writing data
* @return DownloadFileWritableChannel for writing data to the file
* @throws IOException if file cannot be opened for writing
*/
DownloadFileWritableChannel openForWriting() throws IOException;
/**
* Get the path to the download file
* @return String path to the file
*/
String path();
}Manager for creating and cleaning up temporary download files.
/**
* Manager for creating and cleaning up temporary download files
*/
public interface DownloadFileManager {
/**
* Create a temporary file for downloading data
* @param transportConf - Transport configuration for file settings
* @return DownloadFile instance for the created temporary file
*/
DownloadFile createTempFile(TransportConf transportConf);
/**
* Register a temporary file for cleanup when no longer needed
* @param file - DownloadFile to register for cleanup
* @return true if file was successfully registered, false otherwise
*/
boolean registerTempFileToClean(DownloadFile file);
}Channel for writing fetched data that allows reading only after writer is closed.
/**
* Channel for writing fetched data with read capability after closure
*/
public interface DownloadFileWritableChannel extends WritableByteChannel {
/**
* Close the channel and return the written data as a readable buffer
* @return ManagedBuffer containing all written data
* @throws IOException if error occurs during close or buffer creation
*/
ManagedBuffer closeAndRead() throws IOException;
/**
* Write data to the channel
* @param src - ByteBuffer containing data to write
* @return Number of bytes written
* @throws IOException if write operation fails
*/
@Override
int write(ByteBuffer src) throws IOException;
/**
* Check if the channel is open for writing
* @return true if channel is open, false if closed
*/
@Override
boolean isOpen();
/**
* Close the channel for writing
* @throws IOException if close operation fails
*/
@Override
void close() throws IOException;
}Simple DownloadFile implementation without encryption.
/**
* Simple DownloadFile implementation without encryption
*/
public class SimpleDownloadFile implements DownloadFile {
/**
* Create a simple download file
* @param file - File instance to wrap
* @param transportConf - Transport configuration
*/
public SimpleDownloadFile(File file, TransportConf transportConf);
/**
* Delete the download file
* @return true if file was successfully deleted
*/
@Override
public boolean delete();
/**
* Open the file for writing
* @return DownloadFileWritableChannel for writing to the file
* @throws IOException if file cannot be opened
*/
@Override
public DownloadFileWritableChannel openForWriting() throws IOException;
/**
* Get the file path
* @return String path to the file
*/
@Override
public String path();
}Usage Examples:
import org.apache.spark.network.shuffle.*;
import org.apache.spark.network.util.TransportConf;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
// Example 1: Basic download file usage
public class BasicDownloadFileExample {
public void demonstrateBasicUsage() throws IOException {
TransportConf conf = new TransportConf("shuffle");
// Create a temporary file for downloading
File tempFile = File.createTempFile("shuffle-download-", ".tmp");
SimpleDownloadFile downloadFile = new SimpleDownloadFile(tempFile, conf);
System.out.println("Created download file at: " + downloadFile.path());
// Open for writing
try (DownloadFileWritableChannel channel = downloadFile.openForWriting()) {
// Write some data
String testData = "This is test shuffle block data";
ByteBuffer dataBuffer = ByteBuffer.wrap(testData.getBytes());
int bytesWritten = channel.write(dataBuffer);
System.out.println("Wrote " + bytesWritten + " bytes to download file");
// Close and read the data back
ManagedBuffer readBuffer = channel.closeAndRead();
System.out.println("Read back " + readBuffer.size() + " bytes");
// Process the data
try (InputStream dataStream = readBuffer.createInputStream()) {
byte[] readData = ByteStreams.toByteArray(dataStream);
String readString = new String(readData);
System.out.println("Read data: " + readString);
} finally {
readBuffer.release();
}
}
// Clean up
boolean deleted = downloadFile.delete();
System.out.println("File deleted: " + deleted);
}
}
// Example 2: Download file manager implementation
public class SimpleDownloadFileManager implements DownloadFileManager {
private final Set<DownloadFile> managedFiles = ConcurrentHashMap.newKeySet();
private final String tempDirPath;
public SimpleDownloadFileManager(String tempDirPath) {
this.tempDirPath = tempDirPath;
}
@Override
public DownloadFile createTempFile(TransportConf transportConf) {
try {
File tempDir = new File(tempDirPath);
if (!tempDir.exists()) {
tempDir.mkdirs();
}
File tempFile = File.createTempFile("shuffle-", ".tmp", tempDir);
SimpleDownloadFile downloadFile = new SimpleDownloadFile(tempFile, transportConf);
// Register for cleanup
registerTempFileToClean(downloadFile);
return downloadFile;
} catch (IOException e) {
throw new RuntimeException("Failed to create temp file", e);
}
}
@Override
public boolean registerTempFileToClean(DownloadFile file) {
return managedFiles.add(file);
}
public void cleanupAllFiles() {
int cleanedCount = 0;
for (DownloadFile file : managedFiles) {
if (file.delete()) {
cleanedCount++;
}
}
managedFiles.clear();
System.out.println("Cleaned up " + cleanedCount + " temporary files");
}
}
// Example 3: Integration with block fetching
public class DownloadFileBlockFetchingExample {
public void fetchBlocksToFiles() {
TransportConf conf = new TransportConf("shuffle");
SimpleDownloadFileManager fileManager = new SimpleDownloadFileManager("/tmp/shuffle-downloads");
BlockFetchingListener fileDownloadListener = new BlockFetchingListener() {
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
System.out.println("Downloaded block " + blockId + " to file, size: " + data.size());
// Data is already written to file, just release the buffer
data.release();
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
System.err.println("Failed to download block " + blockId + ": " + exception.getMessage());
}
};
// Create transport client
TransportClient client = createTransportClient("shuffle-server", 7337);
// Fetch blocks with file download
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(
client, "app-001", "executor-1", blockIds,
fileDownloadListener, conf, fileManager
);
// Start the fetch
fetcher.start();
// Later, clean up temporary files
fileManager.cleanupAllFiles();
}
}
// Example 4: Custom download file with compression
public class CompressedDownloadFile implements DownloadFile {
private final File file;
private final TransportConf conf;
private final CompressionCodec codec;
public CompressedDownloadFile(File file, TransportConf conf, CompressionCodec codec) {
this.file = file;
this.conf = conf;
this.codec = codec;
}
@Override
public boolean delete() {
return file.delete();
}
@Override
public DownloadFileWritableChannel openForWriting() throws IOException {
return new CompressedWritableChannel(file, codec);
}
@Override
public String path() {
return file.getAbsolutePath();
}
private static class CompressedWritableChannel implements DownloadFileWritableChannel {
private final FileOutputStream fileOut;
private final OutputStream compressedOut;
private boolean closed = false;
CompressedWritableChannel(File file, CompressionCodec codec) throws IOException {
this.fileOut = new FileOutputStream(file);
this.compressedOut = codec.compressedOutputStream(fileOut);
}
@Override
public int write(ByteBuffer src) throws IOException {
if (closed) throw new IOException("Channel is closed");
int bytesToWrite = src.remaining();
byte[] buffer = new byte[bytesToWrite];
src.get(buffer);
compressedOut.write(buffer);
return bytesToWrite;
}
@Override
public boolean isOpen() {
return !closed;
}
@Override
public void close() throws IOException {
if (!closed) {
compressedOut.close();
fileOut.close();
closed = true;
}
}
@Override
public ManagedBuffer closeAndRead() throws IOException {
close();
// Return compressed file as managed buffer
return new FileSegmentManagedBuffer(conf, file, 0, file.length());
}
}
}
// Example 5: Monitoring download file operations
public class MonitoredDownloadFileManager implements DownloadFileManager {
private final DownloadFileManager delegate;
private final Counter filesCreated = new Counter();
private final Counter filesRegistered = new Counter();
private final Gauge activeFiles;
private final Set<DownloadFile> activeFileSet = ConcurrentHashMap.newKeySet();
public MonitoredDownloadFileManager(DownloadFileManager delegate) {
this.delegate = delegate;
this.activeFiles = () -> activeFileSet.size();
}
@Override
public DownloadFile createTempFile(TransportConf transportConf) {
DownloadFile file = delegate.createTempFile(transportConf);
filesCreated.inc();
activeFileSet.add(file);
// Wrap the file to monitor deletion
return new MonitoredDownloadFile(file);
}
@Override
public boolean registerTempFileToClean(DownloadFile file) {
boolean registered = delegate.registerTempFileToClean(file);
if (registered) {
filesRegistered.inc();
}
return registered;
}
private class MonitoredDownloadFile implements DownloadFile {
private final DownloadFile delegate;
MonitoredDownloadFile(DownloadFile delegate) {
this.delegate = delegate;
}
@Override
public boolean delete() {
boolean deleted = delegate.delete();
if (deleted) {
activeFileSet.remove(this);
}
return deleted;
}
@Override
public DownloadFileWritableChannel openForWriting() throws IOException {
return delegate.openForWriting();
}
@Override
public String path() {
return delegate.path();
}
}
public void printMetrics() {
System.out.println("Download File Metrics:");
System.out.println(" Files Created: " + filesCreated.getCount());
System.out.println(" Files Registered: " + filesRegistered.getCount());
System.out.println(" Active Files: " + activeFiles.getValue());
}
}Resource Cleanup:
delete() on DownloadFile instances when finishedError Handling:
Performance Optimization:
Security Considerations:
Monitoring:
Key configuration parameters for file management:
spark.shuffle.file.buffer.size - Buffer size for file I/O operationsspark.local.dir - Local directories for temporary filesspark.shuffle.spill.compress - Enable compression for spilled dataspark.shuffle.compress - Enable compression for shuffle filesspark.io.compression.codec - Compression codec to use