or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

code-generation.mddata-structures.mdfilesystem.mdindex.mdruntime-operators.mdtype-system.mdutilities.md

filesystem.mddocs/

0

# File System Integration

1

2

Complete file system table source and sink implementation with support for partitioning, streaming writes, file compaction, and integration with various storage systems including HDFS, S3, and local filesystems.

3

4

## Capabilities

5

6

### File System Table Factory

7

8

Primary entry point for creating file system-based table sources and sinks, implementing the dynamic table factory pattern for Flink SQL integration.

9

10

```java { .api }

11

/**

12

* Primary factory for file system-based tables

13

* Implements dynamic table factory pattern for Flink SQL integration

14

*/

15

class FileSystemTableFactory

16

implements DynamicTableSourceFactory, DynamicTableSinkFactory {

17

18

/** Create dynamic table source for reading from file systems */

19

DynamicTableSource createDynamicTableSource(Context context);

20

21

/** Create dynamic table sink for writing to file systems */

22

DynamicTableSink createDynamicTableSink(Context context);

23

24

/** Get factory identifier */

25

String factoryIdentifier();

26

27

/** Get required context properties */

28

Set<ConfigOption<?>> requiredOptions();

29

30

/** Get optional context properties */

31

Set<ConfigOption<?>> optionalOptions();

32

}

33

```

34

35

### File System Tables

36

37

Main table source and sink implementations providing comprehensive file system integration capabilities.

38

39

```java { .api }

40

/** Main table source implementation for file systems */

41

class FileSystemTableSource extends AbstractFileSystemTable

42

implements ScanTableSource, PartitionableTableSource, LimitableTableSource {

43

44

FileSystemTableSource(

45

ObjectIdentifier tableIdentifier,

46

CatalogTable catalogTable,

47

Map<String, String> properties,

48

ReadableConfig tableOptions

49

);

50

51

/** Get scan runtime provider */

52

ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);

53

54

/** Apply limit push-down optimization */

55

Result applyLimit(long limit);

56

}

57

58

/** Main table sink implementation for file systems */

59

class FileSystemTableSink extends AbstractFileSystemTable

60

implements DynamicTableSink, PartitionableTableSink {

61

62

FileSystemTableSink(

63

ObjectIdentifier tableIdentifier,

64

CatalogTable catalogTable,

65

Map<String, String> properties,

66

ReadableConfig tableOptions

67

);

68

69

/** Get sink runtime provider */

70

SinkRuntimeProvider getSinkRuntimeProvider(Context context);

71

72

/** Apply static partition insertion */

73

void applyStaticPartition(Map<String, String> partition);

74

}

75

76

/** Base class for file system tables */

77

abstract class AbstractFileSystemTable implements DynamicTableSource, DynamicTableSink {

78

/** Get table schema */

79

TableSchema getTableSchema();

80

81

/** Get partition keys */

82

List<String> getPartitionKeys();

83

84

/** Copy table with new properties */

85

abstract AbstractFileSystemTable copy(Map<String, String> newProperties);

86

}

87

```

88

89

### File System Factory Interface

90

91

Core factory interface for creating file system instances, enabling support for different storage systems.

92

93

```java { .api }

94

/**

95

* Factory for file system instances

96

* Enables support for different storage systems (HDFS, S3, local, etc.)

97

*/

98

interface FileSystemFactory extends Serializable {

99

/** Create file system instance */

100

FileSystem create(URI fsUri) throws IOException;

101

}

102

```

103

104

### Partition Management

105

106

Core interfaces and implementations for managing partitioned data, including partition computation, writing, and reading operations.

107

108

