or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

code-generation.mddata-structures.mdfilesystem.mdindex.mdruntime-operators.mdtype-system.mdutilities.md
tile.json

filesystem.mddocs/

File System Integration

Complete file system table source and sink implementation with support for partitioning, streaming writes, file compaction, and integration with various storage systems including HDFS, S3, and local filesystems.

Capabilities

File System Table Factory

Primary entry point for creating file system-based table sources and sinks, implementing the dynamic table factory pattern for Flink SQL integration.

/**
 * Primary factory for file system-based tables
 * Implements dynamic table factory pattern for Flink SQL integration
 */
class FileSystemTableFactory 
    implements DynamicTableSourceFactory, DynamicTableSinkFactory {
    
    /** Create dynamic table source for reading from file systems */
    DynamicTableSource createDynamicTableSource(Context context);
    
    /** Create dynamic table sink for writing to file systems */
    DynamicTableSink createDynamicTableSink(Context context);
    
    /** Get factory identifier */
    String factoryIdentifier();
    
    /** Get required context properties */
    Set<ConfigOption<?>> requiredOptions();
    
    /** Get optional context properties */
    Set<ConfigOption<?>> optionalOptions();
}

File System Tables

Main table source and sink implementations providing comprehensive file system integration capabilities.

/** Main table source implementation for file systems */
class FileSystemTableSource extends AbstractFileSystemTable 
    implements ScanTableSource, PartitionableTableSource, LimitableTableSource {
    
    FileSystemTableSource(
        ObjectIdentifier tableIdentifier,
        CatalogTable catalogTable,
        Map<String, String> properties,
        ReadableConfig tableOptions
    );
    
    /** Get scan runtime provider */
    ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext);
    
    /** Apply limit push-down optimization */
    Result applyLimit(long limit);
}

/** Main table sink implementation for file systems */
class FileSystemTableSink extends AbstractFileSystemTable 
    implements DynamicTableSink, PartitionableTableSink {
    
    FileSystemTableSink(
        ObjectIdentifier tableIdentifier,
        CatalogTable catalogTable,
        Map<String, String> properties,
        ReadableConfig tableOptions
    );
    
    /** Get sink runtime provider */
    SinkRuntimeProvider getSinkRuntimeProvider(Context context);
    
    /** Apply static partition insertion */
    void applyStaticPartition(Map<String, String> partition);
}

/** Base class for file system tables */
abstract class AbstractFileSystemTable implements DynamicTableSource, DynamicTableSink {
    /** Get table schema */
    TableSchema getTableSchema();
    
    /** Get partition keys */
    List<String> getPartitionKeys();
    
    /** Copy table with new properties */
    abstract AbstractFileSystemTable copy(Map<String, String> newProperties);
}

File System Factory Interface

Core factory interface for creating file system instances, enabling support for different storage systems.

/**
 * Factory for file system instances
 * Enables support for different storage systems (HDFS, S3, local, etc.)
 */
interface FileSystemFactory extends Serializable {
    /** Create file system instance */
    FileSystem create(URI fsUri) throws IOException;
}

Partition Management

Core interfaces and implementations for managing partitioned data, including partition computation, writing, and reading operations.

/**
 * Interface for computing partitions
 * Determines which partition a record belongs to
 */
interface PartitionComputer<T> {
    /** Compute partition path for given record */
    String generatePartValues(T record) throws Exception;
    
    /** Get partition field names */
    String[] getPartitionFieldNames();
}

/**
 * Interface for writing partitioned data
 * Handles the actual writing of records to partition-specific locations
 */
interface PartitionWriter<T> {
    /** Write a record to the appropriate partition */
    void write(T record) throws Exception;
    
    /** Close the writer and finalize writes */
    void close() throws Exception;
    
    /** Get commit information */
    List<PartitionCommitInfo> getCommitInfos();
}

/**
 * Interface for reading partitions
 * Provides partition-aware reading capabilities
 */
interface PartitionReader<P, OUT> {
    /** Read partition data */
    OUT read(P partition) throws Exception;
    
    /** Get partition metadata */
    P[] getPartitions() throws Exception;
}

/** Partition computer implementation for row data */
class RowDataPartitionComputer implements PartitionComputer<RowData> {
    RowDataPartitionComputer(
        String defaultPartValue,
        String[] partitionColumns,
        LogicalType[] partitionTypes,
        String[] fieldNames,
        LogicalType[] fieldTypes
    );
}

Partition Writers

Concrete implementations of partition writers for different partitioning strategies and use cases.

/** Dynamic partition writer implementation */
class DynamicPartitionWriter<T> implements PartitionWriter<T> {
    DynamicPartitionWriter(
        PartitionComputer<T> computer,
        PartitionWriterFactory<T> factory,
        FileSystemCommitter committer
    );
    
    /** Write record to dynamically determined partition */
    void write(T record) throws Exception;
}

