CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-files

Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.

Pending
Overview
Eval results
Files

file-compaction.mddocs/

File Compaction

File compaction provides a system for merging small files to improve performance, reduce metadata overhead, and optimize storage efficiency in distributed file systems.

Capabilities

FileCompactor Interface

Core interface for implementing file compaction logic.

/**
 * Interface for implementing file compaction operations
 */
public interface FileCompactor {
    /**
     * Compacts multiple input files into a single output file
     * @param inputFiles List of input file paths to compact
     * @param outputFile Target path for the compacted output file
     * @throws Exception If compaction fails
     */
    void compact(List<Path> inputFiles, Path outputFile) throws Exception;
}

FileCompactStrategy Interface

Interface defining when and how file compaction should be triggered.

/**
 * Strategy interface for controlling when file compaction occurs
 */
public interface FileCompactStrategy {
    /**
     * Size threshold for triggering compaction
     * @return Size threshold in bytes
     */
    long getSizeThreshold();
    
    /**
     * Number of checkpoints before compaction is triggered
     * @return Checkpoint count threshold
     */
    int getNumCheckpointsBeforeCompaction();
    
    /**
     * Number of threads to use for compaction operations
     * @return Thread count for parallel compaction
     */
    int getNumCompactThreads();
}

FileCompactStrategy.Builder

Builder for creating FileCompactStrategy instances with various configuration options.

/**
 * Builder for configuring file compaction strategies
 */
public static class Builder {
    /**
     * Enables compaction based on checkpoint intervals
     * @param numCheckpoints Number of checkpoints between compaction runs
     * @return Builder instance for chaining
     */
    public Builder enableCompactionOnCheckpoint(int numCheckpoints);
    
    /**
     * Sets size threshold for triggering compaction
     * @param sizeThreshold Size threshold in bytes
     * @return Builder instance for chaining
     */
    public Builder setSizeThreshold(long sizeThreshold);
    
    /**
     * Sets number of threads for compaction operations
     * @param numThreads Number of compaction threads
     * @return Builder instance for chaining
     */
    public Builder setNumCompactThreads(int numThreads);
    
    /**
     * Builds the final FileCompactStrategy instance
     * @return Configured FileCompactStrategy
     */
    public FileCompactStrategy build();
}

Built-in Compactor Implementations

Ready-to-use compactor implementations for common scenarios.

/**
 * Simple concatenation-based file compactor
 * Merges files by concatenating their contents
 */
public class ConcatFileCompactor extends OutputStreamBasedFileCompactor {
    /**
     * Creates concatenation compactor with default configuration
     */
    public ConcatFileCompactor();
    
    /**
     * Creates output stream for writing compacted data
     * @param outputFile Target output file path
     * @return OutputStream for writing compacted content
     * @throws IOException If stream creation fails
     */
    @Override
    protected OutputStream createOutputStream(Path outputFile) throws IOException;
}

/**
 * No-operation compactor that keeps files unchanged
 * Useful for disabling compaction while maintaining interface compatibility
 */
public class IdenticalFileCompactor implements FileCompactor {
    /**
     * No-op compaction that simply copies first input file to output
     * @param inputFiles Input files (only first file is copied)
     * @param outputFile Target output file
     * @throws IOException If file copy fails
     */
    @Override
    public void compact(List<Path> inputFiles, Path outputFile) throws IOException;
}

Abstract Base Classes

Base classes for implementing custom compactors with specific patterns.

/**
 * Base class for output stream-based compactors
 */
public abstract class OutputStreamBasedFileCompactor implements FileCompactor {
    /**
     * Creates output stream for writing compacted data
     * @param outputFile Target output file path
     * @return OutputStream for writing
     * @throws IOException If stream creation fails
     */
    protected abstract OutputStream createOutputStream(Path outputFile) throws IOException;
    
    /**
     * Compacts files by reading all inputs and writing to output stream
     * @param inputFiles Input files to compact
     * @param outputFile Target output file
     * @throws Exception If compaction fails
     */
    @Override
    public final void compact(List<Path> inputFiles, Path outputFile) throws Exception;
}

/**
 * Base class for record-wise file compactors
 * @param <T> Type of records being compacted
 */
