or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bulk-formats.mdfile-compaction.mdfile-enumeration.mdfile-sinks.mdfile-sources.mdindex.mdsplit-assignment.mdstream-formats.md

file-compaction.mddocs/

0

# File Compaction

1

2

File compaction provides a system for merging small files to improve performance, reduce metadata overhead, and optimize storage efficiency in distributed file systems.

3

4

## Capabilities

5

6

### FileCompactor Interface

7

8

Core interface for implementing file compaction logic.

9

10

```java { .api }

11

/**

12

* Interface for implementing file compaction operations

13

*/

14

public interface FileCompactor {

15

/**

16

* Compacts multiple input files into a single output file

17

* @param inputFiles List of input file paths to compact

18

* @param outputFile Target path for the compacted output file

19

* @throws Exception If compaction fails

20

*/

21

void compact(List<Path> inputFiles, Path outputFile) throws Exception;

22

}

23

```

24

25

### FileCompactStrategy Interface

26

27

Interface defining when and how file compaction should be triggered.

28

29

```java { .api }

30

/**

31

* Strategy interface for controlling when file compaction occurs

32

*/

33

public interface FileCompactStrategy {

34

/**

35

* Size threshold for triggering compaction

36

* @return Size threshold in bytes

37

*/

38

long getSizeThreshold();

39

40

/**

41

* Number of checkpoints before compaction is triggered

42

* @return Checkpoint count threshold

43

*/

44

int getNumCheckpointsBeforeCompaction();

45

46

/**

47

* Number of threads to use for compaction operations

48

* @return Thread count for parallel compaction

49

*/

50

int getNumCompactThreads();

51

}

52

```

53

54

### FileCompactStrategy.Builder

55

56

Builder for creating FileCompactStrategy instances with various configuration options.

57

58

```java { .api }

59

/**

60

* Builder for configuring file compaction strategies

61

*/

62

public static class Builder {

63

/**

64

* Enables compaction based on checkpoint intervals

65

* @param numCheckpoints Number of checkpoints between compaction runs

66

* @return Builder instance for chaining

67

*/

68

public Builder enableCompactionOnCheckpoint(int numCheckpoints);

69

70

/**

71

* Sets size threshold for triggering compaction

72

* @param sizeThreshold Size threshold in bytes

73

* @return Builder instance for chaining

74

*/

75

public Builder setSizeThreshold(long sizeThreshold);

76

77

/**

78

* Sets number of threads for compaction operations

79

* @param numThreads Number of compaction threads

80

* @return Builder instance for chaining

81

*/

82

public Builder setNumCompactThreads(int numThreads);

83

84

/**

85

* Builds the final FileCompactStrategy instance

86

* @return Configured FileCompactStrategy

87

*/

88

public FileCompactStrategy build();

89

}

90

```

91

92

### Built-in Compactor Implementations

93

94

Ready-to-use compactor implementations for common scenarios.

95

96

```java { .api }

97

/**

98

* Simple concatenation-based file compactor

99

* Merges files by concatenating their contents

100

*/

101

public class ConcatFileCompactor extends OutputStreamBasedFileCompactor {

102

/**

103

* Creates concatenation compactor with default configuration

104

*/

105

public ConcatFileCompactor();

106

107

/**

108

* Creates output stream for writing compacted data

109

* @param outputFile Target output file path

110

* @return OutputStream for writing compacted content

111

* @throws IOException If stream creation fails

112

*/

113

@Override

114

protected OutputStream createOutputStream(Path outputFile) throws IOException;

115

}

116

117

/**

118

* No-operation compactor that keeps files unchanged

119

* Useful for disabling compaction while maintaining interface compatibility

120

*/

121

public class IdenticalFileCompactor implements FileCompactor {

122

/**

123

* No-op compaction that simply copies first input file to output

124

* @param inputFiles Input files (only first file is copied)

125

* @param outputFile Target output file

126

* @throws IOException If file copy fails

127

*/

128

@Override

129

public void compact(List<Path> inputFiles, Path outputFile) throws IOException;

130

}

131

```

132

133

### Abstract Base Classes

134

135

Base classes for implementing custom compactors with specific patterns.

136

137