/** Grouped partition writer implementation */
class GroupedPartitionWriter<T> implements PartitionWriter<T> {
    GroupedPartitionWriter(
        PartitionComputer<T> computer,
        PartitionWriterFactory<T> factory,
        long maxOpenWriters
    );
    
    /** Write record using grouped partitioning strategy */
    void write(T record) throws Exception;
}

/** Single directory writer implementation */
class SingleDirectoryWriter<T> implements PartitionWriter<T> {
    SingleDirectoryWriter(
        OutputFormatFactory<T> formatFactory,
        Path outputDir,
        String filePrefix
    );
    
    /** Write record to single directory */
    void write(T record) throws Exception;
}

/**
 * Factory for partition writers
 * Creates partition-specific writers on demand
 */
interface PartitionWriterFactory<T> extends Serializable {
    /** Create writer for specific partition */
    PartitionWriter<T> createWriter(String partition) throws IOException;
}

Partition Commit Policies

Interfaces and implementations for determining when partitions should be committed and how the commit process should be executed.

/**
 * Interface for partition commit strategies
 * Determines when and how partitions should be committed
 */
interface PartitionCommitPolicy extends Serializable {
    /** Check if partition should be committed */
    boolean shouldCommit(Context context) throws Exception;
    
    /** Execute the commit operation */
    void commit(Context context) throws Exception;
    
    /** Context for commit operations */
    interface Context {
        /** Get partition path */
        String partition();
        
        /** Get partition commit trigger context */
        PartitionCommitTrigger.Context commitContext();
    }
}

/** Metastore-based commit policy implementation */
class MetastoreCommitPolicy implements PartitionCommitPolicy {
    MetastoreCommitPolicy(
        TableMetaStoreFactory metaStoreFactory,
        ObjectIdentifier tableIdentifier,
        List<String> partitionKeys
    );
}

Output Format Factory

Factory interface for creating output formats, enabling integration with different file formats and storage systems.

/**
 * Factory for output formats
 * Enables integration with different file formats (Parquet, ORC, CSV, etc.)
 */
interface OutputFormatFactory<T> extends Serializable {
    /** Create output format for writing */
    OutputFormat<T> createOutputFormat(Path path);
    
    /** Get supported format options */
    Set<String> getSupportedOptions();
}

File System Operations

Core classes for file system operations including committing, output formatting, and bulk format handling.

/** Committer for file system operations */
class FileSystemCommitter implements Serializable {
    FileSystemCommitter(
        FileSystemFactory fsFactory,
        TableMetaStoreFactory msFactory,
        boolean overwrite,
        Path tmpPath,
        int parallelism,
        List<PartitionCommitPolicy> policies
    );
    
    /** Commit pending files */
    void commitPartitions(List<PartitionCommitInfo> partitionCommitInfos) throws Exception;
    
    /** Get commit policies */
    List<PartitionCommitPolicy> getCommitPolicies();
}

/** Output format for file systems */
class FileSystemOutputFormat<T> extends RichOutputFormat<T> {
    FileSystemOutputFormat(
        OutputFormatFactory<T> formatFactory,
        PartitionComputer<T> computer,
        Path outputPath,
        String filePrefix,
        boolean overwrite
    );
}

/** Bulk format with limit support */
class LimitableBulkFormat<T> implements BulkFormat<T> {
    LimitableBulkFormat(BulkFormat<T> format, long limit);
    
    /** Create reader with limit */
    Reader<T> createReader(Configuration config, FileSourceSplit split) throws IOException;
}

Partition Management Utilities

Utility classes for partition-related operations including temporary file management, partition loading, and time extraction.

/** Manage temporary partition files */
class PartitionTempFileManager {
    PartitionTempFileManager(
        FileSystemFactory fsFactory,
        Path tmpPath,
        int taskNumber,
        String prefix
    );
    
    /** Create temporary file for partition */
    Path createPartitionTempFile(String partition) throws IOException;
    
    /** List temporary files for partition */
    List<Path> listPartitionTempFiles(String partition) throws IOException;
}

/**
 * Load partition information
 * Discovers and loads partition metadata from storage
 */
interface PartitionLoader {
    /** Load all partitions */
    List<Partition> loadPartitions() throws Exception;
    
    /** Load specific partition */
    Partition loadPartition(Map<String, String> partitionSpec) throws Exception;
}

/**
 * Extract time from partitions
 * Enables time-based partition processing
 */
interface PartitionTimeExtractor extends Serializable {
    /** Extract timestamp from partition path */
    LocalDateTime extract(List<String> partitionKeys, List<String> partitionValues);
}

Streaming File Operations

Support for streaming file operations including streaming writers, sinks, and partition committers for real-time data processing.

/**
 * Streaming file writer
 * Handles streaming writes with proper watermark and checkpoint integration
 */