public abstract class RecordWiseFileCompactor<T> implements FileCompactor {
    /**
     * Creates reader for reading records from input file
     * @param inputFile Input file path
     * @return Reader for processing records
     * @throws IOException If reader creation fails
     */
    protected abstract FileCompactReader<T> createReader(Path inputFile) throws IOException;
    
    /**
     * Creates writer for writing records to output file  
     * @param outputFile Output file path
     * @return Writer for output records
     * @throws IOException If writer creation fails
     */
    protected abstract FileCompactWriter<T> createWriter(Path outputFile) throws IOException;
    
    /**
     * Compacts files by reading records and writing them to output
     * @param inputFiles Input files to compact
     * @param outputFile Target output file
     * @throws Exception If compaction fails
     */
    @Override
    public final void compact(List<Path> inputFiles, Path outputFile) throws Exception;
}

Usage Examples:

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.compactor.*;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;

// Basic file sink with concatenation compaction
FileSink<String> compactingSink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
    .enableCompact(
        FileCompactStrategy.builder()
            .setSizeThreshold(MemorySize.ofMebiBytes(64).getBytes())
            .enableCompactionOnCheckpoint(3)
            .setNumCompactThreads(2)
            .build(),
        new ConcatFileCompactor())
    .build();

// Sink with size-based compaction only
FileSink<String> sizeBasedSink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
    .enableCompact(
        FileCompactStrategy.builder()
            .setSizeThreshold(MemorySize.ofMebiBytes(128).getBytes())
            .build(),
        new ConcatFileCompactor())
    .build();

// Sink with checkpoint-based compaction
FileSink<String> checkpointBasedSink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
    .enableCompact(
        FileCompactStrategy.builder()
            .enableCompactionOnCheckpoint(5)
            .setNumCompactThreads(4)
            .build(),
        new ConcatFileCompactor())
    .build();

Custom Compactor Implementation

Example of implementing a custom compactor for specific file formats.

/**
 * Example custom compactor for JSON files with validation
 */
public class JsonFileCompactor extends RecordWiseFileCompactor<JsonNode> {
    private final ObjectMapper objectMapper;
    
    public JsonFileCompactor() {
        this.objectMapper = new ObjectMapper();
    }
    
    @Override
    protected FileCompactReader<JsonNode> createReader(Path inputFile) throws IOException {
        return new JsonFileReader(inputFile, objectMapper);
    }
    
    @Override
    protected FileCompactWriter<JsonNode> createWriter(Path outputFile) throws IOException {
        return new JsonFileWriter(outputFile, objectMapper);
    }
    
    private static class JsonFileReader implements FileCompactReader<JsonNode> {
        private final BufferedReader reader;
        private final ObjectMapper mapper;
        
        public JsonFileReader(Path inputFile, ObjectMapper mapper) throws IOException {
            FileSystem fs = inputFile.getFileSystem();
            this.reader = new BufferedReader(new InputStreamReader(fs.open(inputFile)));
            this.mapper = mapper;
        }
        
        @Override
        public JsonNode read() throws IOException {
            String line = reader.readLine();
            if (line == null) {
                return null;
            }
            return mapper.readTree(line);
        }
        
        @Override
        public void close() throws IOException {
            reader.close();
        }
    }
    
    private static class JsonFileWriter implements FileCompactWriter<JsonNode> {
        private final BufferedWriter writer;
        private final ObjectMapper mapper;
        
        public JsonFileWriter(Path outputFile, ObjectMapper mapper) throws IOException {
            FileSystem fs = outputFile.getFileSystem();
            this.writer = new BufferedWriter(new OutputStreamWriter(fs.create(outputFile, true)));
            this.mapper = mapper;
        }
        
        @Override
        public void write(JsonNode record) throws IOException {
            writer.write(mapper.writeValueAsString(record));
            writer.newLine();
        }
        
        @Override
        public void close() throws IOException {
            writer.close();
        }
    }
}

Compaction Operators and Coordination

Internal components that manage the compaction process in distributed environments.

/**
 * Coordinator for managing compaction across distributed nodes
 */