```java { .api }

109

/**

110

* Interface for computing partitions

111

* Determines which partition a record belongs to

112

*/

113

interface PartitionComputer<T> {

114

/** Compute partition path for given record */

115

String generatePartValues(T record) throws Exception;

116

117

/** Get partition field names */

118

String[] getPartitionFieldNames();

119

}

120

121

/**

122

* Interface for writing partitioned data

123

* Handles the actual writing of records to partition-specific locations

124

*/

125

interface PartitionWriter<T> {

126

/** Write a record to the appropriate partition */

127

void write(T record) throws Exception;

128

129

/** Close the writer and finalize writes */

130

void close() throws Exception;

131

132

/** Get commit information */

133

List<PartitionCommitInfo> getCommitInfos();

134

}

135

136

/**

137

* Interface for reading partitions

138

* Provides partition-aware reading capabilities

139

*/

140

interface PartitionReader<P, OUT> {

141

/** Read partition data */

142

OUT read(P partition) throws Exception;

143

144

/** Get partition metadata */

145

P[] getPartitions() throws Exception;

146

}

147

148

/** Partition computer implementation for row data */

149

class RowDataPartitionComputer implements PartitionComputer<RowData> {

150

RowDataPartitionComputer(

151

String defaultPartValue,

152

String[] partitionColumns,

153

LogicalType[] partitionTypes,

154

String[] fieldNames,

155

LogicalType[] fieldTypes

156

);

157

}

158

```

159

160

### Partition Writers

161

162

Concrete implementations of partition writers for different partitioning strategies and use cases.

163

164

```java { .api }

165

/** Dynamic partition writer implementation */

166

class DynamicPartitionWriter<T> implements PartitionWriter<T> {

167

DynamicPartitionWriter(

168

PartitionComputer<T> computer,

169

PartitionWriterFactory<T> factory,

170

FileSystemCommitter committer

171

);

172

173

/** Write record to dynamically determined partition */

174

void write(T record) throws Exception;

175

}

176

177

/** Grouped partition writer implementation */

178

class GroupedPartitionWriter<T> implements PartitionWriter<T> {

179

GroupedPartitionWriter(

180

PartitionComputer<T> computer,

181

PartitionWriterFactory<T> factory,

182

long maxOpenWriters

183

);

184

185

/** Write record using grouped partitioning strategy */

186

void write(T record) throws Exception;

187

}

188

189

/** Single directory writer implementation */

190

class SingleDirectoryWriter<T> implements PartitionWriter<T> {

191

SingleDirectoryWriter(

192

OutputFormatFactory<T> formatFactory,

193

Path outputDir,

194

String filePrefix

195

);

196

197

/** Write record to single directory */

198

void write(T record) throws Exception;

199

}

200

201

/**

202

* Factory for partition writers

203

* Creates partition-specific writers on demand

204

*/

205

interface PartitionWriterFactory<T> extends Serializable {

206

/** Create writer for specific partition */

207

PartitionWriter<T> createWriter(String partition) throws IOException;

208

}

209

```

210

211

### Partition Commit Policies

212

213

Interfaces and implementations for determining when partitions should be committed and how the commit process should be executed.

214

215

```java { .api }

216

/**

217

* Interface for partition commit strategies

218

* Determines when and how partitions should be committed

219

*/

220

interface PartitionCommitPolicy extends Serializable {

221

/** Check if partition should be committed */

222

boolean shouldCommit(Context context) throws Exception;

223

224

/** Execute the commit operation */

225

void commit(Context context) throws Exception;

226

227

/** Context for commit operations */

228

interface Context {

229

/** Get partition path */

230

String partition();

231

232

/** Get partition commit trigger context */

233

PartitionCommitTrigger.Context commitContext();

234

}

235

}

236

237

/** Metastore-based commit policy implementation */

238

class MetastoreCommitPolicy implements PartitionCommitPolicy {

239

MetastoreCommitPolicy(

240

TableMetaStoreFactory metaStoreFactory,

241

ObjectIdentifier tableIdentifier,

242

List<String> partitionKeys

243

);

244

}

245

```

246

247

### Output Format Factory

248

249

Factory interface for creating output formats, enabling integration with different file formats and storage systems.

250

251

```java { .api }

252

/**

253

* Factory for output formats

254

* Enables integration with different file formats (Parquet, ORC, CSV, etc.)

255

*/

256

interface OutputFormatFactory<T> extends Serializable {

257

/** Create output format for writing */

258

OutputFormat<T> createOutputFormat(Path path);

259

260

/** Get supported format options */

261

Set<String> getSupportedOptions();

262

}

263

```

264

265

### File System Operations

266

267

Core classes for file system operations including committing, output formatting, and bulk format handling.

268

269

