or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

bulk-formats.mdfile-compaction.mdfile-enumeration.mdfile-sinks.mdfile-sources.mdindex.mdsplit-assignment.mdstream-formats.md
tile.json

tessl/maven-org-apache-flink--flink-connector-files

Apache Flink file connector library for unified file processing in both batch and streaming modes with support for various formats, compression, and distributed processing capabilities.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-files@2.1.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-files@2.1.0

index.mddocs/

Apache Flink File Connector

Apache Flink File Connector provides a unified data source and sink for reading and writing files in both batch and streaming modes. It supports various file formats through StreamFormat and BulkFormat interfaces, with features like continuous monitoring, file splitting, compression support, and distributed processing capabilities across different file systems and object stores.

Package Information

  • Package Name: flink-connector-files
  • Package Type: maven
  • Language: Java
  • Group ID: org.apache.flink
  • Installation: <dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>2.1.0</version></dependency>

Core Imports

import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;

Basic Usage

Reading Files

import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import java.time.Duration;

// Create a file source for reading text files
FileSource<String> source = FileSource
    .forRecordStreamFormat(new TextLineInputFormat(), new Path("/path/to/input/files"))
    .monitorContinuously(Duration.ofSeconds(10))
    .build();

// Use in DataStream API
DataStream<String> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

Writing Files

import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;

// Create a file sink for writing text files
FileSink<String> sink = FileSink
    .forRowFormat(new Path("/path/to/output"), new SimpleStringEncoder<String>("UTF-8"))
    .withRollingPolicy(DefaultRollingPolicy.builder()
        .withMaxPartSize(MemorySize.ofMebiBytes(128))
        .withRolloverInterval(Duration.ofMinutes(15))
        .build())
    .build();

// Use in DataStream API
stream.sinkTo(sink);

Architecture

The Flink File Connector is built around several key components:

  • File Sources: FileSource provides unified reading with support for both streaming and batch modes
  • File Sinks: FileSink handles writing with exactly-once semantics and file rolling
  • Format Interfaces: StreamFormat for record-by-record reading, BulkFormat for batch-oriented reading
  • File Discovery: FileEnumerator implementations discover files and create splits for parallel processing
  • Split Assignment: FileSplitAssigner manages distribution of file splits to reader nodes with locality awareness
  • Compression Support: Automatic decompression for common formats (.gz, .bz2, .xz, .deflate)
  • File Compaction: Optional compaction system to merge small files for better performance

Capabilities

File Source Operations

Unified file reading with support for various formats, continuous monitoring, and distributed processing.

public static <T> FileSourceBuilder<T> forRecordStreamFormat(
        final StreamFormat<T> streamFormat, final Path... paths);

public static <T> FileSourceBuilder<T> forBulkFileFormat(
        final BulkFormat<T, FileSourceSplit> bulkFormat, final Path... paths);

File Sources

File Sink Operations

Unified file writing with exactly-once semantics, bucketing, rolling policies, and optional compaction.

public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
        final Path basePath, final Encoder<IN> encoder);

public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(
        final Path basePath, final BulkWriter.Factory<IN> bulkWriterFactory);

File Sinks

Stream Format Interface

Interface for record-by-record file reading with automatic compression support.

@PublicEvolving
public interface StreamFormat<T> extends Serializable, ResultTypeQueryable<T> {
    Reader<T> createReader(
            Configuration config, FSDataInputStream stream, long fileLen, long splitEnd)
            throws IOException;
    
    Reader<T> restoreReader(
            Configuration config,
            FSDataInputStream stream,
            long restoredOffset,
            long fileLen,
            long splitEnd)
            throws IOException;
    
    boolean isSplittable();
    
    @Override
    TypeInformation<T> getProducedType();
    
    @PublicEvolving
    interface Reader<T> extends Closeable {
        @Nullable
        T read() throws IOException;
        
        @Override
        void close() throws IOException;
        
        @Nullable
        default CheckpointedPosition getCheckpointedPosition() {
            return null;
        }
    }
}

Stream Formats

Bulk Format Interface

Interface for batch-oriented reading optimized for formats like ORC and Parquet.

@PublicEvolving
public interface BulkFormat<T, SplitT extends FileSourceSplit>
        extends Serializable, ResultTypeQueryable<T> {
    
    BulkFormat.Reader<T> createReader(Configuration config, SplitT split) throws IOException;
    
    BulkFormat.Reader<T> restoreReader(Configuration config, SplitT split) throws IOException;
    
    boolean isSplittable();
    
    @Override
    TypeInformation<T> getProducedType();
    
    interface Reader<T> extends Closeable {
        @Nullable
        RecordIterator<T> readBatch() throws IOException;

        @Override
        void close() throws IOException;
    }
    
    interface RecordIterator<T> {
        @Nullable
        RecordAndPosition<T> next();

        void releaseBatch();
    }
}

Bulk Formats

File Discovery and Enumeration

File discovery mechanisms for finding and splitting files across distributed storage systems.

public interface FileEnumerator {
    Collection<FileSourceSplit> enumerateSplits(Path[] paths, int minDesiredSplits) throws IOException;
    
    interface Provider extends Serializable {
        FileEnumerator create();
    }
}

File Enumeration

Split Assignment

Split assignment strategies for distributing file processing across nodes with locality awareness.

public interface FileSplitAssigner {
    Optional<FileSourceSplit> getNext(String hostname);
    void addSplits(Collection<FileSourceSplit> splits);
    Collection<FileSourceSplit> remainingSplits();
    
    interface Provider extends Serializable {
        FileSplitAssigner create(Collection<FileSourceSplit> splits);
    }
}

Split Assignment

File Compaction

File compaction system for merging small files to improve performance and reduce metadata overhead.

public interface FileCompactor {
    void compact(List<Path> inputFiles, Path outputFile) throws Exception;
}

public interface FileCompactStrategy {
    long getSizeThreshold();
    int getNumCheckpointsBeforeCompaction();
    int getNumCompactThreads();
}

File Compaction

Types

Core Split and Position Types

public class FileSourceSplit {
    public FileSourceSplit(String id, Path path, long offset, long length, 
                          long modificationTime, long fileSize, String... hostnames);
    public Path path();
    public long offset();
    public long length();
    public long fileModificationTime();
    public String[] hostnames();
    public CheckpointedPosition getReaderPosition();
    public FileSourceSplit updateWithCheckpointedPosition(CheckpointedPosition position);
}

public class CheckpointedPosition {
    public static final long NO_OFFSET = -1L;
    
    public CheckpointedPosition(long offset, long recordsAfterOffset);
    public long getOffset();
    public long getRecordsAfterOffset();
}

Configuration Types

public class ContinuousEnumerationSettings {
    public ContinuousEnumerationSettings(Duration discoveryInterval);
    public Duration getDiscoveryInterval();
}