```java { .api }

138

/**

139

* Base class for output stream-based compactors

140

*/

141

public abstract class OutputStreamBasedFileCompactor implements FileCompactor {

142

/**

143

* Creates output stream for writing compacted data

144

* @param outputFile Target output file path

145

* @return OutputStream for writing

146

* @throws IOException If stream creation fails

147

*/

148

protected abstract OutputStream createOutputStream(Path outputFile) throws IOException;

149

150

/**

151

* Compacts files by reading all inputs and writing to output stream

152

* @param inputFiles Input files to compact

153

* @param outputFile Target output file

154

* @throws Exception If compaction fails

155

*/

156

@Override

157

public final void compact(List<Path> inputFiles, Path outputFile) throws Exception;

158

}

159

160

/**

161

* Base class for record-wise file compactors

162

* @param <T> Type of records being compacted

163

*/

164

public abstract class RecordWiseFileCompactor<T> implements FileCompactor {

165

/**

166

* Creates reader for reading records from input file

167

* @param inputFile Input file path

168

* @return Reader for processing records

169

* @throws IOException If reader creation fails

170

*/

171

protected abstract FileCompactReader<T> createReader(Path inputFile) throws IOException;

172

173

/**

174

* Creates writer for writing records to output file

175

* @param outputFile Output file path

176

* @return Writer for output records

177

* @throws IOException If writer creation fails

178

*/

179

protected abstract FileCompactWriter<T> createWriter(Path outputFile) throws IOException;

180

181

/**

182

* Compacts files by reading records and writing them to output

183

* @param inputFiles Input files to compact

184

* @param outputFile Target output file

185

* @throws Exception If compaction fails

186

*/

187

@Override

188

public final void compact(List<Path> inputFiles, Path outputFile) throws Exception;

189

}

190

```

191

192

**Usage Examples:**

193

194

```java

195

import org.apache.flink.connector.file.sink.FileSink;

196

import org.apache.flink.connector.file.sink.compactor.*;

197

import org.apache.flink.api.common.serialization.SimpleStringEncoder;

198

import org.apache.flink.configuration.MemorySize;

199

import org.apache.flink.core.fs.Path;

200

201

// Basic file sink with concatenation compaction

202

FileSink<String> compactingSink = FileSink

203

.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))

204

.enableCompact(

205

FileCompactStrategy.builder()

206

.setSizeThreshold(MemorySize.ofMebiBytes(64).getBytes())

207

.enableCompactionOnCheckpoint(3)

208

.setNumCompactThreads(2)

209

.build(),

210

new ConcatFileCompactor())

211

.build();

212

213

// Sink with size-based compaction only

214

FileSink<String> sizeBasedSink = FileSink

215

.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))

216

.enableCompact(

217

FileCompactStrategy.builder()

218

.setSizeThreshold(MemorySize.ofMebiBytes(128).getBytes())

219

.build(),

220

new ConcatFileCompactor())

221

.build();

222

223

// Sink with checkpoint-based compaction

224

FileSink<String> checkpointBasedSink = FileSink

225

.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))

226

.enableCompact(

227

FileCompactStrategy.builder()

228

.enableCompactionOnCheckpoint(5)

229

.setNumCompactThreads(4)

230

.build(),

231

new ConcatFileCompactor())

232

.build();

233

```

234

235

### Custom Compactor Implementation

236

237

Example of implementing a custom compactor for specific file formats.

238

239

```java { .api }

240

/**

241

* Example custom compactor for JSON files with validation

242

*/

243

public class JsonFileCompactor extends RecordWiseFileCompactor<JsonNode> {

244

private final ObjectMapper objectMapper;

245

246

public JsonFileCompactor() {

247

this.objectMapper = new ObjectMapper();

248

}

249

250

@Override

251

protected FileCompactReader<JsonNode> createReader(Path inputFile) throws IOException {

252

return new JsonFileReader(inputFile, objectMapper);

253

}

254

255

@Override

256

protected FileCompactWriter<JsonNode> createWriter(Path outputFile) throws IOException {

257

return new JsonFileWriter(outputFile, objectMapper);

258

}

259

260

private static class JsonFileReader implements FileCompactReader<JsonNode> {

261

private final BufferedReader reader;

262

private final ObjectMapper mapper;

263

264

public JsonFileReader(Path inputFile, ObjectMapper mapper) throws IOException {

265

FileSystem fs = inputFile.getFileSystem();

266

this.reader = new BufferedReader(new InputStreamReader(fs.open(inputFile)));

267

this.mapper = mapper;

268

}

269

270

@Override

271

public JsonNode read() throws IOException {

272

String line = reader.readLine();

273

if (line == null) {

274

return null;

275

}

276

return mapper.readTree(line);

277

}

278

279

@Override

280

public void close() throws IOException {

281

reader.close();

282

}

283

}

284

285

private static class JsonFileWriter implements FileCompactWriter<JsonNode> {

286

private final BufferedWriter writer;

287

private final ObjectMapper mapper;

288

289

public JsonFileWriter(Path outputFile, ObjectMapper mapper) throws IOException {

290

FileSystem fs = outputFile.getFileSystem();

291

this.writer = new BufferedWriter(new OutputStreamWriter(fs.create(outputFile, true)));

292

this.mapper = mapper;

293

}

294

295

@Override

296

public void write(JsonNode record) throws IOException {

297

writer.write(mapper.writeValueAsString(record));

298

writer.newLine();

299

}

300

301

@Override

302

public void close() throws IOException {

303

writer.close();

304

}

305

}

306

}

307

```

308

309