```java { .api }

270

/** Committer for file system operations */

271

class FileSystemCommitter implements Serializable {

272

FileSystemCommitter(

273

FileSystemFactory fsFactory,

274

TableMetaStoreFactory msFactory,

275

boolean overwrite,

276

Path tmpPath,

277

int parallelism,

278

List<PartitionCommitPolicy> policies

279

);

280

281

/** Commit pending files */

282

void commitPartitions(List<PartitionCommitInfo> partitionCommitInfos) throws Exception;

283

284

/** Get commit policies */

285

List<PartitionCommitPolicy> getCommitPolicies();

286

}

287

288

/** Output format for file systems */

289

class FileSystemOutputFormat<T> extends RichOutputFormat<T> {

290

FileSystemOutputFormat(

291

OutputFormatFactory<T> formatFactory,

292

PartitionComputer<T> computer,

293

Path outputPath,

294

String filePrefix,

295

boolean overwrite

296

);

297

}

298

299

/** Bulk format with limit support */

300

class LimitableBulkFormat<T> implements BulkFormat<T> {

301

LimitableBulkFormat(BulkFormat<T> format, long limit);

302

303

/** Create reader with limit */

304

Reader<T> createReader(Configuration config, FileSourceSplit split) throws IOException;

305

}

306

```

307

308

### Partition Management Utilities

309

310

Utility classes for partition-related operations including temporary file management, partition loading, and time extraction.

311

312

```java { .api }

313

/** Manage temporary partition files */

314

class PartitionTempFileManager {

315

PartitionTempFileManager(

316

FileSystemFactory fsFactory,

317

Path tmpPath,

318

int taskNumber,

319

String prefix

320

);

321

322

/** Create temporary file for partition */

323

Path createPartitionTempFile(String partition) throws IOException;

324

325

/** List temporary files for partition */

326

List<Path> listPartitionTempFiles(String partition) throws IOException;

327

}

328

329

/**

330

* Load partition information

331

* Discovers and loads partition metadata from storage

332

*/

333

interface PartitionLoader {

334

/** Load all partitions */

335

List<Partition> loadPartitions() throws Exception;

336

337

/** Load specific partition */

338

Partition loadPartition(Map<String, String> partitionSpec) throws Exception;

339

}

340

341

/**

342

* Extract time from partitions

343

* Enables time-based partition processing

344

*/

345

interface PartitionTimeExtractor extends Serializable {

346

/** Extract timestamp from partition path */

347

LocalDateTime extract(List<String> partitionKeys, List<String> partitionValues);

348

}

349

```

350

351

### Streaming File Operations

352

353

Support for streaming file operations including streaming writers, sinks, and partition committers for real-time data processing.

354

355

```java { .api }

356

/**

357

* Streaming file writer

358

* Handles streaming writes with proper watermark and checkpoint integration

359

*/

360

class StreamingFileWriter<IN> extends AbstractStreamOperator<PartitionCommitInfo>

361

implements OneInputStreamOperator<IN, PartitionCommitInfo> {

362

363

StreamingFileWriter(

364

long bucketCheckInterval,

365

PartitionComputer<IN> computer,

366

PartitionWriterFactory<IN> writerFactory,

367

FileSystemCommitter committer

368

);

369

370

/** Process streaming input element */

371

void processElement(StreamRecord<IN> element) throws Exception;

372

}

373

374

/** Streaming sink implementation */

375

class StreamingSink<IN> implements Sink<IN> {

376

StreamingSink(

377

PartitionComputer<IN> computer,

378

PartitionWriterFactory<IN> writerFactory,

379

FileSystemCommitter committer,

380

long bucketCheckInterval

381

);

382

383

/** Create sink writer */

384

SinkWriter<IN> createWriter(InitContext context) throws IOException;

385

}

386

```

387

388

### Streaming Support Classes

389

390

Additional classes supporting streaming operations including partition committers and commit triggers.

391

392