class StreamingFileWriter<IN> extends AbstractStreamOperator<PartitionCommitInfo>
    implements OneInputStreamOperator<IN, PartitionCommitInfo> {
    
    StreamingFileWriter(
        long bucketCheckInterval,
        PartitionComputer<IN> computer,
        PartitionWriterFactory<IN> writerFactory,
        FileSystemCommitter committer
    );
    
    /** Process streaming input element */
    void processElement(StreamRecord<IN> element) throws Exception;
}

/** Streaming sink implementation */
class StreamingSink<IN> implements Sink<IN> {
    StreamingSink(
        PartitionComputer<IN> computer,
        PartitionWriterFactory<IN> writerFactory,
        FileSystemCommitter committer,
        long bucketCheckInterval
    );
    
    /** Create sink writer */
    SinkWriter<IN> createWriter(InitContext context) throws IOException;
}

Streaming Support Classes

Additional classes supporting streaming operations including partition committers and commit triggers.

/** Operator for committing partitions in streaming mode */
class PartitionCommitter extends AbstractStreamOperator<Void>
    implements OneInputStreamOperator<PartitionCommitInfo, Void> {
    
    PartitionCommitter(List<PartitionCommitPolicy> policies);
    
    /** Process partition commit info */
    void processElement(StreamRecord<PartitionCommitInfo> element) throws Exception;
}

/**
 * Trigger for partition commits
 * Determines when partitions should be committed based on various criteria
 */
interface PartitionCommitTrigger extends Serializable {
    /** Check if partition should be committed */
    boolean shouldCommit(Context context) throws Exception;
    
    /** Trigger context interface */
    interface Context {
        /** Get current processing time */
        long currentProcessingTime();
        
        /** Get current watermark */
        long currentWatermark();
        
        /** Get partition create time */
        long partitionCreateTime();
    }
}

/** Processing time-based commit trigger */
class ProcTimeCommitTrigger implements PartitionCommitTrigger {
    ProcTimeCommitTrigger(long delay);
}

File Compaction

Framework for file compaction operations, enabling optimization of file layouts and reducing small file problems.

/**
 * Operator for file compaction
 * Handles merging of small files into larger ones for better performance
 */
class CompactOperator<T> extends AbstractStreamOperator<CompactResult>
    implements OneInputStreamOperator<T, CompactResult> {
    
    CompactOperator(
        CompactReader<T> reader,
        CompactWriter<T> writer,
        long targetFileSize,
        long compactionInterval
    );
}

/** Coordinator for compaction operations */
class CompactCoordinator extends AbstractStreamOperator<CompactionUnit>
    implements OneInputStreamOperator<PartitionCommitInfo, CompactionUnit> {
    
    CompactCoordinator(long targetFileSize, int maxConcurrentCompactions);
    
    /** Process partition commit information for compaction */
    void processElement(StreamRecord<PartitionCommitInfo> element) throws Exception;
}

/**
 * Interface for compact read operations
 * Reads data from files targeted for compaction
 */
interface CompactReader<T> extends Serializable {
    /** Read data from compaction source */
    Iterator<T> read(CompactionUnit unit) throws Exception;
}

/**
 * Interface for compact write operations
 * Writes compacted data to optimized file layouts
 */
interface CompactWriter<T> extends Serializable {
    /** Write compacted data */
    void write(Iterator<T> data, CompactionUnit unit) throws Exception;
}

/** File writer for compaction operations */
class CompactFileWriter<T> implements CompactWriter<T> {
    CompactFileWriter(OutputFormatFactory<T> formatFactory);
}

Metastore Integration

Interface for integrating with table metastores, enabling metadata management and catalog operations.

/**
 * Factory for metastore integration
 * Enables metadata management and catalog operations
 */
interface TableMetaStoreFactory extends Serializable {
    /** Create table metastore instance */
    TableMetaStore createTableMetaStore() throws Exception;
    
    /** Get metastore configuration */
    Map<String, String> getMetaStoreConfig();
}

Usage Examples

// Create file system table factory
FileSystemTableFactory factory = new FileSystemTableFactory();

// Configure table properties
Map<String, String> properties = new HashMap<>();
properties.put("connector", "filesystem");
properties.put("path", "/data/my-table");
properties.put("format", "parquet");

// Create table source
Context sourceContext = new TestContext(properties, schema);
DynamicTableSource source = factory.createDynamicTableSource(sourceContext);

// Create table sink
Context sinkContext = new TestContext(properties, schema);
DynamicTableSink sink = factory.createDynamicTableSink(sinkContext);

// Set up partition writer
PartitionComputer<RowData> computer = new RowDataPartitionComputer(
    "__DEFAULT_PARTITION__",
    new String[]{"year", "month"},
    new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType()},
    fieldNames,
    fieldTypes
);

// Create streaming file writer
StreamingFileWriter<RowData> streamingWriter = new StreamingFileWriter<>(
    60000L, // bucket check interval
    computer,
    writerFactory,
    committer
);