public class CompactCoordinator {
    // Internal implementation for distributed compaction coordination
}

/**
 * Operator that performs actual compaction work
 */
public class CompactorOperator {
    // Internal implementation for compaction execution
}

/**
 * Service interface for compaction operations
 */
public interface CompactService {
    /**
     * Submits files for compaction
     * @param filesToCompact List of files to be compacted
     * @param targetFile Target output file for compaction result
     */
    void submitCompaction(List<Path> filesToCompact, Path targetFile);
}

Integration with Decoder-Based Reading

Support for compaction with custom decoders for complex file formats.

/**
 * Decoder-based reader for compaction operations
 */
public class DecoderBasedReader<T> {
    /**
     * Creates decoder-based reader for custom formats
     * @param decoder Decoder for reading records
     * @param inputStream Input stream to read from
     */
    public DecoderBasedReader(Decoder<T> decoder, InputStream inputStream);
    
    /**
     * Reads next record using the configured decoder
     * @return Next decoded record, or null if no more records
     * @throws IOException If reading fails
     */
    public T read() throws IOException;
}

/**
 * Simple string decoder for text-based formats
 */
public class SimpleStringDecoder implements Decoder<String> {
    /**
     * Decodes string from input stream
     * @param inputStream Input stream to decode from
     * @return Decoded string
     * @throws IOException If decoding fails
     */
    @Override
    public String decode(InputStream inputStream) throws IOException;
}

Advanced Usage Examples:

// JSON compaction with custom logic
FileSink<String> jsonSink = FileSink
    .forRowFormat(new Path("/json-output"), new SimpleStringEncoder<String>("UTF-8"))
    .enableCompact(
        FileCompactStrategy.builder()
            .setSizeThreshold(MemorySize.ofMebiBytes(32).getBytes())
            .enableCompactionOnCheckpoint(2)
            .build(),
        new JsonFileCompactor())
    .build();

// Compaction with high parallelism for large files
FileSink<String> highThroughputSink = FileSink
    .forRowFormat(new Path("/high-volume"), new SimpleStringEncoder<String>("UTF-8"))
    .enableCompact(
        FileCompactStrategy.builder()
            .setSizeThreshold(MemorySize.ofMebiBytes(256).getBytes())
            .setNumCompactThreads(8)
            .build(),
        new ConcatFileCompactor())
    .build();

// Conditional compaction based on environment
FileCompactor compactor = isProductionEnvironment() 
    ? new ConcatFileCompactor()
    : new IdenticalFileCompactor(); // Disable in development

FileSink<String> conditionalSink = FileSink
    .forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
    .enableCompact(
        FileCompactStrategy.builder()
            .setSizeThreshold(MemorySize.ofMebiBytes(64).getBytes())
            .build(),
        compactor)
    .build();

Error Handling

File compaction handles various error conditions during the compaction process:

  • IOException: File system I/O errors during compaction
  • OutOfMemoryError: Insufficient memory for compaction operations
  • SecurityException: Permission errors accessing files
  • RuntimeException: Format-specific compaction errors
try {
    FileCompactor compactor = new ConcatFileCompactor();
    List<Path> inputFiles = Arrays.asList(
        new Path("/data/file1.txt"),
        new Path("/data/file2.txt")
    );
    compactor.compact(inputFiles, new Path("/data/compacted.txt"));
} catch (IOException e) {
    // Handle I/O errors
} catch (SecurityException e) {
    // Handle permission errors
} catch (Exception e) {
    // Handle other compaction errors
}

Performance Considerations

  • Set appropriate size thresholds to balance compaction frequency and efficiency
  • Use multiple compaction threads for I/O intensive operations
  • Consider file format characteristics when choosing compaction strategies
  • Monitor compaction performance and adjust thread counts based on system resources
  • Balance between checkpoint-based and size-based compaction triggers
  • Implement efficient custom compactors for specialized file formats
  • Consider network and storage costs when compacting in distributed environments
  • Monitor the impact of compaction on job performance and adjust triggers accordingly

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-files

docs

bulk-formats.md

file-compaction.md

file-enumeration.md

file-sinks.md

file-sources.md

index.md

split-assignment.md

stream-formats.md

tile.json