or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

authentication.mdblock-fetching.mdfile-management.mdindex.mdmesos.mdprotocol.mdshuffle-client.mdshuffle-server.md
tile.json

file-management.mddocs/

File Management

Temporary file management system for handling downloaded blocks during transfer operations.

Capabilities

DownloadFile Interface

Handle for files used when fetching remote data to disk with lifecycle management.

/**
 * Handle for files used when fetching remote data to disk
 * Provides lifecycle management for temporary download files
 */
public interface DownloadFile {
    /**
     * Delete the download file and clean up resources
     * @return true if file was successfully deleted, false otherwise
     */
    boolean delete();
    
    /**
     * Open the file for writing data
     * @return DownloadFileWritableChannel for writing data to the file
     * @throws IOException if file cannot be opened for writing
     */
    DownloadFileWritableChannel openForWriting() throws IOException;
    
    /**
     * Get the path to the download file
     * @return String path to the file
     */
    String path();
}

DownloadFileManager Interface

Manager for creating and cleaning up temporary download files.

/**
 * Manager for creating and cleaning up temporary download files
 */
public interface DownloadFileManager {
    /**
     * Create a temporary file for downloading data
     * @param transportConf - Transport configuration for file settings
     * @return DownloadFile instance for the created temporary file
     */
    DownloadFile createTempFile(TransportConf transportConf);
    
    /**
     * Register a temporary file for cleanup when no longer needed
     * @param file - DownloadFile to register for cleanup
     * @return true if file was successfully registered, false otherwise
     */
    boolean registerTempFileToClean(DownloadFile file);
}

DownloadFileWritableChannel Interface

Channel for writing fetched data that allows reading only after writer is closed.

/**
 * Channel for writing fetched data with read capability after closure
 */
public interface DownloadFileWritableChannel extends WritableByteChannel {
    /**
     * Close the channel and return the written data as a readable buffer
     * @return ManagedBuffer containing all written data
     * @throws IOException if error occurs during close or buffer creation
     */
    ManagedBuffer closeAndRead() throws IOException;
    
    /**
     * Write data to the channel
     * @param src - ByteBuffer containing data to write
     * @return Number of bytes written
     * @throws IOException if write operation fails
     */
    @Override
    int write(ByteBuffer src) throws IOException;
    
    /**
     * Check if the channel is open for writing
     * @return true if channel is open, false if closed
     */
    @Override
    boolean isOpen();
    
    /**
     * Close the channel for writing
     * @throws IOException if close operation fails
     */
    @Override
    void close() throws IOException;
}

SimpleDownloadFile Implementation

Simple DownloadFile implementation without encryption.

/**
 * Simple DownloadFile implementation without encryption
 */
public class SimpleDownloadFile implements DownloadFile {
    /**
     * Create a simple download file
     * @param file - File instance to wrap
     * @param transportConf - Transport configuration
     */
    public SimpleDownloadFile(File file, TransportConf transportConf);
    
    /**
     * Delete the download file
     * @return true if file was successfully deleted
     */
    @Override
    public boolean delete();
    
    /**
     * Open the file for writing
     * @return DownloadFileWritableChannel for writing to the file
     * @throws IOException if file cannot be opened
     */
    @Override
    public DownloadFileWritableChannel openForWriting() throws IOException;
    
    /**
     * Get the file path
     * @return String path to the file
     */
    @Override
    public String path();
}

Usage Examples:

import org.apache.spark.network.shuffle.*;
import org.apache.spark.network.util.TransportConf;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;