```java { .api }

393

/** Operator for committing partitions in streaming mode */

394

class PartitionCommitter extends AbstractStreamOperator<Void>

395

implements OneInputStreamOperator<PartitionCommitInfo, Void> {

396

397

PartitionCommitter(List<PartitionCommitPolicy> policies);

398

399

/** Process partition commit info */

400

void processElement(StreamRecord<PartitionCommitInfo> element) throws Exception;

401

}

402

403

/**

404

* Trigger for partition commits

405

* Determines when partitions should be committed based on various criteria

406

*/

407

interface PartitionCommitTrigger extends Serializable {

408

/** Check if partition should be committed */

409

boolean shouldCommit(Context context) throws Exception;

410

411

/** Trigger context interface */

412

interface Context {

413

/** Get current processing time */

414

long currentProcessingTime();

415

416

/** Get current watermark */

417

long currentWatermark();

418

419

/** Get partition create time */

420

long partitionCreateTime();

421

}

422

}

423

424

/** Processing time-based commit trigger */

425

class ProcTimeCommitTrigger implements PartitionCommitTrigger {

426

ProcTimeCommitTrigger(long delay);

427

}

428

```

429

430

### File Compaction

431

432

Framework for file compaction operations, enabling optimization of file layouts and reducing small file problems.

433

434

```java { .api }

435

/**

436

* Operator for file compaction

437

* Handles merging of small files into larger ones for better performance

438

*/

439

class CompactOperator<T> extends AbstractStreamOperator<CompactResult>

440

implements OneInputStreamOperator<T, CompactResult> {

441

442

CompactOperator(

443

CompactReader<T> reader,

444

CompactWriter<T> writer,

445

long targetFileSize,

446

long compactionInterval

447

);

448

}

449

450

/** Coordinator for compaction operations */

451

class CompactCoordinator extends AbstractStreamOperator<CompactionUnit>

452

implements OneInputStreamOperator<PartitionCommitInfo, CompactionUnit> {

453

454

CompactCoordinator(long targetFileSize, int maxConcurrentCompactions);

455

456

/** Process partition commit information for compaction */

457

void processElement(StreamRecord<PartitionCommitInfo> element) throws Exception;

458

}

459

460

/**

461

* Interface for compact read operations

462

* Reads data from files targeted for compaction

463

*/

464

interface CompactReader<T> extends Serializable {

465

/** Read data from compaction source */

466

Iterator<T> read(CompactionUnit unit) throws Exception;

467

}

468

469

/**

470

* Interface for compact write operations

471

* Writes compacted data to optimized file layouts

472

*/

473

interface CompactWriter<T> extends Serializable {

474

/** Write compacted data */

475

void write(Iterator<T> data, CompactionUnit unit) throws Exception;

476

}

477

478

/** File writer for compaction operations */

479

class CompactFileWriter<T> implements CompactWriter<T> {

480

CompactFileWriter(OutputFormatFactory<T> formatFactory);

481

}

482

```

483

484

### Metastore Integration

485

486

Interface for integrating with table metastores, enabling metadata management and catalog operations.

487

488

```java { .api }

489

/**

490

* Factory for metastore integration

491

* Enables metadata management and catalog operations

492

*/

493

interface TableMetaStoreFactory extends Serializable {

494

/** Create table metastore instance */

495

TableMetaStore createTableMetaStore() throws Exception;

496

497

/** Get metastore configuration */

498

Map<String, String> getMetaStoreConfig();

499

}

500

```

501

502

## Usage Examples

503

504

```java

505

// Create file system table factory

506

FileSystemTableFactory factory = new FileSystemTableFactory();

507

508

// Configure table properties

509

Map<String, String> properties = new HashMap<>();

510

properties.put("connector", "filesystem");

511

properties.put("path", "/data/my-table");

512

properties.put("format", "parquet");

513

514

// Create table source

515

Context sourceContext = new TestContext(properties, schema);

516

DynamicTableSource source = factory.createDynamicTableSource(sourceContext);

517

518

// Create table sink

519

Context sinkContext = new TestContext(properties, schema);

520

DynamicTableSink sink = factory.createDynamicTableSink(sinkContext);

521

522

// Set up partition writer

523

PartitionComputer<RowData> computer = new RowDataPartitionComputer(

524

"__DEFAULT_PARTITION__",

525

new String[]{"year", "month"},

526

new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()},

527

fieldNames,

528

fieldTypes

529

);

530

531

// Create streaming file writer

532

StreamingFileWriter<RowData> streamingWriter = new StreamingFileWriter<>(

533

60000L, // bucket check interval

534

computer,

535

writerFactory,

536

committer

537

);

538

```