Spark Project Shuffle Streaming Service - provides network shuffle functionality for Apache Spark's distributed computing engine
Temporary file management system for handling downloaded blocks during transfer operations.
Handle for files used when fetching remote data to disk with lifecycle management.
/**
* Handle for files used when fetching remote data to disk
* Provides lifecycle management for temporary download files
*/
public interface DownloadFile {
/**
* Delete the download file and clean up resources
* @return true if file was successfully deleted, false otherwise
*/
boolean delete();
/**
* Open the file for writing data
* @return DownloadFileWritableChannel for writing data to the file
* @throws IOException if file cannot be opened for writing
*/
DownloadFileWritableChannel openForWriting() throws IOException;
/**
* Get the path to the download file
* @return String path to the file
*/
String path();
}Manager for creating and cleaning up temporary download files.
/**
* Manager for creating and cleaning up temporary download files
*/
public interface DownloadFileManager {
/**
* Create a temporary file for downloading data
* @param transportConf - Transport configuration for file settings
* @return DownloadFile instance for the created temporary file
*/
DownloadFile createTempFile(TransportConf transportConf);
/**
* Register a temporary file for cleanup when no longer needed
* @param file - DownloadFile to register for cleanup
* @return true if file was successfully registered, false otherwise
*/
boolean registerTempFileToClean(DownloadFile file);
}Channel for writing fetched data that allows reading only after writer is closed.
/**
* Channel for writing fetched data with read capability after closure
*/
public interface DownloadFileWritableChannel extends WritableByteChannel {
/**
* Close the channel and return the written data as a readable buffer
* @return ManagedBuffer containing all written data
* @throws IOException if error occurs during close or buffer creation
*/
ManagedBuffer closeAndRead() throws IOException;
/**
* Write data to the channel
* @param src - ByteBuffer containing data to write
* @return Number of bytes written
* @throws IOException if write operation fails
*/
@Override
int write(ByteBuffer src) throws IOException;
/**
* Check if the channel is open for writing
* @return true if channel is open, false if closed
*/
@Override
boolean isOpen();
/**
* Close the channel for writing
* @throws IOException if close operation fails
*/
@Override
void close() throws IOException;
}Simple DownloadFile implementation without encryption.
/**
* Simple DownloadFile implementation without encryption
*/
public class SimpleDownloadFile implements DownloadFile {
/**
* Create a simple download file
* @param file - File instance to wrap
* @param transportConf - Transport configuration
*/
public SimpleDownloadFile(File file, TransportConf transportConf);
/**
* Delete the download file
* @return true if file was successfully deleted
*/
@Override
public boolean delete();
/**
* Open the file for writing
* @return DownloadFileWritableChannel for writing to the file
* @throws IOException if file cannot be opened
*/
@Override
public DownloadFileWritableChannel openForWriting() throws IOException;
/**
* Get the file path
* @return String path to the file
*/
@Override
public String path();
}Usage Examples:
import org.apache.spark.network.shuffle.*;
import org.apache.spark.network.util.TransportConf;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
// Example 1: Basic download file usage
public class BasicDownloadFileExample {
public void demonstrateBasicUsage() throws IOException {
TransportConf conf = new TransportConf("shuffle");
// Create a temporary file for downloading
File tempFile = File.createTempFile("shuffle-download-", ".tmp");
SimpleDownloadFile downloadFile = new SimpleDownloadFile(tempFile, conf);
System.out.println("Created download file at: " + downloadFile.path());
// Open for writing
try (DownloadFileWritableChannel channel = downloadFile.openForWriting()) {
// Write some data
String testData = "This is test shuffle block data";
ByteBuffer dataBuffer = ByteBuffer.wrap(testData.getBytes());
int bytesWritten = channel.write(dataBuffer);
System.out.println("Wrote " + bytesWritten + " bytes to download file");
// Close and read the data back
ManagedBuffer readBuffer = channel.closeAndRead();
System.out.println("Read back " + readBuffer.size() + " bytes");
// Process the data
try (InputStream dataStream = readBuffer.createInputStream()) {
byte[] readData = ByteStreams.toByteArray(dataStream);
String readString = new String(readData);
System.out.println("Read data: " + readString);
} finally {
readBuffer.release();
}
}
// Clean up
boolean deleted = downloadFile.delete();
System.out.println("File deleted: " + deleted);
}
}
// Example 2: Download file manager implementation
public class SimpleDownloadFileManager implements DownloadFileManager {
private final Set<DownloadFile> managedFiles = ConcurrentHashMap.newKeySet();
private final String tempDirPath;
public SimpleDownloadFileManager(String tempDirPath) {
this.tempDirPath = tempDirPath;
}
@Override
public DownloadFile createTempFile(TransportConf transportConf) {
try {
File tempDir = new File(tempDirPath);
if (!tempDir.exists()) {
tempDir.mkdirs();
}
File tempFile = File.createTempFile("shuffle-", ".tmp", tempDir);
SimpleDownloadFile downloadFile = new SimpleDownloadFile(tempFile, transportConf);
// Register for cleanup
registerTempFileToClean(downloadFile);
return downloadFile;
} catch (IOException e) {
throw new RuntimeException("Failed to create temp file", e);
}
}
@Override
public boolean registerTempFileToClean(DownloadFile file) {
return managedFiles.add(file);
}
public void cleanupAllFiles() {
int cleanedCount = 0;
for (DownloadFile file : managedFiles) {
if (file.delete()) {
cleanedCount++;
}
}
managedFiles.clear();
System.out.println("Cleaned up " + cleanedCount + " temporary files");
}
}
// Example 3: Integration with block fetching
public class DownloadFileBlockFetchingExample {
public void fetchBlocksToFiles() {
TransportConf conf = new TransportConf("shuffle");
SimpleDownloadFileManager fileManager = new SimpleDownloadFileManager("/tmp/shuffle-downloads");
BlockFetchingListener fileDownloadListener = new BlockFetchingListener() {
@Override
public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
System.out.println("Downloaded block " + blockId + " to file, size: " + data.size());
// Data is already written to file, just release the buffer
data.release();
}
@Override
public void onBlockFetchFailure(String blockId, Throwable exception) {
System.err.println("Failed to download block " + blockId + ": " + exception.getMessage());
}
};
// Create transport client
TransportClient client = createTransportClient("shuffle-server", 7337);
// Fetch blocks with file download
String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(
client, "app-001", "executor-1", blockIds,
fileDownloadListener, conf, fileManager
);
// Start the fetch
fetcher.start();
// Later, clean up temporary files
fileManager.cleanupAllFiles();
}
}
// Example 4: Custom download file with compression
public class CompressedDownloadFile implements DownloadFile {
private final File file;
private final TransportConf conf;
private final CompressionCodec codec;
public CompressedDownloadFile(File file, TransportConf conf, CompressionCodec codec) {
this.file = file;
this.conf = conf;
this.codec = codec;
}
@Override
public boolean delete() {
return file.delete();
}
@Override
public DownloadFileWritableChannel openForWriting() throws IOException {
return new CompressedWritableChannel(file, codec);
}
@Override
public String path() {
return file.getAbsolutePath();
}
private static class CompressedWritableChannel implements DownloadFileWritableChannel {
private final FileOutputStream fileOut;
private final OutputStream compressedOut;
private boolean closed = false;
CompressedWritableChannel(File file, CompressionCodec codec) throws IOException {
this.fileOut = new FileOutputStream(file);
this.compressedOut = codec.compressedOutputStream(fileOut);
}
@Override
public int write(ByteBuffer src) throws IOException {
if (closed) throw new IOException("Channel is closed");
int bytesToWrite = src.remaining();
byte[] buffer = new byte[bytesToWrite];
src.get(buffer);
compressedOut.write(buffer);
return bytesToWrite;
}
@Override
public boolean isOpen() {
return !closed;
}
@Override
public void close() throws IOException {
if (!closed) {
compressedOut.close();
fileOut.close();
closed = true;
}
}
@Override
public ManagedBuffer closeAndRead() throws IOException {
close();
// Return compressed file as managed buffer
return new FileSegmentManagedBuffer(conf, file, 0, file.length());
}
}
}
// Example 5: Monitoring download file operations
public class MonitoredDownloadFileManager implements DownloadFileManager {
private final DownloadFileManager delegate;
private final Counter filesCreated = new Counter();
private final Counter filesRegistered = new Counter();
private final Gauge activeFiles;
private final Set<DownloadFile> activeFileSet = ConcurrentHashMap.newKeySet();
public MonitoredDownloadFileManager(DownloadFileManager delegate) {
this.delegate = delegate;
this.activeFiles = () -> activeFileSet.size();
}
@Override
public DownloadFile createTempFile(TransportConf transportConf) {
DownloadFile file = delegate.createTempFile(transportConf);
filesCreated.inc();
activeFileSet.add(file);
// Wrap the file to monitor deletion
return new MonitoredDownloadFile(file);
}
@Override
public boolean registerTempFileToClean(DownloadFile file) {
boolean registered = delegate.registerTempFileToClean(file);
if (registered) {
filesRegistered.inc();
}
return registered;
}
private class MonitoredDownloadFile implements DownloadFile {
private final DownloadFile delegate;
MonitoredDownloadFile(DownloadFile delegate) {
this.delegate = delegate;
}
@Override
public boolean delete() {
boolean deleted = delegate.delete();
if (deleted) {
activeFileSet.remove(this);
}
return deleted;
}
@Override
public DownloadFileWritableChannel openForWriting() throws IOException {
return delegate.openForWriting();
}
@Override
public String path() {
return delegate.path();
}
}
public void printMetrics() {
System.out.println("Download File Metrics:");
System.out.println(" Files Created: " + filesCreated.getCount());
System.out.println(" Files Registered: " + filesRegistered.getCount());
System.out.println(" Active Files: " + activeFiles.getValue());
}
}Resource Cleanup:
delete() on DownloadFile instances when finishedError Handling:
Performance Optimization:
Security Considerations:
Monitoring:
Key configuration parameters for file management:
spark.shuffle.file.buffer.size - Buffer size for file I/O operationsspark.local.dir - Local directories for temporary filesspark.shuffle.spill.compress - Enable compression for spilled dataspark.shuffle.compress - Enable compression for shuffle filesspark.io.compression.codec - Compression codec to useInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-spark--spark-network-shuffle-2-11