// Example 1: Basic download file usage
public class BasicDownloadFileExample {
    public void demonstrateBasicUsage() throws IOException {
        TransportConf conf = new TransportConf("shuffle");
        
        // Create a temporary file for downloading
        File tempFile = File.createTempFile("shuffle-download-", ".tmp");
        SimpleDownloadFile downloadFile = new SimpleDownloadFile(tempFile, conf);
        
        System.out.println("Created download file at: " + downloadFile.path());
        
        // Open for writing
        try (DownloadFileWritableChannel channel = downloadFile.openForWriting()) {
            // Write some data
            String testData = "This is test shuffle block data";
            ByteBuffer dataBuffer = ByteBuffer.wrap(testData.getBytes());
            
            int bytesWritten = channel.write(dataBuffer);
            System.out.println("Wrote " + bytesWritten + " bytes to download file");
            
            // Close and read the data back
            ManagedBuffer readBuffer = channel.closeAndRead();
            System.out.println("Read back " + readBuffer.size() + " bytes");
            
            // Process the data
            try (InputStream dataStream = readBuffer.createInputStream()) {
                byte[] readData = ByteStreams.toByteArray(dataStream);
                String readString = new String(readData);
                System.out.println("Read data: " + readString);
            } finally {
                readBuffer.release();
            }
        }
        
        // Clean up
        boolean deleted = downloadFile.delete();
        System.out.println("File deleted: " + deleted);
    }
}

// Example 2: Download file manager implementation
public class SimpleDownloadFileManager implements DownloadFileManager {
    private final Set<DownloadFile> managedFiles = ConcurrentHashMap.newKeySet();
    private final String tempDirPath;
    
    public SimpleDownloadFileManager(String tempDirPath) {
        this.tempDirPath = tempDirPath;
    }
    
    @Override
    public DownloadFile createTempFile(TransportConf transportConf) {
        try {
            File tempDir = new File(tempDirPath);
            if (!tempDir.exists()) {
                tempDir.mkdirs();
            }
            
            File tempFile = File.createTempFile("shuffle-", ".tmp", tempDir);
            SimpleDownloadFile downloadFile = new SimpleDownloadFile(tempFile, transportConf);
            
            // Register for cleanup
            registerTempFileToClean(downloadFile);
            
            return downloadFile;
        } catch (IOException e) {
            throw new RuntimeException("Failed to create temp file", e);
        }
    }
    
    @Override
    public boolean registerTempFileToClean(DownloadFile file) {
        return managedFiles.add(file);
    }
    
    public void cleanupAllFiles() {
        int cleanedCount = 0;
        for (DownloadFile file : managedFiles) {
            if (file.delete()) {
                cleanedCount++;
            }
        }
        managedFiles.clear();
        System.out.println("Cleaned up " + cleanedCount + " temporary files");
    }
}

// Example 3: Integration with block fetching
public class DownloadFileBlockFetchingExample {
    public void fetchBlocksToFiles() {
        TransportConf conf = new TransportConf("shuffle");
        SimpleDownloadFileManager fileManager = new SimpleDownloadFileManager("/tmp/shuffle-downloads");
        
        BlockFetchingListener fileDownloadListener = new BlockFetchingListener() {
            @Override
            public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
                System.out.println("Downloaded block " + blockId + " to file, size: " + data.size());
                
                // Data is already written to file, just release the buffer
                data.release();
            }
            
            @Override
            public void onBlockFetchFailure(String blockId, Throwable exception) {
                System.err.println("Failed to download block " + blockId + ": " + exception.getMessage());
            }
        };
        
        // Create transport client
        TransportClient client = createTransportClient("shuffle-server", 7337);
        
        // Fetch blocks with file download
        String[] blockIds = {"shuffle_1_0_0", "shuffle_1_0_1"};
        OneForOneBlockFetcher fetcher = new OneForOneBlockFetcher(
            client, "app-001", "executor-1", blockIds, 
            fileDownloadListener, conf, fileManager
        );
        
        // Start the fetch
        fetcher.start();
        
        // Later, clean up temporary files
        fileManager.cleanupAllFiles();
    }
}

// Example 4: Custom download file with compression
public class CompressedDownloadFile implements DownloadFile {
    private final File file;
    private final TransportConf conf;
    private final CompressionCodec codec;
    
    public CompressedDownloadFile(File file, TransportConf conf, CompressionCodec codec) {
        this.file = file;
        this.conf = conf;
        this.codec = codec;
    }
    
    @Override
    public boolean delete() {
        return file.delete();
    }
    
    @Override
    public DownloadFileWritableChannel openForWriting() throws IOException {
        return new CompressedWritableChannel(file, codec);
    }
    
    @Override
    public String path() {
        return file.getAbsolutePath();
    }
    
    private static class CompressedWritableChannel implements DownloadFileWritableChannel {
        private final FileOutputStream fileOut;
        private final OutputStream compressedOut;
        private boolean closed = false;
        
