Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.
—
File compaction provides a system for merging small files to improve performance, reduce metadata overhead, and optimize storage efficiency in distributed file systems.
Core interface for implementing file compaction logic.
/**
* Interface for implementing file compaction operations
*/
public interface FileCompactor {
/**
* Compacts multiple input files into a single output file
* @param inputFiles List of input file paths to compact
* @param outputFile Target path for the compacted output file
* @throws Exception If compaction fails
*/
void compact(List<Path> inputFiles, Path outputFile) throws Exception;
}Interface defining when and how file compaction should be triggered.
/**
* Strategy interface for controlling when file compaction occurs
*/
public interface FileCompactStrategy {
/**
* Size threshold for triggering compaction
* @return Size threshold in bytes
*/
long getSizeThreshold();
/**
* Number of checkpoints before compaction is triggered
* @return Checkpoint count threshold
*/
int getNumCheckpointsBeforeCompaction();
/**
* Number of threads to use for compaction operations
* @return Thread count for parallel compaction
*/
int getNumCompactThreads();
}Builder for creating FileCompactStrategy instances with various configuration options.
/**
* Builder for configuring file compaction strategies
*/
public static class Builder {
/**
* Enables compaction based on checkpoint intervals
* @param numCheckpoints Number of checkpoints between compaction runs
* @return Builder instance for chaining
*/
public Builder enableCompactionOnCheckpoint(int numCheckpoints);
/**
* Sets size threshold for triggering compaction
* @param sizeThreshold Size threshold in bytes
* @return Builder instance for chaining
*/
public Builder setSizeThreshold(long sizeThreshold);
/**
* Sets number of threads for compaction operations
* @param numThreads Number of compaction threads
* @return Builder instance for chaining
*/
public Builder setNumCompactThreads(int numThreads);
/**
* Builds the final FileCompactStrategy instance
* @return Configured FileCompactStrategy
*/
public FileCompactStrategy build();
}Ready-to-use compactor implementations for common scenarios.
/**
* Simple concatenation-based file compactor
* Merges files by concatenating their contents
*/
public class ConcatFileCompactor extends OutputStreamBasedFileCompactor {
/**
* Creates concatenation compactor with default configuration
*/
public ConcatFileCompactor();
/**
* Creates output stream for writing compacted data
* @param outputFile Target output file path
* @return OutputStream for writing compacted content
* @throws IOException If stream creation fails
*/
@Override
protected OutputStream createOutputStream(Path outputFile) throws IOException;
}
/**
* No-operation compactor that keeps files unchanged
* Useful for disabling compaction while maintaining interface compatibility
*/
public class IdenticalFileCompactor implements FileCompactor {
/**
* No-op compaction that simply copies first input file to output
* @param inputFiles Input files (only first file is copied)
* @param outputFile Target output file
* @throws IOException If file copy fails
*/
@Override
public void compact(List<Path> inputFiles, Path outputFile) throws IOException;
}Base classes for implementing custom compactors with specific patterns.
/**
* Base class for output stream-based compactors
*/
public abstract class OutputStreamBasedFileCompactor implements FileCompactor {
/**
* Creates output stream for writing compacted data
* @param outputFile Target output file path
* @return OutputStream for writing
* @throws IOException If stream creation fails
*/
protected abstract OutputStream createOutputStream(Path outputFile) throws IOException;
/**
* Compacts files by reading all inputs and writing to output stream
* @param inputFiles Input files to compact
* @param outputFile Target output file
* @throws Exception If compaction fails
*/
@Override
public final void compact(List<Path> inputFiles, Path outputFile) throws Exception;
}
/**
* Base class for record-wise file compactors
* @param <T> Type of records being compacted
*/
public abstract class RecordWiseFileCompactor<T> implements FileCompactor {
/**
* Creates reader for reading records from input file
* @param inputFile Input file path
* @return Reader for processing records
* @throws IOException If reader creation fails
*/
protected abstract FileCompactReader<T> createReader(Path inputFile) throws IOException;
/**
* Creates writer for writing records to output file
* @param outputFile Output file path
* @return Writer for output records
* @throws IOException If writer creation fails
*/
protected abstract FileCompactWriter<T> createWriter(Path outputFile) throws IOException;
/**
* Compacts files by reading records and writing them to output
* @param inputFiles Input files to compact
* @param outputFile Target output file
* @throws Exception If compaction fails
*/
@Override
public final void compact(List<Path> inputFiles, Path outputFile) throws Exception;
}Usage Examples:
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.sink.compactor.*;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.fs.Path;
// Basic file sink with concatenation compaction
FileSink<String> compactingSink = FileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
.enableCompact(
FileCompactStrategy.builder()
.setSizeThreshold(MemorySize.ofMebiBytes(64).getBytes())
.enableCompactionOnCheckpoint(3)
.setNumCompactThreads(2)
.build(),
new ConcatFileCompactor())
.build();
// Sink with size-based compaction only
FileSink<String> sizeBasedSink = FileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
.enableCompact(
FileCompactStrategy.builder()
.setSizeThreshold(MemorySize.ofMebiBytes(128).getBytes())
.build(),
new ConcatFileCompactor())
.build();
// Sink with checkpoint-based compaction
FileSink<String> checkpointBasedSink = FileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
.enableCompact(
FileCompactStrategy.builder()
.enableCompactionOnCheckpoint(5)
.setNumCompactThreads(4)
.build(),
new ConcatFileCompactor())
.build();Example of implementing a custom compactor for specific file formats.
/**
* Example custom compactor for JSON files with validation
*/
public class JsonFileCompactor extends RecordWiseFileCompactor<JsonNode> {
private final ObjectMapper objectMapper;
public JsonFileCompactor() {
this.objectMapper = new ObjectMapper();
}
@Override
protected FileCompactReader<JsonNode> createReader(Path inputFile) throws IOException {
return new JsonFileReader(inputFile, objectMapper);
}
@Override
protected FileCompactWriter<JsonNode> createWriter(Path outputFile) throws IOException {
return new JsonFileWriter(outputFile, objectMapper);
}
private static class JsonFileReader implements FileCompactReader<JsonNode> {
private final BufferedReader reader;
private final ObjectMapper mapper;
public JsonFileReader(Path inputFile, ObjectMapper mapper) throws IOException {
FileSystem fs = inputFile.getFileSystem();
this.reader = new BufferedReader(new InputStreamReader(fs.open(inputFile)));
this.mapper = mapper;
}
@Override
public JsonNode read() throws IOException {
String line = reader.readLine();
if (line == null) {
return null;
}
return mapper.readTree(line);
}
@Override
public void close() throws IOException {
reader.close();
}
}
private static class JsonFileWriter implements FileCompactWriter<JsonNode> {
private final BufferedWriter writer;
private final ObjectMapper mapper;
public JsonFileWriter(Path outputFile, ObjectMapper mapper) throws IOException {
FileSystem fs = outputFile.getFileSystem();
this.writer = new BufferedWriter(new OutputStreamWriter(fs.create(outputFile, true)));
this.mapper = mapper;
}
@Override
public void write(JsonNode record) throws IOException {
writer.write(mapper.writeValueAsString(record));
writer.newLine();
}
@Override
public void close() throws IOException {
writer.close();
}
}
}Internal components that manage the compaction process in distributed environments.
/**
* Coordinator for managing compaction across distributed nodes
*/
public class CompactCoordinator {
// Internal implementation for distributed compaction coordination
}
/**
* Operator that performs actual compaction work
*/
public class CompactorOperator {
// Internal implementation for compaction execution
}
/**
* Service interface for compaction operations
*/
public interface CompactService {
/**
* Submits files for compaction
* @param filesToCompact List of files to be compacted
* @param targetFile Target output file for compaction result
*/
void submitCompaction(List<Path> filesToCompact, Path targetFile);
}Support for compaction with custom decoders for complex file formats.
/**
* Decoder-based reader for compaction operations
*/
public class DecoderBasedReader<T> {
/**
* Creates decoder-based reader for custom formats
* @param decoder Decoder for reading records
* @param inputStream Input stream to read from
*/
public DecoderBasedReader(Decoder<T> decoder, InputStream inputStream);
/**
* Reads next record using the configured decoder
* @return Next decoded record, or null if no more records
* @throws IOException If reading fails
*/
public T read() throws IOException;
}
/**
* Simple string decoder for text-based formats
*/
public class SimpleStringDecoder implements Decoder<String> {
/**
* Decodes string from input stream
* @param inputStream Input stream to decode from
* @return Decoded string
* @throws IOException If decoding fails
*/
@Override
public String decode(InputStream inputStream) throws IOException;
}Advanced Usage Examples:
// JSON compaction with custom logic
FileSink<String> jsonSink = FileSink
.forRowFormat(new Path("/json-output"), new SimpleStringEncoder<String>("UTF-8"))
.enableCompact(
FileCompactStrategy.builder()
.setSizeThreshold(MemorySize.ofMebiBytes(32).getBytes())
.enableCompactionOnCheckpoint(2)
.build(),
new JsonFileCompactor())
.build();
// Compaction with high parallelism for large files
FileSink<String> highThroughputSink = FileSink
.forRowFormat(new Path("/high-volume"), new SimpleStringEncoder<String>("UTF-8"))
.enableCompact(
FileCompactStrategy.builder()
.setSizeThreshold(MemorySize.ofMebiBytes(256).getBytes())
.setNumCompactThreads(8)
.build(),
new ConcatFileCompactor())
.build();
// Conditional compaction based on environment
FileCompactor compactor = isProductionEnvironment()
? new ConcatFileCompactor()
: new IdenticalFileCompactor(); // Disable in development
FileSink<String> conditionalSink = FileSink
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
.enableCompact(
FileCompactStrategy.builder()
.setSizeThreshold(MemorySize.ofMebiBytes(64).getBytes())
.build(),
compactor)
.build();File compaction handles various error conditions during the compaction process:
try {
FileCompactor compactor = new ConcatFileCompactor();
List<Path> inputFiles = Arrays.asList(
new Path("/data/file1.txt"),
new Path("/data/file2.txt")
);
compactor.compact(inputFiles, new Path("/data/compacted.txt"));
} catch (IOException e) {
// Handle I/O errors
} catch (SecurityException e) {
// Handle permission errors
} catch (Exception e) {
// Handle other compaction errors
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-files