0
# File System Integration
1
2
Complete file system table source and sink implementation with support for partitioning, streaming writes, file compaction, and integration with various storage systems including HDFS, S3, and local filesystems.
3
4
## Capabilities
5
6
### File System Table Factory
7
8
Primary entry point for creating file system-based table sources and sinks, implementing the dynamic table factory pattern for Flink SQL integration.
9
10
```java { .api }
11
/**
12
* Primary factory for file system-based tables
13
* Implements dynamic table factory pattern for Flink SQL integration
14
*/
15
class FileSystemTableFactory
16
implements DynamicTableSourceFactory, DynamicTableSinkFactory {
17
18
/** Create dynamic table source for reading from file systems */
19
DynamicTableSource createDynamicTableSource(Context context);
20
21
/** Create dynamic table sink for writing to file systems */
22
DynamicTableSink createDynamicTableSink(Context context);
23
24
/** Get factory identifier */
25
String factoryIdentifier();
26
27
/** Get required context properties */
28
Set<ConfigOption<?>> requiredOptions();
29
30
/** Get optional context properties */
31
Set<ConfigOption<?>> optionalOptions();
32
}
33
```
34
35
### File System Tables
36
37
Main table source and sink implementations providing comprehensive file system integration capabilities.
38
39
```java { .api }
40
/** Main table source implementation for file systems */
41
class FileSystemTableSource extends AbstractFileSystemTable
42
implements ScanTableSource, PartitionableTableSource, LimitableTableSource {
43
44
FileSystemTableSource(
45
ObjectIdentifier tableIdentifier,
46
CatalogTable catalogTable,
47
Map<String, String> properties,
48
ReadableConfig tableOptions
49
);
50
51
/** Get scan runtime provider */
52
ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
53
54
/** Apply limit push-down optimization */
55
Result applyLimit(long limit);
56
}
57
58
/** Main table sink implementation for file systems */
59
class FileSystemTableSink extends AbstractFileSystemTable
60
implements DynamicTableSink, PartitionableTableSink {
61
62
FileSystemTableSink(
63
ObjectIdentifier tableIdentifier,
64
CatalogTable catalogTable,
65
Map<String, String> properties,
66
ReadableConfig tableOptions
67
);
68
69
/** Get sink runtime provider */
70
SinkRuntimeProvider getSinkRuntimeProvider(Context context);
71
72
/** Apply static partition insertion */
73
void applyStaticPartition(Map<String, String> partition);
74
}
75
76
/** Base class for file system tables */
77
abstract class AbstractFileSystemTable implements DynamicTableSource, DynamicTableSink {
78
/** Get table schema */
79
TableSchema getTableSchema();
80
81
/** Get partition keys */
82
List<String> getPartitionKeys();
83
84
/** Copy table with new properties */
85
abstract AbstractFileSystemTable copy(Map<String, String> newProperties);
86
}
87
```
88
89
### File System Factory Interface
90
91
Core factory interface for creating file system instances, enabling support for different storage systems.
92
93
```java { .api }
94
/**
95
* Factory for file system instances
96
* Enables support for different storage systems (HDFS, S3, local, etc.)
97
*/
98
interface FileSystemFactory extends Serializable {
99
/** Create file system instance */
100
FileSystem create(URI fsUri) throws IOException;
101
}
102
```
103
104
### Partition Management
105
106
Core interfaces and implementations for managing partitioned data, including partition computation, writing, and reading operations.
107
108
```java { .api }
109
/**
110
* Interface for computing partitions
111
* Determines which partition a record belongs to
112
*/
113
interface PartitionComputer<T> {
114
/** Compute partition path for given record */
115
String generatePartValues(T record) throws Exception;
116
117
/** Get partition field names */
118
String[] getPartitionFieldNames();
119
}
120
121
/**
122
* Interface for writing partitioned data
123
* Handles the actual writing of records to partition-specific locations
124
*/
125
interface PartitionWriter<T> {
126
/** Write a record to the appropriate partition */
127
void write(T record) throws Exception;
128
129
/** Close the writer and finalize writes */
130
void close() throws Exception;
131
132
/** Get commit information */
133
List<PartitionCommitInfo> getCommitInfos();
134
}
135
136
/**
137
* Interface for reading partitions
138
* Provides partition-aware reading capabilities
139
*/
140
interface PartitionReader<P, OUT> {
141
/** Read partition data */
142
OUT read(P partition) throws Exception;
143
144
/** Get partition metadata */
145
P[] getPartitions() throws Exception;
146
}
147
148
/** Partition computer implementation for row data */
149
class RowDataPartitionComputer implements PartitionComputer<RowData> {
150
RowDataPartitionComputer(
151
String defaultPartValue,
152
String[] partitionColumns,
153
LogicalType[] partitionTypes,
154
String[] fieldNames,
155
LogicalType[] fieldTypes
156
);
157
}
158
```
159
160
### Partition Writers
161
162
Concrete implementations of partition writers for different partitioning strategies and use cases.
163
164
```java { .api }
165
/** Dynamic partition writer implementation */
166
class DynamicPartitionWriter<T> implements PartitionWriter<T> {
167
DynamicPartitionWriter(
168
PartitionComputer<T> computer,
169
PartitionWriterFactory<T> factory,
170
FileSystemCommitter committer
171
);
172
173
/** Write record to dynamically determined partition */
174
void write(T record) throws Exception;
175
}
176
177
/** Grouped partition writer implementation */
178
class GroupedPartitionWriter<T> implements PartitionWriter<T> {
179
GroupedPartitionWriter(
180
PartitionComputer<T> computer,
181
PartitionWriterFactory<T> factory,
182
long maxOpenWriters
183
);
184
185
/** Write record using grouped partitioning strategy */
186
void write(T record) throws Exception;
187
}
188
189
/** Single directory writer implementation */
190
class SingleDirectoryWriter<T> implements PartitionWriter<T> {
191
SingleDirectoryWriter(
192
OutputFormatFactory<T> formatFactory,
193
Path outputDir,
194
String filePrefix
195
);
196
197
/** Write record to single directory */
198
void write(T record) throws Exception;
199
}
200
201
/**
202
* Factory for partition writers
203
* Creates partition-specific writers on demand
204
*/
205
interface PartitionWriterFactory<T> extends Serializable {
206
/** Create writer for specific partition */
207
PartitionWriter<T> createWriter(String partition) throws IOException;
208
}
209
```
210
211
### Partition Commit Policies
212
213
Interfaces and implementations for determining when partitions should be committed and how the commit process should be executed.
214
215
```java { .api }
216
/**
217
* Interface for partition commit strategies
218
* Determines when and how partitions should be committed
219
*/
220
interface PartitionCommitPolicy extends Serializable {
221
/** Check if partition should be committed */
222
boolean shouldCommit(Context context) throws Exception;
223
224
/** Execute the commit operation */
225
void commit(Context context) throws Exception;
226
227
/** Context for commit operations */
228
interface Context {
229
/** Get partition path */
230
String partition();
231
232
/** Get partition commit trigger context */
233
PartitionCommitTrigger.Context commitContext();
234
}
235
}
236
237
/** Metastore-based commit policy implementation */
238
class MetastoreCommitPolicy implements PartitionCommitPolicy {
239
MetastoreCommitPolicy(
240
TableMetaStoreFactory metaStoreFactory,
241
ObjectIdentifier tableIdentifier,
242
List<String> partitionKeys
243
);
244
}
245
```
246
247
### Output Format Factory
248
249
Factory interface for creating output formats, enabling integration with different file formats and storage systems.
250
251
```java { .api }
252
/**
253
* Factory for output formats
254
* Enables integration with different file formats (Parquet, ORC, CSV, etc.)
255
*/
256
interface OutputFormatFactory<T> extends Serializable {
257
/** Create output format for writing */
258
OutputFormat<T> createOutputFormat(Path path);
259
260
/** Get supported format options */
261
Set<String> getSupportedOptions();
262
}
263
```
264
265
### File System Operations
266
267
Core classes for file system operations including committing, output formatting, and bulk format handling.
268
269
```java { .api }
270
/** Committer for file system operations */
271
class FileSystemCommitter implements Serializable {
272
FileSystemCommitter(
273
FileSystemFactory fsFactory,
274
TableMetaStoreFactory msFactory,
275
boolean overwrite,
276
Path tmpPath,
277
int parallelism,
278
List<PartitionCommitPolicy> policies
279
);
280
281
/** Commit pending files */
282
void commitPartitions(List<PartitionCommitInfo> partitionCommitInfos) throws Exception;
283
284
/** Get commit policies */
285
List<PartitionCommitPolicy> getCommitPolicies();
286
}
287
288
/** Output format for file systems */
289
class FileSystemOutputFormat<T> extends RichOutputFormat<T> {
290
FileSystemOutputFormat(
291
OutputFormatFactory<T> formatFactory,
292
PartitionComputer<T> computer,
293
Path outputPath,
294
String filePrefix,
295
boolean overwrite
296
);
297
}
298
299
/** Bulk format with limit support */
300
class LimitableBulkFormat<T> implements BulkFormat<T> {
301
LimitableBulkFormat(BulkFormat<T> format, long limit);
302
303
/** Create reader with limit */
304
Reader<T> createReader(Configuration config, FileSourceSplit split) throws IOException;
305
}
306
```
307
308
### Partition Management Utilities
309
310
Utility classes for partition-related operations including temporary file management, partition loading, and time extraction.
311
312
```java { .api }
313
/** Manage temporary partition files */
314
class PartitionTempFileManager {
315
PartitionTempFileManager(
316
FileSystemFactory fsFactory,
317
Path tmpPath,
318
int taskNumber,
319
String prefix
320
);
321
322
/** Create temporary file for partition */
323
Path createPartitionTempFile(String partition) throws IOException;
324
325
/** List temporary files for partition */
326
List<Path> listPartitionTempFiles(String partition) throws IOException;
327
}
328
329
/**
330
* Load partition information
331
* Discovers and loads partition metadata from storage
332
*/
333
interface PartitionLoader {
334
/** Load all partitions */
335
List<Partition> loadPartitions() throws Exception;
336
337
/** Load specific partition */
338
Partition loadPartition(Map<String, String> partitionSpec) throws Exception;
339
}
340
341
/**
342
* Extract time from partitions
343
* Enables time-based partition processing
344
*/
345
interface PartitionTimeExtractor extends Serializable {
346
/** Extract timestamp from partition path */
347
LocalDateTime extract(List<String> partitionKeys, List<String> partitionValues);
348
}
349
```
350
351
### Streaming File Operations
352
353
Support for streaming file operations including streaming writers, sinks, and partition committers for real-time data processing.
354
355
```java { .api }
356
/**
357
* Streaming file writer
358
* Handles streaming writes with proper watermark and checkpoint integration
359
*/
360
class StreamingFileWriter<IN> extends AbstractStreamOperator<PartitionCommitInfo>
361
implements OneInputStreamOperator<IN, PartitionCommitInfo> {
362
363
StreamingFileWriter(
364
long bucketCheckInterval,
365
PartitionComputer<IN> computer,
366
PartitionWriterFactory<IN> writerFactory,
367
FileSystemCommitter committer
368
);
369
370
/** Process streaming input element */
371
void processElement(StreamRecord<IN> element) throws Exception;
372
}
373
374
/** Streaming sink implementation */
375
class StreamingSink<IN> implements Sink<IN> {
376
StreamingSink(
377
PartitionComputer<IN> computer,
378
PartitionWriterFactory<IN> writerFactory,
379
FileSystemCommitter committer,
380
long bucketCheckInterval
381
);
382
383
/** Create sink writer */
384
SinkWriter<IN> createWriter(InitContext context) throws IOException;
385
}
386
```
387
388
### Streaming Support Classes
389
390
Additional classes supporting streaming operations including partition committers and commit triggers.
391
392
```java { .api }
393
/** Operator for committing partitions in streaming mode */
394
class PartitionCommitter extends AbstractStreamOperator<Void>
395
implements OneInputStreamOperator<PartitionCommitInfo, Void> {
396
397
PartitionCommitter(List<PartitionCommitPolicy> policies);
398
399
/** Process partition commit info */
400
void processElement(StreamRecord<PartitionCommitInfo> element) throws Exception;
401
}
402
403
/**
404
* Trigger for partition commits
405
* Determines when partitions should be committed based on various criteria
406
*/
407
interface PartitionCommitTrigger extends Serializable {
408
/** Check if partition should be committed */
409
boolean shouldCommit(Context context) throws Exception;
410
411
/** Trigger context interface */
412
interface Context {
413
/** Get current processing time */
414
long currentProcessingTime();
415
416
/** Get current watermark */
417
long currentWatermark();
418
419
/** Get partition create time */
420
long partitionCreateTime();
421
}
422
}
423
424
/** Processing time-based commit trigger */
425
class ProcTimeCommitTrigger implements PartitionCommitTrigger {
426
ProcTimeCommitTrigger(long delay);
427
}
428
```
429
430
### File Compaction
431
432
Framework for file compaction operations, enabling optimization of file layouts and reducing small file problems.
433
434
```java { .api }
435
/**
436
* Operator for file compaction
437
* Handles merging of small files into larger ones for better performance
438
*/
439
class CompactOperator<T> extends AbstractStreamOperator<CompactResult>
440
implements OneInputStreamOperator<T, CompactResult> {
441
442
CompactOperator(
443
CompactReader<T> reader,
444
CompactWriter<T> writer,
445
long targetFileSize,
446
long compactionInterval
447
);
448
}
449
450
/** Coordinator for compaction operations */
451
class CompactCoordinator extends AbstractStreamOperator<CompactionUnit>
452
implements OneInputStreamOperator<PartitionCommitInfo, CompactionUnit> {
453
454
CompactCoordinator(long targetFileSize, int maxConcurrentCompactions);
455
456
/** Process partition commit information for compaction */
457
void processElement(StreamRecord<PartitionCommitInfo> element) throws Exception;
458
}
459
460
/**
461
* Interface for compact read operations
462
* Reads data from files targeted for compaction
463
*/
464
interface CompactReader<T> extends Serializable {
465
/** Read data from compaction source */
466
Iterator<T> read(CompactionUnit unit) throws Exception;
467
}
468
469
/**
470
* Interface for compact write operations
471
* Writes compacted data to optimized file layouts
472
*/
473
interface CompactWriter<T> extends Serializable {
474
/** Write compacted data */
475
void write(Iterator<T> data, CompactionUnit unit) throws Exception;
476
}
477
478
/** File writer for compaction operations */
479
class CompactFileWriter<T> implements CompactWriter<T> {
480
CompactFileWriter(OutputFormatFactory<T> formatFactory);
481
}
482
```
483
484
### Metastore Integration
485
486
Interface for integrating with table metastores, enabling metadata management and catalog operations.
487
488
```java { .api }
489
/**
490
* Factory for metastore integration
491
* Enables metadata management and catalog operations
492
*/
493
interface TableMetaStoreFactory extends Serializable {
494
/** Create table metastore instance */
495
TableMetaStore createTableMetaStore() throws Exception;
496
497
/** Get metastore configuration */
498
Map<String, String> getMetaStoreConfig();
499
}
500
```
501
502
## Usage Examples
503
504
```java
505
// Create file system table factory
506
FileSystemTableFactory factory = new FileSystemTableFactory();
507
508
// Configure table properties
509
Map<String, String> properties = new HashMap<>();
510
properties.put("connector", "filesystem");
511
properties.put("path", "/data/my-table");
512
properties.put("format", "parquet");
513
514
// Create table source
515
Context sourceContext = new TestContext(properties, schema);
516
DynamicTableSource source = factory.createDynamicTableSource(sourceContext);
517
518
// Create table sink
519
Context sinkContext = new TestContext(properties, schema);
520
DynamicTableSink sink = factory.createDynamicTableSink(sinkContext);
521
522
// Set up partition writer
523
PartitionComputer<RowData> computer = new RowDataPartitionComputer(
524
"__DEFAULT_PARTITION__",
525
new String[]{"year", "month"},
526
new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()},
527
fieldNames,
528
fieldTypes
529
);
530
531
// Create streaming file writer
532
StreamingFileWriter<RowData> streamingWriter = new StreamingFileWriter<>(
533
60000L, // bucket check interval
534
computer,
535
writerFactory,
536
committer
537
);
538
```