0
# File Compaction
1
2
File compaction provides a system for merging small files to improve performance, reduce metadata overhead, and optimize storage efficiency in distributed file systems.
3
4
## Capabilities
5
6
### FileCompactor Interface
7
8
Core interface for implementing file compaction logic.
9
10
```java { .api }
11
/**
12
* Interface for implementing file compaction operations
13
*/
14
public interface FileCompactor {
15
/**
16
* Compacts multiple input files into a single output file
17
* @param inputFiles List of input file paths to compact
18
* @param outputFile Target path for the compacted output file
19
* @throws Exception If compaction fails
20
*/
21
void compact(List<Path> inputFiles, Path outputFile) throws Exception;
22
}
23
```
24
25
### FileCompactStrategy Interface
26
27
Interface defining when and how file compaction should be triggered.
28
29
```java { .api }
30
/**
31
* Strategy interface for controlling when file compaction occurs
32
*/
33
public interface FileCompactStrategy {
34
/**
35
* Size threshold for triggering compaction
36
* @return Size threshold in bytes
37
*/
38
long getSizeThreshold();
39
40
/**
41
* Number of checkpoints before compaction is triggered
42
* @return Checkpoint count threshold
43
*/
44
int getNumCheckpointsBeforeCompaction();
45
46
/**
47
* Number of threads to use for compaction operations
48
* @return Thread count for parallel compaction
49
*/
50
int getNumCompactThreads();
51
}
52
```
53
54
### FileCompactStrategy.Builder
55
56
Builder for creating FileCompactStrategy instances with various configuration options.
57
58
```java { .api }
59
/**
60
* Builder for configuring file compaction strategies
61
*/
62
public static class Builder {
63
/**
64
* Enables compaction based on checkpoint intervals
65
* @param numCheckpoints Number of checkpoints between compaction runs
66
* @return Builder instance for chaining
67
*/
68
public Builder enableCompactionOnCheckpoint(int numCheckpoints);
69
70
/**
71
* Sets size threshold for triggering compaction
72
* @param sizeThreshold Size threshold in bytes
73
* @return Builder instance for chaining
74
*/
75
public Builder setSizeThreshold(long sizeThreshold);
76
77
/**
78
* Sets number of threads for compaction operations
79
* @param numThreads Number of compaction threads
80
* @return Builder instance for chaining
81
*/
82
public Builder setNumCompactThreads(int numThreads);
83
84
/**
85
* Builds the final FileCompactStrategy instance
86
* @return Configured FileCompactStrategy
87
*/
88
public FileCompactStrategy build();
89
}
90
```
91
92
### Built-in Compactor Implementations
93
94
Ready-to-use compactor implementations for common scenarios.
95
96
```java { .api }
97
/**
98
* Simple concatenation-based file compactor
99
* Merges files by concatenating their contents
100
*/
101
public class ConcatFileCompactor extends OutputStreamBasedFileCompactor {
102
/**
103
* Creates concatenation compactor with default configuration
104
*/
105
public ConcatFileCompactor();
106
107
/**
108
* Creates output stream for writing compacted data
109
* @param outputFile Target output file path
110
* @return OutputStream for writing compacted content
111
* @throws IOException If stream creation fails
112
*/
113
@Override
114
protected OutputStream createOutputStream(Path outputFile) throws IOException;
115
}
116
117
/**
118
* No-operation compactor that keeps files unchanged
119
* Useful for disabling compaction while maintaining interface compatibility
120
*/
121
public class IdenticalFileCompactor implements FileCompactor {
122
/**
123
* No-op compaction that simply copies first input file to output
124
* @param inputFiles Input files (only first file is copied)
125
* @param outputFile Target output file
126
* @throws IOException If file copy fails
127
*/
128
@Override
129
public void compact(List<Path> inputFiles, Path outputFile) throws IOException;
130
}
131
```
132
133
### Abstract Base Classes
134
135
Base classes for implementing custom compactors with specific patterns.
136
137
```java { .api }
138
/**
139
* Base class for output stream-based compactors
140
*/
141
public abstract class OutputStreamBasedFileCompactor implements FileCompactor {
142
/**
143
* Creates output stream for writing compacted data
144
* @param outputFile Target output file path
145
* @return OutputStream for writing
146
* @throws IOException If stream creation fails
147
*/
148
protected abstract OutputStream createOutputStream(Path outputFile) throws IOException;
149
150
/**
151
* Compacts files by reading all inputs and writing to output stream
152
* @param inputFiles Input files to compact
153
* @param outputFile Target output file
154
* @throws Exception If compaction fails
155
*/
156
@Override
157
public final void compact(List<Path> inputFiles, Path outputFile) throws Exception;
158
}
159
160
/**
161
* Base class for record-wise file compactors
162
* @param <T> Type of records being compacted
163
*/
164
public abstract class RecordWiseFileCompactor<T> implements FileCompactor {
165
/**
166
* Creates reader for reading records from input file
167
* @param inputFile Input file path
168
* @return Reader for processing records
169
* @throws IOException If reader creation fails
170
*/
171
protected abstract FileCompactReader<T> createReader(Path inputFile) throws IOException;
172
173
/**
174
* Creates writer for writing records to output file
175
* @param outputFile Output file path
176
* @return Writer for output records
177
* @throws IOException If writer creation fails
178
*/
179
protected abstract FileCompactWriter<T> createWriter(Path outputFile) throws IOException;
180
181
/**
182
* Compacts files by reading records and writing them to output
183
* @param inputFiles Input files to compact
184
* @param outputFile Target output file
185
* @throws Exception If compaction fails
186
*/
187
@Override
188
public final void compact(List<Path> inputFiles, Path outputFile) throws Exception;
189
}
190
```
191
192
**Usage Examples:**
193
194
```java
195
import org.apache.flink.connector.file.sink.FileSink;
196
import org.apache.flink.connector.file.sink.compactor.*;
197
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
198
import org.apache.flink.configuration.MemorySize;
199
import org.apache.flink.core.fs.Path;
200
201
// Basic file sink with concatenation compaction
202
FileSink<String> compactingSink = FileSink
203
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
204
.enableCompact(
205
FileCompactStrategy.builder()
206
.setSizeThreshold(MemorySize.ofMebiBytes(64).getBytes())
207
.enableCompactionOnCheckpoint(3)
208
.setNumCompactThreads(2)
209
.build(),
210
new ConcatFileCompactor())
211
.build();
212
213
// Sink with size-based compaction only
214
FileSink<String> sizeBasedSink = FileSink
215
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
216
.enableCompact(
217
FileCompactStrategy.builder()
218
.setSizeThreshold(MemorySize.ofMebiBytes(128).getBytes())
219
.build(),
220
new ConcatFileCompactor())
221
.build();
222
223
// Sink with checkpoint-based compaction
224
FileSink<String> checkpointBasedSink = FileSink
225
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
226
.enableCompact(
227
FileCompactStrategy.builder()
228
.enableCompactionOnCheckpoint(5)
229
.setNumCompactThreads(4)
230
.build(),
231
new ConcatFileCompactor())
232
.build();
233
```
234
235
### Custom Compactor Implementation
236
237
Example of implementing a custom compactor for specific file formats.
238
239
```java { .api }
240
/**
241
* Example custom compactor for JSON files with validation
242
*/
243
public class JsonFileCompactor extends RecordWiseFileCompactor<JsonNode> {
244
private final ObjectMapper objectMapper;
245
246
public JsonFileCompactor() {
247
this.objectMapper = new ObjectMapper();
248
}
249
250
@Override
251
protected FileCompactReader<JsonNode> createReader(Path inputFile) throws IOException {
252
return new JsonFileReader(inputFile, objectMapper);
253
}
254
255
@Override
256
protected FileCompactWriter<JsonNode> createWriter(Path outputFile) throws IOException {
257
return new JsonFileWriter(outputFile, objectMapper);
258
}
259
260
private static class JsonFileReader implements FileCompactReader<JsonNode> {
261
private final BufferedReader reader;
262
private final ObjectMapper mapper;
263
264
public JsonFileReader(Path inputFile, ObjectMapper mapper) throws IOException {
265
FileSystem fs = inputFile.getFileSystem();
266
this.reader = new BufferedReader(new InputStreamReader(fs.open(inputFile)));
267
this.mapper = mapper;
268
}
269
270
@Override
271
public JsonNode read() throws IOException {
272
String line = reader.readLine();
273
if (line == null) {
274
return null;
275
}
276
return mapper.readTree(line);
277
}
278
279
@Override
280
public void close() throws IOException {
281
reader.close();
282
}
283
}
284
285
private static class JsonFileWriter implements FileCompactWriter<JsonNode> {
286
private final BufferedWriter writer;
287
private final ObjectMapper mapper;
288
289
public JsonFileWriter(Path outputFile, ObjectMapper mapper) throws IOException {
290
FileSystem fs = outputFile.getFileSystem();
291
this.writer = new BufferedWriter(new OutputStreamWriter(fs.create(outputFile, true)));
292
this.mapper = mapper;
293
}
294
295
@Override
296
public void write(JsonNode record) throws IOException {
297
writer.write(mapper.writeValueAsString(record));
298
writer.newLine();
299
}
300
301
@Override
302
public void close() throws IOException {
303
writer.close();
304
}
305
}
306
}
307
```
308
309
### Compaction Operators and Coordination
310
311
Internal components that manage the compaction process in distributed environments.
312
313
```java { .api }
314
/**
315
* Coordinator for managing compaction across distributed nodes
316
*/
317
public class CompactCoordinator {
318
// Internal implementation for distributed compaction coordination
319
}
320
321
/**
322
* Operator that performs actual compaction work
323
*/
324
public class CompactorOperator {
325
// Internal implementation for compaction execution
326
}
327
328
/**
329
* Service interface for compaction operations
330
*/
331
public interface CompactService {
332
/**
333
* Submits files for compaction
334
* @param filesToCompact List of files to be compacted
335
* @param targetFile Target output file for compaction result
336
*/
337
void submitCompaction(List<Path> filesToCompact, Path targetFile);
338
}
339
```
340
341
### Integration with Decoder-Based Reading
342
343
Support for compaction with custom decoders for complex file formats.
344
345
```java { .api }
346
/**
347
* Decoder-based reader for compaction operations
348
*/
349
public class DecoderBasedReader<T> {
350
/**
351
* Creates decoder-based reader for custom formats
352
* @param decoder Decoder for reading records
353
* @param inputStream Input stream to read from
354
*/
355
public DecoderBasedReader(Decoder<T> decoder, InputStream inputStream);
356
357
/**
358
* Reads next record using the configured decoder
359
* @return Next decoded record, or null if no more records
360
* @throws IOException If reading fails
361
*/
362
public T read() throws IOException;
363
}
364
365
/**
366
* Simple string decoder for text-based formats
367
*/
368
public class SimpleStringDecoder implements Decoder<String> {
369
/**
370
* Decodes string from input stream
371
* @param inputStream Input stream to decode from
372
* @return Decoded string
373
* @throws IOException If decoding fails
374
*/
375
@Override
376
public String decode(InputStream inputStream) throws IOException;
377
}
378
```
379
380
**Advanced Usage Examples:**
381
382
```java
383
// JSON compaction with custom logic
384
FileSink<String> jsonSink = FileSink
385
.forRowFormat(new Path("/json-output"), new SimpleStringEncoder<String>("UTF-8"))
386
.enableCompact(
387
FileCompactStrategy.builder()
388
.setSizeThreshold(MemorySize.ofMebiBytes(32).getBytes())
389
.enableCompactionOnCheckpoint(2)
390
.build(),
391
new JsonFileCompactor())
392
.build();
393
394
// Compaction with high parallelism for large files
395
FileSink<String> highThroughputSink = FileSink
396
.forRowFormat(new Path("/high-volume"), new SimpleStringEncoder<String>("UTF-8"))
397
.enableCompact(
398
FileCompactStrategy.builder()
399
.setSizeThreshold(MemorySize.ofMebiBytes(256).getBytes())
400
.setNumCompactThreads(8)
401
.build(),
402
new ConcatFileCompactor())
403
.build();
404
405
// Conditional compaction based on environment
406
FileCompactor compactor = isProductionEnvironment()
407
? new ConcatFileCompactor()
408
: new IdenticalFileCompactor(); // Disable in development
409
410
FileSink<String> conditionalSink = FileSink
411
.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))
412
.enableCompact(
413
FileCompactStrategy.builder()
414
.setSizeThreshold(MemorySize.ofMebiBytes(64).getBytes())
415
.build(),
416
compactor)
417
.build();
418
```
419
420
## Error Handling
421
422
File compaction handles various error conditions during the compaction process:
423
424
- **IOException**: File system I/O errors during compaction
425
- **OutOfMemoryError**: Insufficient memory for compaction operations
426
- **SecurityException**: Permission errors accessing files
427
- **RuntimeException**: Format-specific compaction errors
428
429
```java
430
try {
431
FileCompactor compactor = new ConcatFileCompactor();
432
List<Path> inputFiles = Arrays.asList(
433
new Path("/data/file1.txt"),
434
new Path("/data/file2.txt")
435
);
436
compactor.compact(inputFiles, new Path("/data/compacted.txt"));
437
} catch (IOException e) {
438
// Handle I/O errors
439
} catch (SecurityException e) {
440
// Handle permission errors
441
} catch (Exception e) {
442
// Handle other compaction errors
443
}
444
```
445
446
## Performance Considerations
447
448
- Set appropriate size thresholds to balance compaction frequency and efficiency
449
- Use multiple compaction threads for I/O intensive operations
450
- Consider file format characteristics when choosing compaction strategies
451
- Monitor compaction performance and adjust thread counts based on system resources
452
- Balance between checkpoint-based and size-based compaction triggers
453
- Implement efficient custom compactors for specialized file formats
454
- Consider network and storage costs when compacting in distributed environments
455
- Monitor the impact of compaction on job performance and adjust triggers accordingly