        CompressedWritableChannel(File file, CompressionCodec codec) throws IOException {
            this.fileOut = new FileOutputStream(file);
            this.compressedOut = codec.compressedOutputStream(fileOut);
        }
        
        @Override
        public int write(ByteBuffer src) throws IOException {
            if (closed) throw new IOException("Channel is closed");
            
            int bytesToWrite = src.remaining();
            byte[] buffer = new byte[bytesToWrite];
            src.get(buffer);
            compressedOut.write(buffer);
            return bytesToWrite;
        }
        
        @Override
        public boolean isOpen() {
            return !closed;
        }
        
        @Override
        public void close() throws IOException {
            if (!closed) {
                compressedOut.close();
                fileOut.close();
                closed = true;
            }
        }
        
        @Override
        public ManagedBuffer closeAndRead() throws IOException {
            close();
            // Return compressed file as managed buffer
            return new FileSegmentManagedBuffer(conf, file, 0, file.length());
        }
    }
}

// Example 5: Monitoring download file operations
public class MonitoredDownloadFileManager implements DownloadFileManager {
    private final DownloadFileManager delegate;
    private final Counter filesCreated = new Counter();
    private final Counter filesRegistered = new Counter();
    private final Gauge activeFiles;
    private final Set<DownloadFile> activeFileSet = ConcurrentHashMap.newKeySet();
    
    public MonitoredDownloadFileManager(DownloadFileManager delegate) {
        this.delegate = delegate;
        this.activeFiles = () -> activeFileSet.size();
    }
    
    @Override
    public DownloadFile createTempFile(TransportConf transportConf) {
        DownloadFile file = delegate.createTempFile(transportConf);
        filesCreated.inc();
        activeFileSet.add(file);
        
        // Wrap the file to monitor deletion
        return new MonitoredDownloadFile(file);
    }
    
    @Override
    public boolean registerTempFileToClean(DownloadFile file) {
        boolean registered = delegate.registerTempFileToClean(file);
        if (registered) {
            filesRegistered.inc();
        }
        return registered;
    }
    
    private class MonitoredDownloadFile implements DownloadFile {
        private final DownloadFile delegate;
        
        MonitoredDownloadFile(DownloadFile delegate) {
            this.delegate = delegate;
        }
        
        @Override
        public boolean delete() {
            boolean deleted = delegate.delete();
            if (deleted) {
                activeFileSet.remove(this);
            }
            return deleted;
        }
        
        @Override
        public DownloadFileWritableChannel openForWriting() throws IOException {
            return delegate.openForWriting();
        }
        
        @Override
        public String path() {
            return delegate.path();
        }
    }
    
    public void printMetrics() {
        System.out.println("Download File Metrics:");
        System.out.println("  Files Created: " + filesCreated.getCount());
        System.out.println("  Files Registered: " + filesRegistered.getCount());
        System.out.println("  Active Files: " + activeFiles.getValue());
    }
}

File Management Best Practices

  1. Resource Cleanup:

    • Always call delete() on DownloadFile instances when finished
    • Use try-with-resources for DownloadFileWritableChannel
    • Implement proper cleanup in DownloadFileManager
  2. Error Handling:

    • Handle IOException during file operations gracefully
    • Implement retry logic for transient file system errors
    • Monitor disk space and handle out-of-space conditions
  3. Performance Optimization:

    • Use appropriate buffer sizes for file I/O
    • Consider compression for large blocks
    • Implement file pooling for high-frequency operations
  4. Security Considerations:

    • Create temporary files in secure directories
    • Set appropriate file permissions
    • Clean up sensitive data from temporary files
  5. Monitoring:

    • Track temporary file creation and cleanup
    • Monitor disk usage in temporary directories
    • Alert on excessive temporary file accumulation

Configuration Parameters

Key configuration parameters for file management:

  • spark.shuffle.file.buffer.size - Buffer size for file I/O operations
  • spark.local.dir - Local directories for temporary files
  • spark.shuffle.spill.compress - Enable compression for spilled data
  • spark.shuffle.compress - Enable compression for shuffle files
  • spark.io.compression.codec - Compression codec to use