### Compaction Operators and Coordination

310

311

Internal components that manage the compaction process in distributed environments.

312

313

```java { .api }

314

/**

315

* Coordinator for managing compaction across distributed nodes

316

*/

317

public class CompactCoordinator {

318

// Internal implementation for distributed compaction coordination

319

}

320

321

/**

322

* Operator that performs actual compaction work

323

*/

324

public class CompactorOperator {

325

// Internal implementation for compaction execution

326

}

327

328

/**

329

* Service interface for compaction operations

330

*/

331

public interface CompactService {

332

/**

333

* Submits files for compaction

334

* @param filesToCompact List of files to be compacted

335

* @param targetFile Target output file for compaction result

336

*/

337

void submitCompaction(List<Path> filesToCompact, Path targetFile);

338

}

339

```

340

341

### Integration with Decoder-Based Reading

342

343

Support for compaction with custom decoders for complex file formats.

344

345

```java { .api }

346

/**

347

* Decoder-based reader for compaction operations

348

*/

349

public class DecoderBasedReader<T> {

350

/**

351

* Creates decoder-based reader for custom formats

352

* @param decoder Decoder for reading records

353

* @param inputStream Input stream to read from

354

*/

355

public DecoderBasedReader(Decoder<T> decoder, InputStream inputStream);

356

357

/**

358

* Reads next record using the configured decoder

359

* @return Next decoded record, or null if no more records

360

* @throws IOException If reading fails

361

*/

362

public T read() throws IOException;

363

}

364

365

/**

366

* Simple string decoder for text-based formats

367

*/

368

public class SimpleStringDecoder implements Decoder<String> {

369

/**

370

* Decodes string from input stream

371

* @param inputStream Input stream to decode from

372

* @return Decoded string

373

* @throws IOException If decoding fails

374

*/

375

@Override

376

public String decode(InputStream inputStream) throws IOException;

377

}

378

```

379

380

**Advanced Usage Examples:**

381

382

```java

383

// JSON compaction with custom logic

384

FileSink<String> jsonSink = FileSink

385

.forRowFormat(new Path("/json-output"), new SimpleStringEncoder<String>("UTF-8"))

386

.enableCompact(

387

FileCompactStrategy.builder()

388

.setSizeThreshold(MemorySize.ofMebiBytes(32).getBytes())

389

.enableCompactionOnCheckpoint(2)

390

.build(),

391

new JsonFileCompactor())

392

.build();

393

394

// Compaction with high parallelism for large files

395

FileSink<String> highThroughputSink = FileSink

396

.forRowFormat(new Path("/high-volume"), new SimpleStringEncoder<String>("UTF-8"))

397

.enableCompact(

398

FileCompactStrategy.builder()

399

.setSizeThreshold(MemorySize.ofMebiBytes(256).getBytes())

400

.setNumCompactThreads(8)

401

.build(),

402

new ConcatFileCompactor())

403

.build();

404

405

// Conditional compaction based on environment

406

FileCompactor compactor = isProductionEnvironment()

407

? new ConcatFileCompactor()

408

: new IdenticalFileCompactor(); // Disable in development

409

410

FileSink<String> conditionalSink = FileSink

411

.forRowFormat(new Path("/output"), new SimpleStringEncoder<String>("UTF-8"))

412

.enableCompact(

413

FileCompactStrategy.builder()

414

.setSizeThreshold(MemorySize.ofMebiBytes(64).getBytes())

415

.build(),

416

compactor)

417

.build();

418

```

419

420

## Error Handling

421

422

File compaction handles various error conditions during the compaction process:

423

424

- **IOException**: File system I/O errors during compaction

425

- **OutOfMemoryError**: Insufficient memory for compaction operations

426

- **SecurityException**: Permission errors accessing files

427

- **RuntimeException**: Format-specific compaction errors

428

429

```java

430

try {

431

FileCompactor compactor = new ConcatFileCompactor();

432

List<Path> inputFiles = Arrays.asList(

433

new Path("/data/file1.txt"),

434

new Path("/data/file2.txt")

435

);

436

compactor.compact(inputFiles, new Path("/data/compacted.txt"));

437

} catch (IOException e) {

438

// Handle I/O errors

439

} catch (SecurityException e) {

440

// Handle permission errors

441

} catch (Exception e) {

442

// Handle other compaction errors

443

}

444

```

445

446

## Performance Considerations

447

448

- Set appropriate size thresholds to balance compaction frequency and efficiency

449

- Use multiple compaction threads for I/O intensive operations

450

- Consider file format characteristics when choosing compaction strategies

451

- Monitor compaction performance and adjust thread counts based on system resources

452

- Balance between checkpoint-based and size-based compaction triggers

453

- Implement efficient custom compactors for specialized file formats

454

- Consider network and storage costs when compacting in distributed environments

455

- Monitor the impact of compaction on job performance and adjust triggers accordingly